[レポート] テラバイト単位のMLワークロード用データをAWS Step Functionsで準備するセッションに参加してきました #AWSreInvent #API312

2023.12.06

福岡オフィスのyoshihitohです。

AWS Step Functionsを活用して大量のデータを処理する構成についてのコードトークに参加してきました。例によってEMR/EMR ServerlessやGlue Jobとどうやって使い分けるんだろう?と気になっていましたが、こういう選択肢もあるんだなと学びになりました。

以降でセッションの内容についてレポートします。

セッション概要

タイトル

API312 | Prepare terabytes of data for ML workloads with AWS Step Functions

概要

Is your data ready to be used for machine learning workloads? Is it ready to tune generative AI use cases? In this code talk, learn to build an AWS Step Functions workflow that easily scales from running one task at a time to thousands in parallel. Explore how a single data preparation routine, written to process one set of data, scales using Step Functions distributed map to reduce the time and cost of data preparation. Learn how to use Step Functions distributed map in your organization to manage a variety of data processing tasks.

以降は意訳です。

あなたのデータを機械学習のワークロードで活用するための準備はできていますか?生成AI向けのチューニングについてはどうですか?このコードトークセッションでは、単一のタスクから数千におよびタスクを並列できるようにスケールするAWS Step Functionsの構築方法を学びます。単一データの処理から初めて、Step Functionsの分散マップを使用してスケールアウトし、データの準備にかかる時間とコストを削減する方法を探索していきます。あなたの組織で様々なデータ処理タスクを管理するためにStep Functionsの分散マップ (Distributed Map)をどのように活用するのか学びましょう。

スピーカー

  • Christian Mueller
  • Cory McBain

セッション内容

セッションでは大量のデータをAWS Step Functionsでどのように効果的に処理するかというテーマについて、以下の内容でLambda FunctionのコードとSteap Functionsのステートマシンを変更しながら動作確認しました。

  1. 対象のファイルをシーケンシャルに処理する
  2. 対象のファイルを分散・並列処理する

それぞれで実行時間を測定し、どれくらい差がでるかを確認していきました。セッションで利用したソースコードについてはGitHubで公開されていました。AWSアカウントを用意すれば比較的簡単に試せるようになっているようです。

ETLやデータの移行など様々な用途で大量のデータコピーが必要になるケースがあると思います。このようなシーンで活用できる選択肢を増やすためにも、ぜひともリポジトリの内容を実際に動かしてみましょう!

ある程度多くのデータを転送・保存します。実際に動かす場合はリージョン指定の誤りやデータの消し忘れに注意してください。

以降はセッション中に紹介されていた内容を参考に動作確認していきます。

前提条件

データセットについて

セッション中の動作確認では NOAA Global Surface Summary of Day のデータセットを利用していました。このデータセットはAWSの公開S3バケットに配置されているため事前にデータを用意する必要はありません。

2023年12月時点ではバケット所有者が支払う設定になっているためデータ転送についての利用費は気にする必要はありません。ただし以降の操作で自アカウントのS3バケットにデータを保存するため、削除し忘れないように注意してください。

ソースコードについて

READMEに記載されているとおり、AWS SAM CLIを使ってビルド・デプロイします。インストール方法についてはAWSの公式ドキュメントを参照してください。私はvenv環境を作ってから導入しました。

python -m venv .venv
source .venv/bin/activate
pip install aws-sam-cli

ディレクトリ構成は以下のようになっています。デプロイ対象のソースコード・ディレクトリは functions/statemachine/ の直下に配置し、バージョンごとの実装を v1v6 に配置しています。9faff8d221 のコミット時点だとデプロイ対象はv1になっていました。

./functions/
├── pass_through
├── prepare_dataset
├── v1
├── v2
├── v3
├── v4
├── v5
└── v6
./statemachine/
├── sfn-template.asl.json
├── v1
├── v2
├── v3
├── v4
├── v5
└── v6

各バージョンの実装内容について

初期バージョン(v1)では対象のファイルをシーケンシャルに処理します。セッションではこれをベースに変更しながら処理速度をあげていきました。以降にバージョンごとの実装内容と、直前のバージョンとの差分について概要をまとめました。

v1 (初期バージョン)

こんな感じの処理シーケンスです。1個ずつ確認していきます。

# ステート 概要
1 Source Data 入力ファイルを列挙する
2 Prepare Sourced 入力ファイルからデータセットを用意する
2.1 Read File 入力ファイルの内容を読み込む (getObject)
2.2 Pre Process 前処理する (このデモでは何もしない)
2.3 Prepare Dataset データセットを作成する (華氏→摂氏など)
2.4 Stage Dataset データセットを出力する (putObject)
2.5 Publish Event SNSに完了イベントを通知する

v1は処理内容ごとにタスクを細かく分割する方針で実装されているようです。

v2

Lambda Functionについては変更なしです。1回の実行で1ファイルを処理します。

Step Functionsのステートマシーンを変更しています。を以下の内容で変更しています。

  • MaxConcurrency(同時実行数の最大値) を500に変更
  • 対象オブジェクト一覧取得を独立したタスクからマップタスクの ItemReader に移動
  • インラインモードから分散モードに変更
  • エクスプレスワークフローに変更

分散モードに変更することで複数のファイルを同時に処理できるようになります。インラインモードと分散モードの違いについてはこちらのドキュメントを参照してください。

また、ステートマシンで扱うワークフローは標準ワークフローとExpressワークフローと種類が別れているようです。例えば承認フローを伴うような長期間実行するワークフローは標準ワークフローを、IoTデータの取り込みなど短期間で大容量のデータを扱う場合はExpressワークフローを、と使い分けるようです。詳細については以下のドキュメントを参照してください。

v3

Lambda Functionの prepare_dataset について以下の内容で変更しています。

  • 処理対象が単一ファイルから複数ファイルに変更
  • 複数のファイルを単一ファイルに集約して出力 (putObject)するように変更

Step Functionsのステートマシーンは上記の変更に伴い不要になったタスクを削除しています。

  • Read File : ファイル読み込み (getObject)
  • Stage Dataset : ファイル出力 (putObject)

ファイルを1個ずつ出力するとS3へのリクエスト回数増加によるオーバーヘッドで全体的なパフォーマンス低下に繋ります。これを軽減するための変更だと思います。

こんな感じのシーケンスになりました。Lambda Functionに処理を寄せたこともありだいぶすっきりしましたね。

v4

当日のセッションは時間の都合でこのあたりから取り扱えなかった範囲だと記憶しています。リポジトリの内容と参考資料からわかる範囲で読み解いていきます。

Lambda Functionについては変更なしです。

Step Functionsのステートマシーンを以下の内容で変更しています。

  • 同時実行数を500 → 1,000に変更
  • バッチ処理件数を 20 → 100に変更
  • 対象オブジェクト一覧取得を listObjectV2 から S3インベントリのマニフェスト利用に変更

InputType: MANIFEST についてはセッション中に共有されていた資料で補足されていました。

listObjectsV2はオブジェクトの一覧を高速に取得でき、パフォーマンスで問題が生じることはほとんどありません。ただし、1回あたりのAPI呼び出しでは1,000件分しか取得できないため、仮に20万件のオブジェクトを取得する場合はAPI呼び出しが200回必要になるためオーバーヘッドが大きくなります。これを軽減するために、S3のインベントリマニフェストを利用して高速化を図る変更だと思います。

v5

Lambda Functionは pass_through を20回起動するごとにエラー停止するように変更されています。こちらはセッション中に触れられておらずはっきりした意図は掴めておりませんが、おそらく途中でエラーが発生した場合の再実行の検証目的かなと推測しています。

Step Functionsのステートマシンは対象オブジェクト一覧取得をlistObjectV2に戻しています。

v6

Lambda Functionについては変更なしです。

Step Functionsのステートマシーンを以下の内容で変更しています。

  • マップタスクの実行で許容するエラーの割合(ToleratedFailurePercentage)を5%に変更

こちらもマップタスク中にエラーが発生した場合の対処について確認するための変更のようです。

動作確認

ここまでで実装内容を確認できました。実行性能に関連する変更はv1〜v4までです。NOAA Global Surface Summary of DayのデータセットはS3インベントリのマニフェストファイルを出力していないようなので、v1とv3で比較してみます。

環境構築

AWS SAM CLIでスタックをデプロイします。READMEはus-east-1前提になっていますが、ap-northeast-1で動かしてみます。

sam build \
  && sam deploy \
    --region ap-northeast-1 \
    --stack-name reinvent2023-api312 \
    --capabilities CAPABILITY_IAM \
    --resolve-s3 \
    --no-confirm-changeset

スタック作成が成功したら、NOAAのデータをS3バケットに配置します。出力先のS3バケット名はCloudFormationスタックのOutputに設定されているので、以下のコマンドで確認してください。

aws cloudformation describe-stacks \
    --stack-name reinvent2023-api312 \
    --query "Stacks[0].Outputs[?OutputKey=='MarketDataSourceS3BucketName'].OutputValue" \
    --output text

NOAAデータセットの一部をコピーします。コピー先のバケット名は $SouceBucket で表記します。動かす場合は前述のコマンドで確認したバケット名に置き換えてください 。あまり大きいデータを利用するとコピーおよび動作確認に時間がかかるので今回は 1939年 のデータを利用します。

aws s3 cp --recursive "s3://noaa-gsod-pds/1939/" "s3://$SourceBucket/1939/"

ここまでで準備完了です。1939年のデータの規模感は以下の通りです。

  • データサイズ: 14.0MB
  • ファイル数: 275

v1を動かす

v1は初期実装のシーケンシャルな処理なのでそこそこ時間がかかる見込みです。

環境構築直後はv1の内容でデプロイされているため再デプロイは不要です。以下のペイロードを指定してステートマシンを実行します。

{
  "Prefix": "1939/"
}

無事動きました。実行時間は1分10秒〜1分20秒程度でした。

試行回数 実行時間(分:秒)
1回目 01:13
2回目 01:08
3回目 01:20
4回目 01:16
5回目 01:23

v3を動かす

v2・v3で分散実行・prepare_datasetの実装最適化が入っているため大幅な高速化が見込めそうです。

まずv3のソースをデプロイします。以下のコマンドを実行してソースコードを更新します。

cp functions/v3/pass_through/*      functions/pass_through/
cp functions/v3/prepare_dataset/*   functions/prepare_dataset/
cp statemachine/v3/*                statemachine/

ビルドしてデプロイします。

sam build \
  && sam deploy \
    --region ap-northeast-1 \
    --stack-name reinvent2023-api312 \
    --capabilities CAPABILITY_IAM \
    --resolve-s3 \
    --no-confirm-changeset

v1と同じペイロードを指定してステートマシンを実行します。

{
  "Prefix": "1939/"
}

無事動きました。実行時間は5秒〜10秒程度でした、だいぶ高速化しましたね!1回目はコールドスタート、2回目以降はウォームスタートとなっているため差がでていますが、概ね安定した時間で処理できていそうです。

試行回数 実行時間(分:秒)
1回目 00:13
2回目 00:04
3回目 00:04
4回目 00:04
5回目 00:08

おまけ

1939年のデータだと規模が小さくv3実装の性能を活かしきれていなさそうだったので、もう少し大きめの2020年のデータを試してみました。以下、データの規模感です。カッコ内は1939年データ比です。

  • データサイズ: 911.6MB (65倍)
  • ファイル数: 12,299 (44倍)
試行回数 実行時間(分:秒)
1回目 00:47
2回目 01:22
3回目 00:43
4回目 01:28
5回目 00:17

実行時間にばらつきがでるようになりましたが、データの増加具合の割には短い時間で処理が完了していますね。いい感じに高速化できていそうです。

おわりに

今までこの手の前処理を行う場合はAthena・Spark on Glue/EMR といった構成で対処することが多かったのですが、Step Functionsをうまく活用すると同等のことを行えることを学べました。AthenaやSparkを使い慣れていない場合や既存の資産(コード)を活用して実現したいといった要件がある場合は積極的にStep Functionsを活用してみたいと思います!