Chalice+CDKでSQSのメッセージをLambdaで処理する

SQSのメッセージをLambdaで処理します。 今回はChaliceとCDKを用いてシステムを構築します。
2022.03.31

準備

Chaliceで新規プロジェクトを作成します。 今回はプロジェクト名は「sqs-example」としました。 テンプレートはCDKのものを選択しています。

プロジェクトの作成

$ chalice new-project


   ___  _  _    _    _     ___  ___  ___
  / __|| || |  /_\  | |   |_ _|/ __|| __|
 | (__ | __ | / _ \ | |__  | || (__ | _|
  \___||_||_|/_/ \_\|____||___|\___||___|


The python serverless microframework for AWS allows
you to quickly create and deploy applications using
Amazon API Gateway and AWS Lambda.

Please enter the project name
[?] Enter the project name: sqs-example
[?] Select your project type: [CDK] Rest API with a DynamoDB table
   REST API
   S3 Event Handler
   Lambda Functions only
   Legacy REST API Template
 > [CDK] Rest API with a DynamoDB table

Your project has been generated in ./sqs-example

最終的には以下のようになります。

ディレクトリ構成

sqs-example/
├── README.rst
├── infrastructure
│   ├── app.py
│   ├── cdk.json
│   ├── requirements.txt
│   └── stacks
│       ├── __init__.py
│       ├── chaliceapp.py
│       └── sqs.py
├── requirements.txt
└── runtime
    ├── app.py
    └── requirements.txt

アプリケーションの用意

今回はLambdaでSQSをイベントソースとしてメッセージを処理します。

runtime/app.py

import logging

from chalice import Chalice

app = Chalice(app_name='sqs-example')
app.log.setLevel(logging.INFO)

@app.on_sqs_message(queue='chalice-queue', batch_size=10)
def handle_sqs_message(event):
    event = list(event)
    app.log.info(f"len(event): {len(event)}")
    for record in event:
        app.log.info(f"Body: {record.body}")

イベントの数とその内容を表示するだけのシンプルな関数です。

デコレータ@app.on_sqs_messageでソースのキューの名前とバッチサイズを指定しています。

インフラの用意

デプロイはCDKで行います。

今回はSQSを先にデプロイする必要があったのでマルチスタック構成にしました。

SQS

infrastructure/stacks/sqs.py

from aws_cdk import (
    aws_sqs as sqs,
    core as cdk
)

class ChaliceSQS(cdk.Stack):
    def __init__(self, scope, id, **kwargs):
        super().__init__(scope, id, **kwargs)
        dlq = sqs.DeadLetterQueue(
            max_receive_count=3,
            queue=sqs.Queue(self, "ChaliceDLQ")
        )

        self.queue = sqs.Queue(
            self,
            "ChaliceQueue",
            queue_name='chalice-queue',
            visibility_timeout=cdk.Duration.seconds(180),
            dead_letter_queue=dlq
        )

一応、デッドレターキューも設定しておきます。 注意点として可視性タイムアウト(visibility_timeout)がLambda関数の実行時間より短いとデプロイ時にエラーになります。

Chalice

infrastructure/stacks/chaliceapp.py

import os

from aws_cdk import (
    core as cdk
)
from chalice.cdk import Chalice


RUNTIME_SOURCE_DIR = os.path.join(
    os.path.dirname(os.path.dirname(__file__)), os.pardir, 'runtime')


class ChaliceApp(cdk.Stack):

    def __init__(self, scope, id, queue, **kwargs):
        super().__init__(scope, id, **kwargs)
        self.chalice = Chalice(
            self, 'ChaliceApp', source_dir=RUNTIME_SOURCE_DIR,
            stage_config={
                'automatic_layer': True,
            }
        )
        queue.grant_consume_messages(self.chalice.get_role('DefaultRole'))

SQSのQueueを引数としてとって、このスタック内でイベントソースとする権限を与えています。

また、automatic_layerとすることでLambda関数のレイヤーをChaliceが自動で作成してくれます。

App

infrastructure/app.py

#!/usr/bin/env python3
from aws_cdk import core as cdk
from stacks.chaliceapp import ChaliceApp
from stacks.sqs import ChaliceSQS

app = cdk.App()
sqs = ChaliceSQS(app, 'chalice-sqs')
chalice = ChaliceApp(app, 'sqs-example', queue=sqs.queue)
chalice.add_dependency(sqs)

app.synth()

今回デプロイするLambda関数はSQSに依存するため、その依存関係を指定しておきます。

デプロイする

コードの準備が終わったのでデプロイします

デプロイ

$ cd infrastructure
$ cdk bootstrap
$ cdk deploy --all

実行

デプロイが完了したので実行してみます。

SQSにはAWS CLIからメッセージを送信します。

そのために、メッセージが入ったファイルを用意します。

msg.json

[
  { "Id": "1", "MessageBody": "1" },
  { "Id": "2", "MessageBody": "2" },
  { "Id": "3", "MessageBody": "3" },
  { "Id": "4", "MessageBody": "4" },
  { "Id": "5", "MessageBody": "5" },
  { "Id": "6", "MessageBody": "6" },
  { "Id": "7", "MessageBody": "7" },
  { "Id": "8", "MessageBody": "8" },
  { "Id": "9", "MessageBody": "9" },
  { "Id": "10", "MessageBody": "10" }
]

このファイルを指定して実行します。10個のメッセージをバッチで書き込みます。

メッセージの送信

aws sqs send-message-batch --queue-url ${QUEUE_URL} --entries file://./msg.json

結果をCloudWatch Logsから確認してみます。

結果の確認

ログストリーム1
sqs-example - INFO - len(event): 2
sqs-example - INFO - Body: 2
sqs-example - INFO - Body: 4
sqs-example - INFO - len(event): 2
sqs-example - INFO - Body: 10
sqs-example - INFO - Body: 3

ログストリーム2
sqs-example - INFO - len(event): 4
sqs-example - INFO - Body: 8
sqs-example - INFO - Body: 5
sqs-example - INFO - Body: 1
sqs-example - INFO - Body: 9

ログストリーム3
sqs-example - INFO - len(event): 2
sqs-example - INFO - Body: 7
sqs-example - INFO - Body: 6

無事にメッセージが処理されていますね。 ただ、なぜかバッチ数は10としたのにEventはそれ以下に分割されています。 しかも、実行順はバラバラです。

これはLambdaとSQSの仕様が原因です。 実行順については標準キューではベストエフォートなため、保証されません。 実行順を制御したい場合はFIFOキューを使う必要があります。

分割されて実行されているのはSQSをソースとした時のLambdaの仕様によるものです。 マネージメントコンソールから確認するとわかるのですが、これはバッチのメッセージの「最大数」であって、関数の実行ごとに含まれるレコードの数の上限しか保証しません。 (本質的な原因はSQSのReceiveMessageのAPIの仕様によるものです)

おわりに

Chalice+CDKで簡単にSQSをイベントソースとしてLambdaを設定することができました。 Chaliceには他にもイベントソースがあるのでまた記事にしたいと思います。

参考