この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
準備
Chaliceで新規プロジェクトを作成します。 今回はプロジェクト名は「sqs-example」としました。 テンプレートはCDKのものを選択しています。
プロジェクトの作成
$ chalice new-project
___ _ _ _ _ ___ ___ ___
/ __|| || | /_\ | | |_ _|/ __|| __|
| (__ | __ | / _ \ | |__ | || (__ | _|
\___||_||_|/_/ \_\|____||___|\___||___|
The python serverless microframework for AWS allows
you to quickly create and deploy applications using
Amazon API Gateway and AWS Lambda.
Please enter the project name
[?] Enter the project name: sqs-example
[?] Select your project type: [CDK] Rest API with a DynamoDB table
REST API
S3 Event Handler
Lambda Functions only
Legacy REST API Template
> [CDK] Rest API with a DynamoDB table
Your project has been generated in ./sqs-example
最終的には以下のようになります。
ディレクトリ構成
sqs-example/
├── README.rst
├── infrastructure
│ ├── app.py
│ ├── cdk.json
│ ├── requirements.txt
│ └── stacks
│ ├── __init__.py
│ ├── chaliceapp.py
│ └── sqs.py
├── requirements.txt
└── runtime
├── app.py
└── requirements.txt
アプリケーションの用意
今回はLambdaでSQSをイベントソースとしてメッセージを処理します。
runtime/app.py
import logging
from chalice import Chalice
app = Chalice(app_name='sqs-example')
app.log.setLevel(logging.INFO)
@app.on_sqs_message(queue='chalice-queue', batch_size=10)
def handle_sqs_message(event):
event = list(event)
app.log.info(f"len(event): {len(event)}")
for record in event:
app.log.info(f"Body: {record.body}")
イベントの数とその内容を表示するだけのシンプルな関数です。
デコレータ@app.on_sqs_message
でソースのキューの名前とバッチサイズを指定しています。
インフラの用意
デプロイはCDKで行います。
今回はSQSを先にデプロイする必要があったのでマルチスタック構成にしました。
SQS
infrastructure/stacks/sqs.py
from aws_cdk import (
aws_sqs as sqs,
core as cdk
)
class ChaliceSQS(cdk.Stack):
def __init__(self, scope, id, **kwargs):
super().__init__(scope, id, **kwargs)
dlq = sqs.DeadLetterQueue(
max_receive_count=3,
queue=sqs.Queue(self, "ChaliceDLQ")
)
self.queue = sqs.Queue(
self,
"ChaliceQueue",
queue_name='chalice-queue',
visibility_timeout=cdk.Duration.seconds(180),
dead_letter_queue=dlq
)
一応、デッドレターキューも設定しておきます。 注意点として可視性タイムアウト(visibility_timeout)がLambda関数の実行時間より短いとデプロイ時にエラーになります。
Chalice
infrastructure/stacks/chaliceapp.py
import os
from aws_cdk import (
core as cdk
)
from chalice.cdk import Chalice
RUNTIME_SOURCE_DIR = os.path.join(
os.path.dirname(os.path.dirname(__file__)), os.pardir, 'runtime')
class ChaliceApp(cdk.Stack):
def __init__(self, scope, id, queue, **kwargs):
super().__init__(scope, id, **kwargs)
self.chalice = Chalice(
self, 'ChaliceApp', source_dir=RUNTIME_SOURCE_DIR,
stage_config={
'automatic_layer': True,
}
)
queue.grant_consume_messages(self.chalice.get_role('DefaultRole'))
SQSのQueueを引数としてとって、このスタック内でイベントソースとする権限を与えています。
また、automatic_layer
とすることでLambda関数のレイヤーをChaliceが自動で作成してくれます。
App
infrastructure/app.py
#!/usr/bin/env python3
from aws_cdk import core as cdk
from stacks.chaliceapp import ChaliceApp
from stacks.sqs import ChaliceSQS
app = cdk.App()
sqs = ChaliceSQS(app, 'chalice-sqs')
chalice = ChaliceApp(app, 'sqs-example', queue=sqs.queue)
chalice.add_dependency(sqs)
app.synth()
今回デプロイするLambda関数はSQSに依存するため、その依存関係を指定しておきます。
デプロイする
コードの準備が終わったのでデプロイします
デプロイ
$ cd infrastructure
$ cdk bootstrap
$ cdk deploy --all
実行
デプロイが完了したので実行してみます。
SQSにはAWS CLIからメッセージを送信します。
そのために、メッセージが入ったファイルを用意します。
msg.json
[
{ "Id": "1", "MessageBody": "1" },
{ "Id": "2", "MessageBody": "2" },
{ "Id": "3", "MessageBody": "3" },
{ "Id": "4", "MessageBody": "4" },
{ "Id": "5", "MessageBody": "5" },
{ "Id": "6", "MessageBody": "6" },
{ "Id": "7", "MessageBody": "7" },
{ "Id": "8", "MessageBody": "8" },
{ "Id": "9", "MessageBody": "9" },
{ "Id": "10", "MessageBody": "10" }
]
このファイルを指定して実行します。10個のメッセージをバッチで書き込みます。
メッセージの送信
aws sqs send-message-batch --queue-url ${QUEUE_URL} --entries file://./msg.json
結果をCloudWatch Logsから確認してみます。
結果の確認
ログストリーム1
sqs-example - INFO - len(event): 2
sqs-example - INFO - Body: 2
sqs-example - INFO - Body: 4
sqs-example - INFO - len(event): 2
sqs-example - INFO - Body: 10
sqs-example - INFO - Body: 3
ログストリーム2
sqs-example - INFO - len(event): 4
sqs-example - INFO - Body: 8
sqs-example - INFO - Body: 5
sqs-example - INFO - Body: 1
sqs-example - INFO - Body: 9
ログストリーム3
sqs-example - INFO - len(event): 2
sqs-example - INFO - Body: 7
sqs-example - INFO - Body: 6
無事にメッセージが処理されていますね。 ただ、なぜかバッチ数は10としたのにEventはそれ以下に分割されています。 しかも、実行順はバラバラです。
これはLambdaとSQSの仕様が原因です。 実行順については標準キューではベストエフォートなため、保証されません。 実行順を制御したい場合はFIFOキューを使う必要があります。
分割されて実行されているのはSQSをソースとした時のLambdaの仕様によるものです。 マネージメントコンソールから確認するとわかるのですが、これはバッチのメッセージの「最大数」であって、関数の実行ごとに含まれるレコードの数の上限しか保証しません。 (本質的な原因はSQSのReceiveMessageのAPIの仕様によるものです)
おわりに
Chalice+CDKで簡単にSQSをイベントソースとしてLambdaを設定することができました。 Chaliceには他にもイベントソースがあるのでまた記事にしたいと思います。