GlueのSparkジョブでTSVからParquetへ変換してみた
こんにちは。インテグレーション部 の大高です。
Glueと仲良くなるべく色々と触っています。今回はGlueのSparkジョブでTSVからParquetへファイルを変換してみたいと思います。
ジョブの作成
GlueのジョブはSparkジョブを作成します。また、今回はせっかくなので最近リリースされたPython3でジョブを作成しました。
ジョブのスクリプト
大きな処理の流れとしては、S3からTSVファイルを取得し、Parquet形式にしてS3へ保存という流れとして作成します。
スクリプト全体
スクリプトは以下のようにしました。S3バケットや取得元のファイルは事前に用意してあります。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) # ジョブ初期化 sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) # from_options関数でTSV読み込み dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type = "s3", connection_options = {"paths": [ "s3://foobar/glue/src/Sample.tsv"]}, format="csv", format_options={"separator": "\t", "withHeader": True}) # データの情報を表示 print('Count: {0}'.format(dynamic_frame.count())) dynamic_frame.printSchema() # データの中身を表示 dynamic_frame.show(dynamic_frame.count()) # Parquet変換して書き込み data_frame = dynamic_frame.toDF() ## snappy data_frame.write.mode("overwrite").format("parquet").option("compression", "snappy").save("s3://foobar/glue/parquet_snappy") # ジョブコミット job.commit()
TSVファイルの読み込み
TSVファイルをfrom_options関数を利用してDynamicFrameとして読み込んでいます。 Data Catalogを利用して読み込むことが多いと思いますが、今回は敢えてData Catalogは利用せずにファイルを直接指定して読み込んでみました。
なお、今回connection_options
ではpaths
しか指定していませんが、以下のヘルプのように様々なオプションも指定できます。
AWS Glue での ETL の接続タイプとオプション - AWS Glue
また、format_options
でwithHeader
オプションを利用することで先頭行をヘッダ行としています。
データの情報・中身を表示
DynamicFrame化したデータの情報を表示しています。実際に出力されたログとしては以下のように表示されていました。
Count: 3
root |-- No: string |-- Title: string |-- Price: string
{ "No": "1", "Title": "劇場版 仮面ライダービルド Be The One", "Price": "6192.1" } { "No": "2", "Title": "平成仮面ライダー20作記念 仮面ライダー平成ジェネレーションズ FOREVER", "Price": "7115.2" } { "No": "3", "Title": "ビルド NEW WORLD 仮面ライダークローズ", "Price": "9720.3" }
カラムについては、すべて string
として判定されていることが分かります。データは元データと変わらず想定通りです。
なお、元データのTSVは以下のようになっています。
No Title Price 1 劇場版 仮面ライダービルド Be The One 6192.1 2 平成仮面ライダー20作記念 仮面ライダー平成ジェネレーションズ FOREVER 7115.2 3 ビルド NEW WORLD 仮面ライダークローズ 9720.3
Parquet変換して書き込み
ここではまずtoDF()
でSparkのDataFrameへ変換しています。
変換後はwrite
関数でDataFrameWriterとして、保存時のモードやフォーマットを指定してParquetファイルとしてS3に保存します。
処理結果の確認
実際に保存されたファイルを確認してみます。今回はParquetViewerというツールで確認してみました。
以下のとおりちゃんとデータが入っていますね。
まとめ
以上、GlueのSparkジョブでTSVからParquetへ変換してみた結果でした。今回は非常に単純なファイル形式の変換だけでしたが、個人的には色々と学ぶところが多かったです。
どなたかのお役に立てれば幸いです。それでは!