この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、muroです。Step Functionsでは、ステートマシンから別のステートマシンを呼び出すことができます。
今回は、ステートマシンからタスクトークンのコールバックまで待機する
サービス統合パターンを使って別のステートマシンの完了を待つフローを試してみました。
サービス統合パターン
Step Functionsには、ステートマシンから別のステートマシンを呼び出す際の統合パターンが3パターン用意されています。サービス統合パターン - AWS Step Functions
- レスポンスのリクエスト
- ジョブの実行 (.sync)
- タスクトークンのコールバックまで待機する
ステートマシンから別のステートマシンを呼び出して完了を待つには、2. ジョブの実行 (.sync)
パターンまたは3. タスクトークンのコールバックまで待機する
パターンを利用します。今回は3. タスクトークンのコールバックまで待機する
パターンを試します。
タスクトークンのコールバックまで待機するパターン
CDKのリファレンス class StepFunctionsStartExecution (construct) · AWS CDK のExampleを参考にParentStateMachine
とChildStateMachine
の2つのステートマシンを作成してみます。ParentStateMachine
はWAIT_FOR_TASK_TOKEN
統合パターンでChildStateMachine
を実行し、タスクトークンのコールバックまで待機します。ChildStateMachine
はPass
ステートのみで何も処理をしません。
import { Stack, StackProps } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
export class CdkStepfunctionsExampleStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
// Define a state machine with one Pass state
const child = new sfn.StateMachine(this, 'ChildStateMachine', {
definition: sfn.Chain.start(new sfn.Pass(this, 'PassState')),
});
// Include the state machine in a Task state with callback pattern
const task = new tasks.StepFunctionsStartExecution(this, 'ChildTask', {
stateMachine: child,
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
input: sfn.TaskInput.fromObject({
token: sfn.JsonPath.taskToken,
foo: 'bar',
}),
});
// Define a second state machine with the Task state above
new sfn.StateMachine(this, 'ParentStateMachine', {
definition: task,
});
}
}
これをデプロイすると2つのステートマシンが作成されます。
- ParentStateMachine
-
ChildStateMachine
ParentStateMachineを実行してみると、ChildStateMachineが呼ばれて完了しますが、ParentStateMachineの方は完了しません。
- ParentStateMachine
-
ChildStateMachine
これでParentStateMachine
がタスクトークンのコールバックを待機していることがわかりました。
SendTaskSuccess APIとSendTaskFailure API
ParentStateMachine
がコールバックを受けるには、ChildStateMachine
からSendTaskSuccess
API、またはSendTaskFailure
APIを呼び出します。
- SendTaskSuccess
呼び出し元から渡されたトークンを指定して、タスクが成功したことを呼び出し元に返します。
-
呼び出し元から渡されたトークンを指定して、タスクが失敗したことを呼び出し元に返します。
それでは、ChildStateMachine
にLambda関数を実行するタスクを追加して、SendTaskSuccess
またはSendTaskFailure
を呼び出してParentStateMachine
がコールバックを受けるように変更していきます。
Lambda関数の作成
SendTaskSuccess
またはSendTaskFailure
を呼び出すLambda関数を作成します。ここでは引数のexpected
がsuccess
ならばSendTaskSuccess
を、それ以外はSendTaskFailure
を呼び出すようにします。
import json
import boto3
sfn = boto3.client('stepfunctions')
def handler(event, context):
token = event.get("token")
expected = event.get("expected")
if expected == "success":
sfn.send_task_success(
taskToken=token,
output=json.dumps({
"result": expected
})
)
else:
sfn.send_task_failure(
taskToken=token,
error=json.dumps({
"result": expected
})
)
CDKスタックの変更
Lambda関数を作成します。
const sendTaskResultFn = new lambda.Function(this, 'sendTaskResultFn', {
runtime: lambda.Runtime.PYTHON_3_8,
code: lambda.Code.fromAsset('src/lambda'),
handler: 'send_result.handler',
initialPolicy: [
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'states:SendTaskSuccess',
'states:SendTaskFailure',
],
resources: ['*']
})
],
});
ChildStateMachine
でそのLambda関数を実行するようにタスクを変更します。
const child = new sfn.StateMachine(this, 'ChildStateMachine', {
definition: sfn.Chain.start(
new tasks.LambdaInvoke(this, 'Send task result.', {
lambdaFunction: sendTaskResultFn
})
),
});
ParentStateMachine
からChildStateMachine
に渡すパラメータを変更します。
const task = new tasks.StepFunctionsStartExecution(this, 'ChildTask', {
stateMachine: child,
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
input: sfn.TaskInput.fromObject({
token: sfn.JsonPath.taskToken,
expected: sfn.JsonPath.stringAt('$.expected'),
}),
});
成功パターン
それでは動かしてみます。成功パターンでは、以下の入力で実行を開始します。
{
"expected": "success"
}
しばらくするとChildTaskが成功して完了しました。
ステップ出力
を見ると以下のメッセージが返されていました。
{
"result": "success"
}
失敗パターン
続いて失敗パターンを実行します。以下の入力で実行を開始します。
{
"expected": "failure"
}
しばらくするとChildTaskが失敗して完了しました。
ステップ出力
には何も出力されず、例外
として以下のメッセージが返されていました。
{"result": "failure"}
まとめ
WAIT_FOR_TASK_TOKEN
統合パターンとSendTaskSuccess
API/SendTaskFailure
APIを利用して、ネストされたステートマシンの完了を待つフローを実現できました。子ステートマシンの成功・失敗のハンドリングだけでなく、成功時の子ステートマシンからの出力によって、後続処理を分岐させることもできるため、柔軟にフローを組み立てられると思います。なお、今回は触れていませんが、本稼働用のステートマシンでは、何らかの理由でコールバックが受けられないケースを想定して、タイムアウト設定を入れておきたいです。