この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは。インテグレーション部 の大高です。
Glueと仲良くなるべく色々と触っています。今回はGlueのSparkジョブでTSVからParquetへファイルを変換してみたいと思います。
ジョブの作成
GlueのジョブはSparkジョブを作成します。また、今回はせっかくなので最近リリースされたPython3でジョブを作成しました。
ジョブのスクリプト
大きな処理の流れとしては、S3からTSVファイルを取得し、Parquet形式にしてS3へ保存という流れとして作成します。
スクリプト全体
スクリプトは以下のようにしました。S3バケットや取得元のファイルは事前に用意してあります。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
# ジョブ初期化
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# from_options関数でTSV読み込み
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type = "s3",
connection_options = {"paths": [ "s3://foobar/glue/src/Sample.tsv"]},
format="csv",
format_options={"separator": "\t", "withHeader": True})
# データの情報を表示
print('Count: {0}'.format(dynamic_frame.count()))
dynamic_frame.printSchema()
# データの中身を表示
dynamic_frame.show(dynamic_frame.count())
# Parquet変換して書き込み
data_frame = dynamic_frame.toDF()
## snappy
data_frame.write.mode("overwrite").format("parquet").option("compression", "snappy").save("s3://foobar/glue/parquet_snappy")
# ジョブコミット
job.commit()
TSVファイルの読み込み
TSVファイルをfrom_options関数を利用してDynamicFrameとして読み込んでいます。 Data Catalogを利用して読み込むことが多いと思いますが、今回は敢えてData Catalogは利用せずにファイルを直接指定して読み込んでみました。
なお、今回connection_options
ではpaths
しか指定していませんが、以下のヘルプのように様々なオプションも指定できます。
AWS Glue での ETL の接続タイプとオプション - AWS Glue
また、format_options
でwithHeader
オプションを利用することで先頭行をヘッダ行としています。
データの情報・中身を表示
DynamicFrame化したデータの情報を表示しています。実際に出力されたログとしては以下のように表示されていました。
Count: 3
root
|-- No: string
|-- Title: string
|-- Price: string
{
"No": "1",
"Title": "劇場版 仮面ライダービルド Be The One",
"Price": "6192.1"
}
{
"No": "2",
"Title": "平成仮面ライダー20作記念 仮面ライダー平成ジェネレーションズ FOREVER",
"Price": "7115.2"
}
{
"No": "3",
"Title": "ビルド NEW WORLD 仮面ライダークローズ",
"Price": "9720.3"
}
カラムについては、すべて string
として判定されていることが分かります。データは元データと変わらず想定通りです。
なお、元データのTSVは以下のようになっています。
Sample.tsv
No Title Price
1 劇場版 仮面ライダービルド Be The One 6192.1
2 平成仮面ライダー20作記念 仮面ライダー平成ジェネレーションズ FOREVER 7115.2
3 ビルド NEW WORLD 仮面ライダークローズ 9720.3
Parquet変換して書き込み
ここではまずtoDF()
でSparkのDataFrameへ変換しています。
変換後はwrite
関数でDataFrameWriterとして、保存時のモードやフォーマットを指定してParquetファイルとしてS3に保存します。
処理結果の確認
実際に保存されたファイルを確認してみます。今回はParquetViewerというツールで確認してみました。
以下のとおりちゃんとデータが入っていますね。
まとめ
以上、GlueのSparkジョブでTSVからParquetへ変換してみた結果でした。今回は非常に単純なファイル形式の変換だけでしたが、個人的には色々と学ぶところが多かったです。
どなたかのお役に立てれば幸いです。それでは!