Chalice+CDKでSQSのメッセージをLambdaで処理する
準備
Chaliceで新規プロジェクトを作成します。 今回はプロジェクト名は「sqs-example」としました。 テンプレートはCDKのものを選択しています。
$ chalice new-project ___ _ _ _ _ ___ ___ ___ / __|| || | /_\ | | |_ _|/ __|| __| | (__ | __ | / _ \ | |__ | || (__ | _| \___||_||_|/_/ \_\|____||___|\___||___| The python serverless microframework for AWS allows you to quickly create and deploy applications using Amazon API Gateway and AWS Lambda. Please enter the project name [?] Enter the project name: sqs-example [?] Select your project type: [CDK] Rest API with a DynamoDB table REST API S3 Event Handler Lambda Functions only Legacy REST API Template > [CDK] Rest API with a DynamoDB table Your project has been generated in ./sqs-example
最終的には以下のようになります。
sqs-example/ ├── README.rst ├── infrastructure │ ├── app.py │ ├── cdk.json │ ├── requirements.txt │ └── stacks │ ├── __init__.py │ ├── chaliceapp.py │ └── sqs.py ├── requirements.txt └── runtime ├── app.py └── requirements.txt
アプリケーションの用意
今回はLambdaでSQSをイベントソースとしてメッセージを処理します。
import logging from chalice import Chalice app = Chalice(app_name='sqs-example') app.log.setLevel(logging.INFO) @app.on_sqs_message(queue='chalice-queue', batch_size=10) def handle_sqs_message(event): event = list(event) app.log.info(f"len(event): {len(event)}") for record in event: app.log.info(f"Body: {record.body}")
イベントの数とその内容を表示するだけのシンプルな関数です。
デコレータ@app.on_sqs_message
でソースのキューの名前とバッチサイズを指定しています。
インフラの用意
デプロイはCDKで行います。
今回はSQSを先にデプロイする必要があったのでマルチスタック構成にしました。
SQS
from aws_cdk import ( aws_sqs as sqs, core as cdk ) class ChaliceSQS(cdk.Stack): def __init__(self, scope, id, **kwargs): super().__init__(scope, id, **kwargs) dlq = sqs.DeadLetterQueue( max_receive_count=3, queue=sqs.Queue(self, "ChaliceDLQ") ) self.queue = sqs.Queue( self, "ChaliceQueue", queue_name='chalice-queue', visibility_timeout=cdk.Duration.seconds(180), dead_letter_queue=dlq )
一応、デッドレターキューも設定しておきます。 注意点として可視性タイムアウト(visibility_timeout)がLambda関数の実行時間より短いとデプロイ時にエラーになります。
Chalice
import os from aws_cdk import ( core as cdk ) from chalice.cdk import Chalice RUNTIME_SOURCE_DIR = os.path.join( os.path.dirname(os.path.dirname(__file__)), os.pardir, 'runtime') class ChaliceApp(cdk.Stack): def __init__(self, scope, id, queue, **kwargs): super().__init__(scope, id, **kwargs) self.chalice = Chalice( self, 'ChaliceApp', source_dir=RUNTIME_SOURCE_DIR, stage_config={ 'automatic_layer': True, } ) queue.grant_consume_messages(self.chalice.get_role('DefaultRole'))
SQSのQueueを引数としてとって、このスタック内でイベントソースとする権限を与えています。
また、automatic_layer
とすることでLambda関数のレイヤーをChaliceが自動で作成してくれます。
App
#!/usr/bin/env python3 from aws_cdk import core as cdk from stacks.chaliceapp import ChaliceApp from stacks.sqs import ChaliceSQS app = cdk.App() sqs = ChaliceSQS(app, 'chalice-sqs') chalice = ChaliceApp(app, 'sqs-example', queue=sqs.queue) chalice.add_dependency(sqs) app.synth()
今回デプロイするLambda関数はSQSに依存するため、その依存関係を指定しておきます。
デプロイする
コードの準備が終わったのでデプロイします
$ cd infrastructure $ cdk bootstrap $ cdk deploy --all
実行
デプロイが完了したので実行してみます。
SQSにはAWS CLIからメッセージを送信します。
そのために、メッセージが入ったファイルを用意します。
[ { "Id": "1", "MessageBody": "1" }, { "Id": "2", "MessageBody": "2" }, { "Id": "3", "MessageBody": "3" }, { "Id": "4", "MessageBody": "4" }, { "Id": "5", "MessageBody": "5" }, { "Id": "6", "MessageBody": "6" }, { "Id": "7", "MessageBody": "7" }, { "Id": "8", "MessageBody": "8" }, { "Id": "9", "MessageBody": "9" }, { "Id": "10", "MessageBody": "10" } ]
このファイルを指定して実行します。10個のメッセージをバッチで書き込みます。
aws sqs send-message-batch --queue-url ${QUEUE_URL} --entries file://./msg.json
結果をCloudWatch Logsから確認してみます。
ログストリーム1 sqs-example - INFO - len(event): 2 sqs-example - INFO - Body: 2 sqs-example - INFO - Body: 4 sqs-example - INFO - len(event): 2 sqs-example - INFO - Body: 10 sqs-example - INFO - Body: 3 ログストリーム2 sqs-example - INFO - len(event): 4 sqs-example - INFO - Body: 8 sqs-example - INFO - Body: 5 sqs-example - INFO - Body: 1 sqs-example - INFO - Body: 9 ログストリーム3 sqs-example - INFO - len(event): 2 sqs-example - INFO - Body: 7 sqs-example - INFO - Body: 6
無事にメッセージが処理されていますね。 ただ、なぜかバッチ数は10としたのにEventはそれ以下に分割されています。 しかも、実行順はバラバラです。
これはLambdaとSQSの仕様が原因です。 実行順については標準キューではベストエフォートなため、保証されません。 実行順を制御したい場合はFIFOキューを使う必要があります。
分割されて実行されているのはSQSをソースとした時のLambdaの仕様によるものです。 マネージメントコンソールから確認するとわかるのですが、これはバッチのメッセージの「最大数」であって、関数の実行ごとに含まれるレコードの数の上限しか保証しません。 (本質的な原因はSQSのReceiveMessageのAPIの仕様によるものです)
おわりに
Chalice+CDKで簡単にSQSをイベントソースとしてLambdaを設定することができました。 Chaliceには他にもイベントソースがあるのでまた記事にしたいと思います。