SQSとDynamoDBを使ってサーバーレスな処理の負荷を分散させてみる
こんにちは、CX事業本部の夏目です。
AWS LambdaとServerless #1 Advent Calendar 2019 - Qiitaということで、今回はサーバーレスで軽く負荷分散みたいなのを実装してみようと思います。
やりたいこと
- ユーザーごとにS3にJSONファイルを置いている
- 同時実行数を
1
に制限したSQSトリガーのLambdaを使って、同じファイルに対して同時書き込みが起きないように制限している - これをある程度並列で動かせるようにする
構成
- 同時実行数を
1
に制限したSQSトリガーのLambdaを複数用意して処理を分散させる- 一つのキューから複数のLambdaが生えているのではない
- Lambdaとキューの組がいっぱい
- Lambdaの実態としてのコードは同じものを使用する
- 同じLambdaがいっぱい
- ユーザーのデータを書き込むのに直近でしようしたキューはDynamoDBに保存しておいて、一定期間は再利用する
- TTLを設定し、削除されるまでは使用する
- 最初に書き込みに使用するキューを決める際には、トリガーを設定していないSQSを使用する
- トリガーを設定したSQSのQueue URLを入れる
- ここから取り出すことで処理を分散させる
これを図示するとこうなる。
これから実際にこれを作ってみる。
実装
sam.yml
AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 Resources: UrlStoreQueue: Type: AWS::SQS::Queue Properties: ReceiveMessageWaitTimeSeconds: 0 ParallelTriggerQueue01: Type: AWS::SQS::Queue ParallelTriggerQueue02: Type: AWS::SQS::Queue ParallelTriggerQueue03: Type: AWS::SQS::Queue ParallelTriggerQueue04: Type: AWS::SQS::Queue RecentUserQueueTable: Type: AWS::DynamoDB::Table Properties: BillingMode: PAY_PER_REQUEST AttributeDefinitions: - AttributeName: userId AttributeType: S KeySchema: - AttributeName: userId KeyType: HASH TimeToLiveSpecification: AttributeName: expired Enabled: true IngesterFunction: # SQSにデータを突っ込むためのLambda Type: AWS::Serverless::Function Properties: Runtime: python3.7 Timeout: 30 MemorySize: 128 AutoPublishAlias: dev CodeUri: src/ingester Handler: index.handler Policies: - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess - arn:aws:iam::aws:policy/AmazonSQSFullAccess Environment: Variables: PARALLELE_TRIGGER_QUEUE_URLS: !Join - ',' - - !Ref ParallelTriggerQueue01 - !Ref ParallelTriggerQueue02 - !Ref ParallelTriggerQueue03 - !Ref ParallelTriggerQueue04 URL_STORE_QUEUE_URL: !Ref UrlStoreQueue RECENT_USER_QUEUE_TABLE_NAME: !Ref RecentUserQueueTable DataBucket: Type: AWS::S3::Bucket DispatcherFunction01: # 動かしたい処理。01〜04とあるが、同じコードが動く Type: AWS::Serverless::Function Properties: Runtime: python3.7 Timeout: 30 MemorySize: 128 AutoPublishAlias: dev CodeUri: src/dispatcher Handler: index.handler ReservedConcurrentExecutions: 1 Policies: - arn:aws:iam::aws:policy/AmazonS3FullAccess Environment: Variables: DATA_BUCKET_NAME: !Ref DataBucket Events: SQSTrigger: Type: SQS Properties: Queue: !GetAtt ParallelTriggerQueue01.Arn BatchSize: 1 Enabled: true DispatcherFunction02: # 動かしたい処理。01〜04とあるが、同じコードが動く Type: AWS::Serverless::Function Properties: Runtime: python3.7 Timeout: 30 MemorySize: 128 AutoPublishAlias: dev CodeUri: src/dispatcher Handler: index.handler ReservedConcurrentExecutions: 1 Policies: - arn:aws:iam::aws:policy/AmazonS3FullAccess Environment: Variables: DATA_BUCKET_NAME: !Ref DataBucket Events: SQSTrigger: Type: SQS Properties: Queue: !GetAtt ParallelTriggerQueue02.Arn BatchSize: 1 Enabled: true DispatcherFunction03: # 動かしたい処理。01〜04とあるが、同じコードが動く Type: AWS::Serverless::Function Properties: Runtime: python3.7 Timeout: 30 MemorySize: 128 AutoPublishAlias: dev CodeUri: src/dispatcher Handler: index.handler ReservedConcurrentExecutions: 1 Policies: - arn:aws:iam::aws:policy/AmazonS3FullAccess Environment: Variables: DATA_BUCKET_NAME: !Ref DataBucket Events: SQSTrigger: Type: SQS Properties: Queue: !GetAtt ParallelTriggerQueue03.Arn BatchSize: 1 Enabled: true DispatcherFunction04: # 動かしたい処理。01〜04とあるが、同じコードが動く Type: AWS::Serverless::Function Properties: Runtime: python3.7 Timeout: 30 MemorySize: 128 AutoPublishAlias: dev CodeUri: src/dispatcher Handler: index.handler ReservedConcurrentExecutions: 1 Policies: - arn:aws:iam::aws:policy/AmazonS3FullAccess Environment: Variables: DATA_BUCKET_NAME: !Ref DataBucket Events: SQSTrigger: Type: SQS Properties: Queue: !GetAtt ParallelTriggerQueue04.Arn BatchSize: 1 Enabled: true
おなじみのSAM定義。
論理IDとトリガーに紐付けてるSQS Queueだけが異なる4つのLambdaが、分散処理をさせたいLambda。
つまりは、4並列で動かせるようにしている。
ここでLambdaとSQSの組を増やせばもっと処理を分散させることができる。
Lambdaのトリガーになっている4つのSQS Queueに、IngesterFunction
でデータを入れる。
src/ingester/index.py
import json import os from datetime import datetime, timedelta, timezone from decimal import Decimal from uuid import uuid4 import boto3 from boto3.dynamodb.conditions import Key from botocore.exceptions import ClientError DELTA = timedelta(minutes=5) sqs = boto3.client("sqs") dynamodb = boto3.resource("dynamodb") def handler(event, context): data = create_put_data(event) queue_url = get_queue_url(data["userId"]) send_data(data, queue_url) def create_put_data(event): result = {"userId": event["userId"], "message": event["message"]} return result # フローチャートで示したSQS Queue URLを取得するための処理 def get_queue_url(user_id): from_dynamodb = update_ttl_and_get_queue_url(user_id) if from_dynamodb is not None: return from_dynamodb for _ in range(1): from_queue = get_queue_url_from_queue() if from_queue is not None: break send_queue_urls() from_queue = get_queue_url_from_queue() put_user_queue_data(user_id, from_queue) return from_queue # フローチャートの①の処理 # 引数で指定したuserIdで直近SQS Queueを使用していたら、Queue URLを取得する def update_ttl_and_get_queue_url(user_id): table = dynamodb.Table(os.environ["RECENT_USER_QUEUE_TABLE_NAME"]) now = datetime.now(timezone.utc) expired = now + DELTA option = { "Key": {"userId": user_id}, "ReturnValues": "ALL_NEW", "UpdateExpression": "set #expired = :expired", "ExpressionAttributeNames": {"#expired": "expired"}, "ExpressionAttributeValues": {":expired": int(expired.timestamp())}, "ConditionExpression": Key("userId").eq(user_id), } try: resp = table.update_item(**option) return resp["Attributes"]["queueUrl"] except ClientError as e: try: if e.response["Error"]["Code"] == "ConditionalCheckFailedException": return None except Exception: pass raise e # フローチャートの②と④の処理 # トリガーに設定していないSQS Queueからトリガーに設定しているSQS QueueのURLを取得する処理 def get_queue_url_from_queue(): url = os.environ["URL_STORE_QUEUE_URL"] option = {"QueueUrl": url, "MaxNumberOfMessages": 1, "WaitTimeSeconds": 0} resp = sqs.receive_message(**option) if "Messages" not in resp: return None message = resp["Messages"][0] del_option = {"QueueUrl": url, "ReceiptHandle": message["ReceiptHandle"]} sqs.delete_message(**del_option) return message["Body"] # フローチャートの③の処理 # トリガーに設定していないSQS Queueに対して、トリガーに設定しているSQS QueueのURLを詰め込む def send_queue_urls(): option = { "QueueUrl": os.environ["URL_STORE_QUEUE_URL"], "Entries": [ {"Id": str(uuid4()), "MessageBody": url} for url in os.environ["PARALLELE_TRIGGER_QUEUE_URLS"].split(",") ], } for _ in range(3): resp = sqs.send_message_batch(**option) if len(resp["Successful"]) > 0: return raise Exception("I could not send queue urls to sqs queue.") # フローチャートの⑤の処理 # DynamoDBにSQS Queueとuser_idの組を保存する def put_user_queue_data(user_id, queue_url): table = dynamodb.Table(os.environ["RECENT_USER_QUEUE_TABLE_NAME"]) now = datetime.now(timezone.utc) expired = now + DELTA item = { "userId": user_id, "queueUrl": queue_url, "expired": int(expired.timestamp()), } table.put_item(Item=item) def default(obj): if isinstance(obj, Decimal): return int(obj) if int(obj) == obj else float(obj) try: return str(obj) except Exception: return None def send_data(data, queue_url): option = { "QueueUrl": queue_url, "MessageBody": json.dumps(data, default=default, ensure_ascii=False), } sqs.send_message(**option)
処理を分散させるために、一番大事なコード。
基本的にはフローチャートの通りの処理を書いている。
ただし、フローチャートの①に該当する処理だけはちょっとした工夫を入れている。
①に該当する処理では、DynamoDBに保存したデータの有効期限を5分延長する処理の結果としてQueue URLを取得している。
これはUpdateItemの以下2つの機能を利用している。
"ConditionExpression": Key("userId").eq(user_id)
という条件付き書き込みを使用することで、データがある場合のみ書き込みを行うようにしている- DynamoDB Tableにデータがないと書き込みが行われない
"ReturnValues": "ALL_NEW"
というオプションで、更新後のデータ全体を返り値として取得することができる
まあ、直近でQueueを使用したから有効期限を伸ばしたいけども、取得
と有効期限延長
を別の処理で行うと2つの処理が必要になる。
これは、それを1つの処理で同時に行っている。
src/dispatcher/index.py
import json import os from datetime import datetime, timezone import boto3 from botocore.exceptions import ClientError s3 = boto3.client("s3") def handler(event, context): queue_name, body = parse(event) data = get_json_data(queue_name) put_data = create_put_data(data, body) put_json_data(queue_name, put_data) def parse(event): record = event["Records"][0] arn = record["eventSourceARN"] queue_name = arn.split(":")[-1] body = json.loads(record["body"]) return queue_name, body def get_json_data(queue_name): option = {"Bucket": os.environ["DATA_BUCKET_NAME"], "Key": f"{queue_name}.json"} try: resp = s3.get_object(**option) text = resp["Body"].read().decode() return json.loads(text) except ClientError as e: try: if e.response["Error"]["Code"] == "NoSuchKey": return [] except Exception: pass raise e def create_put_data(data, body): now = datetime.now(timezone.utc) result = data + [{"date": str(now), "unixtime": now.timestamp(), "body": body}] return result def put_json_data(queue_name, put_data): option = { "Bucket": os.environ["DATA_BUCKET_NAME"], "Key": f"{queue_name}.json", "ContentType": "application/json", "Body": json.dumps(put_data, indent=2, ensure_ascii=False).encode(), } s3.put_object(**option)
分散させた先で実行している処理内容。
今回はトリガーとして使用したSQS Queueの名前でS3上のJSONファイルに保存するようにしている。
(JSONファイルがなければ作成、あれば追記させている)
使ってみる
[ {"userId": "003", "message": "04393fba-2477-44ab-ac96-d46a0547251d"}, {"userId": "006", "message": "5d09cf72-4a88-4349-96c3-e0530a4d8f28"}, {"userId": "006", "message": "253c6a9c-2210-4ac2-8bba-65a6bf19c731"}, {"userId": "003", "message": "1aba6743-bf6f-4801-9872-fcd51e4ae568"}, {"userId": "001", "message": "f7ddfee9-84be-4a9c-ba78-556fcf773106"}, {"userId": "006", "message": "b662dfee-86d2-43fa-931e-95f2b18ab19a"}, {"userId": "006", "message": "d147662e-6de7-4f9d-b0d4-2e2478221c29"}, {"userId": "005", "message": "26ba573c-a02e-4d59-9bc6-0603264f4c78"}, {"userId": "002", "message": "774bf3f8-4bb8-4efb-b554-2f597235e38d"}, {"userId": "002", "message": "046b7bdb-2d4b-4000-be71-f845d3aa9bcf"}, {"userId": "005", "message": "aa1259c8-a82c-4e3a-9c48-50f976646529"}, {"userId": "001", "message": "bfc03b3e-32c6-471e-9f3d-59a14342f8fb"}, {"userId": "006", "message": "b09ca16c-f135-4108-bfb2-ab5ac57b3a74"}, {"userId": "001", "message": "39b5fbc9-6b2e-4dbf-8f39-4f722600534d"}, {"userId": "004", "message": "6e5f7941-5a56-4e2c-aa9f-30141ceec48e"}, {"userId": "002", "message": "e98108bc-8d44-4312-ab1f-4ea0e6ec968e"}, {"userId": "005", "message": "ea4fcc8b-69a4-4f0a-b6cd-acd05eb23ee3"}, {"userId": "003", "message": "5bacf9fe-7114-4d03-a0cd-2241b54412d0"}, {"userId": "004", "message": "240d3e22-bf40-4916-8b3f-52cc754c78f3"}, {"userId": "005", "message": "beda4aa7-e6e5-4555-96a9-2ecbff7ba5c1"}, {"userId": "002", "message": "1b803336-b150-4658-b27e-20c062bcbbd3"}, {"userId": "003", "message": "5da6e350-51c2-430b-a4c5-a43142945480"}, {"userId": "003", "message": "bea1a36d-bb60-48b7-8881-7ace0cb908d3"}, {"userId": "004", "message": "705d92d8-d029-4d8a-a841-3df4a2ec7cf4"}, {"userId": "003", "message": "6cd88837-da35-4fd7-a91e-5c9b9909a816"}, ]
上記25個のpayloadをingesterのLambdaに連続で渡してみた。
(userId
は001
〜006
までの6個)
(InvocationType
はEvent
)
どのキューにどのuserId
のデータが入ったかをまとめると下記表のようになる。
Queue01 | Queue02 | Queue03 | Queue04 | |
---|---|---|---|---|
1 | 003 | 005 | 006 | 002 |
2 | 003 | 005 | 006 | 002 |
3 | 004 | 005 | 001 | 002 |
4 | 003 | 005 | 006 | 002 |
5 | 004 | 006 | ||
6 | 003 | 001 | ||
7 | 003 | 006 | ||
8 | 004 | 001 | ||
9 | 004 |
ユーザーIDで見ると一つのQueueしか使っていないので、JSONへの同時書き込みは制限できていると考えられる。
まとめ
サーバーレスで簡単な(?)分散処理をしてみました。
サービスを組み合わせればこんなこともできますよ、程度に思ってもらえればと思います。
基本サーバーレス開発していると、今回みたいにちょっと複雑なものを考えたりすると思うので、頑張ってみてください。