[AWS Step Functions] 「タスクトークンのコールバックまで待機する」サービス統合パターンを試してみた
こんにちは、muroです。Step Functionsでは、ステートマシンから別のステートマシンを呼び出すことができます。
今回は、ステートマシンからタスクトークンのコールバックまで待機する
サービス統合パターンを使って別のステートマシンの完了を待つフローを試してみました。
サービス統合パターン
Step Functionsには、ステートマシンから別のステートマシンを呼び出す際の統合パターンが3パターン用意されています。サービス統合パターン - AWS Step Functions
- レスポンスのリクエスト
- ジョブの実行 (.sync)
- タスクトークンのコールバックまで待機する
ステートマシンから別のステートマシンを呼び出して完了を待つには、2. ジョブの実行 (.sync)
パターンまたは3. タスクトークンのコールバックまで待機する
パターンを利用します。今回は3. タスクトークンのコールバックまで待機する
パターンを試します。
タスクトークンのコールバックまで待機するパターン
CDKのリファレンス class StepFunctionsStartExecution (construct) · AWS CDK のExampleを参考にParentStateMachine
とChildStateMachine
の2つのステートマシンを作成してみます。ParentStateMachine
はWAIT_FOR_TASK_TOKEN
統合パターンでChildStateMachine
を実行し、タスクトークンのコールバックまで待機します。ChildStateMachine
はPass
ステートのみで何も処理をしません。
import { Stack, StackProps } from 'aws-cdk-lib'; import { Construct } from 'constructs'; import * as sfn from 'aws-cdk-lib/aws-stepfunctions'; import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks'; export class CdkStepfunctionsExampleStack extends Stack { constructor(scope: Construct, id: string, props?: StackProps) { super(scope, id, props); // Define a state machine with one Pass state const child = new sfn.StateMachine(this, 'ChildStateMachine', { definition: sfn.Chain.start(new sfn.Pass(this, 'PassState')), }); // Include the state machine in a Task state with callback pattern const task = new tasks.StepFunctionsStartExecution(this, 'ChildTask', { stateMachine: child, integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN, input: sfn.TaskInput.fromObject({ token: sfn.JsonPath.taskToken, foo: 'bar', }), }); // Define a second state machine with the Task state above new sfn.StateMachine(this, 'ParentStateMachine', { definition: task, }); } }
これをデプロイすると2つのステートマシンが作成されます。
- ParentStateMachine
-
ChildStateMachine
ParentStateMachineを実行してみると、ChildStateMachineが呼ばれて完了しますが、ParentStateMachineの方は完了しません。
- ParentStateMachine
-
ChildStateMachine
これでParentStateMachine
がタスクトークンのコールバックを待機していることがわかりました。
SendTaskSuccess APIとSendTaskFailure API
ParentStateMachine
がコールバックを受けるには、ChildStateMachine
からSendTaskSuccess
API、またはSendTaskFailure
APIを呼び出します。
- SendTaskSuccess
呼び出し元から渡されたトークンを指定して、タスクが成功したことを呼び出し元に返します。
-
呼び出し元から渡されたトークンを指定して、タスクが失敗したことを呼び出し元に返します。
それでは、ChildStateMachine
にLambda関数を実行するタスクを追加して、SendTaskSuccess
またはSendTaskFailure
を呼び出してParentStateMachine
がコールバックを受けるように変更していきます。
Lambda関数の作成
SendTaskSuccess
またはSendTaskFailure
を呼び出すLambda関数を作成します。ここでは引数のexpected
がsuccess
ならばSendTaskSuccess
を、それ以外はSendTaskFailure
を呼び出すようにします。
import json import boto3 sfn = boto3.client('stepfunctions') def handler(event, context): token = event.get("token") expected = event.get("expected") if expected == "success": sfn.send_task_success( taskToken=token, output=json.dumps({ "result": expected }) ) else: sfn.send_task_failure( taskToken=token, error=json.dumps({ "result": expected }) )
CDKスタックの変更
Lambda関数を作成します。
const sendTaskResultFn = new lambda.Function(this, 'sendTaskResultFn', { runtime: lambda.Runtime.PYTHON_3_8, code: lambda.Code.fromAsset('src/lambda'), handler: 'send_result.handler', initialPolicy: [ new iam.PolicyStatement({ effect: iam.Effect.ALLOW, actions: [ 'states:SendTaskSuccess', 'states:SendTaskFailure', ], resources: ['*'] }) ], });
ChildStateMachine
でそのLambda関数を実行するようにタスクを変更します。
const child = new sfn.StateMachine(this, 'ChildStateMachine', { definition: sfn.Chain.start( new tasks.LambdaInvoke(this, 'Send task result.', { lambdaFunction: sendTaskResultFn }) ), });
ParentStateMachine
からChildStateMachine
に渡すパラメータを変更します。
const task = new tasks.StepFunctionsStartExecution(this, 'ChildTask', { stateMachine: child, integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN, input: sfn.TaskInput.fromObject({ token: sfn.JsonPath.taskToken, expected: sfn.JsonPath.stringAt('$.expected'), }), });
成功パターン
それでは動かしてみます。成功パターンでは、以下の入力で実行を開始します。
{ "expected": "success" }
しばらくするとChildTaskが成功して完了しました。
ステップ出力
を見ると以下のメッセージが返されていました。
{ "result": "success" }
失敗パターン
続いて失敗パターンを実行します。以下の入力で実行を開始します。
{ "expected": "failure" }
しばらくするとChildTaskが失敗して完了しました。
ステップ出力
には何も出力されず、例外
として以下のメッセージが返されていました。
{"result": "failure"}
まとめ
WAIT_FOR_TASK_TOKEN
統合パターンとSendTaskSuccess
API/SendTaskFailure
APIを利用して、ネストされたステートマシンの完了を待つフローを実現できました。子ステートマシンの成功・失敗のハンドリングだけでなく、成功時の子ステートマシンからの出力によって、後続処理を分岐させることもできるため、柔軟にフローを組み立てられると思います。なお、今回は触れていませんが、本稼働用のステートマシンでは、何らかの理由でコールバックが受けられないケースを想定して、タイムアウト設定を入れておきたいです。