[新機能]AWS Glue Studio Spark SQLクエリを使用した変換が可能になりました

2021.04.17

データアナリティクス事業本部コンサルティングチームの石川です。AWS Glue Studioは SQLを使用して変換を定義する新しいTransform「Spark SQL」が追加されました。Spark SQLによる結合・集計・フィルタ・変換をAWS Glue Studio のビジュアルトランスフォームとシームレスに融合させることができるようになりました。本日は、新しいTransform「Spark SQL」を用いたETLを試します!

Spark SQLクエリを使用した変換とは

Spark SQLとは、Apache SparkがサポートするANSI準拠のSQLです。Spark SQL用いて複数のDataFrameを結合・集計・変換したDataFrameを生成できます。

従来、Spark SQLクエリを利用するには、Custom Transformを用いてSparkのスクリプトを書く必要がありました。DynamicFrameをDataFrameに変換した後、DataFrameにエイリアスを付与してSQL実行、生成されたDataFrameをもとのDynamicFrame変換するなどの煩雑な作業が伴いました。

今後は、Custom Transformの代わりに「Spark SQL」というTransformを利用して、SQL以外のコードの記載が不要になりました。

新たに追加された Transform - Spark SQL

TransformからSpark SQLが新たに追加されたことが確認できます。

Spark SQLの設定は、データソース(内部的にはDataFrame)に対するエイリアスの設定と、SQLの記載のみです。ここで設定したエイリアスは、下のSQLの中でテーブル名として利用します。

Spark SQLによるETLを試してみる

では、実際に Transform - Spark SQLを使ってデータを変換します。

検証用データ

今回は、サンプルデータベースのsales, date, usersの3つのテーブルを結合・集計・変換します。

  • DATE: 365 レコード
  • SALES: 172456 レコード
  • USERS: 49990 レコード

AWS Glue Studioの操作については、以下のブログを参照してください。(下記の「ぐるぐるぐるぐるグルコサミン」は、見なかったことでお願いします。)

事前に出力用のテーブルをデータレイク上に作成しています。

CREATE EXTERNAL TABLE `sales_summay`(
  `sellerid` int, 
  `username` varchar(8), 
  `name` varchar(32), 
  `city` varchar(30), 
  `qtysold_total` bigint)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://cm-bucket/datalake/tickitdb/sales_summay'
TBLPROPERTIES (
  'transient_lastDdlTime'='1618626620')

ワークフローの全体像

今回は、先に全体像をお見せします。sales, date, usersの3つのデータソースをSpark SQLで変換して最後にファイル出力します。

1.データソース:SALES、USERS、DATE の設定

ワークフロー上段のデータソース SALES、USERS、DATEの設定です。ともにデータベース名とテーブル名を指定するのみです。下記の設定は、SALESのものですが、USERS、DATEも同様です。

2.ETL:Spark SQL の設定

ワークフロー中央のデータのETLを担うSpark SQLの設定です。

データソースにエイリアスを設定

データソースにエイリアスを設定します。SQL内では、エイリアスがテーブル名として利用します。

  • DATE: date_t
  • SALES: sales_t
  • USERS: users_t

ETL用のSQL

上記のエイリアスを使って、作成したSQLです。

select 
    sellerid, 
    username, 
    (firstname ||' '|| lastname) as name,
    city, 
    sum(qtysold) as qtysold_total
from sales_t, date_t, users_t
where sales_t.sellerid = users_t.userid
    and sales_t.dateid = date_t.dateid
    and year = 2008
group by 1,2,3,4
order by 5 desc

3.データレイクへの出力:S3 設定

ワークフロー下段、データレイクへの出力するS3 設定です。出力フォーマットは、Glue Parquet、圧縮タイプは、snappyを選択しました。Glue Parquetは、Glueに最適化されたParquetフォーマットで、出力ファイルのスキーマを動的に計算して、高速にParquetファイルを書き込めるそうです。

4.ジョブの実行

G.1Xの2worker、Glue 2.0、Spark 2.4、Python3で、約1分ちょっとで実行を終えました。

Athenaから以下の通り参照できます。

ファイルのコンパクションもお忘れなく...

上記のジョブを実行すると、小さな14のParquetファイルに分割されます。データの増加にともないワーカーの数が増えた場合、このファイル数も増えてしまいます。実際のプロダクションでは、ファイルのコンパクション(複数の小さなファイルをまとめること)をおすすめします。

下記のワークフローでは、ファイルのコンパクションするCustom Transformをファイル出力する手前に追加しています。

ファイルのコンパクションするCustom Transformのコードは、以下のとおりです。下記の例では、1つに設定しています。

def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
    selected = dfc.select(list(dfc.keys())[0]).toDF()
    reprep = selected.repartition(1)
    results = DynamicFrame.fromDF(reprep, glueContext, "results")
    return DynamicFrameCollection({"results": results}, glueContext)

あれ、結局Custom Transform使ってるよね…どうか見逃してください。

まとめ

今回やったことをGlueのビルトインのTransformでやろうとすると、Joinを2回、DropFields、Filter、Custom transformで値の生成と手順が多く、その手順(手続き)を自分で考えて組まなければなりません。一方、SQLは宣言的言語なので、SQLで条件を与えればCatalystオプティマイザが最適な実行プランを生成して実行してくれます。Apache Sparkは、将来的にSpark3.0がサポートされると設定の変更なしで最適化が期待できますので、Transform Spark SQLのご利用をおすすめします。

最後に述べたとおり、データレイクでは小さな大量のファイルに常に悩まされます。できれば、repartitionのトランスフォームを追加していただけたらと嬉しいです。

合わせて読みたい