SQS 内のメッセージを EventBridge Pipes 経由で Step Functions(Express) に同期処理させる構成を CDK で実装してみる

SQS 内のメッセージを EventBridge Pipes 経由で Step Functions(Express) に同期処理させる構成を CDK で実装してみる

こんにちは。クラウド事業本部の枡川です。
今回は SQS 内のメッセージを EventBridge Pipes 経由で読み出して、Step Functions に同期処理させる構成を CDK で実装してみました。
EventBridge Pipes を利用することで Lambda を挟まずに Step Functions ベースの同期処理を行うことができ、面倒なリトライ周りの処理をマネージドサービスに任せることができます。
SQS にメッセージが溜まったら Step Functions を呼び出し、エラー時には SQS の maxReceiveCount 分までリトライを繰り返し、成功したらメッセージ削除、失敗したら DLQ にメッセージを配信する部分までやってくれます。
最高ですね。

※ Lambda を挟む方法との比較は下記記事がわかりやすいので、是非ご参照下さい。
Step Functions からのレスポンスを待つ間、ずっと Lambda を起動させておくのはコスト面でデメリットが大きいです。

https://track3jyo.com/2023/01/pipes-with-sqs/

今回改めて本構成を取り上げる背景として、Step Functions が JSONata と変数に対応して使いやすくなったことが挙げられます。

https://aws.amazon.com/jp/blogs/news/simplifying-developer-experience-with-variables-and-jsonata-in-aws-step-functions/

Step Functions に向いているような複雑な処理を実装する場合、SQS のコンシューマーとして Lambda の代わりに Step Functions を利用できると嬉しいのではないでしょうか。
また、ワークフローの構成を可視化しやすく、実装を簡単に理解しやすくなるメリットもあるかと思います。
運用観点でも、下図のように、どこまで進んで失敗しているかがわかりやすいのはメリットですよね。

error.png

そんな Event Bridge Pipes ですが、CDK の L2 コンストラクトがアルファ版ということもあり、巷に CDK ベースで実装した情報が少なく感じました。

https://docs.aws.amazon.com/cdk/api/v2/docs/aws-pipes-alpha-readme.html

また、Step Functions の JSONata 実装も CDK で行えるようになっています。

https://github.com/aws/aws-cdk/releases/tag/v2.178.0

Step Functions は ASL ファイルをそのまま扱って L1 コンストラクトから参照するくらいが何だかんだ扱いやすい気もしていますが、サクッと作る分には L2 が非常に便利です。

https://dev.classmethod.jp/articles/aws-cdk-asl-stepfunctions/

せっかくなので、今回は EventBridge Pipes も JSONata な Step Functions も全て L2 ベースで実装してみます。

構成

全体的な構成は下図のようにしました。

step-functions-pipes-sqs.png

基本的には SQS と Step Functions を EventBridge Pipes で繋げただけですが、API Gateway を作成して HTTP リクエストベースで SQS にメッセージを入れられるようにしています。
また、Step Functions が扱う外部リソースとして、DynamoDB と SNS も構築します。
EventBridge Pipes 側の設定として、InvocationType は REQUEST_RESPONSE を選択して、同期呼び出しを行います。

REQUEST_RESPONSE (default) - Invoke synchronously. For more information, see StartSyncExecution in the AWS Step Functions API Reference.
https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetStateMachineParameters.html

同期処理を行う際、Step Functions は Express ワークフローを選択する必要があります。

・Express ワークフロー (SYNC または ASYNC)
・Standard ワークフロー (ASYNC)
Amazon EventBridge Pipes ターゲット

また、SQS は可視性タイムアウトを 30 秒、maxReceiveCount を 5 回とします。
Step Functions が失敗した際、30 秒ごとに 5 回リトライを行って、それでも失敗したら DLQ にメッセージを配信して元のメッセージを削除します。
ワークフローとしては下図のようなものとしました。

workflow.png

単なる通知処理ですが、DynamoDB に通知記録を取っておき、直近 5 分以内に通知済みであれば通知をスキップします。
通知した時のみ、直近の通知時間情報を更新します。

CDK でデプロイする

では、CDK で AWS リソースを作っていきます。
ソースコードは下記リポジトリに格納しているので、細かい部分が気になった場合はご参照下さい。

https://github.com/masutaro99/cdk-sqs-pipe-step-functions/blob/main/lib/cdk-sqs-pipe-step-functions-stack.ts

まず、CDK アプリケーションを作成します。

npx cdk init app --language typescript

EventBridge Pipes はアルファモジュールなので、明示的にインストールします。

npm install @aws-cdk/aws-pipes-alpha
npm install @aws-cdk/aws-pipes-sources-alpha
npm install @aws-cdk/aws-pipes-targets-alpha

lib/cdk-sqs-pipe-step-functions-stack.ts は下記のように書きました。

import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import * as sqs from "aws-cdk-lib/aws-sqs";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as logs from "aws-cdk-lib/aws-logs";
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";
import * as pipes from "@aws-cdk/aws-pipes-alpha";
import * as pipes_source from "@aws-cdk/aws-pipes-sources-alpha";
import * as pipes_target from "@aws-cdk/aws-pipes-targets-alpha";
import * as apigateway from "aws-cdk-lib/aws-apigateway";
import * as iam from "aws-cdk-lib/aws-iam";
import * as sfn_tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
import * as sns from "aws-cdk-lib/aws-sns";

export class CdkSqsPipeStepFunctionsStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // SQS Dead Letter Queue
    const dlQueue = new sqs.Queue(this, "DeadLetterQueue", {
      fifo: false,
      visibilityTimeout: cdk.Duration.seconds(300),
    });

    // SQS Queue
    const queue = new sqs.Queue(this, "SqsQueue", {
      fifo: false,
      visibilityTimeout: cdk.Duration.seconds(30),
      deadLetterQueue: {
        maxReceiveCount: 5,
        queue: dlQueue,
      },
    });

    // API Gateway
    const api = new apigateway.RestApi(this, "ApiGatewaySqsApi", {
      description: "API that sends requests to SQS",
      deployOptions: {
        stageName: "prod",
      },
    });
    const apiGatewayRole = new iam.Role(this, "ApiGatewaySqsRole", {
      assumedBy: new iam.ServicePrincipal("apigateway.amazonaws.com"),
    });
    queue.grantSendMessages(apiGatewayRole);

    // Integration with SQS
    const sqsIntegration = new apigateway.AwsIntegration({
      service: "sqs",
      path: `${cdk.Aws.ACCOUNT_ID}/${queue.queueName}`,
      integrationHttpMethod: "POST",
      options: {
        credentialsRole: apiGatewayRole,
        requestParameters: {
          "integration.request.header.Content-Type":
            "'application/x-www-form-urlencoded'",
        },
        requestTemplates: {
          "application/json":
            "Action=SendMessage&MessageBody=$util.urlEncode($input.body)",
        },
        integrationResponses: [
          {
            statusCode: "200",
            responseTemplates: {
              "application/json": JSON.stringify({
                success: true,
                message: "Message sent to SQS",
              }),
            },
          },
        ],
      },
    });

    // POST method for API Gateway
    api.root.addMethod("POST", sqsIntegration, {
      methodResponses: [
        {
          statusCode: "200",
          responseModels: {
            "application/json": apigateway.Model.EMPTY_MODEL,
          },
        },
      ],
    });

    // SNS Topic
    const topic = new sns.Topic(this, "SnsTopic", {
      displayName: "SQS Topic",
    });

    // SNS Notification Task
    const snsTask = sfn_tasks.SnsPublish.jsonata(this, "SnsTask", {
      topic,
      message: sfn.TaskInput.fromText("{% $message %}"),
    });

    // Dynamodb Table
    const table = new dynamodb.Table(this, "Table", {
      partitionKey: { name: "id", type: dynamodb.AttributeType.STRING },
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
    });

    // CloudWatch Log Group for Step Functions
    const sfnLogGroup = new logs.LogGroup(this, "SfnLogGroup", {
      retention: logs.RetentionDays.ONE_MONTH,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
    });

    // Set Variables for Step Functions
    const setVariables = sfn.Pass.jsonata(this, "SetVariables", {
      assign: {
        id: "{% $parse($states.input.body).id %}",
        message: "{% $parse($states.input.body).message %}",
        startTime: "{% $toMillis($states.context.Execution.StartTime) %}",
      },
    });

    // Step Functions Task to Get item in DynamoDB
    const getItemTask = sfn_tasks.DynamoGetItem.jsonata(this, "GetItemTask", {
      table,
      key: {
        id: sfn_tasks.DynamoAttributeValue.fromString("{% $id %}"),
      },
    });

    // Step Functions Task to put item in DynamoDB
    const putItemTask = sfn_tasks.DynamoPutItem.jsonata(this, "PutItemTask", {
      table,
      item: {
        id: sfn_tasks.DynamoAttributeValue.fromString("{% $id %}"),
        message: sfn_tasks.DynamoAttributeValue.fromString("{% $message %}"),
        time: sfn_tasks.DynamoAttributeValue.fromString(
          "{% $string($startTime) %}"
        ),
      },
    });

    // Pass Task to set the choice
    const passTask = new sfn.Pass(this, "PassTask", {});

    const choice = sfn.Choice.jsonata(this, "Choice")
      .when(
        sfn.Condition.jsonata(
          "{% ($startTime - $number($states.input.Item.time.S)) >= 60 * 1000 * 5 %}"
        ),
        putItemTask.next(snsTask)
      )
      .otherwise(passTask);

    // Step Functions State Machine
    const definitionBody = sfn.DefinitionBody.fromChainable(
      setVariables.next(getItemTask.next(choice))
    );
    const stateMachine = new sfn.StateMachine(this, "StateMachine", {
      stateMachineType: sfn.StateMachineType.EXPRESS,
      definitionBody: definitionBody,
      queryLanguage: sfn.QueryLanguage.JSONATA,
      timeout: cdk.Duration.minutes(5),
      logs: {
        destination: sfnLogGroup,
        level: sfn.LogLevel.ALL,
        includeExecutionData: true,
      },
    });

    // CloudWatch Log Group for EventBridge Pipes
    const pipeLogGroup = new logs.LogGroup(this, "PipeLogGroup", {
      retention: logs.RetentionDays.ONE_MONTH,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
    });
    const cwlLogDestination = new pipes.CloudwatchLogsLogDestination(
      pipeLogGroup
    );

    // EventBridge Pipes
    const pipe = new pipes.Pipe(this, "Pipe", {
      source: new pipes_source.SqsSource(queue),
      target: new pipes_target.SfnStateMachine(stateMachine, {
        invocationType:
          pipes_target.StateMachineInvocationType.REQUEST_RESPONSE,
      }),
      logLevel: pipes.LogLevel.TRACE,
      logIncludeExecutionData: [pipes.IncludeExecutionData.ALL],
      logDestinations: [cwlLogDestination],
    });
  }
}

Step Functions 周りについて、JSONPath を利用している際はこんな書き方でした。

const notifyFailure = new sfn_tasks.SnsPublish(this, "NotifyFailure", {
  topic: snsTopic,
  subject: "Task failed",
  message: sfn.TaskInput.fromJsonPathAt("$"),
  resultPath: "$.Notify",
});

JSONata の場合は jsonata メソッドを呼ぶ必要があることを抑えておけば、今までと同じ感じで記述可能です。

const snsTask = sfn_tasks.SnsPublish.jsonata(this, "SnsTask", {
  topic,
  message: sfn.TaskInput.fromText("{% $message %}"),
});

変数の Assign も直感的です。

// Set Variables for Step Functions
const setVariables = sfn.Pass.jsonata(this, "SetVariables", {
  assign: {
    id: "{% $parse($states.input.body).id %}",
    message: "{% $parse($states.input.body).message %}",
    startTime: "{% $toMillis($states.context.Execution.StartTime) %}",
  },
});

EventBridge Pipes 周りも、ソースとターゲットの指定には専用のモジュールが必要なことを抑えておけば特に難しい部分は無いです。

https://docs.aws.amazon.com/cdk/api/v2/docs/aws-pipes-sources-alpha-readme.html

https://docs.aws.amazon.com/cdk/api/v2/docs/aws-pipes-targets-alpha-readme.html

では、デプロイします。

npx cdk deploy CdkSqsPipeStepFunctionsStack

動作確認

動作確認を行います。
まず、普通にリクエストを送ってみます。

% curl -v -X POST \
  -H "Content-Type: application/json" \
  -d '{"id":"1", "message": "test"}' \
  https://uxgl2kuuv7.execute-api.ap-northeast-1.amazonaws.com/prod

通知が実行されました。

success.png

すぐ 2 回目のリクエストを送ると、通知がスキップされます。
良い感じです。

% curl -v -X POST \
  -H "Content-Type: application/json" \
  -d '{"id":"1", "message": "test"}' \
  https://uxgl2kuuv7.execute-api.ap-northeast-1.amazonaws.com/prod

pass.png

今回、指定した ID が DynamoDB に存在しないと Step Functions が失敗する仕様としています。
失敗するようなリクエストを送ります。

% curl -v -X POST \
  -H "Content-Type: application/json" \
  -d '{"id":"2", "message": "test"}' \
  https://uxgl2kuuv7.execute-api.ap-northeast-1.amazonaws.com/prod

error.png

期待値通り、30 秒ごとに 5 回リトライされています。

sfn-error.png

maxReceiveCount を超えたので、DLQ にメッセージが配信されました。

dlq.png

Event Bridge Pipes 側のメトリクスでも 5 回分の失敗が記録されています。

pipes2.png

最後に

EventBridge Pipes を利用した Step Functions の同期呼び出しを試してみました。
SQS 内のメッセージの取り回しについて、ほぼマネージドサービスに任せることができて良い感じです。
SQS のコンシューマーとしては Lambda を利用することが多いと思いますが、便利になった Step Functions も是非活用していきたいですね!

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.