この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部の夏目です。
AWS LambdaとServerless #1 Advent Calendar 2019 - Qiitaということで、今回はサーバーレスで軽く負荷分散みたいなのを実装してみようと思います。
やりたいこと
- ユーザーごとにS3にJSONファイルを置いている
- 同時実行数を
1
に制限したSQSトリガーのLambdaを使って、同じファイルに対して同時書き込みが起きないように制限している - これをある程度並列で動かせるようにする
構成
- 同時実行数を
1
に制限したSQSトリガーのLambdaを複数用意して処理を分散させる- 一つのキューから複数のLambdaが生えているのではない
- Lambdaとキューの組がいっぱい
- Lambdaの実態としてのコードは同じものを使用する
- 同じLambdaがいっぱい
- ユーザーのデータを書き込むのに直近でしようしたキューはDynamoDBに保存しておいて、一定期間は再利用する
- TTLを設定し、削除されるまでは使用する
- 最初に書き込みに使用するキューを決める際には、トリガーを設定していないSQSを使用する
- トリガーを設定したSQSのQueue URLを入れる
- ここから取り出すことで処理を分散させる
これを図示するとこうなる。
これから実際にこれを作ってみる。
実装
sam.yml
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Resources:
UrlStoreQueue:
Type: AWS::SQS::Queue
Properties:
ReceiveMessageWaitTimeSeconds: 0
ParallelTriggerQueue01:
Type: AWS::SQS::Queue
ParallelTriggerQueue02:
Type: AWS::SQS::Queue
ParallelTriggerQueue03:
Type: AWS::SQS::Queue
ParallelTriggerQueue04:
Type: AWS::SQS::Queue
RecentUserQueueTable:
Type: AWS::DynamoDB::Table
Properties:
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: userId
AttributeType: S
KeySchema:
- AttributeName: userId
KeyType: HASH
TimeToLiveSpecification:
AttributeName: expired
Enabled: true
IngesterFunction: # SQSにデータを突っ込むためのLambda
Type: AWS::Serverless::Function
Properties:
Runtime: python3.7
Timeout: 30
MemorySize: 128
AutoPublishAlias: dev
CodeUri: src/ingester
Handler: index.handler
Policies:
- arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
- arn:aws:iam::aws:policy/AmazonSQSFullAccess
Environment:
Variables:
PARALLELE_TRIGGER_QUEUE_URLS: !Join
- ','
- - !Ref ParallelTriggerQueue01
- !Ref ParallelTriggerQueue02
- !Ref ParallelTriggerQueue03
- !Ref ParallelTriggerQueue04
URL_STORE_QUEUE_URL: !Ref UrlStoreQueue
RECENT_USER_QUEUE_TABLE_NAME: !Ref RecentUserQueueTable
DataBucket:
Type: AWS::S3::Bucket
DispatcherFunction01: # 動かしたい処理。01〜04とあるが、同じコードが動く
Type: AWS::Serverless::Function
Properties:
Runtime: python3.7
Timeout: 30
MemorySize: 128
AutoPublishAlias: dev
CodeUri: src/dispatcher
Handler: index.handler
ReservedConcurrentExecutions: 1
Policies:
- arn:aws:iam::aws:policy/AmazonS3FullAccess
Environment:
Variables:
DATA_BUCKET_NAME: !Ref DataBucket
Events:
SQSTrigger:
Type: SQS
Properties:
Queue: !GetAtt ParallelTriggerQueue01.Arn
BatchSize: 1
Enabled: true
DispatcherFunction02: # 動かしたい処理。01〜04とあるが、同じコードが動く
Type: AWS::Serverless::Function
Properties:
Runtime: python3.7
Timeout: 30
MemorySize: 128
AutoPublishAlias: dev
CodeUri: src/dispatcher
Handler: index.handler
ReservedConcurrentExecutions: 1
Policies:
- arn:aws:iam::aws:policy/AmazonS3FullAccess
Environment:
Variables:
DATA_BUCKET_NAME: !Ref DataBucket
Events:
SQSTrigger:
Type: SQS
Properties:
Queue: !GetAtt ParallelTriggerQueue02.Arn
BatchSize: 1
Enabled: true
DispatcherFunction03: # 動かしたい処理。01〜04とあるが、同じコードが動く
Type: AWS::Serverless::Function
Properties:
Runtime: python3.7
Timeout: 30
MemorySize: 128
AutoPublishAlias: dev
CodeUri: src/dispatcher
Handler: index.handler
ReservedConcurrentExecutions: 1
Policies:
- arn:aws:iam::aws:policy/AmazonS3FullAccess
Environment:
Variables:
DATA_BUCKET_NAME: !Ref DataBucket
Events:
SQSTrigger:
Type: SQS
Properties:
Queue: !GetAtt ParallelTriggerQueue03.Arn
BatchSize: 1
Enabled: true
DispatcherFunction04: # 動かしたい処理。01〜04とあるが、同じコードが動く
Type: AWS::Serverless::Function
Properties:
Runtime: python3.7
Timeout: 30
MemorySize: 128
AutoPublishAlias: dev
CodeUri: src/dispatcher
Handler: index.handler
ReservedConcurrentExecutions: 1
Policies:
- arn:aws:iam::aws:policy/AmazonS3FullAccess
Environment:
Variables:
DATA_BUCKET_NAME: !Ref DataBucket
Events:
SQSTrigger:
Type: SQS
Properties:
Queue: !GetAtt ParallelTriggerQueue04.Arn
BatchSize: 1
Enabled: true
おなじみのSAM定義。
論理IDとトリガーに紐付けてるSQS Queueだけが異なる4つのLambdaが、分散処理をさせたいLambda。
つまりは、4並列で動かせるようにしている。
ここでLambdaとSQSの組を増やせばもっと処理を分散させることができる。
Lambdaのトリガーになっている4つのSQS Queueに、IngesterFunction
でデータを入れる。
src/ingester/index.py
import json
import os
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from uuid import uuid4
import boto3
from boto3.dynamodb.conditions import Key
from botocore.exceptions import ClientError
DELTA = timedelta(minutes=5)
sqs = boto3.client("sqs")
dynamodb = boto3.resource("dynamodb")
def handler(event, context):
data = create_put_data(event)
queue_url = get_queue_url(data["userId"])
send_data(data, queue_url)
def create_put_data(event):
result = {"userId": event["userId"], "message": event["message"]}
return result
# フローチャートで示したSQS Queue URLを取得するための処理
def get_queue_url(user_id):
from_dynamodb = update_ttl_and_get_queue_url(user_id)
if from_dynamodb is not None:
return from_dynamodb
for _ in range(1):
from_queue = get_queue_url_from_queue()
if from_queue is not None:
break
send_queue_urls()
from_queue = get_queue_url_from_queue()
put_user_queue_data(user_id, from_queue)
return from_queue
# フローチャートの①の処理
# 引数で指定したuserIdで直近SQS Queueを使用していたら、Queue URLを取得する
def update_ttl_and_get_queue_url(user_id):
table = dynamodb.Table(os.environ["RECENT_USER_QUEUE_TABLE_NAME"])
now = datetime.now(timezone.utc)
expired = now + DELTA
option = {
"Key": {"userId": user_id},
"ReturnValues": "ALL_NEW",
"UpdateExpression": "set #expired = :expired",
"ExpressionAttributeNames": {"#expired": "expired"},
"ExpressionAttributeValues": {":expired": int(expired.timestamp())},
"ConditionExpression": Key("userId").eq(user_id),
}
try:
resp = table.update_item(**option)
return resp["Attributes"]["queueUrl"]
except ClientError as e:
try:
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
return None
except Exception:
pass
raise e
# フローチャートの②と④の処理
# トリガーに設定していないSQS Queueからトリガーに設定しているSQS QueueのURLを取得する処理
def get_queue_url_from_queue():
url = os.environ["URL_STORE_QUEUE_URL"]
option = {"QueueUrl": url, "MaxNumberOfMessages": 1, "WaitTimeSeconds": 0}
resp = sqs.receive_message(**option)
if "Messages" not in resp:
return None
message = resp["Messages"][0]
del_option = {"QueueUrl": url, "ReceiptHandle": message["ReceiptHandle"]}
sqs.delete_message(**del_option)
return message["Body"]
# フローチャートの③の処理
# トリガーに設定していないSQS Queueに対して、トリガーに設定しているSQS QueueのURLを詰め込む
def send_queue_urls():
option = {
"QueueUrl": os.environ["URL_STORE_QUEUE_URL"],
"Entries": [
{"Id": str(uuid4()), "MessageBody": url}
for url in os.environ["PARALLELE_TRIGGER_QUEUE_URLS"].split(",")
],
}
for _ in range(3):
resp = sqs.send_message_batch(**option)
if len(resp["Successful"]) > 0:
return
raise Exception("I could not send queue urls to sqs queue.")
# フローチャートの⑤の処理
# DynamoDBにSQS Queueとuser_idの組を保存する
def put_user_queue_data(user_id, queue_url):
table = dynamodb.Table(os.environ["RECENT_USER_QUEUE_TABLE_NAME"])
now = datetime.now(timezone.utc)
expired = now + DELTA
item = {
"userId": user_id,
"queueUrl": queue_url,
"expired": int(expired.timestamp()),
}
table.put_item(Item=item)
def default(obj):
if isinstance(obj, Decimal):
return int(obj) if int(obj) == obj else float(obj)
try:
return str(obj)
except Exception:
return None
def send_data(data, queue_url):
option = {
"QueueUrl": queue_url,
"MessageBody": json.dumps(data, default=default, ensure_ascii=False),
}
sqs.send_message(**option)
処理を分散させるために、一番大事なコード。
基本的にはフローチャートの通りの処理を書いている。
ただし、フローチャートの①に該当する処理だけはちょっとした工夫を入れている。
①に該当する処理では、DynamoDBに保存したデータの有効期限を5分延長する処理の結果としてQueue URLを取得している。
これはUpdateItemの以下2つの機能を利用している。
"ConditionExpression": Key("userId").eq(user_id)
という条件付き書き込みを使用することで、データがある場合のみ書き込みを行うようにしている- DynamoDB Tableにデータがないと書き込みが行われない
"ReturnValues": "ALL_NEW"
というオプションで、更新後のデータ全体を返り値として取得することができる
まあ、直近でQueueを使用したから有効期限を伸ばしたいけども、取得
と有効期限延長
を別の処理で行うと2つの処理が必要になる。
これは、それを1つの処理で同時に行っている。
src/dispatcher/index.py
import json
import os
from datetime import datetime, timezone
import boto3
from botocore.exceptions import ClientError
s3 = boto3.client("s3")
def handler(event, context):
queue_name, body = parse(event)
data = get_json_data(queue_name)
put_data = create_put_data(data, body)
put_json_data(queue_name, put_data)
def parse(event):
record = event["Records"][0]
arn = record["eventSourceARN"]
queue_name = arn.split(":")[-1]
body = json.loads(record["body"])
return queue_name, body
def get_json_data(queue_name):
option = {"Bucket": os.environ["DATA_BUCKET_NAME"], "Key": f"{queue_name}.json"}
try:
resp = s3.get_object(**option)
text = resp["Body"].read().decode()
return json.loads(text)
except ClientError as e:
try:
if e.response["Error"]["Code"] == "NoSuchKey":
return []
except Exception:
pass
raise e
def create_put_data(data, body):
now = datetime.now(timezone.utc)
result = data + [{"date": str(now), "unixtime": now.timestamp(), "body": body}]
return result
def put_json_data(queue_name, put_data):
option = {
"Bucket": os.environ["DATA_BUCKET_NAME"],
"Key": f"{queue_name}.json",
"ContentType": "application/json",
"Body": json.dumps(put_data, indent=2, ensure_ascii=False).encode(),
}
s3.put_object(**option)
分散させた先で実行している処理内容。
今回はトリガーとして使用したSQS Queueの名前でS3上のJSONファイルに保存するようにしている。
(JSONファイルがなければ作成、あれば追記させている)
使ってみる
[
{"userId": "003", "message": "04393fba-2477-44ab-ac96-d46a0547251d"},
{"userId": "006", "message": "5d09cf72-4a88-4349-96c3-e0530a4d8f28"},
{"userId": "006", "message": "253c6a9c-2210-4ac2-8bba-65a6bf19c731"},
{"userId": "003", "message": "1aba6743-bf6f-4801-9872-fcd51e4ae568"},
{"userId": "001", "message": "f7ddfee9-84be-4a9c-ba78-556fcf773106"},
{"userId": "006", "message": "b662dfee-86d2-43fa-931e-95f2b18ab19a"},
{"userId": "006", "message": "d147662e-6de7-4f9d-b0d4-2e2478221c29"},
{"userId": "005", "message": "26ba573c-a02e-4d59-9bc6-0603264f4c78"},
{"userId": "002", "message": "774bf3f8-4bb8-4efb-b554-2f597235e38d"},
{"userId": "002", "message": "046b7bdb-2d4b-4000-be71-f845d3aa9bcf"},
{"userId": "005", "message": "aa1259c8-a82c-4e3a-9c48-50f976646529"},
{"userId": "001", "message": "bfc03b3e-32c6-471e-9f3d-59a14342f8fb"},
{"userId": "006", "message": "b09ca16c-f135-4108-bfb2-ab5ac57b3a74"},
{"userId": "001", "message": "39b5fbc9-6b2e-4dbf-8f39-4f722600534d"},
{"userId": "004", "message": "6e5f7941-5a56-4e2c-aa9f-30141ceec48e"},
{"userId": "002", "message": "e98108bc-8d44-4312-ab1f-4ea0e6ec968e"},
{"userId": "005", "message": "ea4fcc8b-69a4-4f0a-b6cd-acd05eb23ee3"},
{"userId": "003", "message": "5bacf9fe-7114-4d03-a0cd-2241b54412d0"},
{"userId": "004", "message": "240d3e22-bf40-4916-8b3f-52cc754c78f3"},
{"userId": "005", "message": "beda4aa7-e6e5-4555-96a9-2ecbff7ba5c1"},
{"userId": "002", "message": "1b803336-b150-4658-b27e-20c062bcbbd3"},
{"userId": "003", "message": "5da6e350-51c2-430b-a4c5-a43142945480"},
{"userId": "003", "message": "bea1a36d-bb60-48b7-8881-7ace0cb908d3"},
{"userId": "004", "message": "705d92d8-d029-4d8a-a841-3df4a2ec7cf4"},
{"userId": "003", "message": "6cd88837-da35-4fd7-a91e-5c9b9909a816"},
]
上記25個のpayloadをingesterのLambdaに連続で渡してみた。
(userId
は001
〜006
までの6個)
(InvocationType
はEvent
)
どのキューにどのuserId
のデータが入ったかをまとめると下記表のようになる。
Queue01 | Queue02 | Queue03 | Queue04 | |
---|---|---|---|---|
1 | 003 | 005 | 006 | 002 |
2 | 003 | 005 | 006 | 002 |
3 | 004 | 005 | 001 | 002 |
4 | 003 | 005 | 006 | 002 |
5 | 004 | 006 | ||
6 | 003 | 001 | ||
7 | 003 | 006 | ||
8 | 004 | 001 | ||
9 | 004 |
ユーザーIDで見ると一つのQueueしか使っていないので、JSONへの同時書き込みは制限できていると考えられる。
まとめ
サーバーレスで簡単な(?)分散処理をしてみました。
サービスを組み合わせればこんなこともできますよ、程度に思ってもらえればと思います。
基本サーバーレス開発していると、今回みたいにちょっと複雑なものを考えたりすると思うので、頑張ってみてください。