[レポート] ANT326 : AWS Glue ETL Jobの Metrics-Driven パフォーマンスチューニング #reinvent

2018.12.03

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

ANT326: Metrics-Driven Performance Tuning for AWS Glue ETL Jobs のChalk Talkのレポートとなります。本セッションではパフォーマンスチューニング手法とのその効果について2018年7月に機能追加されたメトリクスをエビデンスに用いて解説しています。

セッション概要

AWS Glue provides a horizontally scalable platform for running ETL jobs against a wide variety of data sources. In this builder's session, we cover techniques for understanding and optimizing the performance of your jobs using AWS Glue job metrics. Learn how to identify bottlenecks on the driver and executors, identify and fix data skew, tune the number of DPUs, and address common memory errors.

AWS Glueは、さまざまなデータソースに対してETLジョブを実行するための、水平方向にスケーラブルなプラットフォームを提供します。このビルダーのセッションでは、AWS Glueジョブ・メトリックを使用してジョブのパフォーマンスを理解し、最適化するためのテクニックについて説明します。ドライバとエグゼキュータのボトルネックを特定する方法、データスキューを特定および修正する方法、DPUの数を調整する方法、共通のメモリエラーに対処する方法について説明します。

スピーカー

Ben Sowell - Principal Engineer

Apache Spark と AWS Glue とは

Apache Sparkは、複雑な分析のための豊富な機能を持つ分散データ処理エンジンです。AWS Glueは、Apache Sparkランタイムを用いて開発されたETL用途の機能を提供します。

Apache SparkのDataframeとSparkSQLは、AWS GlueのDynamicframeとAWS Glue ETLに対応しますが、 DataframeとDynamicframeは相互変換が可能です。

AWS Glue execution model

ジョブとステージ

  • アクション(Action): ETLコードの各行
  • ジョブ(jobs): 処理のまとまり
  • ステージ(stages): ジョブをシャッフル前後で分けます

1〜6行目までは、1つのジョブとなり、4行目のrepartition()で、シャッフルが発生するのでステージが別れます。7行目は1つのアクションに見えますが、Sparkは遅延評価なので、1〜3行目と7行目を実行することになります。

AWS Glue execution model: data partitions

Apache SparkとAWS Glueはデータ並列で、データをパーティションに並列分散処理します。

  • 1 stage x 1 partition = 1 task

全体のスループットは、パーティション数によって制限されません。

そこで生じる2つの疑問

  • アプリケーションのジョブとステージはどのように分けたら良いのか?
  • データセットをどのように分割するか?

多くの小さなデータファイルの処理の問題

よく起こる問題の一つは、多数の小さなファイルを扱うことです。この処理は多くのメモリを必要とし、パフォーマンスオーバーヘッドにつながる可能性があります。簡単なJSONからParquetへの変換で、640個のパーティションに128万個のJSONファイルがあるとします。

パフォーマンスをより理解するためにAWS Glueのジョブメトリクスを使用します。ジョブメトリクスを有効にするにはマネジメントコンソールのJobMetricsをEnableに設定、CLI/SDKで有効にするには、--enable-metricsを指定します。

Case1: 一般的なApache Spark を用いた変換

data = spark.read.format("json").load("<input_location>")
data.write.format("parquet").save("<output_location>")

Apache Sparkで、多くの小さなデータファイルのParquetに変換のジョブを実行するとcommand faildexit code 1のようなエラーが発生します。ドライバーのメモリー使用量は急速に増えており、5GB以下に達しています。

Case2: AWS Glue Dynamicframe を用いた変換

AWS Glue Dynamicframe を用いると、ドライバのメモリは、実行の全期間にわたって50%以下に収まります。

df = glueContext.create_dynamic_frame_from_catalog("<database>", "<table>")
glueContext.write_from_options(df, "s3", {"path":"<output_location>"}, "parquet")

ファイルのグループ化の設定

  • groupFiles
    • パーティション内のファイルのグループ化を有効にするには、 "inPartition"に設定する
    • 異なるパーティションからのファイルのグループ化を有効にするには、 "acrossPartition"に設定します。 パーティションの値は各レコードに追加されない
    • 50,000を超えるファイルがある場合、グループ化は自動的に有効になる
  • groupSize
    • 各グループのターゲットサイズ(バイト)
    • デフォルトはクラスタ内のコア数に基づく
    • グループのサイズを増やして試す

実行は遅いのですが、クラッシュは回避できました。

大きなファイル処理の問題

大きなテキストファイルを分割して処理できない場合にも問題が発生します。データがどのように圧縮されているかは、パフォーマンスに大きな違いをもたらします。非圧縮またはbzip2圧縮されたファイルは、分割して並列処理できますが、gzip圧縮されたファイルは分割できません。また、ファイルをカラムなファイルフォーマット(Parquet)に変換する対策も検討してください。

大きなgzipファイルの場合

5つのPartitionにそれぞれ大きな1つのファイル(1.6GB)がある場合、2時間後にジョブエラーが発生します。

大きなbzip2ファイルの場合

bzip2ファイル(1.3GB)はブロックの中に分割できるため、104タスクに分割して18分後に処理が終了します。

大きな非圧縮ファイルの場合

非圧縮ファイル(12.5GB)は行ごとに分割できるため、64MBのパーティションに分割して12分後に処理が終了します。

解決方針

  • 圧縮タイプを選択するならbzip2が好ましい
  • gzipを選択する場合はリソースが十分であるか確認が必要
  • AWS Glueは帯域幅がジョブのボトムネックになることはまれなのでファイルを解凍しない

ファイル数を削減するには

データセットをCoalesceする

データセットをCoalesceするのは、ファイル数を削減したい場合です。下記のコードでは、Dataframeのパーティションを1つにCoalesceして出力ファイルを1つにまとめています。読み込みは並列化しますが、1つのExecutorにすべてのデータが送られるため遅れが生じ、1時間52分かかっています。

df = spark.read.format("json").load(...)
df2 = df.coalesce(1)
df.write.format("json").save(...)

Coalesceにgroupingを組み合わせる

Coalesceにgroupingを組み合わせてると、groupingのサイズは500MBになります。4つのexecutorは26分間に2GB未満で16パーティション処理します。

Coalesceしないときのベースラインパフォーマンス

ジョブをすべて並列し、処理を10分します。その時のデータサイズは500MBとなります。

DynamicFrameとスキーマコンピレーション

  • DynamicFrameは、Spark RDDのレコードのself-describing(AWS Glueの独自実装)です
  • 基本的なETL操作には全体的なスキーマは必要なく、他のレコードを参照せずに"age"フィールドを削除できます
  • 一部の完全なスキーマが必要な操作では、アプリケーションで余分な作業が強制される可能性があります。
  • 一部のオペレーションはアプリケーション中の特別なジョブで完全なスキーマが必要です。スキーマを必要とする操作は以下です
    • DropNullFields
    • Relationalize
    • ResolveChoice

DynamicFrame Schema Extra Example : DropNullFields

下記の例では、データカタログからデータを読み込み、タイプが NullType のすべての null フィールドを削除した後、出力しています。

df = glueContext.create_dynamic_frame_from_catalog(...)
df2 = DropNullFields.apply(df)
glueContext.write_dynamic_frame_option(df2, ...)

最初のステージはスキーマ変換用です。次にカラム名やデータ型を変換するApplyMappingを、データを読み込んだ直後に追加します。

df = glueContext.create_dynamic_frame_from_catalog(...)
df2 = df.applyMapping([
    ('col1', 'col1', 'string'),
    ('col2', 'col2', 'string'),
    ('col3', 'col3', 'string'),
    ('col4', 'col4', 'string'),
    ('col5', 'col5', 'boolean')
])
glueContext.write_dynamic_frame_option(df3, ...)

処理を追加しても、ステージは1つのままです。最初に解説したとおり、処理を追加してもシャッフルが生じないので、ステージ数は1つのままです。

DynamicFrameの最適化

  • ApplyMappingはスキーマの設定に用います
  • DropNullFieldsのような変換に追加します
  • デフォルトで生成するスクリプトでは安全に設計されていますが、データを把握しているなら最適化できるでしょう
  • データ変換に要する時間は昨年よりも4倍改善していますので、過去のスクリプトもお試しください!

AWS GlueにおけるPythonパフォーマンス

  • Pythonでマップとフィルタを大きなデータセットで使用するとコストがかかります
  • すべてのデータはシリアル化されて、JVMとPythonの間で送信されます
  • 回避策
    • AWS Glue のスクリプトをScalaで記述する
    • DataFrameに変換してSpark SQL式を用いる

最後に

AWS GlueやSparkを使い倒している人向けのChalk Talkなので、答え合わせという観点で良い内容でしたが、groupFilesのacrossPartitionなど現時点でマニュアルにない設定も当たり前のように解説されております。Chalk Talkは発表者に対して質問できる醍醐味がありますが、オフレコの内容を含むので割愛している部分もあります。用語や前提がよくわからない人は以下のブログも合わせてお読みください。

AWS Glue がETLジョブのデバッグとプロファイリングを可能にするメトリックをサポートしました

AWS GlueのDynamicFrameの動きを見てみる

AWS Glueで多くの小さなファイルをまとめて読み込むgroupFiles/groupSize指定でパフォーマンスを改善する

合わせて読みたい

[レポート] ANT308 : AWS Glue のサーバレスアナリティクスパイプライン構築する #reinvent