SQSとLambdaで実装する直列処理

2020.06.19

CX事業本部の夏目です。

今回は、SQSとLambdaを使って直列処理を実装する方法について紹介します。

方法

  • Lambda Functionの予約された同時実行数を 1に制限する
  • Lambda FunctionのトリガーとしてSQSを指定する

これだけで簡単にLambdaを直列に動かすことができます。

通常のSQSのキューではキューに入った順序で実行されるとは限りませんが、FIFOキューを使えば先入れ先出しの順序で処理することができます。
また、FIFOキューを使っていてLambda Functionがエラー終了した場合でも、先入れ先出し順序は保たれます。

まとめ

S3のファイルを扱うときなど、直列的に処理を行う必要がある場合に使うことができると思います。

今回検証した内容では、同時実行数1でもLambdaでスロットルが発生しない規模のものを想定しています。 実際には1件処理するのにかかる時間やSQSが一度に受け取ることができるメッセージの上限などの制約が存在するため、一度に大量のメッセージが飛んできた場合など、うまく処理が走らない場合があるのでご注意ください。

説明 / 検証

なんでLambda Functionの予約された同時実行数を 1に制限するの?

SQSトリガーのLambdaはSQSに入っているデータを一気に処理できるよう、最大限にスケールして動きます。
例えば、バッチサイズが3でキューに50個データが有る場合、Lambdaが 17個並列で起動します。

予約された同時実行数を1にしないと、Lambdaが複数起動されてしまい直列に処理ができなくなります。

FIFOキューをトリガーに使う際、Lambdaでエラーが起きても順序は保証されるの?

これは実際にLambdaを動かして確認してみました。

FIFOキューの準備

SQSのFIFOキューを作成します。

lambda_serial_test.fifoという名前を指定して、ここではキューのクイック作成をクリックして作成します。

作成したキューに対して、キュー操作メッセージを送信から事前に複数のメッセージをキューに入れておきます。

5件のメッセージを準備しました。

Lambda関数の準備

次にLambdaを作っていきます。

FIFOキューと同様に、sqs_serial_testという名前でPython3.7を使用するLambdaを作成します。
SQSトリガーのLambdaになるので、ポリシーテンプレートからSQSのポーリングアクセス権限を付与しておきます。

作成したら、Lambdaのコードを書き換え、保存します。

lambda_function.py

from random import randrange
import json

RANGE_MAX: int = 100
THRESHOLD: int = 50
TAG = "AA635220-6123-42A9-9E8F-7142A8D126E4"


def puts(data: str, flag: bool):
    message = f"{TAG} ({'success' if flag else ' failed'}): {data}"
    print(message)


def lambda_handler(event, context):
    text = event["Records"][0]["body"]
    flag = randrange(RANGE_MAX) > THRESHOLD
    puts(text, flag)
    if not flag:
        raise Exception()

ざっくり何をしているかと言うと、 - 50%の確率で例外を投げる (エラー終了する) - 正常に終了するのか、エラー終了するのか、またその際のキューの中身は何か、をログに出力する

次に、事前に作成しデータも入れておいたFIFOキューをトリガーに設定します。

バッチサイズを1にして、1件ずつ処理するようにします。
トリガーの有効化にチェックが入った状態で追加して、すぐに動かします。

しばらく待ってから、CloudWatch LogsにあるLambdaのログを見に行きます。
/aws/lambda/sqs_serial_testのロググループを開き、ログストリームを開きます。

検索窓(イベントをフィルターとプレースホルダーが見えるテキストボックス)に "AA635220-6123-42A9-9E8F-7142A8D126E4"と入力し、エンターを押して検索します。

見ると、4thというデータのときにLambdaがエラー終了していることがわかります。
しかし、その次が5thではなく、もう一度4thというデータを取り出しています。

このことから、Lambdaがエラー終了してもFIFOキューの順番は保証されていることがわかります。