KinesisデータストリームをLambdaで処理する時のエラー制御方法をまとめてみた

2023.05.08

Kinesis Data Streams/DynamoDB Streamsのようなストリームを安定して処理するには、適切なエラー処理が重要です。

本記事では

  • 不正な入力データによるリトライ
  • バッチ処理時に一部のレコードが原因でバッチ全体をリトライ
  • 正しく処理できなかったレコードを通知

といった異常系処理を制御する方法を紹介します。

前提として、ワーカーはLambdaで実装し、Kinesisとは EventBridge Pipes、または、Lambdaイベントソースマッピングで連携しているものとします。

データソースがDynamoDB Streamsの場合も、同様のことが成り立つと思われます。

不正な入力データによるリトライを制御

一時的な障害に対してはリトライが有効ですが、不具合・不正データによるエラーが発生している場合、リトライしても処理は成功しません。 また、一時的な障害が長引いた場合は、適切なタイミングで諦めることが必要です。

例えば、EventBridge Pipesの場合、デフォルトでは24時間、または、リトライ185回のどちらかの条件を満たすまで再試行します。速やかに再試行を諦めるには、以下の値をカスタマイズします。

  • リトライ数(Retry attempts)
  • イベントの経過時間(Maximum age of event/record)

EventBridge Pipesの場合、Pipe Settingsで設定します。

Lambda Event Sourceの場合、Trigger configurationのAdditional settingsで設定します。

試しに意図的にエラーを発生させます。

デフォルト設定では、延々とリトライを繰り返します。また、exponential backoffにより、リトライ間隔が伸びていることもわかります。

リトライ 実行時刻 間隔(秒)
0 7.123
1 7.374 0.251
2 7.893 0.519
3 8.890 0.997
4 10.922 2.032
5 14.573 3.651
6 19.773 5.200
7 34.525 14.752

次に、リトライ数を2にして再度エラーを発生させます。

初回呼び出しでエラーが起きた後、2回再試行し、合計3回実行して処理を終えました。

バッチ処理時に一部のレコードが原因でバッチ全体をリトライされるのを防ぐ

バッチ処理時に一部のレコードが原因でバッチ全体が再実行されるのを防ぎたい場合

  1. 二分探索法(bisect)でバッチを分割し、エラーを誘発するレコードを含むバッチを絞り込む
  2. バッチ内で失敗したレコード(のシーケンス)を特定し、失敗箇所からやり直す

の2種類のアプローチがあります。

前者はアプリケーション側の作り込みなしに、Kinesisの基盤側でエラーを絞り込んでくれますが、同じレコードに対して何度も再実行される可能性があり、バッチサイズが大きい場合は無駄が多くなります。

後者はアプリケーション側で軽微な作り込みが伴うため、複雑さが増す一方で、リトライ時にはエラー箇所から再実行され、先行する処理に成功したレコードは再実行されないため、無駄が減ります。

実装のシンプルさを優先するなら前者、処理の効率を重視するなら後者です。

この動作を確認します。

最大リトライ数を1に設定し、不正なレコードを3つ目に含む4つのデータを渡します。

$ cat test.json 
[
        {
            "Data": "0",
            "PartitionKey": "0"
        },
        {
            "Data": "1",
            "PartitionKey": "1"
        },
        {
            "Data": "c",
            "PartitionKey": "c"
        },
        {
            "Data": "3",
            "PartitionKey": "3"
        }
]

$ aws kinesis put-records \
  --stream-name foo \
  --cli-binary-format \
  raw-in-base64-out \
  --records file://test.json
{
    "FailedRecordCount": 0,
    "Records": [
        {
            "SequenceNumber": "49640346564938479822052999557991794161517120249185435650",
            "ShardId": "shardId-000000000000"
        },
        {
            "SequenceNumber": "49640346564938479822052999557993003087336734878360141826",
            "ShardId": "shardId-000000000000"
        },
        {
            "SequenceNumber": "49640346564938479822052999557994212013156349507534848002",
            "ShardId": "shardId-000000000000"
        },
        {
            "SequenceNumber": "49640346564938479822052999557995420938975964136709554178",
            "ShardId": "shardId-000000000000"
        }
    ]
}

1. 二分探索法(bisect)でバッチを分割し、エラーを誘発するレコードを含むバッチを絞り込む

bisectオプションを有効にすると、エラー発生時にバッチを2分割して再実行します。

アプリケーションの作り込みなしにKinesis の基盤側でエラーを絞り込んでくれる一方で、同じレコードに対して何度も再実行される可能性があり、バッチサイズが大きい場合は無駄が大きくなるリスクがあります。

bisectは一度正常に処理したメッセージも再処理するため、ワーカーは冪等に実装する必要があります。

EventBridge Pipesの場合、Source設定のAdditional settingsのOn partial batch item failureAUTOMATIC_BISECTに設定します。

Lambda Event Sourceの場合、Trigger configurationのAdditional settingsのSplit batch on errorをチェックします。

Pipes向けに以下のようなConsumerを用意し、動作確認します。

import base64
import json
def lambda_handler3(event, context):
    print([base64.b64decode(record['data']) for record in event])

    for record in event:
        data = base64.b64decode(record['data'])
        int(data)

エラーが起きる度に、バッチを2分割して再実行していることがわかります。

2. バッチ内で失敗したレコード(のシーケンス)を特定し、失敗箇所からやり直す

Lambdaのレスポンスに失敗したレコード情報を batchItemFailures をキーにして返すと、失敗位置から再実行されます。

EventBridge Pipesの場合、デフォルトで有効です。

Lambda Event Sourceの場合、Trigger configurationのAdditional settingsのReport batch item failuresをチェックします。

Pipes向け実装例

import base64
import json

def lambda_handler(event, context):
    print([base64.b64decode(record['data']) for record in event])
    for record in event:
        try:
            data = base64.b64decode(record['data'])
            int(data)
        except:
            result = {"batchItemFailures" : [{'itemIdentifier' : record['sequenceNumber']}]}
            print(result)
            return result
    return {"batchItemFailures":[]}

Event Source向け実装例

import base64
import json

def lambda_handler(event, context):
    print([base64.b64decode(record['kinesis']['data']) for record in event['Records']])
    for record in event['Records']:
        data = base64.b64decode(record['kinesis']['data'])
        try:
            int(data)
        except:
            result = {"batchItemFailures" : [{'itemIdentifier' : record['kinesis']['sequenceNumber']}]}
            print(result)
            return result

Kinesis は時系列(シーケンス番号)順に処理するため、処理の失敗を検知したらバッチ全体を処理せずに、すぐにレスポンスを返してしまって問題有りません。 仮にレスポンスに複数のシーケンス番号が含まれていても、リトライ時には、一番小さなシーケンス番号から再試行されます。

If the batchItemFailures array contains multiple items, Lambda uses the record with the lowest sequence number as the checkpoint. Lambda then retries all records starting from that checkpoint.

Using AWS Lambda with Amazon Kinesis - AWS Lambda

このオプションを有効にすると、エラーの起きたレコードから再実行されますが、リトライの上限に達すると、同じバッチの他のレコードも道連れにして処理を終了します。

※4レコード目の「3」が処理されていません

bisect オプションも有効にすることで、エラー箇所から再実行し、更に、異常レコードだけピンポイントに処理をスキップできます。

CloudWatch メトリクス

EventBridge Pipesの場合、エラー発生時にメトリクスが生成され、ワーカーのエラーハンドリングによって生成されるメトリクスが異なります。

  • TargetStageFailed (batchItemFailures を利用しなかった場合)
  • TargetStagePartiallyFailed (batchItemFailures を利用した場合)

ご注意ください。

EventBridge Pipesのメトリクス

正しく処理できなかったレコードを通知

デッドレターキュー(DLQ)を利用すると、処理に失敗したメッセージを通知できます。

EventBridge Pipesの場合、Pipe settingsのDead-letter queueで設定します。

Lambda Event Sourceの場合、On-failure destinationで設定します。

Pipes DLQのメッセージ例

{
  "context": {
    "partnerResourceArn": "arn:aws:pipes:eu-central-1:12345:pipe/kine_pipe",
    "condition": "RetryAttemptsExhausted"
  },
  "version": "1.0",
  "timestamp": "2023-05-01T10:52:00.579Z",
  "KinesisBatchInfo": {
    "shardId": "shardId-000000000000",
    "startSequenceNumber": "49640346564938479822052999558037733342662992515972464642",
    "endSequenceNumber": "49640346564938479822052999558037733342662992515972464642",
    "approximateArrivalOfFirstRecord": "2023-05-01T10:51:59.005Z",
    "approximateArrivalOfLastRecord": "2023-05-01T10:51:59.005Z",
    "batchSize": 1,
    "streamArn": "arn:aws:kinesis:eu-central-1:12345:stream/foo"
  }
}

このメッセージには

  • 通知された原因(condition)
  • 対象のストリーム(streamArn)
  • 対象のシャード(shardId)
  • 対象のシーケンス番号(startSequenceNumber)

といった情報が含まれています

DLQメッセージから、エラーを引き起こしたレコードを取得する方法については、次の記事を参照ください。

まとめ

Amazon Kinesis Data StreamsとLambdaの組み合わせを例に、ストリーム処理の異常処理方法をまとめました。 DynamoDB StreamsやLambda以外のワーカーを利用する場合も、考え方は同じです。

  • リトライ数
  • エラー範囲を絞り込むbisect
  • エラーレコードを通知するDLQ

は無条件で設定し、部分成功(batchItemFailures)は余力があれば対応しましょう。

更に、ワーカーの処理は冪等に実装しましょう。

運用周りは後回しにしがちですが、早い段階でエンドツーエンドでデータを連携させ、エラー処理も仕込んでおくと、開発効率も向上すると思います。

それでは。