[AWS Step Functions] 「タスクトークンのコールバックまで待機する」サービス統合パターンを試してみた

2022.04.01

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、muroです。Step Functionsでは、ステートマシンから別のステートマシンを呼び出すことができます。

[アップデート]Step Functions ステートマシンのネストが可能になりました!

今回は、ステートマシンからタスクトークンのコールバックまで待機するサービス統合パターンを使って別のステートマシンの完了を待つフローを試してみました。

サービス統合パターン

Step Functionsには、ステートマシンから別のステートマシンを呼び出す際の統合パターンが3パターン用意されています。サービス統合パターン - AWS Step Functions

  1. レスポンスのリクエスト
  2. ジョブの実行 (.sync)
  3. タスクトークンのコールバックまで待機する

ステートマシンから別のステートマシンを呼び出して完了を待つには、2. ジョブの実行 (.sync)パターンまたは3. タスクトークンのコールバックまで待機するパターンを利用します。今回は3. タスクトークンのコールバックまで待機するパターンを試します。

タスクトークンのコールバックまで待機するパターン

CDKのリファレンス class StepFunctionsStartExecution (construct) · AWS CDK のExampleを参考にParentStateMachineChildStateMachineの2つのステートマシンを作成してみます。ParentStateMachineWAIT_FOR_TASK_TOKEN統合パターンでChildStateMachineを実行し、タスクトークンのコールバックまで待機します。ChildStateMachinePassステートのみで何も処理をしません。

import { Stack, StackProps } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';

export class CdkStepfunctionsExampleStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    // Define a state machine with one Pass state
    const child = new sfn.StateMachine(this, 'ChildStateMachine', {
      definition: sfn.Chain.start(new sfn.Pass(this, 'PassState')),
    });

    // Include the state machine in a Task state with callback pattern
    const task = new tasks.StepFunctionsStartExecution(this, 'ChildTask', {
      stateMachine: child,
      integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
      input: sfn.TaskInput.fromObject({
        token: sfn.JsonPath.taskToken,
        foo: 'bar',
      }),
    });

    // Define a second state machine with the Task state above
    new sfn.StateMachine(this, 'ParentStateMachine', {
      definition: task,
    });
  }
}

これをデプロイすると2つのステートマシンが作成されます。

  1. ParentStateMachine

  2. ChildStateMachine

ParentStateMachineを実行してみると、ChildStateMachineが呼ばれて完了しますが、ParentStateMachineの方は完了しません。

  1. ParentStateMachine

  2. ChildStateMachine

これでParentStateMachineがタスクトークンのコールバックを待機していることがわかりました。

SendTaskSuccess APIとSendTaskFailure API

ParentStateMachineがコールバックを受けるには、ChildStateMachineからSendTaskSuccessAPI、またはSendTaskFailureAPIを呼び出します。

  1. SendTaskSuccess

    呼び出し元から渡されたトークンを指定して、タスクが成功したことを呼び出し元に返します。

  2. SendTaskFailure

    呼び出し元から渡されたトークンを指定して、タスクが失敗したことを呼び出し元に返します。

それでは、ChildStateMachineにLambda関数を実行するタスクを追加して、SendTaskSuccessまたはSendTaskFailureを呼び出してParentStateMachineがコールバックを受けるように変更していきます。

Lambda関数の作成

SendTaskSuccessまたはSendTaskFailureを呼び出すLambda関数を作成します。ここでは引数のexpectedsuccessならばSendTaskSuccessを、それ以外はSendTaskFailureを呼び出すようにします。

import json
import boto3

sfn = boto3.client('stepfunctions')


def handler(event, context):
    token = event.get("token")
    expected = event.get("expected")

    if expected == "success":
        sfn.send_task_success(
            taskToken=token,
            output=json.dumps({
                "result": expected
            })
        )
    else:
        sfn.send_task_failure(
            taskToken=token,
            error=json.dumps({
                "result": expected
            })
        )

CDKスタックの変更

Lambda関数を作成します。

    const sendTaskResultFn = new lambda.Function(this, 'sendTaskResultFn', {
      runtime: lambda.Runtime.PYTHON_3_8,
      code: lambda.Code.fromAsset('src/lambda'),
      handler: 'send_result.handler',
      initialPolicy: [
        new iam.PolicyStatement({
          effect: iam.Effect.ALLOW,
          actions: [
            'states:SendTaskSuccess',
            'states:SendTaskFailure',
          ],
          resources: ['*']
        })
      ],
    });

ChildStateMachineでそのLambda関数を実行するようにタスクを変更します。

    const child = new sfn.StateMachine(this, 'ChildStateMachine', {
      definition: sfn.Chain.start(
        new tasks.LambdaInvoke(this, 'Send task result.', {
          lambdaFunction: sendTaskResultFn
        })
      ),
    });

ParentStateMachineからChildStateMachineに渡すパラメータを変更します。

    const task = new tasks.StepFunctionsStartExecution(this, 'ChildTask', {
      stateMachine: child,
      integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
      input: sfn.TaskInput.fromObject({
        token: sfn.JsonPath.taskToken,
        expected: sfn.JsonPath.stringAt('$.expected'),
      }),
    });

成功パターン

それでは動かしてみます。成功パターンでは、以下の入力で実行を開始します。

{
    "expected": "success"
}

しばらくするとChildTaskが成功して完了しました。

ステップ出力を見ると以下のメッセージが返されていました。

{
  "result": "success"
}

失敗パターン

続いて失敗パターンを実行します。以下の入力で実行を開始します。

{
    "expected": "failure"
}

しばらくするとChildTaskが失敗して完了しました。

ステップ出力には何も出力されず、例外として以下のメッセージが返されていました。

{"result": "failure"}

まとめ

WAIT_FOR_TASK_TOKEN統合パターンとSendTaskSuccessAPI/SendTaskFailureAPIを利用して、ネストされたステートマシンの完了を待つフローを実現できました。子ステートマシンの成功・失敗のハンドリングだけでなく、成功時の子ステートマシンからの出力によって、後続処理を分岐させることもできるため、柔軟にフローを組み立てられると思います。なお、今回は触れていませんが、本稼働用のステートマシンでは、何らかの理由でコールバックが受けられないケースを想定して、タイムアウト設定を入れておきたいです。