AWS Glue がETLジョブのデバッグとプロファイリングを可能にするメトリックをサポートしました
はじめに
AWS Glueの中で最も大きなアップデートです。AWS Glue がついにETLジョブのデバッグとプロファイリングを可能にするメトリックを提供するようになりました。DriverとExecutorのメモリー使用量とCPU負荷、Executor間のデータのシャッフルなどのランタイム・メトリクスを容易に追跡できるようになりましたので早速試してみました。
AWS Glue のメトリクスについて
AWS Glueは、Job Profilerを用いてETLジョブの生データを収集して、CloudWatchに保存されているほぼリアルタイムのメトリクスにすることができます。これらの統計情報はCloudWatchに保持され、集約されているため、履歴情報にアクセスしてアプリケーションのパフォーマンスをより正確に把握できます。
AWS GlueでETLジョブを実行すると、以下で説明するメトリクスがCloudWatchに送信されます。これらのメトリクスは、AWS Glueコンソール、CloudWatchコンソールダッシュボード、またはAWS CLIで表示できます。
検証用のETLジョブの作成
以下の条件で検証用のETLジョブを作成しました。
- 2つの約560MBのCSV形式のS3データファイルに入力
- 入力は事前にGlue データカタログにテーブルとして登録
- JSON形式のS3データファイルに出力
- 出力は日付ごとにカラム名ありパーティション分割(意図的にシャッフル)
- DPUは10(デフォルト)
- Job profilingは
Enable
(デフォルトDisable
)に変更
特にJob profilingはデフォルトDisable
なのでEnable
に変更することをお忘れないようご注意ください。ジョブ作成時のAdvanced Properties やScript liblaries and job parameters(optional)は以下のとおりです。
ETLジョブのソースコードは以下のとおりです。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "default", table_name = "lineorder_1998", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "lineorder_1998", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("lo_orderkey", "int", "lo_orderkey", "int"), ("lo_linenumber", "int", "lo_linenumber", "int"), ("lo_custkey", "int", "lo_custkey", "int"), ("lo_partkey", "int", "lo_partkey", "int"), ("lo_suppkey", "int", "lo_suppkey", "int"), ("lo_orderdate", "string", "lo_orderdate", "string"), ("lo_orderpriority", "string", "lo_orderpriority", "string"), ("lo_shippriority", "string", "lo_shippriority", "string"), ("lo_quantity", "int", "lo_quantity", "int"), ("lo_extendedprice", "int", "lo_extendedprice", "int"), ("lo_ordertotalprice", "int", "lo_ordertotalprice", "int"), ("lo_discount", "int", "lo_discount", "int"), ("lo_revenue", "int", "lo_revenue", "int"), ("lo_supplycost", "int", "lo_supplycost", "int"), ("lo_tax", "int", "lo_tax", "int"), ("lo_commitdate", "int", "lo_commitdate", "int"), ("lo_shipmode", "string", "lo_shipmode", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("lo_orderkey", "int", "lo_orderkey", "int"), ("lo_linenumber", "int", "lo_linenumber", "int"), ("lo_custkey", "int", "lo_custkey", "int"), ("lo_partkey", "int", "lo_partkey", "int"), ("lo_suppkey", "int", "lo_suppkey", "int"), ("lo_orderdate", "string", "lo_orderdate", "string"), ("lo_orderpriority", "string", "lo_orderpriority", "string"), ("lo_shippriority", "string", "lo_shippriority", "string"), ("lo_quantity", "int", "lo_quantity", "int"), ("lo_extendedprice", "int", "lo_extendedprice", "int"), ("lo_ordertotalprice", "int", "lo_ordertotalprice", "int"), ("lo_discount", "int", "lo_discount", "int"), ("lo_revenue", "int", "lo_revenue", "int"), ("lo_supplycost", "int", "lo_supplycost", "int"), ("lo_tax", "int", "lo_tax", "int"), ("lo_commitdate", "int", "lo_commitdate", "int"), ("lo_shipmode", "string", "lo_shipmode", "string")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://cm-bucket/tmp", "compression": "gzip"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://cm-ishikawa-satoru/tmp", "partitionKeys": ["lo_orderdate"], "compression": "gzip"}, format = "json", transformation_ctx = "datasink2") job.commit()
ETLジョブのメトリクスの参照
AWS Glueコンソールのメトリクスの参照
AWS Glueコンソールは、ジョブを実行すると5分ごとにメトリクスがグラフに反映されます。参照するには、ETLジョブの一覧から参照したいジョブ名(Name)を選択した後、下の「メトリクス」タブを選択します。ETL Data Movement
とMemory Profile: Driver and Executors
の2つのグラフが参照できます。
さらに詳細メトリクスを表示するには、[View Additional metrics]ボタンを押します。すると、Detailed job metricsとしてさらに
Data Shuffle Across Executors
、CPU Load: Driver and Executors
、Job Execution: Active Executors, Completed Stages & Maximum Needed Executors
のグラフが参照できます。このグラフは、参照したい時間帯のグラフ部分を対角線上に矩形選択することで拡大することができます。
サンプルのETLジョブでは、2つの入力ファイルに対して、17のExecutorが起動され、リソースが有効に利用されていないことを容易に把握することができます。
CloudWatch Metric Dashboardsのメトリクスの参照
AWS Glueは30秒ごとにMetricをCloudWatchに保存します。AWS Glueコンソールのメトリクスの参照では5分毎の表示となりますが、CloudWatch Metric Dashboardsでは、30秒ごとにMetricを参照できます。上記のグラフの右上のから「メトリクスを表示」を選択すると、CloudWatch Metric Dashboardsに遷移します。
ETL Data Movement
というグラフが、glue.ALL.s3.filesystem.read_bytes
とglue.ALL.s3.filesystem.write_bytes
の2つの情報であることも確認できます。
CloudWatch Metric Dashboardsのメトリクスの監視
CloudWatch Metric Dashboardsで参照したメトリクスに対して、条件やしきい値を設定することでメトリクスを監視できます。
AWS Glue のメトリクス
AWS Glueプロファイルを使用して30秒ごとにCloudWatchに測定値を送信し、AWS Glue Metrics Dashboardが1分に1回報告します。 メトリクスの詳細については、AWS Glue Metricsを御覧ください。
- glue.driver.aggregate.bytesRead
- glue.driver.aggregate.elapsedTime
- glue.driver.aggregate.numCompletedStages
- glue.driver.aggregate.numCompletedTasks
- glue.driver.aggregate.numFailedTasks
- glue.driver.aggregate.numKilledTasks
- glue.driver.aggregate.recordsRead
- glue.driver.aggregate.shuffleBytesWritten
- glue.driver.aggregate.shuffleLocalBytesRead
- glue.driver.BlockManager.disk.diskSpaceUsed_MB
- glue.driver.ExecutorAllocationManager.executors.numberAllExecutors
- glue.driver.ExecutorAllocationManager.executors.numberMaxNeededExecutors
- glue.driver.jvm.heap.usage
- glue.executorId.jvm.heap.usage
- glue.ALL.jvm.heap.usage
- glue.driver.jvm.heap.used
- glue.executorId.jvm.heap.used
- glue.ALL.jvm.heap.used
- glue.driver.s3.filesystem.read_bytes
- glue.executorId.s3.filesystem.read_bytes
- glue.ALL.s3.filesystem.read_bytes
- glue.driver.s3.filesystem.write_bytes
- glue.executorId.s3.filesystem.write_bytes
- glue.ALL.s3.filesystem.write_bytes
- glue.driver.system.cpuSystemLoad
- glue.executorId.system.cpuSystemLoad
- glue.ALL.system.cpuSystemLoad
最後に
これまで、ETLジョブのパフォーマンスの低下、メモリ不足が発生して処理が中断するといった問題の調査はCloudWatchLogsのログの参照のみでした。従来は、特定のExecutorがホットな状態なのか、そしてボトルネックの要因(CPU、メモリ、IO)が何かを把握するのはほぼ不可能でした。
新たに追加された AWS Glue のメトリクスは、読み書きされたバイト数、DriverとExecutorのメモリー使用量とCPU負荷、Executor間のデータのシャッフルなどのランタイム・メトリクスを容易に追跡できるようになりました。これらのメトリクスは、コードのデバッグ、データの問題の特定、CPUキャパシティの計画に役立ちます。もちろん、特定のジョブ条件でAmazon CloudWatchでアラームを設定することもできます。今日からこのメトリクスを有効(Enable)にして、クラスタのアクティビティを計測・監視にお役立てください。
Don’t guess, measure!(推測するな、計測せよ!)