[新機能]AWS Glue Studio Spark SQLクエリを使用した変換が可能になりました
データアナリティクス事業本部コンサルティングチームの石川です。AWS Glue Studioは SQLを使用して変換を定義する新しいTransform「Spark SQL」が追加されました。Spark SQLによる結合・集計・フィルタ・変換をAWS Glue Studio のビジュアルトランスフォームとシームレスに融合させることができるようになりました。本日は、新しいTransform「Spark SQL」を用いたETLを試します!
Spark SQLクエリを使用した変換とは
Spark SQLとは、Apache SparkがサポートするANSI準拠のSQLです。Spark SQL用いて複数のDataFrameを結合・集計・変換したDataFrameを生成できます。
従来、Spark SQLクエリを利用するには、Custom Transformを用いてSparkのスクリプトを書く必要がありました。DynamicFrameをDataFrameに変換した後、DataFrameにエイリアスを付与してSQL実行、生成されたDataFrameをもとのDynamicFrame変換するなどの煩雑な作業が伴いました。
今後は、Custom Transformの代わりに「Spark SQL」というTransformを利用して、SQL以外のコードの記載が不要になりました。
新たに追加された Transform - Spark SQL
TransformからSpark SQLが新たに追加されたことが確認できます。
Spark SQLの設定は、データソース(内部的にはDataFrame)に対するエイリアスの設定と、SQLの記載のみです。ここで設定したエイリアスは、下のSQLの中でテーブル名として利用します。
Spark SQLによるETLを試してみる
では、実際に Transform - Spark SQLを使ってデータを変換します。
検証用データ
今回は、サンプルデータベースのsales, date, usersの3つのテーブルを結合・集計・変換します。
- DATE: 365 レコード
- SALES: 172456 レコード
- USERS: 49990 レコード
AWS Glue Studioの操作については、以下のブログを参照してください。(下記の「ぐるぐるぐるぐるグルコサミン」は、見なかったことでお願いします。)
事前に出力用のテーブルをデータレイク上に作成しています。
CREATE EXTERNAL TABLE `sales_summay`( `sellerid` int, `username` varchar(8), `name` varchar(32), `city` varchar(30), `qtysold_total` bigint) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://cm-bucket/datalake/tickitdb/sales_summay' TBLPROPERTIES ( 'transient_lastDdlTime'='1618626620')
ワークフローの全体像
今回は、先に全体像をお見せします。sales, date, usersの3つのデータソースをSpark SQLで変換して最後にファイル出力します。
1.データソース:SALES、USERS、DATE の設定
ワークフロー上段のデータソース SALES、USERS、DATEの設定です。ともにデータベース名とテーブル名を指定するのみです。下記の設定は、SALESのものですが、USERS、DATEも同様です。
2.ETL:Spark SQL の設定
ワークフロー中央のデータのETLを担うSpark SQLの設定です。
データソースにエイリアスを設定
データソースにエイリアスを設定します。SQL内では、エイリアスがテーブル名として利用します。
- DATE: date_t
- SALES: sales_t
- USERS: users_t
ETL用のSQL
上記のエイリアスを使って、作成したSQLです。
select sellerid, username, (firstname ||' '|| lastname) as name, city, sum(qtysold) as qtysold_total from sales_t, date_t, users_t where sales_t.sellerid = users_t.userid and sales_t.dateid = date_t.dateid and year = 2008 group by 1,2,3,4 order by 5 desc
3.データレイクへの出力:S3 設定
ワークフロー下段、データレイクへの出力するS3 設定です。出力フォーマットは、Glue Parquet、圧縮タイプは、snappyを選択しました。Glue Parquetは、Glueに最適化されたParquetフォーマットで、出力ファイルのスキーマを動的に計算して、高速にParquetファイルを書き込めるそうです。
4.ジョブの実行
G.1Xの2worker、Glue 2.0、Spark 2.4、Python3で、約1分ちょっとで実行を終えました。
Athenaから以下の通り参照できます。
ファイルのコンパクションもお忘れなく...
上記のジョブを実行すると、小さな14のParquetファイルに分割されます。データの増加にともないワーカーの数が増えた場合、このファイル数も増えてしまいます。実際のプロダクションでは、ファイルのコンパクション(複数の小さなファイルをまとめること)をおすすめします。
下記のワークフローでは、ファイルのコンパクションするCustom Transformをファイル出力する手前に追加しています。
ファイルのコンパクションするCustom Transformのコードは、以下のとおりです。下記の例では、1つに設定しています。
def MyTransform (glueContext, dfc) -> DynamicFrameCollection: selected = dfc.select(list(dfc.keys())[0]).toDF() reprep = selected.repartition(1) results = DynamicFrame.fromDF(reprep, glueContext, "results") return DynamicFrameCollection({"results": results}, glueContext)
あれ、結局Custom Transform使ってるよね…どうか見逃してください。
まとめ
今回やったことをGlueのビルトインのTransformでやろうとすると、Joinを2回、DropFields、Filter、Custom transformで値の生成と手順が多く、その手順(手続き)を自分で考えて組まなければなりません。一方、SQLは宣言的言語なので、SQLで条件を与えればCatalystオプティマイザが最適な実行プランを生成して実行してくれます。Apache Sparkは、将来的にSpark3.0がサポートされると設定の変更なしで最適化が期待できますので、Transform Spark SQLのご利用をおすすめします。
最後に述べたとおり、データレイクでは小さな大量のファイルに常に悩まされます。できれば、repartitionのトランスフォームを追加していただけたらと嬉しいです。