Chalice+CDKでSQSのメッセージをLambdaで処理する

Chalice+CDKでSQSのメッセージをLambdaで処理する

SQSのメッセージをLambdaで処理します。 今回はChaliceとCDKを用いてシステムを構築します。
Clock Icon2022.03.31

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

準備

Chaliceで新規プロジェクトを作成します。 今回はプロジェクト名は「sqs-example」としました。 テンプレートはCDKのものを選択しています。

$ chalice new-project


   ___  _  _    _    _     ___  ___  ___
  / __|| || |  /_\  | |   |_ _|/ __|| __|
 | (__ | __ | / _ \ | |__  | || (__ | _|
  \___||_||_|/_/ \_\|____||___|\___||___|


The python serverless microframework for AWS allows
you to quickly create and deploy applications using
Amazon API Gateway and AWS Lambda.

Please enter the project name
[?] Enter the project name: sqs-example
[?] Select your project type: [CDK] Rest API with a DynamoDB table
   REST API
   S3 Event Handler
   Lambda Functions only
   Legacy REST API Template
 > [CDK] Rest API with a DynamoDB table

Your project has been generated in ./sqs-example

最終的には以下のようになります。

sqs-example/
├── README.rst
├── infrastructure
│   ├── app.py
│   ├── cdk.json
│   ├── requirements.txt
│   └── stacks
│       ├── __init__.py
│       ├── chaliceapp.py
│       └── sqs.py
├── requirements.txt
└── runtime
    ├── app.py
    └── requirements.txt

アプリケーションの用意

今回はLambdaでSQSをイベントソースとしてメッセージを処理します。

import logging

from chalice import Chalice

app = Chalice(app_name='sqs-example')
app.log.setLevel(logging.INFO)

@app.on_sqs_message(queue='chalice-queue', batch_size=10)
def handle_sqs_message(event):
    event = list(event)
    app.log.info(f"len(event): {len(event)}")
    for record in event:
        app.log.info(f"Body: {record.body}")

イベントの数とその内容を表示するだけのシンプルな関数です。

デコレータ@app.on_sqs_messageでソースのキューの名前とバッチサイズを指定しています。

インフラの用意

デプロイはCDKで行います。

今回はSQSを先にデプロイする必要があったのでマルチスタック構成にしました。

SQS

from aws_cdk import (
    aws_sqs as sqs,
    core as cdk
)

class ChaliceSQS(cdk.Stack):
    def __init__(self, scope, id, **kwargs):
        super().__init__(scope, id, **kwargs)
        dlq = sqs.DeadLetterQueue(
            max_receive_count=3,
            queue=sqs.Queue(self, "ChaliceDLQ")
        )

        self.queue = sqs.Queue(
            self,
            "ChaliceQueue",
            queue_name='chalice-queue',
            visibility_timeout=cdk.Duration.seconds(180),
            dead_letter_queue=dlq
        )

一応、デッドレターキューも設定しておきます。 注意点として可視性タイムアウト(visibility_timeout)がLambda関数の実行時間より短いとデプロイ時にエラーになります。

Chalice

import os

from aws_cdk import (
    core as cdk
)
from chalice.cdk import Chalice


RUNTIME_SOURCE_DIR = os.path.join(
    os.path.dirname(os.path.dirname(__file__)), os.pardir, 'runtime')


class ChaliceApp(cdk.Stack):

    def __init__(self, scope, id, queue, **kwargs):
        super().__init__(scope, id, **kwargs)
        self.chalice = Chalice(
            self, 'ChaliceApp', source_dir=RUNTIME_SOURCE_DIR,
            stage_config={
                'automatic_layer': True,
            }
        )
        queue.grant_consume_messages(self.chalice.get_role('DefaultRole'))

SQSのQueueを引数としてとって、このスタック内でイベントソースとする権限を与えています。

また、automatic_layerとすることでLambda関数のレイヤーをChaliceが自動で作成してくれます。

App

#!/usr/bin/env python3
from aws_cdk import core as cdk
from stacks.chaliceapp import ChaliceApp
from stacks.sqs import ChaliceSQS

app = cdk.App()
sqs = ChaliceSQS(app, 'chalice-sqs')
chalice = ChaliceApp(app, 'sqs-example', queue=sqs.queue)
chalice.add_dependency(sqs)

app.synth()

今回デプロイするLambda関数はSQSに依存するため、その依存関係を指定しておきます。

デプロイする

コードの準備が終わったのでデプロイします

$ cd infrastructure
$ cdk bootstrap
$ cdk deploy --all

実行

デプロイが完了したので実行してみます。

SQSにはAWS CLIからメッセージを送信します。

そのために、メッセージが入ったファイルを用意します。

[
  { "Id": "1", "MessageBody": "1" },
  { "Id": "2", "MessageBody": "2" },
  { "Id": "3", "MessageBody": "3" },
  { "Id": "4", "MessageBody": "4" },
  { "Id": "5", "MessageBody": "5" },
  { "Id": "6", "MessageBody": "6" },
  { "Id": "7", "MessageBody": "7" },
  { "Id": "8", "MessageBody": "8" },
  { "Id": "9", "MessageBody": "9" },
  { "Id": "10", "MessageBody": "10" }
]

このファイルを指定して実行します。10個のメッセージをバッチで書き込みます。

aws sqs send-message-batch --queue-url ${QUEUE_URL} --entries file://./msg.json

結果をCloudWatch Logsから確認してみます。

ログストリーム1
sqs-example - INFO - len(event): 2
sqs-example - INFO - Body: 2
sqs-example - INFO - Body: 4
sqs-example - INFO - len(event): 2
sqs-example - INFO - Body: 10
sqs-example - INFO - Body: 3

ログストリーム2
sqs-example - INFO - len(event): 4
sqs-example - INFO - Body: 8
sqs-example - INFO - Body: 5
sqs-example - INFO - Body: 1
sqs-example - INFO - Body: 9

ログストリーム3
sqs-example - INFO - len(event): 2
sqs-example - INFO - Body: 7
sqs-example - INFO - Body: 6

無事にメッセージが処理されていますね。 ただ、なぜかバッチ数は10としたのにEventはそれ以下に分割されています。 しかも、実行順はバラバラです。

これはLambdaとSQSの仕様が原因です。 実行順については標準キューではベストエフォートなため、保証されません。 実行順を制御したい場合はFIFOキューを使う必要があります。

分割されて実行されているのはSQSをソースとした時のLambdaの仕様によるものです。 マネージメントコンソールから確認するとわかるのですが、これはバッチのメッセージの「最大数」であって、関数の実行ごとに含まれるレコードの数の上限しか保証しません。 (本質的な原因はSQSのReceiveMessageのAPIの仕様によるものです)

おわりに

Chalice+CDKで簡単にSQSをイベントソースとしてLambdaを設定することができました。 Chaliceには他にもイベントソースがあるのでまた記事にしたいと思います。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.