Kinesis Data Streams へのデータ格納をトリガーに Lambda を起動する際の Batch size を変更して挙動を確認した(ParallelizationFactor)
コーヒーが好きな emi です。
今回は Kinesis Data Streams へのデータ格納をトリガーに Lambda を起動する際の Batch size を変更して挙動を確認してみました。
Kinesis Data Streams についての詳細は以下のブログで紹介しておりますので、参考にしてください。
最初にまとめ
- Batch size を大きくすると Lambda での処理速度は速くなるが、順番は保証されない
- Batch size を小さくすると Lambda での処理速度は遅くなるが、順番は保証される
Kinesis Data Streams へのデータ格納をトリガーに Lambda 関数を起動することができる
Lambda はイベント駆動サービスです。
Lambda は様々なイベントをトリガーに設定することができ、Kinesis Data Streams へのデータ流入をトリガーに Lambda を起動することもできます。
Kinesis Data Streams へのデータ格納をトリガーに Lambda 関数を起動する際の設定項目
Kinesis Data Streams へのデータ流入をトリガーに Lambda 関数を実行するための設定パラメーターはたくさんあります。要件に応じて緻密にチューニング可能というわけです。
マネジメントコンソールから設定画面を見てみましょう。
Lambda コンソールからトリガーしたい Lambda 関数を選択し、[設定] - [トリガー] - [トリガーを追加] から、トリガーとなるサービスを選択します。
たくさんサービスが表示されますが、今回は Kinesis Data Streams へのデータ格納をトリガーにしたいので Kinesis を選択します。
その後は以下のような項目が表示されるので、システム要件に応じて設定値を決めてください。
Amazon Kinesis で AWS Lambda を使用する - イベントソースとしてストリームを設定する - AWS Lambda
- Activate trigger
- このオプションが「はい」に設定されている場合、トリガーは有効です。定義されたアクションが実行されます。
- Batch size★
- 一度に処理するレコードの最大数を指定します。デフォルト 100、最大 10,000 です。
- Batch window
- レコードをバッチ処理するために Lambda が待機する最大時間を秒単位で指定します。この設定は、Batch size が満たされる前にバッチを処理するために使用されます。
- Concurrent batches per shard
- Lambda 関数が各シャードから同時に処理できるバッチの最大数です。例えばこの数が 3 なら、Lambda は同じシャードから同時に最大 3 つのバッチを処理することができます。Batch size が 10 で Concurrent batches per shard が 3 の場合、Lambda は 1 つのシャードから同時に最大 30 レコード(3 バッチ x 各バッチ 10 レコード)までを処理することができます。ParallelizationFactor と呼ぶようです。
- Maximum age of record
- 処理するレコードの最大寿命を指定します。最大 604,800 秒(7 日間)です。-1 はレコードの寿命が無制限であることを示します。
- On-failure destination
- 処理に失敗したレコードの送信先を指定します。
- Report batch item failures
- 個々のレコードの処理失敗をレポートするかどうかを指定します。
- Retry attempts
- レコードの処理が失敗した場合に、そのレコードの処理を再試行する回数を指定します。「なし」は再試行しないことを示します。
- Split batch on error
- バッチ内のレコードの処理中にエラーが発生した場合に、バッチを分割して再処理するかどうかを指定します。「いいえ」に設定されている場合、エラーが発生してもバッチは分割されません。
- Starting position
- ストリームからレコードを読み取る開始位置を指定します。「LATEST」は最新のレコードから読み取りを開始することを示します。
- Tumbling window duration
- 特定の時間枠内で Kinesis Data Streams のストリームデータを集約し、その集約されたデータに対して一度に Lambda 関数を実行することが可能になります。これにより Lambda 関数は一回の実行でより大きなデータセットを処理できるようになります。
今回注目するのは「Batch size」です。Batch size を変更することでどのように結果が出力されるか観察します。
構成図
以下のような構成で、Batch size を変更して挙動を確認します。
Kinesis Data Streams の作成
Kinesis Data Streams の作成 を参考に Kinesis Data Streams を作成します。名前は「Kinesis_Kinesis_Lambda_test」としました。
作成できたら、Kinesis Data Streams の ARN をコピーしておいてください。
Lambda 関数の作成
InLambda_Kinesis_Lambda_test
Kinesis Data Streams にデータを流し込む「InLambda_Kinesis_Lambda_test」という Lambda 関数は Lambda の作成 と同じ権限、同じコードで作成します。
この関数は、Lambda から「"オーレオレ"」「"サンバ"」「"ソーレソレ"」「"ポンポコ"」の 4 つの文字列の中からランダムに選択した文字列を Kinesis Data Streams のストリームに送信します。
OutLambda_Kinesis_Lambda_test
トリガーとして Kinesis Data Streams を設定する「OutLambda_Kinesis_Lambda_test」という関数を作成します。
Lambda コンソールを開き、[関数] - [関数の作成] をクリックします。
- 一から作成
- 関数名:今回は「OutLambda_Kinesis_Lambda_test」という名前で作成しました。
- ランタイム:Python 3.11
- アーキテクチャ:x86_64
「関数の作成」をクリックします。
OutLambda_Kinesis_Lambda_test に権限を付与する
AWS マネジメントコンソールから Lambda 関数を作成すると、デフォルトで CloudWatch Logs にログを書き込むアクセス許可を持つ実行ロールが自動作成され付与されます。
このデフォルトの IAM ロールに、Kinesis Data Streams 内のレコードを Get するための権限を追加しましょう。
- ポリシーを作成
IAM コンソールより [ポリシー] - [ポリシーの作成] で、以下のようなカスタマー管理ポリシーを作成してください。 IAM ポリシーの名前は「PermissionsToKinesis」としました。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "PermissionsToKinesis", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListShards", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/Kinesis_Kinesis_Lambda_test" } ] }
"Resource"
には、コピーしておいた Kinesis Data Streams の ARN を入れてください。 これが、Kinesis Data Streams のレコードデータを Get するための IAM ポリシーです。
- ポリシーのアタッチ
作成した「OutLambda_Kinesis_Lambda_test」の [設定タブ] - [アクセス権限] - [実行ロール] より、Lambda に付与された IAM ロールのリンクをクリックしてください。IAM ロールの詳細が別タブで開きます。
IAM ロールの詳細画面で、[許可を追加] - [ポリシーをアタッチ] より、先程作成した Kinesis Data Streams のレコードデータを Get するための IAM ポリシー「PermissionsToKinesis」をアタッチします。
これで Kinesis Data Streams のレコードデータを Get するための権限を Lambda 関数に追加できました。
関数コードの作成
以下のように関数コードを作成します。 「lambda_function.py」の中身を以下のコードに書き換えてください。
import json import boto3 import base64 import logging # CloudWatch Logsにログを出力するための設定 logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): for record in event['Records']: # Kinesis Data Streamsからデータを取得し、デコードする payload = json.loads(base64.b64decode(record['kinesis']['data'])) if payload == "オーレオレ": # デコードしたデータをCloudWatch Logsに表示 logger.info('マツケンサンバ!') elif payload == "サンバ": # デコードしたデータをCloudWatch Logsに表示 logger.info('ビバサンバ!') else: # デコードしたデータをCloudWatch Logsに表示 logger.info('マツケンサンバできません。') return { 'statusCode': 200, 'body': json.dumps('Successfully processed messages from Kinesis Data Streams!') }
この関数は、Kinesis Data Streams から取得したレコードの文字列によって異なるログメッセージを CloudWatch Logs に表示します。
- 「"オーレオレ"」を取得した場合→「マツケンサンバ!」を表示
- 「"サンバ"」を取得した場合→「ビバサンバ!」を表示
- それ以外の文字列を取得した場合→「マツケンサンバできません。」を表示
Batch size を変更して挙動を確認する
Batch size を変更して挙動を確認していきます。Lambda コンソールから「OutLambda_Kinesis_Lambda_test」を選択し、[設定] - [トリガー] - [トリガーを追加] から、トリガーとなる Kinesis Data Streams を選択します。Batch size 以外のパラメーターは以下のように設定しました。
- Activate trigger:はい
- Batch window:なし
- Concurrent batches per shard:1
- Maximum age of record:-1
- On-failure destination:なし
- Report batch item failures:はい
- Retry attempts:なし
- Split batch on error:いいえ
- Starting position:LATEST
- Tumbling window duration:なし
Batch size = 10
「OutLambda_Kinesis_Lambda_test」のトリガーに Kinesis Data Streams を指定し、Batch size = 10 にした状態で「InLambda_Kinesis_Lambda_test」をテスト実行します。
テストタブから「テスト」を一度クリックします。「成功」と表示されたら OK です。
CloudWatch Logs でログを確認します。
- ①CloudWatch コンソールで [ログ] - [ログのインサイト(CloudWatch Logs Insight)] を開きます。
- ②タイムゾーンが「Local timezone」(日本時間)になっていることを確認します。
- ③Lambda 関数を実行した時間を指定します。
- ④ロググループを選択します。今回は Lambda 関数「InLambda_Kinesis_Lambda_test」のロググループである「/aws/lambda/InLambda_Kinesis_Lambda_test」を選択しています。
- ⑤ログを検索するためのクエリを書きます。今回は 50 行のログを昇順(asc、早い時間→遅い時間)で表示するため、以下のようにクエリを書きます。
fields @timestamp, @message | sort @timestamp asc | limit 50
- ⑥「クエリの実行」をクリックすると、以下のようにログが表示されます。
同様に、今度は Lambda 関数「OutLambda_Kinesis_Lambda_test」のロググループである「/aws/lambda/OutLambda_Kinesis_Lambda_test」でログを抽出すると、以下のように表示されます。
画像が小さく恐縮ですが、実行時間は @duration
フィールドに記載されており、2 行目に注目いただくと、この場合は 12.92 ms(ミリ秒)です。
実際に文字列が流し込まれた箇所だけログ取得したいので、「InLambda_Kinesis_Lambda_test」のロググループに対し以下のクエリでもう一度実行します。
fields @timestamp, @message | filter @message like '[INFO]' and @message not like 'Found credentials in environment variables.' | sort @timestamp desc | limit 15
以下のように、文字列が流し込まれたログのみ抽出されます。
同様に、「OutLambda_Kinesis_Lambda_test」のロググループ対し以下のクエリでもう一度実行します。
fields @timestamp, @message | filter @message like '[INFO]' | sort @timestamp desc | limit 15
以下のように、文字列が表示されたログのみ抽出されます。
注目いただきたいのはリクエスト ID です。
リクエスト ID の文字列が同じログは、Lambda 関数 1 回の実行の中で処理されたものです。
「InLambda_Kinesis_Lambda_test」の方は 1 回の Labmda 関数実行で 15 行のメッセージを出力しているのでリクエスト ID がすべて同じですが、「OutLambda_Kinesis_Lambda_test」の方は 10 レコード処理したところでリクエスト ID が変わっています。 これは、Batch size = 10 と設定したためです。
小さくなってしまい恐縮ですが、以下が抽出した「InLambda_Kinesis_Lambda_test」のログ「OutLambda_Kinesis_Lambda_test」のログを左右に並べたものです。
以下の想定処理に従って線を引いてみました。
- 「オーレオレ」に対し「マツケンサンバ!」が返ってきている
- 「サンバ」に対し「ビバサンバ!」が返って来ている
- それ以外は「マツケンサンバできません。」が返ってきている
一度の Lambda 関数実行の中で処理されたレコードは順不同になる模様です。
Batch size = 15
同様に、Batch size = 15 でも実験します。
1 回の Lambda 関数実行で 15 レコードすべてを処理しましたので、リクエスト ID はすべて同じです。
1 回の Lambda 関数実行で 15 レコードすべてを処理できたので、処理時間は短くなりました。
レコードの順番はバラバラです。
Batch size = 1
同様に、Batch size = 1 でも実験します。
1 回の Lambda 関数実行で 1 レコードだけ処理するので、リクエスト ID はすべてバラバラです。
1 回の Lambda 関数実行で 1 レコードだけ処理するので、処理時間は最も長くなりました。
レコードの順番は流し込んだ順番と同じで、保たれています。
おわりに
Kinesis Data Streams へのデータ格納をトリガーに Lambda を起動する際の Batch size を変更し、パフォーマンスチューニングをしてみました。
他にも設定可能なパラメーターがたくさんありますので、いろいろ実験してみたいところです。
順番が多少担保されなくてもよい、もしくはタイムスタンプ等で後から順序が判断可能ならば、SQS キューに入れて非同期実装とした方が良い場合もありそうです。
参考
Kinesis と DynamoDB をイベントソースにする際の AWS Lambda の新しいスケーリング管理