この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
ふと気になりました。「Lambdaのデプロイ中にKinesis Data Streamsにデータを流し続けると、どういう動きになるんだろう?」と。
- ある時刻を境にバッチリ変わる?
- デプロイ付近で新しいLambdaと古いLambdaがランダムに実行される?
試してみました。
おすすめの方
- AWS SAMを使ってみたい方
- LambdaとDynamoDBを使ってみたい方
- Kinesis Data Streamsにデータが送信されている状態でLambdaをデプロイした場合の挙動に興味がある方
ざっくり概要
Kinesis Data Streamsとその後段にLambdaがあります。このLambdaでは受け取ったデータをDynamoDBに書き込みます。 Kinesis Data Stremasにデータを送信し続けている状態でLambdaをデプロイ(コード変更)したとき、どのような動作になるのかを試してみました。
まずはKinesis Data StreamsとLambdaを作成する
SAM Init
sam init \
--runtime python3.7 \
--name kinesis-lambda-deploy-sample \
--app-template hello-world
SAMテンプレートファイル
Kinesis Data Streamsのシャード数は外部パラメータで渡すようにしています。
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: kinesis-lambda-deploy-sample
Parameters:
ShardCount:
Type: Number
Resources:
TestStream:
Type: AWS::Kinesis::Stream
Properties:
Name: !Sub todo-sample-${ShardCount}-stream
ShardCount: !Sub ${ShardCount}
TodoFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: hello_world/
Handler: app.lambda_handler
Runtime: python3.7
Timeout: 10
AutoPublishAlias: Sample
Environment:
Variables:
TABLE_NAME: !Ref TodoTable
Policies:
- arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
Events:
Stream:
Type: Kinesis
Properties:
Stream: !GetAtt TestStream.Arn
StartingPosition: LATEST
BatchSize: 2
TodoFunctionLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub /aws/lambda/${TodoFunction}
TodoTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub todo-sample-${ShardCount}-table
AttributeDefinitions:
- AttributeName: todoId
AttributeType: S
KeySchema:
- AttributeName: todoId
KeyType: HASH
BillingMode: PAY_PER_REQUEST
Lambdaコード
DynamoDBに書き込むデータにタイムスタンプとlambda:aaa
を含めています。新しいLambdaではlambda:aaa
を変更することで、デプロイ前後どちらのLambdaが書き込んだのかを区別できるようにしています。
app.py
import base64
import json
import os
import boto3
from datetime import datetime, timezone
dynamodb = boto3.resource('dynamodb')
table_name = os.environ['TABLE_NAME']
def lambda_handler(event, context):
table = dynamodb.Table(table_name)
for record in event['Records']:
b64_data = record['kinesis']['data']
data = base64.b64decode(b64_data)
payload = json.loads(data)
table.put_item(Item={
'todoId': payload['todoId'],
'title': payload['title'],
'createdAt': int(datetime.now(timezone.utc).timestamp() * 1000),
'lambda': 'aaa'
})
AWS SAMデプロイ
下記でデプロイします。ShardCount
は1
にしています。
sam build
sam package \
--output-template-file packaged.yaml \
--s3-bucket cm-fujii.genki-sam-test-bucket
sam deploy \
--template-file packaged.yaml \
--stack-name kinesis-lambda-deploy-sample-1-stack \
--capabilities CAPABILITY_NAMED_IAM \
--no-fail-on-empty-changeset \
--parameter-overrides ShardCount=1
Kinesis Data Streamsにデータを送信し続けるスクリプトを作成する
下記のPythonスクリプトを作成します。同ディレクトリにfinish.txt
が置かれるまで、APIアクセスを続けます。KINESIS_STREAM_NAME
には作成したKinesis Data Stremasの名前を記載します。
putter.py
import sys
import os
import json
import time
import boto3
FINISH_FILE = 'finish.txt'
KINESIS_STREAM_NAME = 'todo-sample-1-stream'
client = boto3.client('kinesis')
def put(index):
count = 1
result_ok = 0
result_ng = 0
while True:
payload = get_payload(index, count)
try:
client.put_record(**payload)
except Exception:
result_ng += 1
raise
else:
result_ok += 1
if is_finish():
break
time.sleep(0.3)
count += 1
print(f'ok: {result_ok}')
print(f'ng: {result_ng}')
def get_payload(index, count):
return {
'StreamName': KINESIS_STREAM_NAME,
'PartitionKey': f'test-{index:02}',
'Data': json.dumps({
'todoId': f't{index:02}-{count:04}',
'title': 'アレを買う'
})
}
def is_finish():
if os.path.isfile(FINISH_FILE):
return True
return False
if __name__ == "__main__":
args = sys.argv
if len(args) == 2:
put(int(args[1]))
Kinesis Data Stremasにデータを送信しながらLambdaをデプロイする(シャード数:1)
Lambdaコードを修正する
下記に変更します。DynamoDBに書き込むデータについて、一部をaaa
からxxxxxx
に変更しています。 これによって、デプロイ前後のどちらのLambdaから書き込んだのか区別できます。
app.py
import base64
import json
import os
import boto3
from datetime import datetime, timezone
dynamodb = boto3.resource('dynamodb')
table_name = os.environ['TABLE_NAME']
def lambda_handler(event, context):
table = dynamodb.Table(table_name)
for record in event['Records']:
b64_data = record['kinesis']['data']
data = base64.b64decode(b64_data)
payload = json.loads(data)
table.put_item(Item={
'todoId': payload['todoId'],
'title': payload['title'],
'createdAt': int(datetime.now(timezone.utc).timestamp() * 1000),
'lambda': 'xxxxxx'
})
Kinesis Data Stremasにデータ送信開始する
手元のPCからKinesis Data Stremasに対して、データ送信を開始します。
python putter.py 1 &
python putter.py 2 &
python putter.py 3 &
デプロイする
下記コマンドでデプロイします。
sam build
sam package \
--output-template-file packaged.yaml \
--s3-bucket cm-fujii.genki-sam-test-bucket
sam deploy \
--template-file packaged.yaml \
--stack-name kinesis-lambda-deploy-sample-1-stack \
--capabilities CAPABILITY_NAMED_IAM \
--no-fail-on-empty-changeset \
--parameter-overrides ShardCount=1
デプロイ完了後、データ送信を停止する
touch finish.txt
結果
CloudWatch Logsの様子
ログストリームは3つありましたが、ログの時刻的にデプロイ前Lambdaの同時実行数は1つでした。
DynamoDBテーブルの様子
DynamoDBテーブルから全データを取得し、時刻順にソートしました(一部抜粋)。 デプロイ時刻付近で新しいLambdaと古いLambdaが混在実行されていることが分かります。
1604298637442: t03-0096, aaa
1604298637542: t02-0184, aaa
1604298637562: t01-0246, aaa
1604298637645: t03-0097, aaa
1604298639483: t02-0185, xxxxxx
1604298639712: t01-0247, xxxxxx
1604298640558: t03-0098, aaa
1604298640773: t02-0186, aaa
1604298640872: t01-0248, xxxxxx
1604298640925: t03-0099, xxxxxx
1604298641023: t02-0187, aaa
1604298641078: t01-0249, aaa
1604298641142: t03-0100, aaa
1604298641174: t02-0188, aaa
1604298641285: t01-0250, xxxxxx
1604298641343: t03-0101, xxxxxx
1604298641443: t02-0189, xxxxxx
1604298641483: t01-0251, xxxxxx
1604298641583: t03-0102, xxxxxx
1604298641633: t02-0190, xxxxxx
1604298641723: t01-0252, xxxxxx
1604298641763: t03-0103, xxxxxx
1604298641826: t02-0191, aaa
1604298641874: t01-0253, aaa
1604298641945: t03-0104, xxxxxx
1604298642003: t02-0192, xxxxxx
1604298642093: t01-0254, aaa
1604298642134: t03-0105, aaa
1604298642213: t02-0193, xxxxxx
1604298642263: t01-0255, xxxxxx
1604298642364: t03-0106, xxxxxx
1604298642403: t02-0194, xxxxxx
1604298642487: t01-0256, aaa
1604298642534: t03-0107, aaa
1604298642620: t02-0195, xxxxxx
1604298642683: t01-0257, xxxxxx
1604298642765: t03-0108, xxxxxx
1604298642823: t02-0196, xxxxxx
1604298642903: t01-0258, xxxxxx
1604298642962: t03-0109, xxxxxx
1604298643047: t02-0197, aaa
1604298643144: t01-0259, xxxxxx
1604298643183: t03-0110, xxxxxx
1604298643290: t02-0198, xxxxxx
1604298643345: t01-0260, xxxxxx
1604298643446: t03-0111, xxxxxx
1604298643503: t02-0199, xxxxxx
Kinesis Data Stremasにデータを送信しながらLambdaをデプロイする(シャード数:2)
シャード数を2
にして、Lambdaの同時実行数を増やして試しました。
デプロイ
下記でデプロイします。ShardCount
を2
にしています。
sam build
sam package \
--output-template-file packaged.yaml \
--s3-bucket cm-fujii.genki-sam-test-bucket
sam deploy \
--template-file packaged.yaml \
--stack-name kinesis-lambda-deploy-sample-2-stack \
--capabilities CAPABILITY_NAMED_IAM \
--no-fail-on-empty-changeset \
--parameter-overrides ShardCount=2
実験の手順はさきほどと同じのため省略します。
結果
CloudWatch Logsの様子
Lambdaの同時期同数は2になりました。
DynamoDBテーブルの様子
DynamoDBテーブルから全データを取得し、時刻順にソートしました。 こちらも、デプロイ時刻付近で新しいLambdaと古いLambdaが混在実行されていることが分かります。
1604300178013: t03-0128, aaa
1604300178022: t01-0235, aaa
1604300178733: t02-0201, aaa
1604300178774: t02-0202, aaa
1604300179379: t03-0129, yyyyyy
1604300179437: t02-0203, yyyyyy
1604300179603: t01-0236, yyyyyy
1604300179698: t03-0130, yyyyyy
1604300179707: t01-0237, yyyyyy
1604300180344: t02-0204, aaa
1604300180509: t03-0131, aaa
1604300180572: t02-0205, aaa
1604300180725: t02-0206, yyyyyy
1604300180768: t01-0238, aaa
1604300180832: t03-0132, aaa
1604300180863: t02-0207, aaa
1604300180911: t02-0208, aaa
1604300180923: t01-0239, aaa
1604300180970: t02-0209, aaa
1604300181004: t02-0210, aaa
1604300181011: t03-0133, yyyyyy
1604300181039: t01-0240, yyyyyy
1604300181104: t03-0134, aaa
1604300181144: t01-0241, aaa
1604300181753: t02-0211, yyyyyy
1604300181758: t03-0135, yyyyyy
1604300181799: t02-0212, yyyyyy
1604300181816: t01-0242, yyyyyy
1604300181884: t02-0213, aaa
1604300181916: t03-0136, yyyyyy
1604300181956: t01-0243, yyyyyy
1604300182058: t03-0137, aaa
1604300182104: t01-0244, aaa
1604300182182: t03-0138, yyyyyy
1604300182236: t01-0245, yyyyyy
1604300182316: t01-0246, yyyyyy
1604300182396: t03-0139, yyyyyy
1604300182457: t03-0140, aaa
1604300182504: t01-0247, aaa
1604300182584: t03-0141, yyyyyy
1604300182616: t01-0248, yyyyyy
1604300182766: t02-0214, yyyyyy
1604300182770: t03-0142, yyyyyy
1604300182799: t01-0249, yyyyyy
さいごに
「デプロイ付近で新しいLambdaと古いLambdaがランダムに実行される」という結果になりました。だからどうだというわけではないですが、なにかの際に役立つかもしれません。