[serverless] AWS LambdaのイベントソースにSQSを使う

2021.07.16

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Introduction

皆さんご存知のとおり、AWS Lambdaとはサーバーレスでプログラムを実行できる
環境を提供するAWSのサービスです。
実行したい処理を関数として定義すればそのまま実行できますが、
S3/DynamoDB/Kinesis/SNS/SQSなどをトリガーとしてLambdaを起動させることも可能です。

本稿ではSQSをトリガーとしてLambdaを実行させてみます。
(先日この仕組みで確認事項があり、試してみたのでそのアウトプット)  

SQS + Lambdaの仕組み

仕組みは単純です。
イベントソースマッピングを行い、SQSとLambdaを関連付けるだけです。
Lambdaは自動的にスケールされ、設定により細かく制御可能です。
イベントソースマッピングは、現状標準キューとFIFOキューがサポートされています。
Lambda はキューをポーリングし、キューからメッセージを取得すると
Lambda関数を実行します。Lambda関数が正常に処理を完了すると、
キューから対象のメッセージを自動的に削除します。

Environment

  • Python : 3.8.8
  • Node : v16.2.0
  • aws-cli : 2.2.5 Python/3.8.8

serverless?

今回はServerless Frameworkを使用します。
これはServerlessなアプリを構成管理デプロイするためのツールで、AWS Lambda以外にも
Google CloudFunctionsやAzure Functionに対応してます。

Setup & Create Example

npmでserverlessフレームワークをインストールします。
versionが表示できたらインストールOKです。

% npm install -g serverless
% serverless --version
Framework Core: 2.51.2
Plugin: 5.4.3
SDK: 4.2.4
Components: 3.13.3

そしてこのへんをみて
IAMユーザーの作成やらcredentialsの設定をします。

% sls create -t aws-python3 -p sqs-lambda
Serverless: Generating boilerplate...
Serverless: Generating boilerplate in "sqs-lambda"
 _______                             __
|   _   .-----.----.--.--.-----.----|  .-----.-----.-----.
|   |___|  -__|   _|  |  |  -__|   _|  |  -__|__ --|__ --|
|____   |_____|__|  \___/|_____|__| |__|_____|_____|_____|
|   |   |             The Serverless Application Framework
|       |                           serverless.com, v2.51.2
 -------'

Serverless: Successfully generated boilerplate for template: "aws-python3"

deployコマンド一発でかんたんにデプロイできます。

% sls deploy
Serverless: Packaging service...
Serverless: Excluding development dependencies...
Serverless: Uploading CloudFormation file to S3...
Serverless: Uploading artifacts...
Serverless: Uploading service sqs-lambda.zip file to S3 (748 B)...
.............

デプロイした関数をテスト。
ちゃんと動作してますね。

% sls invoke -f hello
{
    "statusCode": 200,
    "body": "{\"message\": \"Go Serverless v1.0! Your function executed successfully!\", \"input\": {}}"
}

ではserverless.ymlでSQSをイベントソースにしてLambdaを起動するようにしてみます。
(必要なところだけ抜粋)

IAMの設定でSQSのアクセスを許可。
EventSourceQueueとはResource部分で作成するキューを参照しています。

・・・
  iam:
    role:
      statements:
        - Effect: "Allow"
          Action:
            - "sqs:SendMessage"
            - "sqs:GetQueueUrl"
          Resource: "arn:aws:sqs:${self:provider.region}:<AWSアカウントID>:EventSourceQueue"
        - Effect: "Allow"
          Action:
            - "sqs:ListQueues"
          Resource: "arn:aws:sqs:${self:provider.region}:<AWSアカウントID>:*"
・・・

ResourcesでSQSキューを作成し、eventsでそのキューを指定します。

・・・
functions:
  hello:
    handler: handler.sqs_lambda
    events:
      - sqs:
          arn:
            Fn::GetAtt:
              - EventSourceQueue
              - Arn
resources:
 Resources:
   EventSourceQueue:
     Type: "AWS::SQS::Queue"
     Properties:
       QueueName: "EventSourceQueue"

設定ファイルができたらLambdaプログラムを記述します。
メッセージはeventから送られてくるので、eventからメッセージを取り出します。
メッセージ数は、最大10件取得することが可能です。
※設定やタイミングに依存する

import json


def sqs_lambda(event, context):
    body = {
        "messages": [],
        "input": event
    }

    for record in event['Records']:
        body['messages'].append(record["body"])


    response = {
        "statusCode": 200,
        "body": json.dumps(body)
    }

    print(response["body"])

    return response

プログラムの記述ができたらdeployします。

% sls deploy

CLIで、さきほど作成したキューにメッセージを送ります。

% aws sqs send-message --queue-url https://us-east-1.queue.amazonaws.com/<AWSアカウントID>/EventSourceQueue --message-body "Foo"

LambdaからCloudwatchログを確認すると、
Lambdaが起動してメッセージを取得しているのがわかります。

その後、使い終わったら削除しておきましょう。

% sls remove

setting SQS + Lambda

Lambdaは、SQSに到着するメッセージの数に応じて(リミットまで)
自動的にスケーリングします。
任意のパラメータを設定することでスケーリングの制限をおこなったり
同時実行数を制御できたりします。
これらのパラメータもserverles.ymlで指定可能です。

SQS parameters

SQS側で設定できるパラメータについていくつか紹介します。

  • receiveMessageWaitTimeSeconds
    Lambdaがメッセージをポーリングして応答を返す前に待機する時間(1〜20秒)です。
    0を指定すればショートポーリングになります。

  • visibilityTimeout
    メッセージがキューから読み取られた後、
    メッセージがconsumerに表示されない時間の長さです。
    この値は、Lambda関数に設定したタイムアウトの6倍以上推奨とのことです。

  • maxReceiveCount
    メッセージをDeadLetterQueue(処理できなかったメッセージが送られるキュー) に移動する前に、送信元キューに表示されるように配信できる回数です。
    5以上推奨とのこと。

  • maximumBatchingWindow
    Lamba関数を起動する前に、SQSのレコードを集めるための最大時間です。
    最大300秒まで設定可能です。

Lambda parameters

  • reservedConcurrency
    予約済みの同時実行制限です。
    同時に実行できるLambda関数の実行数を設定します。
    5以上推奨とのこと。

  • batchSize
    Lambdaが1回のバッチでSQSキューから取得するメッセージの最大数です。
    ※指定しても、必ず最大数とれるとは限らない

References