GlueのSparkジョブでTSVからParquetへ変換してみた

こんにちは。インテグレーション部 の大高です。

Glueと仲良くなるべく色々と触っています。今回はGlueのSparkジョブでTSVからParquetへファイルを変換してみたいと思います。

ジョブの作成

GlueのジョブはSparkジョブを作成します。また、今回はせっかくなので最近リリースされたPython3でジョブを作成しました。

ジョブのスクリプト

大きな処理の流れとしては、S3からTSVファイルを取得し、Parquet形式にしてS3へ保存という流れとして作成します。

スクリプト全体

スクリプトは以下のようにしました。S3バケットや取得元のファイルは事前に用意してあります。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])


# ジョブ初期化
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


# from_options関数でTSV読み込み
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type = "s3",
                                                              connection_options = {"paths": [ "s3://foobar/glue/src/Sample.tsv"]},
                                                              format="csv",
                                                              format_options={"separator": "\t", "withHeader": True})

# データの情報を表示
print('Count: {0}'.format(dynamic_frame.count()))
dynamic_frame.printSchema()

# データの中身を表示
dynamic_frame.show(dynamic_frame.count())


# Parquet変換して書き込み
data_frame = dynamic_frame.toDF()

## snappy
data_frame.write.mode("overwrite").format("parquet").option("compression", "snappy").save("s3://foobar/glue/parquet_snappy")


# ジョブコミット
job.commit()

TSVファイルの読み込み

TSVファイルをfrom_options関数を利用してDynamicFrameとして読み込んでいます。 Data Catalogを利用して読み込むことが多いと思いますが、今回は敢えてData Catalogは利用せずにファイルを直接指定して読み込んでみました。

なお、今回connection_optionsではpathsしか指定していませんが、以下のヘルプのように様々なオプションも指定できます。

AWS Glue での ETL の接続タイプとオプション - AWS Glue

また、format_optionswithHeaderオプションを利用することで先頭行をヘッダ行としています。

データの情報・中身を表示

DynamicFrame化したデータの情報を表示しています。実際に出力されたログとしては以下のように表示されていました。

Count: 3
root
|-- No: string
|-- Title: string
|-- Price: string
{
    "No": "1",
    "Title": "劇場版 仮面ライダービルド Be The One",
    "Price": "6192.1"
}

{
    "No": "2",
    "Title": "平成仮面ライダー20作記念 仮面ライダー平成ジェネレーションズ FOREVER",
    "Price": "7115.2"
}

{
    "No": "3",
    "Title": "ビルド NEW WORLD 仮面ライダークローズ",
    "Price": "9720.3"
}

カラムについては、すべて string として判定されていることが分かります。データは元データと変わらず想定通りです。

なお、元データのTSVは以下のようになっています。

No	Title	Price
1	劇場版 仮面ライダービルド Be The One	6192.1
2	平成仮面ライダー20作記念 仮面ライダー平成ジェネレーションズ FOREVER	7115.2
3	ビルド NEW WORLD 仮面ライダークローズ	9720.3

Parquet変換して書き込み

ここではまずtoDF()でSparkのDataFrameへ変換しています。

変換後はwrite関数でDataFrameWriterとして、保存時のモードやフォーマットを指定してParquetファイルとしてS3に保存します。

処理結果の確認

実際に保存されたファイルを確認してみます。今回はParquetViewerというツールで確認してみました。

以下のとおりちゃんとデータが入っていますね。

まとめ

以上、GlueのSparkジョブでTSVからParquetへ変換してみた結果でした。今回は非常に単純なファイル形式の変換だけでしたが、個人的には色々と学ぶところが多かったです。

どなたかのお役に立てれば幸いです。それでは!