Amazon API Gateway REST API を Amazon SQS と統合して、Lambda 関数を非同期に呼び出す構成を AWS CDK で構築する
こんにちは、製造ビジネステクノロジー部の若槻です。
Amazon API Gateway が Lambda 関数を呼び出す構成で、関数上で DB アクセスなどの重たい処理が行われる場合に、トラフィック状況によっては Lambda のスロットリングが発生する可能性があります。
そのようなユースケースでのプラクティスとして、API Gateway と Lambda 関数の間に Amazon SQS キューを配置する構成があります。

Things to Consider When You Build REST APIs with Amazon API Gateway | AWS Architecture Blog より
この構成では、SQS キューが API Gateway(プロデューサー)から受信したメッセージを一時的に保存し、Lambda 関数(コンシューマー)が非同期にメッセージを処理(消費)することで、下記図のように急増するトラフィックを均一化してスロットリングを回避し、システムの可用性を高めることができます。

https://d1.awsstatic.com/webinars/jp/pdf/services/20190717_AWS-BlackBelt_AmazonSQS.pdf#page=35 より
今回は、API Gateway REST API を Amazon SQS と統合して、Lambda 関数を非同期に呼び出す構成を AWS CDK で実装してみました。
やってみた
下記の構成を実装します。
[クライアント] --> [REST API] --> [SQS キュー] <-- 消費 -- [Lambda 関数]
REST API がクライアントから受信したリクエストのペイロードを SQS キューにメッセージとして送信し、Lambda 関数が SQS キューからメッセージをイベントソースマッピングにより消費する構成です。
この構成は、REST API にリクエストを送信するクライアントが同期的なレスポンスを必要としない場合や、外部システム等とのマシン間通信(M2M)を想定しています。
リソース作成
AWS CDK でリソースを作成します。
import * as cdk from "aws-cdk-lib";
import * as apigateway from "aws-cdk-lib/aws-apigateway";
import * as iam from "aws-cdk-lib/aws-iam";
import * as lambda from "aws-cdk-lib/aws-lambda";
import { SqsEventSource } from "aws-cdk-lib/aws-lambda-event-sources";
import * as lambda_nodejs from "aws-cdk-lib/aws-lambda-nodejs";
import * as log from "aws-cdk-lib/aws-logs";
import * as sqs from "aws-cdk-lib/aws-sqs";
import { Construct } from "constructs";
export class SampleStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: cdk.StackProps) {
super(scope, id, props);
/**
* Lambda 関数作成
*/
const lambdaFunction = new lambda_nodejs.NodejsFunction(
this,
"LambdaFunction",
{
entry: "src/handler.ts",
tracing: lambda.Tracing.ACTIVE, // AWS X-Ray によるトレースを有効化
},
);
/**
* SQS キュー作成
*/
const queue = new sqs.Queue(this, "Queue");
/**
* Lambda 関数に SQS キューのメッセージ受信権限を付与し、イベントソースとして設定
*/
queue.grantConsumeMessages(lambdaFunction);
lambdaFunction.addEventSource(new SqsEventSource(queue));
/**
* API Gateway REST API 作成
*/
const restApi = new apigateway.RestApi(this, "RestApi", {
deployOptions: {
/**
* AWS X-Ray によるトレースを有効化
*/
tracingEnabled: true,
/**
* アクセスログの有効化
*/
accessLogDestination: new apigateway.LogGroupLogDestination(
new log.LogGroup(this, "ApiGatewayAccessLogGroup"),
),
accessLogFormat: apigateway.AccessLogFormat.jsonWithStandardFields(),
/**
* 実行ログの有効化
*/
loggingLevel: apigateway.MethodLoggingLevel.INFO,
dataTraceEnabled: true,
},
});
/**
* SQS キューへメッセージを送信するための AWS 統合を作成
* @see https://docs.aws.amazon.com/ja_jp/apigateway/latest/developerguide/api-gateway-api-integration-types.html
*/
const sendMessageIntegration = new apigateway.AwsIntegration({
service: "sqs",
path: `${cdk.Aws.ACCOUNT_ID}/${queue.queueName}`, // アカウントID/キュー名のパスを指定
integrationHttpMethod: "POST",
options: {
/**
* コンテントタイプを Amazon SQS の POST リクエストがサポートする形式に設定
*
* MEMO: 設定しない場合は SendMessage API 呼び出しで 404 エラーとなる
* @see https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-making-api-requests-xml.html#structure-post-request
*/
requestParameters: {
"integration.request.header.Content-Type":
"'application/x-www-form-urlencoded'",
},
/**
* リクエストテンプレートを設定
*
* MEMO: 入力変数 `$input.body` を使用して、REST API のリクエストボディを SQS の SendMessage アクションのパラメータ形式に変換するテンプレートを指定
* @see https://docs.aws.amazon.com/ja_jp/apigateway/latest/developerguide/api-gateway-mapping-template-reference.html
*/
requestTemplates: {
"application/json":
"Action=SendMessage&MessageBody=$util.urlEncode($input.body)",
},
/**
* 統合レスポンスを設定
*
* MEMO: REST API からクライアントへ固定のレスポンスを返すように設定
*/
integrationResponses: [
{
statusCode: "200",
responseTemplates: {
"application/json": '{"status": "message sent"}',
},
},
],
/**
* REST API から SQS キューへのアクセス権限を付与する IAM ロールを指定
*/
credentialsRole: new iam.Role(this, "ApiGatewaySqsRole", {
assumedBy: new iam.ServicePrincipal("apigateway.amazonaws.com"),
inlinePolicies: {
AllowSqsSendMessage: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ["sqs:SendMessage"],
resources: [queue.queueArn],
}),
],
}),
},
}),
},
});
/**
* API Gateway のリソースとメソッドを作成し、SQS キューへメッセージを送信する統合を設定
*/
const queueResource = restApi.root.addResource("items");
queueResource.addMethod("POST", sendMessageIntegration, {
methodResponses: [
{
statusCode: "200",
},
],
});
/**
* API Gateway の ANY メソッドとプロキシリソースの設定例
*
* MEMO: このように任意のパスと HTTP メソッドで SQS キューにメッセージを送信する設定も可能。
* ただし、ダウンストリーム処理がパスおよびメソッドを識別可能とするためにはマッピングテンプレートの修正が必要となる。
*/
// restApi.root.addMethod("ANY", sendMessageIntegration, {
// methodResponses: [{ statusCode: "200" }],
// });
// const proxyResource = restApi.root.addResource("{proxy+}");
// proxyResource.addMethod("ANY", sendMessageIntegration, {
// methodResponses: [{ statusCode: "200" }],
// });
}
}
API Gateway から SQS キューにメッセージを直接送信するために AWS サービス統合を使用しています。これにより SQS の sendMessage API を呼び出しています。

https://d1.awsstatic.com/webinars/jp/pdf/services/20190514_AWS-Blackbelt_APIGateway_rev.pdf#page=49 より
また、マッピングテンプレートで予約変数 $iuput を使用して、API Gateway で受信したリクエストのボディをキューメッセージのレコードに設定しています。

https://d1.awsstatic.com/webinars/jp/pdf/services/20190514_AWS-Blackbelt_APIGateway_rev.pdf#page=51 より
ここで、CDK コード中では X-Ray トレースの明示的な有効化を API Gateway および Lambda でしか行っていませんが、Amazon SQS キューはアップストリーム (API Gateway) とダウンストリーム (Lambda) でトレースが有効化されていれば、トレースリンキング機能により両者のトレースの接続が自動で行われるため、明示的な有効化は不要です。
これは、AWS X-Ray には AWS サービス自身がサンプリングを行うアクティブ計測と、別のサービスサンプリングされているリクエストを対象にするパッシブ計測があり、Amazon SQS は後者のパッシブ計測をサポートしていることによるものです。
- Amazon Simple Queue Service — パッシブ計測。サービスが X-Ray SDK を使用してリクエストをトレースする場合、Amazon SQS はトレースヘッダーを送信し、整合性のあるトレース ID を持つコンシューマーに、送信者から元のトレースを伝達し続けます。
動作確認
クライアントから REST API エンドポイントに対してメッセージを送信します。
curl -X POST https://${API_ID}.execute-api.ap-northeast-1.amazonaws.com/prod/items \
-H "Content-Type: application/json" \
-d '{"message": "Hello from API Gateway to SQS"}'
実行結果のトレースです。SQS キューのアップストリームとダウンストリームの2つのトレースが接続され、単一のトレースかのようにで1つのページ上で確認可能となっています。

セグメントタイムラインはトレースごとにトグルで開くようになっています。


Lambda 関数のログを見ると、API 呼び出し時に渡されたメッセージが表示されており、キューを経由してペイロードが正しく渡されていることが確認できます。

{
"Records": [
{
"messageId": "a30ee7be-a622-453c-9563-ea145a05ff02",
"receiptHandle": "AQEB598tg7avOpwo6n6v6k3+...<省略>...==",
"body": "{\"message\": \"Hello from API Gateway to SQS\"}",
"attributes": {
"ApproximateReceiveCount": "1",
"AWSTraceHeader": "Root=1-695674c9-59b472dd3a45eaca3912b9c1;Parent=77cf9d03313aaecd;Sampled=1",
"SentTimestamp": "1767273673833",
"SenderId": "AROAUL6WVPY2M6PC7BBKJ:BackplaneAssumeRoleSession",
"ApproximateFirstReceiveTimestamp": "1767273673844"
},
"messageAttributes": {},
"md5OfBody": "60945dbf651c26b11be8821471a2c923",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:ap-northeast-1:XXXXXXXXXXXX:Sample-Queue4A7E3555-5aSwyKDoa011",
"awsRegion": "ap-northeast-1"
}
]
}
SQS キューのメトリクスです。1件のメッセージが送受信されていることが確認できます。

リンクされたトレースに紐づいたログを確認する際の注意点として、選択したトレースがアップストリームかダウンストリームかによって、表示されるログが異なってきます。

アップストリームのトレースを選択した場合

ダウントリームのトレースを選択した場合
任意のパス/メソッドに AWS 統合を設定も可能
コード中のコメントにも記載しましたが、API Gateway の ANY メソッドとプロキシリソースを使用して、任意のパスおよび HTTP メソッドで SQS キューにメッセージを送信する設定も可能です。
restApi.root.addMethod("ANY", sendMessageIntegration, {
methodResponses: [{ statusCode: "200" }],
});
const proxyResource = restApi.root.addResource("{proxy+}");
proxyResource.addMethod("ANY", sendMessageIntegration, {
methodResponses: [{ statusCode: "200" }],
});
明示的に作成していない hello パスに対して POST リクエストを送信する例です。
curl -X POST https://${API_}.execute-api.ap-northeast-1.amazonaws.com/prod/hello \
-H "Content-Type: application/json" \
-d '{"message": "Hello from API Gateway to SQS"}'
{"status": "message sent"}
SQS キューを通じて、Lambda 関数にメッセージが正しく届いていることが確認できました。

ただし、こちらもコード中に記載したように、ダウンストリームの Lambda 関数でリクエストのパスおよび HTTP メソッドを識別可能とするために、マッピングテンプレートの修正が必要となります。
トラブルシュート
はじめ AWS 統合作成の実装で下記のコンテントタイプの設定を忘れてしまい、SQS キューへのメッセージ送信が失敗しました。
requestParameters: {
"integration.request.header.Content-Type":
"'application/x-www-form-urlencoded'",
},
その場合は下記のように sendMessage API の呼び出しが 404 エラーとなり、API Gateway の実行ログに 404 Not Found エラーが記録されていました。

API Gateway の実行ログは有効化しておくと、こういう時に役立ちます。
イベントソースマッピング使用時の注意点
今回は Lambda 関数と SQS キューをイベントソースマッピングで接続しましたが、いくつか注意点があります。
ダウンストリーム側での冪等性確保
Lambda 関数が SQS キューからメッセージを消費する際のイベントソースマッピングの特性として、メッセージが少なくとも1回は処理されることが保証されている一方で、重複して処理される可能性がある点に注意が必要です。
Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、レコードの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。
よって、ダウンストリーム側の Lambda 関数でメッセージ処理の冪等性を確保する実装が必要となります。
レイテンシーの増加
経由するサービスが増えているため当然ではありますが、SQS キューを介在させることで、直接 API Gateway から Lambda 関数を呼び出す場合と比較してレイテンシーが増加します。
単一のデバイス上の同じメモリ空間内ですべてを処理できるモノリシックアプリケーションとは異なり、イベント駆動型アプリケーションはネットワーク間で通信します。この設計では、レイテンシーが変動します。レイテンシーを最小限に抑えるようにアプリケーションを設計することはできますが、モノリシックアプリケーションはほとんどの場合、スケーラビリティと可用性を犠牲にして、レイテンシーを低くするために最適化できます。
銀行の高頻度な取引用途や倉庫のミリ秒未満のロボットオートメーションなど、一貫した低レイテンシーパフォーマンスを必要とするワークロードは、イベント駆動型アーキテクチャの候補として適していません。
今回だと 100ms に満たない数値でしたが SQS キューを経由する際のレイテンシーが発生していました。この部分は可用性とのトレードオフとなるので、リスクを認識した上で採用を検討する必要があります。
Dead Letter Queue
Dead Letter Queue(DLQ) とは、メッセージの処理に失敗した場合に、そのメッセージを通常のキューから分離して保存するための専用の SQS キューです。
イベントソースマッピングで SQS キューを使用する場合、DLQ を設定することで、Lambda 関数がメッセージの処理に失敗した際に、そのメッセージを DLQ に移動させることができます。それにより、失敗したメッセージを後で分析・再処理することが可能となり、システムの信頼性と可用性を向上させることができます。

今回のような非同期処理のユースケースでは、DLQ の設定が推奨されているので、実装を検討することをお勧めします。
おわりに
API Gateway REST API を Amazon SQS と統合して、Lambda 関数を非同期に呼び出す構成を AWS CDK で実装してみました。
実装するにあたり、Amazon SQS や AWS X-Ray 、API Gateway の AWS 統合について理解を深める良い機会となりました。あと今回参考資料として多用した Black Belt 資料が非常に分かりやすくまとまっていて助かりました。
参考
X-Ray の非同期アプリケーションのトレース
以上







