この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Introduction
皆さんご存知のとおり、AWS Lambdaとはサーバーレスでプログラムを実行できる
環境を提供するAWSのサービスです。
実行したい処理を関数として定義すればそのまま実行できますが、
S3/DynamoDB/Kinesis/SNS/SQSなどをトリガーとしてLambdaを起動させることも可能です。
本稿ではSQSをトリガーとしてLambdaを実行させてみます。
(先日この仕組みで確認事項があり、試してみたのでそのアウトプット)
SQS + Lambdaの仕組み
仕組みは単純です。
イベントソースマッピングを行い、SQSとLambdaを関連付けるだけです。
Lambdaは自動的にスケールされ、設定により細かく制御可能です。
イベントソースマッピングは、現状標準キューとFIFOキューがサポートされています。
Lambda はキューをポーリングし、キューからメッセージを取得すると
Lambda関数を実行します。Lambda関数が正常に処理を完了すると、
キューから対象のメッセージを自動的に削除します。
Environment
- Python : 3.8.8
- Node : v16.2.0
- aws-cli : 2.2.5 Python/3.8.8
serverless?
今回はServerless Frameworkを使用します。
これはServerlessなアプリを構成管理デプロイするためのツールで、AWS Lambda以外にも
Google CloudFunctionsやAzure Functionに対応してます。
Setup & Create Example
npmでserverlessフレームワークをインストールします。
versionが表示できたらインストールOKです。
% npm install -g serverless
% serverless --version
Framework Core: 2.51.2
Plugin: 5.4.3
SDK: 4.2.4
Components: 3.13.3
そしてこのへんをみて
IAMユーザーの作成やらcredentialsの設定をします。
% sls create -t aws-python3 -p sqs-lambda
Serverless: Generating boilerplate...
Serverless: Generating boilerplate in "sqs-lambda"
_______ __
| _ .-----.----.--.--.-----.----| .-----.-----.-----.
| |___| -__| _| | | -__| _| | -__|__ --|__ --|
|____ |_____|__| \___/|_____|__| |__|_____|_____|_____|
| | | The Serverless Application Framework
| | serverless.com, v2.51.2
-------'
Serverless: Successfully generated boilerplate for template: "aws-python3"
deployコマンド一発でかんたんにデプロイできます。
% sls deploy
Serverless: Packaging service...
Serverless: Excluding development dependencies...
Serverless: Uploading CloudFormation file to S3...
Serverless: Uploading artifacts...
Serverless: Uploading service sqs-lambda.zip file to S3 (748 B)...
.............
デプロイした関数をテスト。
ちゃんと動作してますね。
% sls invoke -f hello
{
"statusCode": 200,
"body": "{\"message\": \"Go Serverless v1.0! Your function executed successfully!\", \"input\": {}}"
}
ではserverless.ymlでSQSをイベントソースにしてLambdaを起動するようにしてみます。
(必要なところだけ抜粋)
IAMの設定でSQSのアクセスを許可。
EventSourceQueueとはResource部分で作成するキューを参照しています。
・・・
iam:
role:
statements:
- Effect: "Allow"
Action:
- "sqs:SendMessage"
- "sqs:GetQueueUrl"
Resource: "arn:aws:sqs:${self:provider.region}:<AWSアカウントID>:EventSourceQueue"
- Effect: "Allow"
Action:
- "sqs:ListQueues"
Resource: "arn:aws:sqs:${self:provider.region}:<AWSアカウントID>:*"
・・・
ResourcesでSQSキューを作成し、eventsでそのキューを指定します。
・・・
functions:
hello:
handler: handler.sqs_lambda
events:
- sqs:
arn:
Fn::GetAtt:
- EventSourceQueue
- Arn
resources:
Resources:
EventSourceQueue:
Type: "AWS::SQS::Queue"
Properties:
QueueName: "EventSourceQueue"
設定ファイルができたらLambdaプログラムを記述します。
メッセージはeventから送られてくるので、eventからメッセージを取り出します。
メッセージ数は、最大10件取得することが可能です。
※設定やタイミングに依存する
import json
def sqs_lambda(event, context):
body = {
"messages": [],
"input": event
}
for record in event['Records']:
body['messages'].append(record["body"])
response = {
"statusCode": 200,
"body": json.dumps(body)
}
print(response["body"])
return response
プログラムの記述ができたらdeployします。
% sls deploy
CLIで、さきほど作成したキューにメッセージを送ります。
% aws sqs send-message --queue-url https://us-east-1.queue.amazonaws.com/<AWSアカウントID>/EventSourceQueue --message-body "Foo"
LambdaからCloudwatchログを確認すると、
Lambdaが起動してメッセージを取得しているのがわかります。
その後、使い終わったら削除しておきましょう。
% sls remove
setting SQS + Lambda
Lambdaは、SQSに到着するメッセージの数に応じて(リミットまで)
自動的にスケーリングします。
任意のパラメータを設定することでスケーリングの制限をおこなったり
同時実行数を制御できたりします。
これらのパラメータもserverles.ymlで指定可能です。
SQS parameters
SQS側で設定できるパラメータについていくつか紹介します。
-
receiveMessageWaitTimeSeconds
Lambdaがメッセージをポーリングして応答を返す前に待機する時間(1〜20秒)です。
0を指定すればショートポーリングになります。 -
visibilityTimeout
メッセージがキューから読み取られた後、
メッセージがconsumerに表示されない時間の長さです。
この値は、Lambda関数に設定したタイムアウトの6倍以上推奨とのことです。 -
maxReceiveCount
メッセージをDeadLetterQueue(処理できなかったメッセージが送られるキュー) に移動する前に、送信元キューに表示されるように配信できる回数です。
5以上推奨とのこと。 -
maximumBatchingWindow
Lamba関数を起動する前に、SQSのレコードを集めるための最大時間です。
最大300秒まで設定可能です。
Lambda parameters
-
reservedConcurrency
予約済みの同時実行制限です。
同時に実行できるLambda関数の実行数を設定します。
5以上推奨とのこと。 -
batchSize
Lambdaが1回のバッチでSQSキューから取得するメッセージの最大数です。
※指定しても、必ず最大数とれるとは限らない