[AWS Step Functions] Express WorkflowからStandard Workflowの同期的な呼び出しができるのか試してみた

[AWS Step Functions] Express WorkflowからStandard Workflowの同期的な呼び出しができるのか試してみた

結果:できませんでした
Clock Icon2022.09.16

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

以前にAmazon API Gateway REST APIからStep Functions Express Workflowを同期的に呼び出して結果をレスポンスするという構成を作りました。

REST APIのバックの処理を、LambdaではなくStep Functionsによりローコードで作れるというのはとてもとても魅力的です。しかし、Express Workflowはマネジメントコンソールから実行履歴をGraph ViewやEvent Viewで確認できず、CloudWatch Logsのイベントログを確認する必要があるため、これではビジュアルワークフローとしての利便性は半減してしまうというもの。

そこで「Express WorkflowからStandard Workflowを同期的に呼び出しし、Standard Workflow内で必要な処理を行えば履歴の問題は解決できるのでは?」と考え、今回その構成を試してみました。

試してみた

実装は次のようなAWS CDK(TypeScript)のスタックで行いました。

import {
  aws_stepfunctions,
  aws_stepfunctions_tasks,
  Stack,
  StackProps,
  Duration,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';

export class AwsCdkAppStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    // Srandard Workflow
    const standardWorkflow = new aws_stepfunctions.StateMachine(
      this,
      'standardWorkflow',
      {
        stateMachineName: 'standardWorkflow',
        definition: new aws_stepfunctions.Wait(this, 'wait', {
          time: aws_stepfunctions.WaitTime.duration(Duration.seconds(15)),
        }),
      }
    );

    // Srandard Workflowを実行するタスク
    const executeStandardWorkflowTask =
      new aws_stepfunctions_tasks.StepFunctionsStartExecution(
        this,
        'executeStandardWorkflowTask',
        {
          stateMachine: standardWorkflow,
          integrationPattern:
            aws_stepfunctions.IntegrationPattern.REQUEST_RESPONSE,
          input: aws_stepfunctions.TaskInput.fromObject({
            token: aws_stepfunctions.JsonPath.taskToken,
          }),
        }
      );

    // Express Workflow
    new aws_stepfunctions.StateMachine(this, 'expressWorkflow', {
      stateMachineName: 'expressWorkflow',
      stateMachineType: aws_stepfunctions.StateMachineType.EXPRESS,
      definition: executeStandardWorkflowTask,
    });
  }
}

ステートマシンから別のステートマシンを同期的に呼び出す方法は、Service Integration Patterns(サービス統合パターン)のうちRun a Job (.sync)またはWait for a Callback with the Task Token(.waitForTaskToken)を利用する必要があります。

ここでは後者の.waitForTaskTokenによる方法で実装しています。

そして上記をCDK Deployしようとしたところエラーとなりました。

$ cdk deploy AwsCdkAppStack

...

AwsCdkAppStack: deploying...
[0%] start: Publishing aa1a622a388cbb466f351832e108e4e25cd7c212db366843659b09900cd8e9ac:current_account-current_region
[100%] success: Published aa1a622a388cbb466f351832e108e4e25cd7c212db366843659b09900cd8e9ac:current_account-current_region
AwsCdkAppStack: creating CloudFormation changeset...
11:30:30 PM | CREATE_FAILED        | AWS::StepFunctions::StateMachine | expressWorkflow8143AF42
Resource handler returned message: "Invalid State Machine Definition: 'SCHEMA_VALIDATION_FAILED: Express state machine does n
ot support '.waitForTaskToken' service integration  at /States/executeStandardWorkflowTask/Resource' (Service: AWSStepFunctio
ns; Status Code: 400; Error Code: InvalidDefinition; Request ID: 4164e9fa-c3e7-41e3-a9cf-a595349995a7; Proxy: null)" (Request
Token: 5931ac05-11a1-f6cf-7a72-58241efe7fc8, HandlerErrorCode: InvalidRequest)

Express state machine does not support '.waitForTaskToken' service integrationとある通り、Express Workflowでは'.waitForTaskToken' service integrationをサポートしていないようです。残念。

ドキュメントにちゃんと記載があった

AWSのドキュメントを読むと、今回確認しようとしたことについてちゃんと記載がありました。

Service integrationsについて、Standard Workflowはすべてのパターン対応しているようです。

Supports all service integrations and patterns.

一方で、Express Workflowは.syncおよび.waitForTaskTokenのService Integration Patternsには対応していないとのことです。

Supports all service integrations. Note Express Workflows do not support Job-run (.sync) or Callback (.waitForTaskToken) service integration patterns.

REST APIからStandard Workflowを非同期に実行して結果取得する方法ならある

下記エントリで紹介されている内容での実装なら、REST APIからStandard Workflowを実行することができます。

上記で紹介されているのは次のようなシーケンスの処理です。

  1. リクエストでStandard Workflowを非同期実行
  2. レスポンスで実行名を取得
  3. 以降のリクエストで実行名を使用してStandard Workflowの実行が完了するまでポーリング
  4. 実行が完了したら実行結果をリクエストしてレスポンスで取得する

これによりREST APIでStandard Workflowを使用でき、またAPI Gatewayの29秒制限も回避することが可能となります。

ただしフロントエンド側でポーリングをするための作り込みが必要となるので、その点を考慮する必要がありますね。

おわりに

AWSにフィードバックは投げてみました。何卒よろしくお願いいたします。

以上

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.