Step Functions + Batchでシンプルなステートマシンを作ってみた
こんにちは。AWS事業本部のKyoです。
前回VSCodeにStep Functionsの開発環境を整えたので、Batchと連携させて科学計算をイメージしつつ、できるだけシンプルなステートマシンを構築してみます。
Step Functions開発の「うーむ...」にVSCodeとAWS ToolKitで挑んでみる話
やりたいこと
直列での計算を行うステートマシンを構築します。
下図のFirstCalcとSecondCalcはそれぞれ 「increment」というBatchジョブを呼び出します。FirstCalcの出力ファイルをSecondCalcの入力ファイルとします。
incrementは以下の処理を行います。
- 入力ファイルをS3からダウンロード
- 入力ファイルに含まれる数値に1を足す
- 計算結果をファイルに出力し、S3にアップロード
入力と出力ファイルの場所(S3のURI)は環境変数で決められるようになっているので、これを利用してFirstCalcの出力ファイルをSecondCalcの入力ファイルとします。
以下にコードを示します。
Dockerfile
FROM amazonlinux:2 # Install AWS CLI v2 RUN yum install -y unzip less && \ curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \ unzip awscliv2.zip && \ ./aws/install WORKDIR /usr/local/bin ADD increment.sh increment.sh CMD [ "sh", "increment.sh" ]
increment.sh
INPUT_FILE_NAME=$(basename $INPUT_S3_URI) OUTPUT_FILE_NAME=$(basename $OUTPUT_S3_URI) echo "START: $STEP_NAME" # S3から入力ファイルをダウンロード aws s3 cp $INPUT_S3_URI $INPUT_FILE_NAME || exit 1 # 入力ファイル内の数値を表示 input_num=`cat $INPUT_FILE_NAME` echo "input: $input_num" # 計算 output_num=$(( input_num + 1)) echo "output: $output_num" # 計算結果を出力ファイルへ echo $output_num >$OUTPUT_FILE_NAME # S3へファイルをアップロード aws s3 cp $OUTPUT_FILE_NAME $OUTPUT_S3_URI || exit 1 echo "COMPLETE: $STEP_NAME"
環境
今回はStep Functionsが話の中心なのでBatchの環境については簡単に説明します。
- Fargate起動タイプを利用
- 前述のincrementを実行するジョブ定義
- イメージレジストリとしてはECRを利用
- S3FullAccessを含むジョブロール
- 1vCPU, 2048メモリ
また、入力ファイルとして「1」と記入しただけのテキストファイルをアップロードしてあります。
なお今回のステートマシンはBatchとStep Functionsでゲノミクスのワークフローに入門してみたを参考にしています。
やってみる
入力JSON
ステートマシンの入力となるJSONを決定しました。これはステートマシンの最初のステート(今回ではFirstCalc)の入力になります。これを利用することで、ステートマシンと処理を疎結合にすることができます。今回はBatchを利用するため、キュー、ジョブ定義、 環境変数を定義しました。なお値はダミーです。
{ "params": { "queue": "arn:aws:batch:ap-northeast-1:123456789012:job-queue/fargate-batch-queu", "jobdefs": { "increment": "arn:aws:batch:ap-northeast-1:123456789012:job-definition/increment-job-def:1" } "environment": { "INPUT_S3_URI": "s3://sfs-batch-bucket-123456789012/input.txt", "FIRST_OUTPUT_S3_URI": "s3://sfs-batch-bucket-123456789012/first_result.txt" "SECOND_OUTPUT_S3_URI": "s3://sfs-batch-bucket-123456789012/second_result.txt" } } }
ステートマシン
以下のようになりました(yamlで書けるって素晴らしいですね)。
Comment: >- First state machine. StartAt: FirstCalc States: FirstCalc: Type: Task Resource: 'arn:aws:states:::batch:submitJob.sync' ResultPath: null Parameters: JobName.$: $$.State.Name JobDefinition.$: $.params.jobdefs.increment JobQueue.$: $.params.queue ContainerOverrides: Environment: - Name: STEP_NAME Value.$: $$.State.Name - Name: INPUT_S3_URI Value.$: $.params.environment.INPUT_S3_URI - Name: OUTPUT_S3_URI Value.$: $.params.environment.FIRST_OUTPUT_S3_URI Command: - 'sh' - 'increment.sh' Next: SecondCalc SecondCalc: Type: Task Resource: arn:aws:states:::batch:submitJob.sync ResultPath: null Parameters: JobName.$: $$.State.Name JobDefinition.$: $.params.jobdefs.increment JobQueue.$: $.params.queue ContainerOverrides: Environment: - Name: STEP_NAME Value.$: $$.State.Name - Name: INPUT_S3_URI Value.$: $.params.environment.FIRST_OUTPUT_S3_URI - Name: OUTPUT_S3_URI Value.$: $.params.environment.SECOND_OUTPUT_S3_URI Command: - 'sh' - 'increment.sh' End: true
ステート概要
ステートであるFirstCalc, SecondCalcではBatchジョブのincrementを呼び出しを行います。
Resource: 'arn:aws:states:::batch:submitJob.sync'
でBatchジョブであることを示し、Parameters
でジョブ登録を行う際のパラメータを指定しています。
コンテキストオブジェクト
JobName.$: $$.State.Name
等では$$
という見慣れない表記があります。これはコンテキストオブジェクトから値を取得しています。
コンテキストオブジェクトは、実行中に使用できる内部の JSON 構造です。また、状態マシンと実行に関する情報が含まれます。
コンテキストオブジェクトはざっくりいうとメタデータのようなものです。今回は以下の形でステート名(FirstCalc or SecondCalc)を取得し、Batchジョブの環境変数に渡しています。これによって、YAML上の表記は同じにしつつ、Batchジョブがどのステートとして実行されているか判断できるようにしています(ジョブ名およびログ用の環境変数で利用)。
入力値の取得
JobQueue.$: $.params.queue
という表記もあります。これはステートへの入力(FirstCalcにおいては入力JSON)からparams
フィールド内のqueue
フィールドを取得しています。
同じジョブを呼び出しているのでFirstCalc, SecondCalcの記述はあまり変わりません。違いとしては、SecondCalcの 「Batchジョブ」環境変数にFirstCalcと同じ「Step Functions」環境変数を先程の方法として指定しています。これによってFirstCalcの出力ファイルをSecondCalcの入力ファイルとしています。
# FirstCalcより抜粋 - Name: OUTPUT_S3_URI Value.$: $.params.environment.FIRST_OUTPUT_S3_URI # SecondCalcより抜粋 - Name: INPUT_S3_URI Value.$: $.params.environment.FIRST_OUTPUT_S3_URI
軽くハマったところ
軽くハマったところとしてはResultPath
の指定がありました。FirstCalcではステートへの入力として入力JSONが渡されますが、ResultPath
を指定しない場合、SecondCalcではステートへの入力は、FirstCalcの結果(ここではBatchの処理結果)になります。こうなると、入力JSONにはあった$.params.queue
は消えてしまうので、SecondCalcはキャンセルされてしまいます。
対策として、FirstCalcの中でResultPath: null
を指定するとFirstCalcへの入力をそのまま、SecondCalcの入力にすることができます(FirstCalcの結果をnullにして、元の入力を維持しています)。また、ResultPath: $.Container
と書けば、FirstCalcへの入力に加えて、FirstCalcの結果からContainer
というフィールドを埋め込むことができます。今回は後続の処理で利用しないのでnullを指定しています。
このあたりの話は以下のブログがわかりやすいと思います。
動かしてみる
VSCodeから実行します。個人的に実行はVSCode、結果の確認はマネジメントコンソールが便利そうだと思っています。
詳しい操作は次のブログをご覧ください。
Step Functionsの開発がVSCodeで完結!?AWS ToolkitでStep Functionsがサポートされました
起動
AWS ToolkitのExplorerで対象のステートマシンを右クリック→ Start Executionで以下の画面がでます。先程決めた入力JSONの値を貼り付けて、Executeを押します(複数回クリックするとその分だけ実行されるので注意が必要です。デバッグには便利でした)。
結果
マネジメントコンソールから確認できます。各ステートがすべて成功になっていることが確認できました。グラフインスペクターや実行イベント履歴から各ステートへの入出力を追えるのがいいですね。
SECOND_OUTPUT_S3_URI
で指定したパスを見てみると結果のファイルがアップロードされていました。
ダウンロードして中身を確認すると「3」でした。入力に使った値が「1」でincrementつまり「+1」をFirstCalc, SecondCalcの計2回行ったので正しい結果といえます。計算の中間産物であるFIRST_OUTPUT_S3_URI
も同様に確認できました。
Batchの方でも、ジョブが成功していました。
おわりに
今回はStep Functions でBatchを使ったステートマシンを組んでみました。
VSCodeでの開発者体験は良く、エラーの指摘やプレビューの表示はとても助かりました。また、デプロイと実行が簡単で、デバッグも行いやすかったです。何よりYAMLで書けるというのが良かった。
今回行った処理は簡単な足し算でしたが、入力ファイルをダウンロード、計算、出力ファイルをアップロード、そして次の処理に渡すという流れは多くのステートマシンで共通する部分であると考えています。まだまだ使っていない機能もあるので並列での計算やループ、エラーハンドリングの実装も行ってみたいなと考えています。
以上、なにかのお役に立てれば幸いです。
合わせて読みたい
Step Functionsを利用するにあたり、以下の3つのエントリが役立ちました。特に3本目のエントリで解説されているステート間のパラメータ受け渡しは重要だと思います。リファレンス的にもおすすめです!