AWS Glue がETLジョブのデバッグとプロファイリングを可能にするメトリックをサポートしました

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

AWS Glueの中で最も大きなアップデートです。AWS Glue がついにETLジョブのデバッグとプロファイリングを可能にするメトリックを提供するようになりました。DriverとExecutorのメモリー使用量とCPU負荷、Executor間のデータのシャッフルなどのランタイム・メトリクスを容易に追跡できるようになりましたので早速試してみました。

AWS Glue のメトリクスについて

AWS Glueは、Job Profilerを用いてETLジョブの生データを収集して、CloudWatchに保存されているほぼリアルタイムのメトリクスにすることができます。これらの統計情報はCloudWatchに保持され、集約されているため、履歴情報にアクセスしてアプリケーションのパフォーマンスをより正確に把握できます。

AWS GlueでETLジョブを実行すると、以下で説明するメトリクスがCloudWatchに送信されます。これらのメトリクスは、AWS Glueコンソール、CloudWatchコンソールダッシュボード、またはAWS CLIで表示できます。

検証用のETLジョブの作成

以下の条件で検証用のETLジョブを作成しました。

  • 2つの約560MBのCSV形式のS3データファイルに入力
  • 入力は事前にGlue データカタログにテーブルとして登録
  • JSON形式のS3データファイルに出力
  • 出力は日付ごとにカラム名ありパーティション分割(意図的にシャッフル)
  • DPUは10(デフォルト)
  • Job profilingはEnable(デフォルトDisable)に変更

特にJob profilingはデフォルトDisableなのでEnableに変更することをお忘れないようご注意ください。ジョブ作成時のAdvanced Properties やScript liblaries and job parameters(optional)は以下のとおりです。

ETLジョブのソースコードは以下のとおりです。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "lineorder_1998", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "lineorder_1998", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("lo_orderkey", "int", "lo_orderkey", "int"), ("lo_linenumber", "int", "lo_linenumber", "int"), ("lo_custkey", "int", "lo_custkey", "int"), ("lo_partkey", "int", "lo_partkey", "int"), ("lo_suppkey", "int", "lo_suppkey", "int"), ("lo_orderdate", "string", "lo_orderdate", "string"), ("lo_orderpriority", "string", "lo_orderpriority", "string"), ("lo_shippriority", "string", "lo_shippriority", "string"), ("lo_quantity", "int", "lo_quantity", "int"), ("lo_extendedprice", "int", "lo_extendedprice", "int"), ("lo_ordertotalprice", "int", "lo_ordertotalprice", "int"), ("lo_discount", "int", "lo_discount", "int"), ("lo_revenue", "int", "lo_revenue", "int"), ("lo_supplycost", "int", "lo_supplycost", "int"), ("lo_tax", "int", "lo_tax", "int"), ("lo_commitdate", "int", "lo_commitdate", "int"), ("lo_shipmode", "string", "lo_shipmode", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("lo_orderkey", "int", "lo_orderkey", "int"), ("lo_linenumber", "int", "lo_linenumber", "int"), ("lo_custkey", "int", "lo_custkey", "int"), ("lo_partkey", "int", "lo_partkey", "int"), ("lo_suppkey", "int", "lo_suppkey", "int"), ("lo_orderdate", "string", "lo_orderdate", "string"), ("lo_orderpriority", "string", "lo_orderpriority", "string"), ("lo_shippriority", "string", "lo_shippriority", "string"), ("lo_quantity", "int", "lo_quantity", "int"), ("lo_extendedprice", "int", "lo_extendedprice", "int"), ("lo_ordertotalprice", "int", "lo_ordertotalprice", "int"), ("lo_discount", "int", "lo_discount", "int"), ("lo_revenue", "int", "lo_revenue", "int"), ("lo_supplycost", "int", "lo_supplycost", "int"), ("lo_tax", "int", "lo_tax", "int"), ("lo_commitdate", "int", "lo_commitdate", "int"), ("lo_shipmode", "string", "lo_shipmode", "string")], transformation_ctx = "applymapping1")

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://cm-bucket/tmp", "compression": "gzip"}, format = "json", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://cm-ishikawa-satoru/tmp", "partitionKeys": ["lo_orderdate"], "compression": "gzip"}, format = "json", transformation_ctx = "datasink2")
job.commit()

ETLジョブのメトリクスの参照

AWS Glueコンソールのメトリクスの参照

AWS Glueコンソールは、ジョブを実行すると5分ごとにメトリクスがグラフに反映されます。参照するには、ETLジョブの一覧から参照したいジョブ名(Name)を選択した後、下の「メトリクス」タブを選択します。ETL Data MovementMemory Profile: Driver and Executorsの2つのグラフが参照できます。

さらに詳細メトリクスを表示するには、[View Additional metrics]ボタンを押します。すると、Detailed job metricsとしてさらに Data Shuffle Across ExecutorsCPU Load: Driver and ExecutorsJob Execution: Active Executors, Completed Stages & Maximum Needed Executorsのグラフが参照できます。このグラフは、参照したい時間帯のグラフ部分を対角線上に矩形選択することで拡大することができます。

サンプルのETLジョブでは、2つの入力ファイルに対して、17のExecutorが起動され、リソースが有効に利用されていないことを容易に把握することができます。

CloudWatch Metric Dashboardsのメトリクスの参照

AWS Glueは30秒ごとにMetricをCloudWatchに保存します。AWS Glueコンソールのメトリクスの参照では5分毎の表示となりますが、CloudWatch Metric Dashboardsでは、30秒ごとにMetricを参照できます。上記のグラフの右上のから「メトリクスを表示」を選択すると、CloudWatch Metric Dashboardsに遷移します。

ETL Data Movementというグラフが、glue.ALL.s3.filesystem.read_bytesglue.ALL.s3.filesystem.write_bytesの2つの情報であることも確認できます。

CloudWatch Metric Dashboardsのメトリクスの監視

CloudWatch Metric Dashboardsで参照したメトリクスに対して、条件やしきい値を設定することでメトリクスを監視できます。

AWS Glue のメトリクス

AWS Glueプロファイルを使用して30秒ごとにCloudWatchに測定値を送信し、AWS Glue Metrics Dashboardが1分に1回報告します。 メトリクスの詳細については、AWS Glue Metricsを御覧ください。

  • glue.driver.aggregate.bytesRead
  • glue.driver.aggregate.elapsedTime
  • glue.driver.aggregate.numCompletedStages
  • glue.driver.aggregate.numCompletedTasks
  • glue.driver.aggregate.numFailedTasks
  • glue.driver.aggregate.numKilledTasks
  • glue.driver.aggregate.recordsRead
  • glue.driver.aggregate.shuffleBytesWritten
  • glue.driver.aggregate.shuffleLocalBytesRead
  • glue.driver.BlockManager.disk.diskSpaceUsed_MB
  • glue.driver.ExecutorAllocationManager.executors.numberAllExecutors
  • glue.driver.ExecutorAllocationManager.executors.numberMaxNeededExecutors
  • glue.driver.jvm.heap.usage
  • glue.executorId.jvm.heap.usage
  • glue.ALL.jvm.heap.usage
  • glue.driver.jvm.heap.used
  • glue.executorId.jvm.heap.used
  • glue.ALL.jvm.heap.used
  • glue.driver.s3.filesystem.read_bytes
  • glue.executorId.s3.filesystem.read_bytes
  • glue.ALL.s3.filesystem.read_bytes
  • glue.driver.s3.filesystem.write_bytes
  • glue.executorId.s3.filesystem.write_bytes
  • glue.ALL.s3.filesystem.write_bytes
  • glue.driver.system.cpuSystemLoad
  • glue.executorId.system.cpuSystemLoad
  • glue.ALL.system.cpuSystemLoad

最後に

これまで、ETLジョブのパフォーマンスの低下、メモリ不足が発生して処理が中断するといった問題の調査はCloudWatchLogsのログの参照のみでした。従来は、特定のExecutorがホットな状態なのか、そしてボトルネックの要因(CPU、メモリ、IO)が何かを把握するのはほぼ不可能でした。

新たに追加された AWS Glue のメトリクスは、読み​​書きされたバイト数、DriverとExecutorのメモリー使用量とCPU負荷、Executor間のデータのシャッフルなどのランタイム・メトリクスを容易に追跡できるようになりました。これらのメトリクスは、コードのデバッグ、データの問題の特定、CPUキャパシティの計画に役立ちます。もちろん、特定のジョブ条件でAmazon CloudWatchでアラームを設定することもできます。今日からこのメトリクスを有効(Enable)にして、クラスタのアクティビティを計測・監視にお役立てください。

Don’t guess, measure!(推測するな、計測せよ!)