ストリーム処理のエラー制御がより柔軟に!!Kinesis Data StreamsとDynamoDB Streamsから起動するLambdaでチェックポイントが利用可能になりました #reinvent
CX事業本部@大阪の岩田です。
Kinesis Data StreamsとDynamoDB Streamsから起動するLambdaで新たに「チェックポイント」が利用できるようになり、処理が「途中まで成功した」という情報をLambdaだけで管理できるようになりました。
さっそく試してみたいと思います。
何が嬉しいのか?
Kinesis Data Streams/DynamoDB Streamsから起動するLambdaのイベントデータには、「バッチ」という処理単位で複数レコードの情報が渡ってきます。これまでLambda単体ではバッチ全体での処理成功/失敗しか表現できず、レコード単位での処理成功/失敗を管理するには別途DynamoDBなどを利用する必要がありました。
バッチの途中で処理が異常終了するケースについて考えてみましょう。例えば10レコードがLambdaに渡ってきた場合に7レコードまで処理が成功し、8レコード目で異常終了したとします。この場合、Lambdaのデフォルトの振る舞いでは先程と同じ10レコード分の処理にリトライすることになります。最初のLambda実行で7レコード目までは正常終了しているので、できることなら8レコード目からリトライしたいところですが、従来はそのような制御が実現できませんでした。
2019年11月のアップデートによりエラー発生時にバッチを分割するというオプションが選択可能になり、ある程度リトライ時の振る舞いを制御できるようになりましたが、このオプションではバッチを半分に分割してリトライすることしかできません。10レコード中8レコード目で異常終了した場合は、10レコードを5レコード×2バッチに分割してリトライすることになりますが、リトライの1バッチ目(1~5レコード)は既に正しく処理済みのレコードなのでLambdaを起動するだけ無駄です。2バッチ目(6~10レコード)に関しても6,7レコード目に関しては正しく処理済みなので、ここも無駄な部分になります。
今回のアップデートによって、ピンポイントで「7レコード目からリトライする」といった制御が可能になり、Lambdaの処理効率化や課金の最適化といった効果が期待できます。
やってみる
実際にチェックポイントの機能を試してみます。いくつか設定を変えながらKiensis Data StreamsからLambdaを起動し、Lambdaのログを確認してみます。もろもろ確認しやすいようにKinesisのシャード数は1で作成しておきます。
普通に実行
まずはデフォルトの振る舞いを確認してみましょう。Kinesisのストリームから以下のLambdaを起動するよう設定します。処理の内容は至ってシンプルで
- 1~3レコード目までは処理成功のログを出力
- 4レコード目の処理でエラーのログを出力しつつ、例外を発生させて異常終了する
という処理です。
なおPythonのランタイムは3.8を利用しています。
def lambda_handler(event, context): print(f'records: {len(event["Records"])}') i = 0 for record in event['Records']: i += 1 if i == 4: print(f'Error: {record["kinesis"]["sequenceNumber"]}') raise Exception('fail') print(f'Success: {record["kinesis"]["sequenceNumber"]}') # TODO implement return { 'statusCode': 200, 'body': json.dumps('Hello from Lambda!') }
ローカル環境から 以下のコードを実行し、Kinesisのストリームに10レコードのデータを投げつけます。
import boto3 import time kinesis = boto3.client('kinesis') records = [] for i in range(10): records.append({ 'Data': str(time.time()).encode('utf-8'), 'PartitionKey': '1' }) kinesis.put_records(StreamName='checkpoint-test',Records=records)
CloudWatchLogsのログを確認してみます。
10レコードが1つのバッチにまとめられており、リトライする度に同一レコード(1=3レコード目)の処理が繰り返し実行されていることが分かります。Lambdaの実装は4レコード目で必ず失敗するようになっているので、このまま延々とエラー&リトライを繰り返すことになります。
エラー時のバッチ分割を有効化して実行
次は昨年のアップデートで利用可能になったオプション「エラー時にバッチを分割」を有効化し、再度同じ処理を試します。
出力されたログです。
今度はバッチサイズが10→5→2 or 3と半分づつに小さくなっていることが分かります。バッチサイズが2 or 3であればLambdaが正常終了するので、この設定であれば重複実行による無駄はありつつもストリームが「詰まる」ということは回避できます。
エラー時にチェックポイントを返却するようにして実行
最後に今回のアップデートであるチェックポイントを利用して実行してみましょう。Lambdaのコードを以下のように変更します。
def lambda_handler(event, context): print(f'records: {len(event["Records"])}') i = 0 for record in event['Records']: try: i += 1 if i == 4: print(f'Error: {record["kinesis"]["sequenceNumber"]}') raise Exception('fail') except Exception: return fail(record) print(f'Success: {record["kinesis"]["sequenceNumber"]}') # TODO implement return { 'statusCode': 200, 'body': json.dumps('Hello from Lambda!') } def fail(record): return { 'batchItemFailures': [ { 'itemIdentifier': record['kinesis']['sequenceNumber'] } ] }
例外発生時に返却している
{ 'batchItemFailures': [ { 'itemIdentifier': record['kinesis']['sequenceNumber'] } ] }
がポイントです。LambdaのレスポンスにStreamsEventResponse
と呼ばれるこのような構造のデータを返却することで、Lambdaの実行基盤がチェックポイントを管理してくるようになります。Lambdaの実行基盤は以下の仕様に基づいてStreamsEventResponse
をチェックし、バッチ全体の成功/失敗を判定します。
バッチ全体を成功と判定する場合
- batchItemFailureに空のリストが設定されている場合
- batchItemFailureがnullの場合
- StreamsEventResponseが空の場合
- StreamsEventResponseがnullの場合
バッチ全体を失敗と判定する場合
- itemIdentifierに空文字列が設定されている場合
- itemIdentifierがnullの場合
- itemIdentifierに指定されたキー名が不正な場合
チェックポイントを利用する際は意図せずバッチ全体が成功/失敗と判定されないように、データの構造を適切に設定して下さい。
Lambdaのコードを変更したらKinesisとのマッピングの設定を変更して再度テストを行います。
出力されたログです
リトライを重ねる度にイベントデータで受け取るレコード数が10→7→4→1と減少しており、前回処理の途中からリトライできていることが分かります。
まとめ
Kinesis Data Streams及びDynamoDB Streams起動のLambdaで利用可能になったチェックポイントについてご紹介しました。チェックポイントを活用することでリトライ時の処理時間や課金を最適化できる可能性があるので、Lambdaでストリーム処理を実装されている方は是非チェックポイントの利用をご検討下さい。