SQSとDynamoDBを使ってサーバーレスな処理の負荷を分散させてみる

こんにちは、CX事業本部の夏目です。

AWS LambdaとServerless #1 Advent Calendar 2019 - Qiitaということで、今回はサーバーレスで軽く負荷分散みたいなのを実装してみようと思います。

やりたいこと

  • ユーザーごとにS3にJSONファイルを置いている
  • 同時実行数を1に制限したSQSトリガーのLambdaを使って、同じファイルに対して同時書き込みが起きないように制限している
  • これをある程度並列で動かせるようにする

構成

  • 同時実行数を1に制限したSQSトリガーのLambdaを複数用意して処理を分散させる
    • 一つのキューから複数のLambdaが生えているのではない
    • Lambdaとキューの組がいっぱい
    • Lambdaの実態としてのコードは同じものを使用する
      • 同じLambdaがいっぱい
  • ユーザーのデータを書き込むのに直近でしようしたキューはDynamoDBに保存しておいて、一定期間は再利用する
    • TTLを設定し、削除されるまでは使用する
  • 最初に書き込みに使用するキューを決める際には、トリガーを設定していないSQSを使用する
    • トリガーを設定したSQSのQueue URLを入れる
    • ここから取り出すことで処理を分散させる

これを図示するとこうなる。

これから実際にこれを作ってみる。

実装

sam.yml

AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31


Resources:
  UrlStoreQueue:
    Type:  AWS::SQS::Queue
    Properties:
      ReceiveMessageWaitTimeSeconds: 0

  ParallelTriggerQueue01:
    Type: AWS::SQS::Queue

  ParallelTriggerQueue02:
    Type: AWS::SQS::Queue

  ParallelTriggerQueue03:
    Type: AWS::SQS::Queue

  ParallelTriggerQueue04:
    Type: AWS::SQS::Queue

  RecentUserQueueTable:
    Type: AWS::DynamoDB::Table
    Properties:
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: userId
          AttributeType: S
      KeySchema:
        - AttributeName: userId
          KeyType: HASH
      TimeToLiveSpecification:
        AttributeName: expired
        Enabled: true

  IngesterFunction: # SQSにデータを突っ込むためのLambda
    Type: AWS::Serverless::Function
    Properties:
      Runtime: python3.7
      Timeout: 30
      MemorySize: 128
      AutoPublishAlias: dev
      CodeUri: src/ingester
      Handler: index.handler
      Policies:
        - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
        - arn:aws:iam::aws:policy/AmazonSQSFullAccess
      Environment:
        Variables:
          PARALLELE_TRIGGER_QUEUE_URLS: !Join
            - ','
            - - !Ref ParallelTriggerQueue01
              - !Ref ParallelTriggerQueue02
              - !Ref ParallelTriggerQueue03
              - !Ref ParallelTriggerQueue04
          URL_STORE_QUEUE_URL: !Ref UrlStoreQueue
          RECENT_USER_QUEUE_TABLE_NAME: !Ref RecentUserQueueTable

  DataBucket:
    Type: AWS::S3::Bucket

  DispatcherFunction01: # 動かしたい処理。01〜04とあるが、同じコードが動く
    Type: AWS::Serverless::Function
    Properties:
      Runtime: python3.7
      Timeout: 30
      MemorySize: 128
      AutoPublishAlias: dev
      CodeUri: src/dispatcher
      Handler: index.handler
      ReservedConcurrentExecutions: 1
      Policies:
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
      Environment:
        Variables:
          DATA_BUCKET_NAME: !Ref DataBucket
      Events:
        SQSTrigger:
          Type: SQS
          Properties:
            Queue: !GetAtt ParallelTriggerQueue01.Arn
            BatchSize: 1
            Enabled: true

  DispatcherFunction02: # 動かしたい処理。01〜04とあるが、同じコードが動く
    Type: AWS::Serverless::Function
    Properties:
      Runtime: python3.7
      Timeout: 30
      MemorySize: 128
      AutoPublishAlias: dev
      CodeUri: src/dispatcher
      Handler: index.handler
      ReservedConcurrentExecutions: 1
      Policies:
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
      Environment:
        Variables:
          DATA_BUCKET_NAME: !Ref DataBucket
      Events:
        SQSTrigger:
          Type: SQS
          Properties:
            Queue: !GetAtt ParallelTriggerQueue02.Arn
            BatchSize: 1
            Enabled: true

  DispatcherFunction03: # 動かしたい処理。01〜04とあるが、同じコードが動く
    Type: AWS::Serverless::Function
    Properties:
      Runtime: python3.7
      Timeout: 30
      MemorySize: 128
      AutoPublishAlias: dev
      CodeUri: src/dispatcher
      Handler: index.handler
      ReservedConcurrentExecutions: 1
      Policies:
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
      Environment:
        Variables:
          DATA_BUCKET_NAME: !Ref DataBucket
      Events:
        SQSTrigger:
          Type: SQS
          Properties:
            Queue: !GetAtt ParallelTriggerQueue03.Arn
            BatchSize: 1
            Enabled: true

  DispatcherFunction04: # 動かしたい処理。01〜04とあるが、同じコードが動く
    Type: AWS::Serverless::Function
    Properties:
      Runtime: python3.7
      Timeout: 30
      MemorySize: 128
      AutoPublishAlias: dev
      CodeUri: src/dispatcher
      Handler: index.handler
      ReservedConcurrentExecutions: 1
      Policies:
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
      Environment:
        Variables:
          DATA_BUCKET_NAME: !Ref DataBucket
      Events:
        SQSTrigger:
          Type: SQS
          Properties:
            Queue: !GetAtt ParallelTriggerQueue04.Arn
            BatchSize: 1
            Enabled: true

おなじみのSAM定義。

論理IDとトリガーに紐付けてるSQS Queueだけが異なる4つのLambdaが、分散処理をさせたいLambda。 つまりは、4並列で動かせるようにしている。
ここでLambdaとSQSの組を増やせばもっと処理を分散させることができる。

Lambdaのトリガーになっている4つのSQS Queueに、IngesterFunctionでデータを入れる。

src/ingester/index.py

import json
import os
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from uuid import uuid4

import boto3
from boto3.dynamodb.conditions import Key
from botocore.exceptions import ClientError

DELTA = timedelta(minutes=5)

sqs = boto3.client("sqs")
dynamodb = boto3.resource("dynamodb")


def handler(event, context):
    data = create_put_data(event)
    queue_url = get_queue_url(data["userId"])
    send_data(data, queue_url)


def create_put_data(event):
    result = {"userId": event["userId"], "message": event["message"]}

    return result


# フローチャートで示したSQS Queue URLを取得するための処理
def get_queue_url(user_id):
    from_dynamodb = update_ttl_and_get_queue_url(user_id)
    if from_dynamodb is not None:
        return from_dynamodb
    for _ in range(1):
        from_queue = get_queue_url_from_queue()
        if from_queue is not None:
            break
        send_queue_urls()
        from_queue = get_queue_url_from_queue()
    put_user_queue_data(user_id, from_queue)
    return from_queue


# フローチャートの①の処理
# 引数で指定したuserIdで直近SQS Queueを使用していたら、Queue URLを取得する
def update_ttl_and_get_queue_url(user_id):
    table = dynamodb.Table(os.environ["RECENT_USER_QUEUE_TABLE_NAME"])
    now = datetime.now(timezone.utc)
    expired = now + DELTA
    option = {
        "Key": {"userId": user_id},
        "ReturnValues": "ALL_NEW",
        "UpdateExpression": "set #expired = :expired",
        "ExpressionAttributeNames": {"#expired": "expired"},
        "ExpressionAttributeValues": {":expired": int(expired.timestamp())},
        "ConditionExpression": Key("userId").eq(user_id),
    }
    try:
        resp = table.update_item(**option)
        return resp["Attributes"]["queueUrl"]
    except ClientError as e:
        try:
            if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
                return None
        except Exception:
            pass
        raise e


# フローチャートの②と④の処理
# トリガーに設定していないSQS Queueからトリガーに設定しているSQS QueueのURLを取得する処理
def get_queue_url_from_queue():
    url = os.environ["URL_STORE_QUEUE_URL"]
    option = {"QueueUrl": url, "MaxNumberOfMessages": 1, "WaitTimeSeconds": 0}
    resp = sqs.receive_message(**option)
    if "Messages" not in resp:
        return None
    message = resp["Messages"][0]
    del_option = {"QueueUrl": url, "ReceiptHandle": message["ReceiptHandle"]}
    sqs.delete_message(**del_option)
    return message["Body"]


# フローチャートの③の処理
# トリガーに設定していないSQS Queueに対して、トリガーに設定しているSQS QueueのURLを詰め込む
def send_queue_urls():
    option = {
        "QueueUrl": os.environ["URL_STORE_QUEUE_URL"],
        "Entries": [
            {"Id": str(uuid4()), "MessageBody": url}
            for url in os.environ["PARALLELE_TRIGGER_QUEUE_URLS"].split(",")
        ],
    }
    for _ in range(3):
        resp = sqs.send_message_batch(**option)
        if len(resp["Successful"]) > 0:
            return
    raise Exception("I could not send queue urls to sqs queue.")


# フローチャートの⑤の処理
# DynamoDBにSQS Queueとuser_idの組を保存する
def put_user_queue_data(user_id, queue_url):
    table = dynamodb.Table(os.environ["RECENT_USER_QUEUE_TABLE_NAME"])
    now = datetime.now(timezone.utc)
    expired = now + DELTA
    item = {
        "userId": user_id,
        "queueUrl": queue_url,
        "expired": int(expired.timestamp()),
    }
    table.put_item(Item=item)


def default(obj):
    if isinstance(obj, Decimal):
        return int(obj) if int(obj) == obj else float(obj)
    try:
        return str(obj)
    except Exception:
        return None


def send_data(data, queue_url):
    option = {
        "QueueUrl": queue_url,
        "MessageBody": json.dumps(data, default=default, ensure_ascii=False),
    }
    sqs.send_message(**option)

処理を分散させるために、一番大事なコード。
基本的にはフローチャートの通りの処理を書いている。

ただし、フローチャートの①に該当する処理だけはちょっとした工夫を入れている。

①に該当する処理では、DynamoDBに保存したデータの有効期限を5分延長する処理の結果としてQueue URLを取得している。
これはUpdateItemの以下2つの機能を利用している。

  • "ConditionExpression": Key("userId").eq(user_id)という条件付き書き込みを使用することで、データがある場合のみ書き込みを行うようにしている
    • DynamoDB Tableにデータがないと書き込みが行われない
  • "ReturnValues": "ALL_NEW"というオプションで、更新後のデータ全体を返り値として取得することができる

まあ、直近でQueueを使用したから有効期限を伸ばしたいけども、取得有効期限延長を別の処理で行うと2つの処理が必要になる。 これは、それを1つの処理で同時に行っている。

src/dispatcher/index.py

import json
import os
from datetime import datetime, timezone

import boto3
from botocore.exceptions import ClientError

s3 = boto3.client("s3")


def handler(event, context):
    queue_name, body = parse(event)
    data = get_json_data(queue_name)
    put_data = create_put_data(data, body)
    put_json_data(queue_name, put_data)


def parse(event):
    record = event["Records"][0]
    arn = record["eventSourceARN"]
    queue_name = arn.split(":")[-1]
    body = json.loads(record["body"])
    return queue_name, body


def get_json_data(queue_name):
    option = {"Bucket": os.environ["DATA_BUCKET_NAME"], "Key": f"{queue_name}.json"}
    try:
        resp = s3.get_object(**option)
        text = resp["Body"].read().decode()
        return json.loads(text)
    except ClientError as e:
        try:
            if e.response["Error"]["Code"] == "NoSuchKey":
                return []
        except Exception:
            pass
        raise e


def create_put_data(data, body):
    now = datetime.now(timezone.utc)
    result = data + [{"date": str(now), "unixtime": now.timestamp(), "body": body}]
    return result


def put_json_data(queue_name, put_data):
    option = {
        "Bucket": os.environ["DATA_BUCKET_NAME"],
        "Key": f"{queue_name}.json",
        "ContentType": "application/json",
        "Body": json.dumps(put_data, indent=2, ensure_ascii=False).encode(),
    }
    s3.put_object(**option)

分散させた先で実行している処理内容。

今回はトリガーとして使用したSQS Queueの名前でS3上のJSONファイルに保存するようにしている。
(JSONファイルがなければ作成、あれば追記させている)

使ってみる

[
    {"userId": "003", "message": "04393fba-2477-44ab-ac96-d46a0547251d"},
    {"userId": "006", "message": "5d09cf72-4a88-4349-96c3-e0530a4d8f28"},
    {"userId": "006", "message": "253c6a9c-2210-4ac2-8bba-65a6bf19c731"},
    {"userId": "003", "message": "1aba6743-bf6f-4801-9872-fcd51e4ae568"},
    {"userId": "001", "message": "f7ddfee9-84be-4a9c-ba78-556fcf773106"},
    {"userId": "006", "message": "b662dfee-86d2-43fa-931e-95f2b18ab19a"},
    {"userId": "006", "message": "d147662e-6de7-4f9d-b0d4-2e2478221c29"},
    {"userId": "005", "message": "26ba573c-a02e-4d59-9bc6-0603264f4c78"},
    {"userId": "002", "message": "774bf3f8-4bb8-4efb-b554-2f597235e38d"},
    {"userId": "002", "message": "046b7bdb-2d4b-4000-be71-f845d3aa9bcf"},
    {"userId": "005", "message": "aa1259c8-a82c-4e3a-9c48-50f976646529"},
    {"userId": "001", "message": "bfc03b3e-32c6-471e-9f3d-59a14342f8fb"},
    {"userId": "006", "message": "b09ca16c-f135-4108-bfb2-ab5ac57b3a74"},
    {"userId": "001", "message": "39b5fbc9-6b2e-4dbf-8f39-4f722600534d"},
    {"userId": "004", "message": "6e5f7941-5a56-4e2c-aa9f-30141ceec48e"},
    {"userId": "002", "message": "e98108bc-8d44-4312-ab1f-4ea0e6ec968e"},
    {"userId": "005", "message": "ea4fcc8b-69a4-4f0a-b6cd-acd05eb23ee3"},
    {"userId": "003", "message": "5bacf9fe-7114-4d03-a0cd-2241b54412d0"},
    {"userId": "004", "message": "240d3e22-bf40-4916-8b3f-52cc754c78f3"},
    {"userId": "005", "message": "beda4aa7-e6e5-4555-96a9-2ecbff7ba5c1"},
    {"userId": "002", "message": "1b803336-b150-4658-b27e-20c062bcbbd3"},
    {"userId": "003", "message": "5da6e350-51c2-430b-a4c5-a43142945480"},
    {"userId": "003", "message": "bea1a36d-bb60-48b7-8881-7ace0cb908d3"},
    {"userId": "004", "message": "705d92d8-d029-4d8a-a841-3df4a2ec7cf4"},
    {"userId": "003", "message": "6cd88837-da35-4fd7-a91e-5c9b9909a816"},
]

上記25個のpayloadをingesterのLambdaに連続で渡してみた。
(userId001006までの6個)
(InvocationTypeEvent)

どのキューにどのuserIdのデータが入ったかをまとめると下記表のようになる。

Queue01 Queue02 Queue03 Queue04
1 003 005 006 002
2 003 005 006 002
3 004 005 001 002
4 003 005 006 002
5 004 006
6 003 001
7 003 006
8 004 001
9 004

ユーザーIDで見ると一つのQueueしか使っていないので、JSONへの同時書き込みは制限できていると考えられる。

まとめ

サーバーレスで簡単な(?)分散処理をしてみました。
サービスを組み合わせればこんなこともできますよ、程度に思ってもらえればと思います。

基本サーバーレス開発していると、今回みたいにちょっと複雑なものを考えたりすると思うので、頑張ってみてください。