Kinesis Data Streamsにデータが送信されている状態で、Lambdaをデプロイした際の動作を確認してみた

Kinesis Data Streamsと後段のLambdaがあります。Kinesis Data Streamsにデータが送信されている状態でデプロイしたとき、どのような動作になるか試してみました。
2020.11.04

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

ふと気になりました。「Lambdaのデプロイ中にKinesis Data Streamsにデータを流し続けると、どういう動きになるんだろう?」と。

  • ある時刻を境にバッチリ変わる?
  • デプロイ付近で新しいLambdaと古いLambdaがランダムに実行される?

試してみました。

おすすめの方

  • AWS SAMを使ってみたい方
  • LambdaとDynamoDBを使ってみたい方
  • Kinesis Data Streamsにデータが送信されている状態でLambdaをデプロイした場合の挙動に興味がある方

ざっくり概要

Kinesis Data Streamsとその後段にLambdaがあります。このLambdaでは受け取ったデータをDynamoDBに書き込みます。 Kinesis Data Stremasにデータを送信し続けている状態でLambdaをデプロイ(コード変更)したとき、どのような動作になるのかを試してみました。

概要

まずはKinesis Data StreamsとLambdaを作成する

SAM Init

sam init \
    --runtime python3.7 \
    --name kinesis-lambda-deploy-sample \
    --app-template hello-world

SAMテンプレートファイル

Kinesis Data Streamsのシャード数は外部パラメータで渡すようにしています。

template.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: kinesis-lambda-deploy-sample

Parameters:
  ShardCount:
    Type: Number

Resources:
  TestStream:
    Type: AWS::Kinesis::Stream
    Properties:
        Name: !Sub todo-sample-${ShardCount}-stream
        ShardCount: !Sub ${ShardCount}

  TodoFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: hello_world/
      Handler: app.lambda_handler
      Runtime: python3.7
      Timeout: 10
      AutoPublishAlias: Sample
      Environment:
        Variables:
          TABLE_NAME: !Ref TodoTable
      Policies:
        - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
      Events:
        Stream:
          Type: Kinesis
          Properties:
            Stream: !GetAtt TestStream.Arn
            StartingPosition: LATEST
            BatchSize: 2

  TodoFunctionLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/lambda/${TodoFunction}

  TodoTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: !Sub todo-sample-${ShardCount}-table
      AttributeDefinitions:
        - AttributeName: todoId
          AttributeType: S
      KeySchema:
        - AttributeName: todoId
          KeyType: HASH
      BillingMode: PAY_PER_REQUEST

Lambdaコード

DynamoDBに書き込むデータにタイムスタンプとlambda:aaaを含めています。新しいLambdaではlambda:aaaを変更することで、デプロイ前後どちらのLambdaが書き込んだのかを区別できるようにしています。

app.py

import base64
import json
import os

import boto3

from datetime import datetime, timezone

dynamodb = boto3.resource('dynamodb')
table_name = os.environ['TABLE_NAME']

def lambda_handler(event, context):
    table = dynamodb.Table(table_name)

    for record in event['Records']:
        b64_data = record['kinesis']['data']
        data = base64.b64decode(b64_data)
        payload = json.loads(data)

        table.put_item(Item={
            'todoId': payload['todoId'],
            'title': payload['title'],
            'createdAt': int(datetime.now(timezone.utc).timestamp() * 1000),
            'lambda': 'aaa'
        })

AWS SAMデプロイ

下記でデプロイします。ShardCount1にしています。

sam build

sam package \
    --output-template-file packaged.yaml \
    --s3-bucket cm-fujii.genki-sam-test-bucket

sam deploy \
    --template-file packaged.yaml \
    --stack-name kinesis-lambda-deploy-sample-1-stack \
    --capabilities CAPABILITY_NAMED_IAM \
    --no-fail-on-empty-changeset \
    --parameter-overrides ShardCount=1

Kinesis Data Streamsにデータを送信し続けるスクリプトを作成する

下記のPythonスクリプトを作成します。同ディレクトリにfinish.txtが置かれるまで、APIアクセスを続けます。KINESIS_STREAM_NAMEには作成したKinesis Data Stremasの名前を記載します。

putter.py

import sys
import os
import json
import time
import boto3

FINISH_FILE = 'finish.txt'
KINESIS_STREAM_NAME = 'todo-sample-1-stream'

client = boto3.client('kinesis')

def put(index):
    count = 1
    result_ok = 0
    result_ng = 0

    while True:
        payload = get_payload(index, count)
        try:
            client.put_record(**payload)
        except Exception:
            result_ng += 1
            raise
        else:
            result_ok += 1

        if is_finish():
            break
        time.sleep(0.3)
        count += 1

    print(f'ok: {result_ok}')
    print(f'ng: {result_ng}')


def get_payload(index, count):
    return {
        'StreamName': KINESIS_STREAM_NAME,
        'PartitionKey': f'test-{index:02}',
        'Data': json.dumps({
            'todoId': f't{index:02}-{count:04}',
            'title': 'アレを買う'
        })
    }

def is_finish():
    if os.path.isfile(FINISH_FILE):
        return True
    return False

if __name__ == "__main__":
    args = sys.argv
    if len(args) == 2:
        put(int(args[1]))

Kinesis Data Stremasにデータを送信しながらLambdaをデプロイする(シャード数:1)

Lambdaコードを修正する

下記に変更します。DynamoDBに書き込むデータについて、一部をaaaからxxxxxxに変更しています。 これによって、デプロイ前後のどちらのLambdaから書き込んだのか区別できます。

app.py

import base64
import json
import os

import boto3

from datetime import datetime, timezone

dynamodb = boto3.resource('dynamodb')
table_name = os.environ['TABLE_NAME']

def lambda_handler(event, context):
    table = dynamodb.Table(table_name)

    for record in event['Records']:
        b64_data = record['kinesis']['data']
        data = base64.b64decode(b64_data)
        payload = json.loads(data)

        table.put_item(Item={
            'todoId': payload['todoId'],
            'title': payload['title'],
            'createdAt': int(datetime.now(timezone.utc).timestamp() * 1000),
            'lambda': 'xxxxxx'
        })

Kinesis Data Stremasにデータ送信開始する

手元のPCからKinesis Data Stremasに対して、データ送信を開始します。

python putter.py 1 &
python putter.py 2 &
python putter.py 3 &

デプロイする

下記コマンドでデプロイします。

sam build

sam package \
    --output-template-file packaged.yaml \
    --s3-bucket cm-fujii.genki-sam-test-bucket

sam deploy \
    --template-file packaged.yaml \
    --stack-name kinesis-lambda-deploy-sample-1-stack \
    --capabilities CAPABILITY_NAMED_IAM \
    --no-fail-on-empty-changeset \
    --parameter-overrides ShardCount=1

デプロイ完了後、データ送信を停止する

touch finish.txt

結果

CloudWatch Logsの様子

ログストリームは3つありましたが、ログの時刻的にデプロイ前Lambdaの同時実行数は1つでした。

CloudWatch Logsの様子

DynamoDBテーブルの様子

DynamoDBテーブルから全データを取得し、時刻順にソートしました(一部抜粋)。 デプロイ時刻付近で新しいLambdaと古いLambdaが混在実行されていることが分かります。

1604298637442: t03-0096, aaa
1604298637542: t02-0184, aaa
1604298637562: t01-0246, aaa
1604298637645: t03-0097, aaa
1604298639483: t02-0185, xxxxxx
1604298639712: t01-0247, xxxxxx
1604298640558: t03-0098, aaa
1604298640773: t02-0186, aaa
1604298640872: t01-0248, xxxxxx
1604298640925: t03-0099, xxxxxx
1604298641023: t02-0187, aaa
1604298641078: t01-0249, aaa
1604298641142: t03-0100, aaa
1604298641174: t02-0188, aaa
1604298641285: t01-0250, xxxxxx
1604298641343: t03-0101, xxxxxx
1604298641443: t02-0189, xxxxxx
1604298641483: t01-0251, xxxxxx
1604298641583: t03-0102, xxxxxx
1604298641633: t02-0190, xxxxxx
1604298641723: t01-0252, xxxxxx
1604298641763: t03-0103, xxxxxx
1604298641826: t02-0191, aaa
1604298641874: t01-0253, aaa
1604298641945: t03-0104, xxxxxx
1604298642003: t02-0192, xxxxxx
1604298642093: t01-0254, aaa
1604298642134: t03-0105, aaa
1604298642213: t02-0193, xxxxxx
1604298642263: t01-0255, xxxxxx
1604298642364: t03-0106, xxxxxx
1604298642403: t02-0194, xxxxxx
1604298642487: t01-0256, aaa
1604298642534: t03-0107, aaa
1604298642620: t02-0195, xxxxxx
1604298642683: t01-0257, xxxxxx
1604298642765: t03-0108, xxxxxx
1604298642823: t02-0196, xxxxxx
1604298642903: t01-0258, xxxxxx
1604298642962: t03-0109, xxxxxx
1604298643047: t02-0197, aaa
1604298643144: t01-0259, xxxxxx
1604298643183: t03-0110, xxxxxx
1604298643290: t02-0198, xxxxxx
1604298643345: t01-0260, xxxxxx
1604298643446: t03-0111, xxxxxx
1604298643503: t02-0199, xxxxxx

Kinesis Data Stremasにデータを送信しながらLambdaをデプロイする(シャード数:2)

シャード数を2にして、Lambdaの同時実行数を増やして試しました。

デプロイ

下記でデプロイします。ShardCount2にしています。

sam build

sam package \
    --output-template-file packaged.yaml \
    --s3-bucket cm-fujii.genki-sam-test-bucket

sam deploy \
    --template-file packaged.yaml \
    --stack-name kinesis-lambda-deploy-sample-2-stack \
    --capabilities CAPABILITY_NAMED_IAM \
    --no-fail-on-empty-changeset \
    --parameter-overrides ShardCount=2

実験の手順はさきほどと同じのため省略します。

結果

CloudWatch Logsの様子

Lambdaの同時期同数は2になりました。

CloudWatch Logsの様子

DynamoDBテーブルの様子

DynamoDBテーブルから全データを取得し、時刻順にソートしました。 こちらも、デプロイ時刻付近で新しいLambdaと古いLambdaが混在実行されていることが分かります。

1604300178013: t03-0128, aaa
1604300178022: t01-0235, aaa
1604300178733: t02-0201, aaa
1604300178774: t02-0202, aaa
1604300179379: t03-0129, yyyyyy
1604300179437: t02-0203, yyyyyy
1604300179603: t01-0236, yyyyyy
1604300179698: t03-0130, yyyyyy
1604300179707: t01-0237, yyyyyy
1604300180344: t02-0204, aaa
1604300180509: t03-0131, aaa
1604300180572: t02-0205, aaa
1604300180725: t02-0206, yyyyyy
1604300180768: t01-0238, aaa
1604300180832: t03-0132, aaa
1604300180863: t02-0207, aaa
1604300180911: t02-0208, aaa
1604300180923: t01-0239, aaa
1604300180970: t02-0209, aaa
1604300181004: t02-0210, aaa
1604300181011: t03-0133, yyyyyy
1604300181039: t01-0240, yyyyyy
1604300181104: t03-0134, aaa
1604300181144: t01-0241, aaa
1604300181753: t02-0211, yyyyyy
1604300181758: t03-0135, yyyyyy
1604300181799: t02-0212, yyyyyy
1604300181816: t01-0242, yyyyyy
1604300181884: t02-0213, aaa
1604300181916: t03-0136, yyyyyy
1604300181956: t01-0243, yyyyyy
1604300182058: t03-0137, aaa
1604300182104: t01-0244, aaa
1604300182182: t03-0138, yyyyyy
1604300182236: t01-0245, yyyyyy
1604300182316: t01-0246, yyyyyy
1604300182396: t03-0139, yyyyyy
1604300182457: t03-0140, aaa
1604300182504: t01-0247, aaa
1604300182584: t03-0141, yyyyyy
1604300182616: t01-0248, yyyyyy
1604300182766: t02-0214, yyyyyy
1604300182770: t03-0142, yyyyyy
1604300182799: t01-0249, yyyyyy

さいごに

「デプロイ付近で新しいLambdaと古いLambdaがランダムに実行される」という結果になりました。だからどうだというわけではないですが、なにかの際に役立つかもしれません。

参考