データアナリティクス事業本部の鈴木です。
最近、AWS CDK v2で入れ子のステートマシン※を作成したく試行錯誤していました。
※この記事では2つのステートマシンがあり、一方からもう一方を実行するような構成のこととします。
ようやくできたステートマシンを実行したところ、内側のステートマシンの完了を待たずに外側のステートマシンが完了してしまい、同期実行するための実装を調べたので紹介します。
入れ子のステートマシンの同期実行方法
入れ子になっているステートマシンを同期実行する方法は、以下のドキュメントに紹介がありました。
以下の2種が紹介されていました。
- サービスを呼び出し、ジョブが完了するまでStep Functionsが待機するよう実装する。
- コールバックサービス統合パターンにて、タスクトークンでサービスを呼び出し、トークンがペイロードとともに返されるまでStep Functionsが待機する。
1点目は、arn:aws:states:::states:startExecution.sync
およびarn:aws:states:::states:startExecution.sync:2
を使って内側のステートマシンを呼び出す方法です。この2つの違いは、Outputの形式が異なることがドキュメントに記載されています。
2点目は、内側のステートマシン実行時にタスクトークンを渡し、内側のステートマシン終了時にSendTaskSuccessかSendTaskFailureの呼び出しを持って終了を通知する方法です。
サービス統合パターンのドキュメントページに記載の通り、コールバックサービス統合パターンは人間による承認・サードパーティーとの統合・レガシーシステムの呼び出しまで待機するため必要になる場合があり、単純にステートマシンを管理しやすくするために入れ子にしたい場合には、1つ目の方法でよさそうです。
この記事では、AWS CDK v2を使って、まず1つ目の方法の実装例を紹介します。2つ目についても具体的な実装例が意外となかったので、作ってみたものを紹介します。
前提
環境
- AWS CDK:
2.80.0
実装方法
StepFunctionsStartExecution
クラスを使って実装します。
今回の実装のポイントはConstruct Props
のintegrationPattern
です。IntegrationPattern
型の値を取るものですが、IntegrationPattern
型は以下の3つの値をとります。
# | Name | Description |
---|---|---|
1 | REQUEST_RESPONSE | サービスを呼び出し、Step FunctionsがHTTPレスポンスを取得した直後に次の状態に進む。 |
2 | RUN_JOB | サービスを呼び出し、ジョブが完了するまでStep Functionsが待機する。 |
3 | WAIT_FOR_TASK_TOKEN | タスクトークンでサービスを呼び出し、トークンがペイロードとともに返されるまでStep Functionsが待機する。 |
一つ目は非同期呼び出しなので、残りの2つを使って同期呼び出しを実装しました。
CDKプロジェクトの作成
以下のようにサンプルのCDKプロジェクトを作成しました。
# プロジェクト用のディレクトリ作成
mkdir statemachine_cdk
# プロジェクトの初期化
cd statemachine_cdk
cdk init sample-app --language typescript
やってみる
1. ジョブが完了するまで待機する方法
まず1つ目の『サービスを呼び出し、ジョブが完了するまでStep Functionsが待機するよう実装する』方法です。
integrationPattern
にRUN_JOB
を指定しました。
実装例
lib/statemachine_cdk-stack.ts
import { Stack, StackProps } from 'aws-cdk-lib';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import {
Duration
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
export class StatemachineCdkStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
// 呼び出される側のステートマシン
// 10秒待機するステート
const wait10seconds = new sfn.Wait(this, 'wait10seconds', {
time: sfn.WaitTime.duration(Duration.seconds(10)),
});
const InnerStateMachine = new sfn.StateMachine(this, 'InnerStateMachine', {
stateMachineName: 'InnerStateMachine',
definition: wait10seconds
});
// 呼び出す側のステートマシン
const OuterStateMachine = new sfn.StateMachine(this, 'OuterStateMachine', {
definition: new tasks.StepFunctionsStartExecution(
this,
"StartExecuteInnerStateMachine",
{
stateMachine: InnerStateMachine,
inputPath: "$",
associateWithParent: true,
integrationPattern: sfn.IntegrationPattern.RUN_JOB
}
)
});
}
}
cdk deploy
コマンドでデプロイしました。
作成されたステートマシンの定義
できた外側のステートマシンはarn:aws:states:::states:startExecution.sync:2
にて内側のステートマシンを呼び出す形となっていました。
# 外側のステートマシン
{
"StartAt": "StartExecuteInnerStateMachine",
"States": {
"StartExecuteInnerStateMachine": {
"End": true,
"Type": "Task",
"InputPath": "$",
"Resource": "arn:aws:states:::states:startExecution.sync:2",
"Parameters": {
"Input": {
"AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
},
"StateMachineArn": "arn:aws:states:ap-northeast-1:アカウントID:stateMachine:InnerStateMachine"
}
}
}
}
# 内側のステートマシン
{
"Type": "Wait",
"Seconds": 10,
"End": true
}
動作確認結果
実行すると、外側のステートマシンが内側のステートマシンを呼び出した後、10秒間待機していることが分かります。
これは内側に確かに10秒待機するステートが入っているためですね。
2. トークンの返却まで待機する方法
次に『トークンの返却まで待機する方法』の実装例です。
実装例
integrationPattern
にWAIT_FOR_TASK_TOKEN
を指定しました。また、input
にaws_stepfunctions.JsonPath.taskToken
を設定しました。
トークンの返却は、内側のステートマシンのSendTaskSuccess
ステートにて、CallAwsService
クラスを使って返すようにしました。
lib/statemachine_cdk-stack.ts
import { Stack, StackProps } from 'aws-cdk-lib';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import {
Duration,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
export class StatemachineCdkStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
// 呼び出される側のステートマシン
// 10秒待機するステート
const wait10seconds = new sfn.Wait(this, 'wait10seconds', {
time: sfn.WaitTime.duration(Duration.seconds(10)),
});
// 成功としてトークンを返却するステート
const SendTaskSuccess = new tasks.CallAwsService(
this,
'SendTaskSuccess',
{
service: 'sfn',
action: 'sendTaskSuccess',
parameters: {
"TaskToken": sfn.JsonPath.stringAt('$.token'),
"Output": sfn.JsonPath.objectAt('$')
},
iamResources: ["arn:aws:states:*"],
iamAction: 'states:SendTaskSuccess'
},
);
const InnerStateMachine = new sfn.StateMachine(this, 'InnerStateMachine', {
stateMachineName: 'InnerStateMachine',
definition:
wait10seconds
.next(
SendTaskSuccess
)
});
// 呼び出す側のステートマシン
const OuterStateMachine = new sfn.StateMachine(this, 'OuterStateMachine', {
definition: new tasks.StepFunctionsStartExecution(
this,
"StartExecuteInnerStateMachine",
{
stateMachine: InnerStateMachine,
inputPath: "$",
input: sfn.TaskInput.fromObject({
"token": sfn.JsonPath.taskToken,
}),
heartbeatTimeout: sfn.Timeout.duration(Duration.minutes(30)), //30min
associateWithParent: true,
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN
}
)
});
}
}
内側のステートマシンは10秒待機するというシンプルなものなので、今回はsendTaskSuccessのアクションを実行するステートのみを入れています。実際はもっと複雑な処理を実行する可能性があり、当然処理に失敗してエラーになってしまうことがあるため、SendTaskFailureの呼び出しをするステートも必要になると考えられます。
その場合は失敗の可能性があるステートにはaddCatch
メソッドでsendTaskFailureのアクションを実行するステートへの遷移を追加しておく必要があります。
外側のステートマシンでは、万が一トークンの返却がうまく出来なかった場合を考えて、heartbeatTimeout
でタイムアウトを入れています。
作成されたステートマシンの定義
# 外側のステートマシン
{
"StartAt": "StartExecuteInnerStateMachine",
"States": {
"StartExecuteInnerStateMachine": {
"End": true,
"Type": "Task",
"HeartbeatSeconds": 1800,
"InputPath": "$",
"Resource": "arn:aws:states:::states:startExecution.waitForTaskToken",
"Parameters": {
"Input": {
"token.$": "$$.Task.Token",
"AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
},
"StateMachineArn": "arn:aws:states:ap-northeast-1:アカウントID:stateMachine:InnerStateMachine"
}
}
}
}
# 内側のステートマシン
{
"StartAt": "wait10seconds",
"States": {
"wait10seconds": {
"Type": "Wait",
"Seconds": 10,
"Next": "SendTaskSuccess"
},
"SendTaskSuccess": {
"End": true,
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:sfn:sendTaskSuccess",
"Parameters": {
"TaskToken.$": "$.token",
"Output.$": "$"
}
}
}
}
動作確認結果
こちらの実装パターンでも、実行すると、外側のステートマシンが内側のステートマシンを呼び出した後、10秒間待機していることが分かります。
内側では、10秒待機した後にトークンを返却するステートも成功していますね。
なお、先述した通り、実際の内側のステートマシンは、処理の失敗でエラーになってしまう可能性があるため、SendTaskFailure
のステートも必要になると思います。
最後に
AWS CDK v2で入れ子のステートマシンを作成する際に、同期実行するための実装を2パターン紹介しました。
特にコールバックサービス統合パターンはよいサンプルが見つからなかったので、自分で作成してみたものをご紹介しました。
参考になりましたら幸いです。