AWS CDK v2で同期的に実行する入れ子のステートマシンを作成してみた
データアナリティクス事業本部の鈴木です。
最近、AWS CDK v2で入れ子のステートマシン※を作成したく試行錯誤していました。
※この記事では2つのステートマシンがあり、一方からもう一方を実行するような構成のこととします。
ようやくできたステートマシンを実行したところ、内側のステートマシンの完了を待たずに外側のステートマシンが完了してしまい、同期実行するための実装を調べたので紹介します。
入れ子のステートマシンの同期実行方法
入れ子になっているステートマシンを同期実行する方法は、以下のドキュメントに紹介がありました。
以下の2種が紹介されていました。
- サービスを呼び出し、ジョブが完了するまでStep Functionsが待機するよう実装する。
- コールバックサービス統合パターンにて、タスクトークンでサービスを呼び出し、トークンがペイロードとともに返されるまでStep Functionsが待機する。
1点目は、arn:aws:states:::states:startExecution.sync
およびarn:aws:states:::states:startExecution.sync:2
を使って内側のステートマシンを呼び出す方法です。この2つの違いは、Outputの形式が異なることがドキュメントに記載されています。
2点目は、内側のステートマシン実行時にタスクトークンを渡し、内側のステートマシン終了時にSendTaskSuccessかSendTaskFailureの呼び出しを持って終了を通知する方法です。
サービス統合パターンのドキュメントページに記載の通り、コールバックサービス統合パターンは人間による承認・サードパーティーとの統合・レガシーシステムの呼び出しまで待機するため必要になる場合があり、単純にステートマシンを管理しやすくするために入れ子にしたい場合には、1つ目の方法でよさそうです。
この記事では、AWS CDK v2を使って、まず1つ目の方法の実装例を紹介します。2つ目についても具体的な実装例が意外となかったので、作ってみたものを紹介します。
前提
環境
- AWS CDK:
2.80.0
実装方法
StepFunctionsStartExecution
クラスを使って実装します。
今回の実装のポイントはConstruct Props
のintegrationPattern
です。IntegrationPattern
型の値を取るものですが、IntegrationPattern
型は以下の3つの値をとります。
# | Name | Description |
---|---|---|
1 | REQUEST_RESPONSE | サービスを呼び出し、Step FunctionsがHTTPレスポンスを取得した直後に次の状態に進む。 |
2 | RUN_JOB | サービスを呼び出し、ジョブが完了するまでStep Functionsが待機する。 |
3 | WAIT_FOR_TASK_TOKEN | タスクトークンでサービスを呼び出し、トークンがペイロードとともに返されるまでStep Functionsが待機する。 |
一つ目は非同期呼び出しなので、残りの2つを使って同期呼び出しを実装しました。
CDKプロジェクトの作成
以下のようにサンプルのCDKプロジェクトを作成しました。
# プロジェクト用のディレクトリ作成 mkdir statemachine_cdk # プロジェクトの初期化 cd statemachine_cdk cdk init sample-app --language typescript
やってみる
1. ジョブが完了するまで待機する方法
まず1つ目の『サービスを呼び出し、ジョブが完了するまでStep Functionsが待機するよう実装する』方法です。
integrationPattern
にRUN_JOB
を指定しました。
実装例
import { Stack, StackProps } from 'aws-cdk-lib'; import * as sfn from 'aws-cdk-lib/aws-stepfunctions'; import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks'; import { Duration } from 'aws-cdk-lib'; import { Construct } from 'constructs'; export class StatemachineCdkStack extends Stack { constructor(scope: Construct, id: string, props?: StackProps) { super(scope, id, props); // 呼び出される側のステートマシン // 10秒待機するステート const wait10seconds = new sfn.Wait(this, 'wait10seconds', { time: sfn.WaitTime.duration(Duration.seconds(10)), }); const InnerStateMachine = new sfn.StateMachine(this, 'InnerStateMachine', { stateMachineName: 'InnerStateMachine', definition: wait10seconds }); // 呼び出す側のステートマシン const OuterStateMachine = new sfn.StateMachine(this, 'OuterStateMachine', { definition: new tasks.StepFunctionsStartExecution( this, "StartExecuteInnerStateMachine", { stateMachine: InnerStateMachine, inputPath: "$", associateWithParent: true, integrationPattern: sfn.IntegrationPattern.RUN_JOB } ) }); } }
cdk deploy
コマンドでデプロイしました。
作成されたステートマシンの定義
できた外側のステートマシンはarn:aws:states:::states:startExecution.sync:2
にて内側のステートマシンを呼び出す形となっていました。
# 外側のステートマシン { "StartAt": "StartExecuteInnerStateMachine", "States": { "StartExecuteInnerStateMachine": { "End": true, "Type": "Task", "InputPath": "$", "Resource": "arn:aws:states:::states:startExecution.sync:2", "Parameters": { "Input": { "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id" }, "StateMachineArn": "arn:aws:states:ap-northeast-1:アカウントID:stateMachine:InnerStateMachine" } } } } # 内側のステートマシン { "Type": "Wait", "Seconds": 10, "End": true }
動作確認結果
実行すると、外側のステートマシンが内側のステートマシンを呼び出した後、10秒間待機していることが分かります。
これは内側に確かに10秒待機するステートが入っているためですね。
2. トークンの返却まで待機する方法
次に『トークンの返却まで待機する方法』の実装例です。
実装例
integrationPattern
にWAIT_FOR_TASK_TOKEN
を指定しました。また、input
にaws_stepfunctions.JsonPath.taskToken
を設定しました。
トークンの返却は、内側のステートマシンのSendTaskSuccess
ステートにて、CallAwsService
クラスを使って返すようにしました。
import { Stack, StackProps } from 'aws-cdk-lib'; import * as sfn from 'aws-cdk-lib/aws-stepfunctions'; import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks'; import { Duration, } from 'aws-cdk-lib'; import { Construct } from 'constructs'; export class StatemachineCdkStack extends Stack { constructor(scope: Construct, id: string, props?: StackProps) { super(scope, id, props); // 呼び出される側のステートマシン // 10秒待機するステート const wait10seconds = new sfn.Wait(this, 'wait10seconds', { time: sfn.WaitTime.duration(Duration.seconds(10)), }); // 成功としてトークンを返却するステート const SendTaskSuccess = new tasks.CallAwsService( this, 'SendTaskSuccess', { service: 'sfn', action: 'sendTaskSuccess', parameters: { "TaskToken": sfn.JsonPath.stringAt('$.token'), "Output": sfn.JsonPath.objectAt('$') }, iamResources: ["arn:aws:states:*"], iamAction: 'states:SendTaskSuccess' }, ); const InnerStateMachine = new sfn.StateMachine(this, 'InnerStateMachine', { stateMachineName: 'InnerStateMachine', definition: wait10seconds .next( SendTaskSuccess ) }); // 呼び出す側のステートマシン const OuterStateMachine = new sfn.StateMachine(this, 'OuterStateMachine', { definition: new tasks.StepFunctionsStartExecution( this, "StartExecuteInnerStateMachine", { stateMachine: InnerStateMachine, inputPath: "$", input: sfn.TaskInput.fromObject({ "token": sfn.JsonPath.taskToken, }), heartbeatTimeout: sfn.Timeout.duration(Duration.minutes(30)), //30min associateWithParent: true, integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN } ) }); } }
内側のステートマシンは10秒待機するというシンプルなものなので、今回はsendTaskSuccessのアクションを実行するステートのみを入れています。実際はもっと複雑な処理を実行する可能性があり、当然処理に失敗してエラーになってしまうことがあるため、SendTaskFailureの呼び出しをするステートも必要になると考えられます。
その場合は失敗の可能性があるステートにはaddCatch
メソッドでsendTaskFailureのアクションを実行するステートへの遷移を追加しておく必要があります。
外側のステートマシンでは、万が一トークンの返却がうまく出来なかった場合を考えて、heartbeatTimeout
でタイムアウトを入れています。
作成されたステートマシンの定義
# 外側のステートマシン { "StartAt": "StartExecuteInnerStateMachine", "States": { "StartExecuteInnerStateMachine": { "End": true, "Type": "Task", "HeartbeatSeconds": 1800, "InputPath": "$", "Resource": "arn:aws:states:::states:startExecution.waitForTaskToken", "Parameters": { "Input": { "token.$": "$$.Task.Token", "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id" }, "StateMachineArn": "arn:aws:states:ap-northeast-1:アカウントID:stateMachine:InnerStateMachine" } } } } # 内側のステートマシン { "StartAt": "wait10seconds", "States": { "wait10seconds": { "Type": "Wait", "Seconds": 10, "Next": "SendTaskSuccess" }, "SendTaskSuccess": { "End": true, "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:sfn:sendTaskSuccess", "Parameters": { "TaskToken.$": "$.token", "Output.$": "$" } } } }
動作確認結果
こちらの実装パターンでも、実行すると、外側のステートマシンが内側のステートマシンを呼び出した後、10秒間待機していることが分かります。
内側では、10秒待機した後にトークンを返却するステートも成功していますね。
なお、先述した通り、実際の内側のステートマシンは、処理の失敗でエラーになってしまう可能性があるため、SendTaskFailure
のステートも必要になると思います。
最後に
AWS CDK v2で入れ子のステートマシンを作成する際に、同期実行するための実装を2パターン紹介しました。
特にコールバックサービス統合パターンはよいサンプルが見つからなかったので、自分で作成してみたものをご紹介しました。
参考になりましたら幸いです。