AWS CDK v2で同期的に実行する入れ子のステートマシンを作成してみた

2023.05.29

データアナリティクス事業本部の鈴木です。

最近、AWS CDK v2で入れ子のステートマシンを作成したく試行錯誤していました。

※この記事では2つのステートマシンがあり、一方からもう一方を実行するような構成のこととします。

ようやくできたステートマシンを実行したところ、内側のステートマシンの完了を待たずに外側のステートマシンが完了してしまい、同期実行するための実装を調べたので紹介します。

入れ子のステートマシンの同期実行方法

入れ子になっているステートマシンを同期実行する方法は、以下のドキュメントに紹介がありました。

以下の2種が紹介されていました。

  1. サービスを呼び出し、ジョブが完了するまでStep Functionsが待機するよう実装する。
  2. コールバックサービス統合パターンにて、タスクトークンでサービスを呼び出し、トークンがペイロードとともに返されるまでStep Functionsが待機する。

1点目は、arn:aws:states:::states:startExecution.syncおよびarn:aws:states:::states:startExecution.sync:2を使って内側のステートマシンを呼び出す方法です。この2つの違いは、Outputの形式が異なることがドキュメントに記載されています。

2点目は、内側のステートマシン実行時にタスクトークンを渡し、内側のステートマシン終了時にSendTaskSuccessSendTaskFailureの呼び出しを持って終了を通知する方法です。

サービス統合パターンのドキュメントページに記載の通り、コールバックサービス統合パターンは人間による承認・サードパーティーとの統合・レガシーシステムの呼び出しまで待機するため必要になる場合があり、単純にステートマシンを管理しやすくするために入れ子にしたい場合には、1つ目の方法でよさそうです。

この記事では、AWS CDK v2を使って、まず1つ目の方法の実装例を紹介します。2つ目についても具体的な実装例が意外となかったので、作ってみたものを紹介します。

前提

環境

  • AWS CDK: 2.80.0

実装方法

StepFunctionsStartExecutionクラスを使って実装します。

今回の実装のポイントはConstruct PropsintegrationPatternです。IntegrationPattern型の値を取るものですが、IntegrationPattern型は以下の3つの値をとります。

# Name Description
1 REQUEST_RESPONSE サービスを呼び出し、Step FunctionsがHTTPレスポンスを取得した直後に次の状態に進む。
2 RUN_JOB サービスを呼び出し、ジョブが完了するまでStep Functionsが待機する。
3 WAIT_FOR_TASK_TOKEN タスクトークンでサービスを呼び出し、トークンがペイロードとともに返されるまでStep Functionsが待機する。

一つ目は非同期呼び出しなので、残りの2つを使って同期呼び出しを実装しました。

CDKプロジェクトの作成

以下のようにサンプルのCDKプロジェクトを作成しました。

# プロジェクト用のディレクトリ作成
mkdir statemachine_cdk

# プロジェクトの初期化
cd statemachine_cdk
cdk init sample-app --language typescript

やってみる

1. ジョブが完了するまで待機する方法

まず1つ目の『サービスを呼び出し、ジョブが完了するまでStep Functionsが待機するよう実装する』方法です。

integrationPatternRUN_JOBを指定しました。

実装例

lib/statemachine_cdk-stack.ts

import { Stack, StackProps } from 'aws-cdk-lib';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import {
  Duration
} from 'aws-cdk-lib';
import { Construct } from 'constructs';


export class StatemachineCdkStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    // 呼び出される側のステートマシン
    // 10秒待機するステート
    const wait10seconds = new sfn.Wait(this, 'wait10seconds', {
      time: sfn.WaitTime.duration(Duration.seconds(10)),
    });

    const InnerStateMachine = new sfn.StateMachine(this, 'InnerStateMachine', {
      stateMachineName: 'InnerStateMachine',
      definition: wait10seconds
    });

    // 呼び出す側のステートマシン
    const OuterStateMachine = new sfn.StateMachine(this, 'OuterStateMachine', {
      definition: new tasks.StepFunctionsStartExecution(
          this,
          "StartExecuteInnerStateMachine",
          {
            stateMachine: InnerStateMachine,
            inputPath: "$",
            associateWithParent: true,
            integrationPattern: sfn.IntegrationPattern.RUN_JOB
          }
        )
    });
  }
}

cdk deployコマンドでデプロイしました。

作成されたステートマシンの定義

できた外側のステートマシンはarn:aws:states:::states:startExecution.sync:2にて内側のステートマシンを呼び出す形となっていました。

# 外側のステートマシン
{
  "StartAt": "StartExecuteInnerStateMachine",
  "States": {
    "StartExecuteInnerStateMachine": {
      "End": true,
      "Type": "Task",
      "InputPath": "$",
      "Resource": "arn:aws:states:::states:startExecution.sync:2",
      "Parameters": {
        "Input": {
          "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
        },
        "StateMachineArn": "arn:aws:states:ap-northeast-1:アカウントID:stateMachine:InnerStateMachine"
      }
    }
  }
}

# 内側のステートマシン
{
  "Type": "Wait",
  "Seconds": 10,
  "End": true
}

動作確認結果

実行すると、外側のステートマシンが内側のステートマシンを呼び出した後、10秒間待機していることが分かります。

1つ目の方法の実行結果

これは内側に確かに10秒待機するステートが入っているためですね。

1つ目の方法の内側の実行結果

2. トークンの返却まで待機する方法

次に『トークンの返却まで待機する方法』の実装例です。

実装例

integrationPatternWAIT_FOR_TASK_TOKENを指定しました。また、inputaws_stepfunctions.JsonPath.taskTokenを設定しました。

トークンの返却は、内側のステートマシンのSendTaskSuccessステートにて、CallAwsServiceクラスを使って返すようにしました。

lib/statemachine_cdk-stack.ts

import { Stack, StackProps } from 'aws-cdk-lib';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import {
  Duration,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';


export class StatemachineCdkStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    // 呼び出される側のステートマシン
    // 10秒待機するステート
    const wait10seconds = new sfn.Wait(this, 'wait10seconds', {
      time: sfn.WaitTime.duration(Duration.seconds(10)),
    });

    // 成功としてトークンを返却するステート
    const SendTaskSuccess = new tasks.CallAwsService(
      this,
      'SendTaskSuccess',
      {
        service: 'sfn',
        action: 'sendTaskSuccess',
        parameters: {
          "TaskToken": sfn.JsonPath.stringAt('$.token'),
          "Output": sfn.JsonPath.objectAt('$')
        },
        iamResources: ["arn:aws:states:*"],
        iamAction: 'states:SendTaskSuccess'
      },
    );

    const InnerStateMachine = new sfn.StateMachine(this, 'InnerStateMachine', {
      stateMachineName: 'InnerStateMachine',
      definition: 
        wait10seconds
        .next(
          SendTaskSuccess
        )
    });

    // 呼び出す側のステートマシン
    const OuterStateMachine = new sfn.StateMachine(this, 'OuterStateMachine', {
      definition: new tasks.StepFunctionsStartExecution(
          this,
          "StartExecuteInnerStateMachine",
          {
            stateMachine: InnerStateMachine,
            inputPath: "$",
            input: sfn.TaskInput.fromObject({
              "token": sfn.JsonPath.taskToken,
            }),
            heartbeatTimeout: sfn.Timeout.duration(Duration.minutes(30)), //30min
            associateWithParent: true,
            integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN
          }
        )
    });
  }
}

内側のステートマシンは10秒待機するというシンプルなものなので、今回はsendTaskSuccessのアクションを実行するステートのみを入れています。実際はもっと複雑な処理を実行する可能性があり、当然処理に失敗してエラーになってしまうことがあるため、SendTaskFailureの呼び出しをするステートも必要になると考えられます。

その場合は失敗の可能性があるステートにはaddCatchメソッドでsendTaskFailureのアクションを実行するステートへの遷移を追加しておく必要があります。

外側のステートマシンでは、万が一トークンの返却がうまく出来なかった場合を考えて、heartbeatTimeoutでタイムアウトを入れています。

作成されたステートマシンの定義

# 外側のステートマシン
{
  "StartAt": "StartExecuteInnerStateMachine",
  "States": {
    "StartExecuteInnerStateMachine": {
      "End": true,
      "Type": "Task",
      "HeartbeatSeconds": 1800,
      "InputPath": "$",
      "Resource": "arn:aws:states:::states:startExecution.waitForTaskToken",
      "Parameters": {
        "Input": {
          "token.$": "$$.Task.Token",
          "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
        },
        "StateMachineArn": "arn:aws:states:ap-northeast-1:アカウントID:stateMachine:InnerStateMachine"
      }
    }
  }
}

# 内側のステートマシン
{
  "StartAt": "wait10seconds",
  "States": {
    "wait10seconds": {
      "Type": "Wait",
      "Seconds": 10,
      "Next": "SendTaskSuccess"
    },
    "SendTaskSuccess": {
      "End": true,
      "Type": "Task",
      "Resource": "arn:aws:states:::aws-sdk:sfn:sendTaskSuccess",
      "Parameters": {
        "TaskToken.$": "$.token",
        "Output.$": "$"
      }
    }
  }
}

動作確認結果

こちらの実装パターンでも、実行すると、外側のステートマシンが内側のステートマシンを呼び出した後、10秒間待機していることが分かります。

2つ目の方法の実行結果

内側では、10秒待機した後にトークンを返却するステートも成功していますね。

2つ目の方法の内側の実行結果

なお、先述した通り、実際の内側のステートマシンは、処理の失敗でエラーになってしまう可能性があるため、SendTaskFailureのステートも必要になると思います。

最後に

AWS CDK v2で入れ子のステートマシンを作成する際に、同期実行するための実装を2パターン紹介しました。

特にコールバックサービス統合パターンはよいサンプルが見つからなかったので、自分で作成してみたものをご紹介しました。

参考になりましたら幸いです。

参考