この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Kinesis Data Streams/DynamoDB Streamsのようなストリームを安定して処理するには、適切なエラー処理が重要です。
本記事では
- 不正な入力データによるリトライ
- バッチ処理時に一部のレコードが原因でバッチ全体をリトライ
- 正しく処理できなかったレコードを通知
といった異常系処理を制御する方法を紹介します。
前提として、ワーカーはLambdaで実装し、Kinesisとは EventBridge Pipes、または、Lambdaイベントソースマッピングで連携しているものとします。
データソースがDynamoDB Streamsの場合も、同様のことが成り立つと思われます。
不正な入力データによるリトライを制御
一時的な障害に対してはリトライが有効ですが、不具合・不正データによるエラーが発生している場合、リトライしても処理は成功しません。 また、一時的な障害が長引いた場合は、適切なタイミングで諦めることが必要です。
例えば、EventBridge Pipesの場合、デフォルトでは24時間、または、リトライ185回のどちらかの条件を満たすまで再試行します。速やかに再試行を諦めるには、以下の値をカスタマイズします。
- リトライ数(Retry attempts)
- イベントの経過時間(Maximum age of event/record)
EventBridge Pipesの場合、Pipe Settingsで設定します。
Lambda Event Sourceの場合、Trigger configurationのAdditional settingsで設定します。
試しに意図的にエラーを発生させます。
デフォルト設定では、延々とリトライを繰り返します。また、exponential backoffにより、リトライ間隔が伸びていることもわかります。
リトライ | 実行時刻 | 間隔(秒) |
---|---|---|
0 | 7.123 | |
1 | 7.374 | 0.251 |
2 | 7.893 | 0.519 |
3 | 8.890 | 0.997 |
4 | 10.922 | 2.032 |
5 | 14.573 | 3.651 |
6 | 19.773 | 5.200 |
7 | 34.525 | 14.752 |
次に、リトライ数を2にして再度エラーを発生させます。
初回呼び出しでエラーが起きた後、2回再試行し、合計3回実行して処理を終えました。
バッチ処理時に一部のレコードが原因でバッチ全体をリトライされるのを防ぐ
バッチ処理時に一部のレコードが原因でバッチ全体が再実行されるのを防ぎたい場合
- 二分探索法(bisect)でバッチを分割し、エラーを誘発するレコードを含むバッチを絞り込む
- バッチ内で失敗したレコード(のシーケンス)を特定し、失敗箇所からやり直す
の2種類のアプローチがあります。
前者はアプリケーション側の作り込みなしに、Kinesisの基盤側でエラーを絞り込んでくれますが、同じレコードに対して何度も再実行される可能性があり、バッチサイズが大きい場合は無駄が多くなります。
後者はアプリケーション側で軽微な作り込みが伴うため、複雑さが増す一方で、リトライ時にはエラー箇所から再実行され、先行する処理に成功したレコードは再実行されないため、無駄が減ります。
実装のシンプルさを優先するなら前者、処理の効率を重視するなら後者です。
この動作を確認します。
最大リトライ数を1に設定し、不正なレコードを3つ目に含む4つのデータを渡します。
$ cat test.json
[
{
"Data": "0",
"PartitionKey": "0"
},
{
"Data": "1",
"PartitionKey": "1"
},
{
"Data": "c",
"PartitionKey": "c"
},
{
"Data": "3",
"PartitionKey": "3"
}
]
$ aws kinesis put-records \
--stream-name foo \
--cli-binary-format \
raw-in-base64-out \
--records file://test.json
{
"FailedRecordCount": 0,
"Records": [
{
"SequenceNumber": "49640346564938479822052999557991794161517120249185435650",
"ShardId": "shardId-000000000000"
},
{
"SequenceNumber": "49640346564938479822052999557993003087336734878360141826",
"ShardId": "shardId-000000000000"
},
{
"SequenceNumber": "49640346564938479822052999557994212013156349507534848002",
"ShardId": "shardId-000000000000"
},
{
"SequenceNumber": "49640346564938479822052999557995420938975964136709554178",
"ShardId": "shardId-000000000000"
}
]
}
1. 二分探索法(bisect)でバッチを分割し、エラーを誘発するレコードを含むバッチを絞り込む
bisectオプションを有効にすると、エラー発生時にバッチを2分割して再実行します。
アプリケーションの作り込みなしにKinesis の基盤側でエラーを絞り込んでくれる一方で、同じレコードに対して何度も再実行される可能性があり、バッチサイズが大きい場合は無駄が大きくなるリスクがあります。
bisectは一度正常に処理したメッセージも再処理するため、ワーカーは冪等に実装する必要があります。
EventBridge Pipesの場合、Source設定のAdditional settingsのOn partial batch item failureをAUTOMATIC_BISECT
に設定します。
Lambda Event Sourceの場合、Trigger configurationのAdditional settingsのSplit batch on errorをチェックします。
Pipes向けに以下のようなConsumerを用意し、動作確認します。
import base64
import json
def lambda_handler3(event, context):
print([base64.b64decode(record['data']) for record in event])
for record in event:
data = base64.b64decode(record['data'])
int(data)
エラーが起きる度に、バッチを2分割して再実行していることがわかります。
2. バッチ内で失敗したレコード(のシーケンス)を特定し、失敗箇所からやり直す
Lambdaのレスポンスに失敗したレコード情報を batchItemFailures
をキーにして返すと、失敗位置から再実行されます。
EventBridge Pipesの場合、デフォルトで有効です。
Lambda Event Sourceの場合、Trigger configurationのAdditional settingsのReport batch item failuresをチェックします。
Pipes向け実装例
import base64
import json
def lambda_handler(event, context):
print([base64.b64decode(record['data']) for record in event])
for record in event:
try:
data = base64.b64decode(record['data'])
int(data)
except:
result = {"batchItemFailures" : [{'itemIdentifier' : record['sequenceNumber']}]}
print(result)
return result
return {"batchItemFailures":[]}
Event Source向け実装例
import base64
import json
def lambda_handler(event, context):
print([base64.b64decode(record['kinesis']['data']) for record in event['Records']])
for record in event['Records']:
data = base64.b64decode(record['kinesis']['data'])
try:
int(data)
except:
result = {"batchItemFailures" : [{'itemIdentifier' : record['kinesis']['sequenceNumber']}]}
print(result)
return result
Kinesis は時系列(シーケンス番号)順に処理するため、処理の失敗を検知したらバッチ全体を処理せずに、すぐにレスポンスを返してしまって問題有りません。 仮にレスポンスに複数のシーケンス番号が含まれていても、リトライ時には、一番小さなシーケンス番号から再試行されます。
If the batchItemFailures array contains multiple items, Lambda uses the record with the lowest sequence number as the checkpoint. Lambda then retries all records starting from that checkpoint.
このオプションを有効にすると、エラーの起きたレコードから再実行されますが、リトライの上限に達すると、同じバッチの他のレコードも道連れにして処理を終了します。
※4レコード目の「3」が処理されていません
bisect オプションも有効にすることで、エラー箇所から再実行し、更に、異常レコードだけピンポイントに処理をスキップできます。
CloudWatch メトリクス
EventBridge Pipesの場合、エラー発生時にメトリクスが生成され、ワーカーのエラーハンドリングによって生成されるメトリクスが異なります。
- TargetStageFailed (
batchItemFailures
を利用しなかった場合) - TargetStagePartiallyFailed (
batchItemFailures
を利用した場合)
ご注意ください。
正しく処理できなかったレコードを通知
デッドレターキュー(DLQ)を利用すると、処理に失敗したメッセージを通知できます。
EventBridge Pipesの場合、Pipe settingsのDead-letter queueで設定します。
Lambda Event Sourceの場合、On-failure destinationで設定します。
Pipes DLQのメッセージ例
{
"context": {
"partnerResourceArn": "arn:aws:pipes:eu-central-1:12345:pipe/kine_pipe",
"condition": "RetryAttemptsExhausted"
},
"version": "1.0",
"timestamp": "2023-05-01T10:52:00.579Z",
"KinesisBatchInfo": {
"shardId": "shardId-000000000000",
"startSequenceNumber": "49640346564938479822052999558037733342662992515972464642",
"endSequenceNumber": "49640346564938479822052999558037733342662992515972464642",
"approximateArrivalOfFirstRecord": "2023-05-01T10:51:59.005Z",
"approximateArrivalOfLastRecord": "2023-05-01T10:51:59.005Z",
"batchSize": 1,
"streamArn": "arn:aws:kinesis:eu-central-1:12345:stream/foo"
}
}
このメッセージには
- 通知された原因(
condition
) - 対象のストリーム(
streamArn
) - 対象のシャード(
shardId
) - 対象のシーケンス番号(
startSequenceNumber
)
といった情報が含まれています
DLQメッセージから、エラーを引き起こしたレコードを取得する方法については、次の記事を参照ください。
まとめ
Amazon Kinesis Data StreamsとLambdaの組み合わせを例に、ストリーム処理の異常処理方法をまとめました。 DynamoDB StreamsやLambda以外のワーカーを利用する場合も、考え方は同じです。
- リトライ数
- エラー範囲を絞り込むbisect
- エラーレコードを通知するDLQ
は無条件で設定し、部分成功(batchItemFailures
)は余力があれば対応しましょう。
更に、ワーカーの処理は冪等に実装しましょう。
運用周りは後回しにしがちですが、早い段階でエンドツーエンドでデータを連携させ、エラー処理も仕込んでおくと、開発効率も向上すると思います。
それでは。