Step Functions + Batchでシンプルなステートマシンを作ってみた

前回Step Functionsの開発環境を整えたのでBatchを使ったシンプルなステートマシンを構築してみました。
2021.05.18

こんにちは。AWS事業本部のKyoです。

前回VSCodeにStep Functionsの開発環境を整えたので、Batchと連携させて科学計算をイメージしつつ、できるだけシンプルなステートマシンを構築してみます。

Step Functions開発の「うーむ...」にVSCodeとAWS ToolKitで挑んでみる話

やりたいこと

直列での計算を行うステートマシンを構築します。

下図のFirstCalcとSecondCalcはそれぞれ 「increment」というBatchジョブを呼び出します。FirstCalcの出力ファイルをSecondCalcの入力ファイルとします。

incrementは以下の処理を行います。

  • 入力ファイルをS3からダウンロード
  • 入力ファイルに含まれる数値に1を足す
  • 計算結果をファイルに出力し、S3にアップロード

入力と出力ファイルの場所(S3のURI)は環境変数で決められるようになっているので、これを利用してFirstCalcの出力ファイルをSecondCalcの入力ファイルとします。

以下にコードを示します。

Dockerfile

FROM amazonlinux:2

# Install AWS CLI v2
RUN yum install -y unzip less && \
    curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
    unzip awscliv2.zip && \
    ./aws/install

WORKDIR /usr/local/bin
ADD increment.sh increment.sh

CMD [ "sh", "increment.sh" ]

increment.sh

INPUT_FILE_NAME=$(basename $INPUT_S3_URI)
OUTPUT_FILE_NAME=$(basename $OUTPUT_S3_URI)

echo "START: $STEP_NAME"

# S3から入力ファイルをダウンロード
aws s3 cp $INPUT_S3_URI $INPUT_FILE_NAME  || exit 1

# 入力ファイル内の数値を表示
input_num=`cat $INPUT_FILE_NAME`
echo "input: $input_num"

# 計算
output_num=$(( input_num + 1))
echo "output: $output_num"

# 計算結果を出力ファイルへ
echo $output_num >$OUTPUT_FILE_NAME

# S3へファイルをアップロード
aws s3 cp $OUTPUT_FILE_NAME $OUTPUT_S3_URI || exit 1

echo "COMPLETE: $STEP_NAME"

環境

今回はStep Functionsが話の中心なのでBatchの環境については簡単に説明します。

  • Fargate起動タイプを利用
  • 前述のincrementを実行するジョブ定義
    • イメージレジストリとしてはECRを利用
    • S3FullAccessを含むジョブロール
    • 1vCPU, 2048メモリ

また、入力ファイルとして「1」と記入しただけのテキストファイルをアップロードしてあります。

なお今回のステートマシンはBatchとStep Functionsでゲノミクスのワークフローに入門してみたを参考にしています。

やってみる

入力JSON

ステートマシンの入力となるJSONを決定しました。これはステートマシンの最初のステート(今回ではFirstCalc)の入力になります。これを利用することで、ステートマシンと処理を疎結合にすることができます。今回はBatchを利用するため、キュー、ジョブ定義、 環境変数を定義しました。なお値はダミーです。

{
  "params": {
    "queue": "arn:aws:batch:ap-northeast-1:123456789012:job-queue/fargate-batch-queu",
		"jobdefs": {
	    "increment": "arn:aws:batch:ap-northeast-1:123456789012:job-definition/increment-job-def:1"
		}
    "environment": {
      "INPUT_S3_URI": "s3://sfs-batch-bucket-123456789012/input.txt",
      "FIRST_OUTPUT_S3_URI": "s3://sfs-batch-bucket-123456789012/first_result.txt"
      "SECOND_OUTPUT_S3_URI": "s3://sfs-batch-bucket-123456789012/second_result.txt"
    }
  }
}

ステートマシン

以下のようになりました(yamlで書けるって素晴らしいですね)。

Comment: >-
  First state machine.
StartAt: FirstCalc
States:
    FirstCalc:
        Type: Task
        Resource: 'arn:aws:states:::batch:submitJob.sync'
        ResultPath: null
        Parameters:
            JobName.$: $$.State.Name
            JobDefinition.$: $.params.jobdefs.increment
            JobQueue.$: $.params.queue
            ContainerOverrides:
                Environment:
                    -   Name: STEP_NAME
                        Value.$: $$.State.Name
                    -   Name: INPUT_S3_URI
                        Value.$: $.params.environment.INPUT_S3_URI
                    -   Name: OUTPUT_S3_URI
                        Value.$: $.params.environment.FIRST_OUTPUT_S3_URI
                Command:
                    - 'sh'
                    - 'increment.sh'
        Next: SecondCalc
    SecondCalc:
        Type: Task
        Resource: arn:aws:states:::batch:submitJob.sync
        ResultPath: null
        Parameters:
            JobName.$: $$.State.Name
            JobDefinition.$: $.params.jobdefs.increment
            JobQueue.$: $.params.queue
            ContainerOverrides:
                Environment:
                    -   Name: STEP_NAME
                        Value.$: $$.State.Name
                    -   Name: INPUT_S3_URI
                        Value.$: $.params.environment.FIRST_OUTPUT_S3_URI
                    -   Name: OUTPUT_S3_URI
                        Value.$: $.params.environment.SECOND_OUTPUT_S3_URI
                Command:
                    - 'sh'
                    - 'increment.sh'
        End: true

ステート概要

ステートであるFirstCalc, SecondCalcではBatchジョブのincrementを呼び出しを行います。

Resource: 'arn:aws:states:::batch:submitJob.sync'でBatchジョブであることを示し、Parametersでジョブ登録を行う際のパラメータを指定しています。

コンテキストオブジェクト

JobName.$: $$.State.Name等では$$という見慣れない表記があります。これはコンテキストオブジェクトから値を取得しています。

コンテキストオブジェクトは、実行中に使用できる内部の JSON 構造です。また、状態マシンと実行に関する情報が含まれます。

コンテキストオブジェクトはざっくりいうとメタデータのようなものです。今回は以下の形でステート名(FirstCalc or SecondCalc)を取得し、Batchジョブの環境変数に渡しています。これによって、YAML上の表記は同じにしつつ、Batchジョブがどのステートとして実行されているか判断できるようにしています(ジョブ名およびログ用の環境変数で利用)。

入力値の取得

JobQueue.$: $.params.queueという表記もあります。これはステートへの入力(FirstCalcにおいては入力JSON)からparamsフィールド内のqueueフィールドを取得しています。

同じジョブを呼び出しているのでFirstCalc, SecondCalcの記述はあまり変わりません。違いとしては、SecondCalcの 「Batchジョブ」環境変数にFirstCalcと同じ「Step Functions」環境変数を先程の方法として指定しています。これによってFirstCalcの出力ファイルをSecondCalcの入力ファイルとしています。

# FirstCalcより抜粋
-   Name: OUTPUT_S3_URI
    Value.$: $.params.environment.FIRST_OUTPUT_S3_URI

# SecondCalcより抜粋
-   Name: INPUT_S3_URI
    Value.$: $.params.environment.FIRST_OUTPUT_S3_URI

軽くハマったところ

軽くハマったところとしてはResultPathの指定がありました。FirstCalcではステートへの入力として入力JSONが渡されますが、ResultPathを指定しない場合、SecondCalcではステートへの入力は、FirstCalcの結果(ここではBatchの処理結果)になります。こうなると、入力JSONにはあった$.params.queueは消えてしまうので、SecondCalcはキャンセルされてしまいます。

対策として、FirstCalcの中でResultPath: null を指定するとFirstCalcへの入力をそのまま、SecondCalcの入力にすることができます(FirstCalcの結果をnullにして、元の入力を維持しています)。また、ResultPath: $.Containerと書けば、FirstCalcへの入力に加えて、FirstCalcの結果からContainerというフィールドを埋め込むことができます。今回は後続の処理で利用しないのでnullを指定しています。

このあたりの話は以下のブログがわかりやすいと思います。

動かしてみる

VSCodeから実行します。個人的に実行はVSCode、結果の確認はマネジメントコンソールが便利そうだと思っています。

詳しい操作は次のブログをご覧ください。

Step Functionsの開発がVSCodeで完結!?AWS ToolkitでStep Functionsがサポートされました

起動

AWS ToolkitのExplorerで対象のステートマシンを右クリック→ Start Executionで以下の画面がでます。先程決めた入力JSONの値を貼り付けて、Executeを押します(複数回クリックするとその分だけ実行されるので注意が必要です。デバッグには便利でした)。

結果

マネジメントコンソールから確認できます。各ステートがすべて成功になっていることが確認できました。グラフインスペクターや実行イベント履歴から各ステートへの入出力を追えるのがいいですね。

SECOND_OUTPUT_S3_URIで指定したパスを見てみると結果のファイルがアップロードされていました。

ダウンロードして中身を確認すると「3」でした。入力に使った値が「1」でincrementつまり「+1」をFirstCalc, SecondCalcの計2回行ったので正しい結果といえます。計算の中間産物であるFIRST_OUTPUT_S3_URIも同様に確認できました。

Batchの方でも、ジョブが成功していました。

おわりに

今回はStep Functions でBatchを使ったステートマシンを組んでみました。

VSCodeでの開発者体験は良く、エラーの指摘やプレビューの表示はとても助かりました。また、デプロイと実行が簡単で、デバッグも行いやすかったです。何よりYAMLで書けるというのが良かった。

今回行った処理は簡単な足し算でしたが、入力ファイルをダウンロード、計算、出力ファイルをアップロード、そして次の処理に渡すという流れは多くのステートマシンで共通する部分であると考えています。まだまだ使っていない機能もあるので並列での計算やループ、エラーハンドリングの実装も行ってみたいなと考えています。

以上、なにかのお役に立てれば幸いです。

合わせて読みたい

Step Functionsを利用するにあたり、以下の3つのエントリが役立ちました。特に3本目のエントリで解説されているステート間のパラメータ受け渡しは重要だと思います。リファレンス的にもおすすめです!