SQS 内のメッセージを EventBridge Pipes 経由で Step Functions(Express) に同期処理させる構成を CDK で実装してみる
こんにちは。クラウド事業本部の枡川です。
今回は SQS 内のメッセージを EventBridge Pipes 経由で読み出して、Step Functions に同期処理させる構成を CDK で実装してみました。
EventBridge Pipes を利用することで Lambda を挟まずに Step Functions ベースの同期処理を行うことができ、面倒なリトライ周りの処理をマネージドサービスに任せることができます。
SQS にメッセージが溜まったら Step Functions を呼び出し、エラー時には SQS の maxReceiveCount
分までリトライを繰り返し、成功したらメッセージ削除、失敗したら DLQ にメッセージを配信する部分までやってくれます。
最高ですね。
※ Lambda を挟む方法との比較は下記記事がわかりやすいので、是非ご参照下さい。
Step Functions からのレスポンスを待つ間、ずっと Lambda を起動させておくのはコスト面でデメリットが大きいです。
今回改めて本構成を取り上げる背景として、Step Functions が JSONata と変数に対応して使いやすくなったことが挙げられます。
Step Functions に向いているような複雑な処理を実装する場合、SQS のコンシューマーとして Lambda の代わりに Step Functions を利用できると嬉しいのではないでしょうか。
また、ワークフローの構成を可視化しやすく、実装を簡単に理解しやすくなるメリットもあるかと思います。
運用観点でも、下図のように、どこまで進んで失敗しているかがわかりやすいのはメリットですよね。
そんな Event Bridge Pipes ですが、CDK の L2 コンストラクトがアルファ版ということもあり、巷に CDK ベースで実装した情報が少なく感じました。
また、Step Functions の JSONata 実装も CDK で行えるようになっています。
Step Functions は ASL ファイルをそのまま扱って L1 コンストラクトから参照するくらいが何だかんだ扱いやすい気もしていますが、サクッと作る分には L2 が非常に便利です。
せっかくなので、今回は EventBridge Pipes も JSONata な Step Functions も全て L2 ベースで実装してみます。
構成
全体的な構成は下図のようにしました。
基本的には SQS と Step Functions を EventBridge Pipes で繋げただけですが、API Gateway を作成して HTTP リクエストベースで SQS にメッセージを入れられるようにしています。
また、Step Functions が扱う外部リソースとして、DynamoDB と SNS も構築します。
EventBridge Pipes 側の設定として、InvocationType は REQUEST_RESPONSE
を選択して、同期呼び出しを行います。
REQUEST_RESPONSE (default) - Invoke synchronously. For more information, see StartSyncExecution in the AWS Step Functions API Reference.
https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetStateMachineParameters.html
同期処理を行う際、Step Functions は Express ワークフローを選択する必要があります。
・Express ワークフロー (SYNC または ASYNC)
・Standard ワークフロー (ASYNC)
Amazon EventBridge Pipes ターゲット
また、SQS は可視性タイムアウトを 30 秒、maxReceiveCount
を 5 回とします。
Step Functions が失敗した際、30 秒ごとに 5 回リトライを行って、それでも失敗したら DLQ にメッセージを配信して元のメッセージを削除します。
ワークフローとしては下図のようなものとしました。
単なる通知処理ですが、DynamoDB に通知記録を取っておき、直近 5 分以内に通知済みであれば通知をスキップします。
通知した時のみ、直近の通知時間情報を更新します。
CDK でデプロイする
では、CDK で AWS リソースを作っていきます。
ソースコードは下記リポジトリに格納しているので、細かい部分が気になった場合はご参照下さい。
まず、CDK アプリケーションを作成します。
npx cdk init app --language typescript
EventBridge Pipes はアルファモジュールなので、明示的にインストールします。
npm install @aws-cdk/aws-pipes-alpha
npm install @aws-cdk/aws-pipes-sources-alpha
npm install @aws-cdk/aws-pipes-targets-alpha
lib/cdk-sqs-pipe-step-functions-stack.ts
は下記のように書きました。
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import * as sqs from "aws-cdk-lib/aws-sqs";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as logs from "aws-cdk-lib/aws-logs";
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";
import * as pipes from "@aws-cdk/aws-pipes-alpha";
import * as pipes_source from "@aws-cdk/aws-pipes-sources-alpha";
import * as pipes_target from "@aws-cdk/aws-pipes-targets-alpha";
import * as apigateway from "aws-cdk-lib/aws-apigateway";
import * as iam from "aws-cdk-lib/aws-iam";
import * as sfn_tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
import * as sns from "aws-cdk-lib/aws-sns";
export class CdkSqsPipeStepFunctionsStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// SQS Dead Letter Queue
const dlQueue = new sqs.Queue(this, "DeadLetterQueue", {
fifo: false,
visibilityTimeout: cdk.Duration.seconds(300),
});
// SQS Queue
const queue = new sqs.Queue(this, "SqsQueue", {
fifo: false,
visibilityTimeout: cdk.Duration.seconds(30),
deadLetterQueue: {
maxReceiveCount: 5,
queue: dlQueue,
},
});
// API Gateway
const api = new apigateway.RestApi(this, "ApiGatewaySqsApi", {
description: "API that sends requests to SQS",
deployOptions: {
stageName: "prod",
},
});
const apiGatewayRole = new iam.Role(this, "ApiGatewaySqsRole", {
assumedBy: new iam.ServicePrincipal("apigateway.amazonaws.com"),
});
queue.grantSendMessages(apiGatewayRole);
// Integration with SQS
const sqsIntegration = new apigateway.AwsIntegration({
service: "sqs",
path: `${cdk.Aws.ACCOUNT_ID}/${queue.queueName}`,
integrationHttpMethod: "POST",
options: {
credentialsRole: apiGatewayRole,
requestParameters: {
"integration.request.header.Content-Type":
"'application/x-www-form-urlencoded'",
},
requestTemplates: {
"application/json":
"Action=SendMessage&MessageBody=$util.urlEncode($input.body)",
},
integrationResponses: [
{
statusCode: "200",
responseTemplates: {
"application/json": JSON.stringify({
success: true,
message: "Message sent to SQS",
}),
},
},
],
},
});
// POST method for API Gateway
api.root.addMethod("POST", sqsIntegration, {
methodResponses: [
{
statusCode: "200",
responseModels: {
"application/json": apigateway.Model.EMPTY_MODEL,
},
},
],
});
// SNS Topic
const topic = new sns.Topic(this, "SnsTopic", {
displayName: "SQS Topic",
});
// SNS Notification Task
const snsTask = sfn_tasks.SnsPublish.jsonata(this, "SnsTask", {
topic,
message: sfn.TaskInput.fromText("{% $message %}"),
});
// Dynamodb Table
const table = new dynamodb.Table(this, "Table", {
partitionKey: { name: "id", type: dynamodb.AttributeType.STRING },
removalPolicy: cdk.RemovalPolicy.DESTROY,
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
});
// CloudWatch Log Group for Step Functions
const sfnLogGroup = new logs.LogGroup(this, "SfnLogGroup", {
retention: logs.RetentionDays.ONE_MONTH,
removalPolicy: cdk.RemovalPolicy.DESTROY,
});
// Set Variables for Step Functions
const setVariables = sfn.Pass.jsonata(this, "SetVariables", {
assign: {
id: "{% $parse($states.input.body).id %}",
message: "{% $parse($states.input.body).message %}",
startTime: "{% $toMillis($states.context.Execution.StartTime) %}",
},
});
// Step Functions Task to Get item in DynamoDB
const getItemTask = sfn_tasks.DynamoGetItem.jsonata(this, "GetItemTask", {
table,
key: {
id: sfn_tasks.DynamoAttributeValue.fromString("{% $id %}"),
},
});
// Step Functions Task to put item in DynamoDB
const putItemTask = sfn_tasks.DynamoPutItem.jsonata(this, "PutItemTask", {
table,
item: {
id: sfn_tasks.DynamoAttributeValue.fromString("{% $id %}"),
message: sfn_tasks.DynamoAttributeValue.fromString("{% $message %}"),
time: sfn_tasks.DynamoAttributeValue.fromString(
"{% $string($startTime) %}"
),
},
});
// Pass Task to set the choice
const passTask = new sfn.Pass(this, "PassTask", {});
const choice = sfn.Choice.jsonata(this, "Choice")
.when(
sfn.Condition.jsonata(
"{% ($startTime - $number($states.input.Item.time.S)) >= 60 * 1000 * 5 %}"
),
putItemTask.next(snsTask)
)
.otherwise(passTask);
// Step Functions State Machine
const definitionBody = sfn.DefinitionBody.fromChainable(
setVariables.next(getItemTask.next(choice))
);
const stateMachine = new sfn.StateMachine(this, "StateMachine", {
stateMachineType: sfn.StateMachineType.EXPRESS,
definitionBody: definitionBody,
queryLanguage: sfn.QueryLanguage.JSONATA,
timeout: cdk.Duration.minutes(5),
logs: {
destination: sfnLogGroup,
level: sfn.LogLevel.ALL,
includeExecutionData: true,
},
});
// CloudWatch Log Group for EventBridge Pipes
const pipeLogGroup = new logs.LogGroup(this, "PipeLogGroup", {
retention: logs.RetentionDays.ONE_MONTH,
removalPolicy: cdk.RemovalPolicy.DESTROY,
});
const cwlLogDestination = new pipes.CloudwatchLogsLogDestination(
pipeLogGroup
);
// EventBridge Pipes
const pipe = new pipes.Pipe(this, "Pipe", {
source: new pipes_source.SqsSource(queue),
target: new pipes_target.SfnStateMachine(stateMachine, {
invocationType:
pipes_target.StateMachineInvocationType.REQUEST_RESPONSE,
}),
logLevel: pipes.LogLevel.TRACE,
logIncludeExecutionData: [pipes.IncludeExecutionData.ALL],
logDestinations: [cwlLogDestination],
});
}
}
Step Functions 周りについて、JSONPath を利用している際はこんな書き方でした。
const notifyFailure = new sfn_tasks.SnsPublish(this, "NotifyFailure", {
topic: snsTopic,
subject: "Task failed",
message: sfn.TaskInput.fromJsonPathAt("$"),
resultPath: "$.Notify",
});
JSONata の場合は jsonata
メソッドを呼ぶ必要があることを抑えておけば、今までと同じ感じで記述可能です。
const snsTask = sfn_tasks.SnsPublish.jsonata(this, "SnsTask", {
topic,
message: sfn.TaskInput.fromText("{% $message %}"),
});
変数の Assign も直感的です。
// Set Variables for Step Functions
const setVariables = sfn.Pass.jsonata(this, "SetVariables", {
assign: {
id: "{% $parse($states.input.body).id %}",
message: "{% $parse($states.input.body).message %}",
startTime: "{% $toMillis($states.context.Execution.StartTime) %}",
},
});
EventBridge Pipes 周りも、ソースとターゲットの指定には専用のモジュールが必要なことを抑えておけば特に難しい部分は無いです。
では、デプロイします。
npx cdk deploy CdkSqsPipeStepFunctionsStack
動作確認
動作確認を行います。
まず、普通にリクエストを送ってみます。
% curl -v -X POST \
-H "Content-Type: application/json" \
-d '{"id":"1", "message": "test"}' \
https://uxgl2kuuv7.execute-api.ap-northeast-1.amazonaws.com/prod
通知が実行されました。
すぐ 2 回目のリクエストを送ると、通知がスキップされます。
良い感じです。
% curl -v -X POST \
-H "Content-Type: application/json" \
-d '{"id":"1", "message": "test"}' \
https://uxgl2kuuv7.execute-api.ap-northeast-1.amazonaws.com/prod
今回、指定した ID が DynamoDB に存在しないと Step Functions が失敗する仕様としています。
失敗するようなリクエストを送ります。
% curl -v -X POST \
-H "Content-Type: application/json" \
-d '{"id":"2", "message": "test"}' \
https://uxgl2kuuv7.execute-api.ap-northeast-1.amazonaws.com/prod
期待値通り、30 秒ごとに 5 回リトライされています。
maxReceiveCount
を超えたので、DLQ にメッセージが配信されました。
Event Bridge Pipes 側のメトリクスでも 5 回分の失敗が記録されています。
最後に
EventBridge Pipes を利用した Step Functions の同期呼び出しを試してみました。
SQS 内のメッセージの取り回しについて、ほぼマネージドサービスに任せることができて良い感じです。
SQS のコンシューマーとしては Lambda を利用することが多いと思いますが、便利になった Step Functions も是非活用していきたいですね!