[アップデート] AWS LambdaがAmazon SQSをイベントソースとしたバッチ処理で部分応答をサポートしました

Amazon SQSをイベントソースとしたLambda関数のバッチ処理において、一部の失敗したメッセージのIDを返すことで成功したメッセージは削除され失敗したメッセージはキューに残せるようになりました。
2021.11.25

こんにちは。サービスグループの武田です。

AWS LambdaはPythonやJavaScriptで書かれた処理をクラウド上で実行できるAWSのFaaSです。さまざまなサービスと統合されていますが、そのひとつとしてAmazon SQSがあります。イベントソースマッピングという機能を使用することで、SQSへのメッセージ送信をトリガーとしてLambda関数を実行できます。SQSをイベントソースとしたLambda関数のトリガーは、バッチ処理として複数のメッセージをまとめて処理させることができます。さて複数のメッセージをまとめて処理する場合に起こる問題として、一部は成功するが一部は失敗してしまったというものです。

これまでは関数が成功すればすべてのメッセージが成功したとしてキューから削除。関数が失敗すればすべてのメッセージが失敗したとしてキューに残される。のどちらかしかありませんでした。そのため、対応として関数内のエラーハンドリングの一部として成功したメッセージについて個別に削除する実装をする必要がありました。

今回のアップデートで上記のような個別削除を自前実装する必要がなくなり、単に失敗したメッセージのIDを返せばよくなりました。

リソースの作成

それでは実際に試してみましょう。今回は次のリソースをマネジメントコンソールで作成して検証してみます。

  • TestQueue
    • イベントソースとなるSQSキュー
  • TestQueueDLQ
    • 失敗したメッセージ退避用のデッドレターキュー
  • testFunction
    • TestQueueからトリガーされるLambda関数

TestQueueDLQ作成

順序としてDLQを先に作ります。名前をTestQueueDLQとし、そのほかはすべてデフォルト設定で作成します。

TestQueue作成

次にイベントソースとなるSQSキューを作成します。名前はTestQueueです。

デッドレターキューとして先ほど作成したTestQueueDLQを指定し、最大受信数は検証目的のため1とします。ほかはデフォルト設定で作成します。

testFunction作成

続いてトリガーされるLambda関数を作成します。ゼロからやるにはロールの作成などが面倒なためブループリントを活用します。 sqs-poller という設計図があるため、それを選択しましょう。

関数名をtestFunction、ロール名をtestFunction-roleとします。

SQSトリガーの設定として、SQSキューを先ほど作成したTestQueue、バッチウィンドウは10とします。そしてもっとも大事な設定として、追加設定にある バッチ項目の失敗を報告する のチェックをONにしてください。これを忘れると部分応答の機能が有効になりません。

Lambda関数のコードもひとまずそのままで作成しましょう。

トリガーの設定を確認してみるとfunctionResponseTypesがReportBatchItemFailuresとなっていることが確認できます。ついでにこのタイミングでトリガーも有効化しておきましょう。

トリガーの動作確認

ここまでの設定が問題ないか、一度キューにメッセージを送信してみましょう。

送信してしばらく待ったらCloudWatch Logsに出ていました。問題なさそうです。

部分応答を試してみる

それではいよいよ部分応答の確認です。まずはデプロイしたLambda関数を書き換えます。今回は次のようなコードにしました。Test Message xxxxが偶数であれば失敗として扱います。

console.log('Loading function');

exports.handler = async (event) => {
    return { 
        "batchItemFailures": event.Records
            .filter(({ body }) => +body.replace('Test Message ', '') % 2 === 0)
            .map(({ messageId }) => ({
                "itemIdentifier": messageId
            }))
    };
};

関数の結果は次の書式である必要があります。id2などは失敗したメッセージのmessageIdです。先ほどのコードは偶数のメッセージのIDだけを集めて、この形式のレスポンスを組み立てます。

{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "id2"
        },
        {
            "itemIdentifier": "id4"
        }
    ]
}

準備ができたらメッセージを送ってみます。次のようなワンライナーで50個のメッセージをSQSに送信してみます。偶数は半分の25個ですので、25個のメッセージは正常に処理され、もう25個のメッセージは失敗しDLQに移されれば成功です。

seq 50 | xargs -I{} -n 1 -P 10 aws sqs send-message --queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/TestQueue --message-body "Test Message {}" > /dev/null

コマンド実行直後はTestQueueに50個のメッセージが入っています。

しばらく待っていると処理が終わります。結果はDLQに25個のメッセージが移され、意図した結果となりました。

まとめ

一部のメッセージが失敗した場合の対処は、これまで関数内で個別にメッセージを削除する必要がありました。それをサポートするライブラリなどもありましたが、やはりマネージドサービスで面倒を見てくれるのは安心感があります。SQSとLambdaで組まれたバッチシステムがありましたらぜひ見直してみてください。

参考URL