Amazon Kinesis Data Streamsのストリームにレコードを追加すると、時間とともに単調増加するシーケンス番号が割り振られ、ポインターとしてデータが保存されます。 このようにすることで、シャード内ではレコードが時系列順に並びます。
逆に、ストリーム内のデータにアクセスする場合、シャードを指定し、イテレーターを移動させながら、シーケンシャルにレコードをアクセスします。
Producerはシャードを束ねたストリームに対して操作しますが、Consumerは特定のシャードに対して操作します。 ご注意ください。
本記事では
- シャード番号
- シーケンス番号
が与えられた時に、その実レコードを AWS CLIで取得する方法を紹介します。
Kinesisストリームを処理するワーカーにデッドレターキュー(DLQ)を設定し、エラーの原因になったレコードを確認するようなユースケースを想定しています。
デッドレターキューメッセージのサンプル
Kinesis Data StreamsをEventBridge Pipes経由でLambdaで処理し、リトライ上限に到達すると、以下の様なメッセージが DLQ に送信されます。
{
"context": {
"partnerResourceArn": "arn:aws:pipes:eu-central-1:123456789012:pipe/k_worker",
"condition": "RetryAttemptsExhausted"
},
"version": "1.0",
"timestamp": "2023-04-29T15:47:42.068Z",
"KinesisBatchInfo": {
"shardId": "shardId-000000000000",
"startSequenceNumber": "49640293986203916304020474349387293118703111245603536898",
"endSequenceNumber": "49640293986203916304020474349387293118703111245603536898",
"approximateArrivalOfFirstRecord": "2023-04-29T15:46:40.681Z",
"approximateArrivalOfLastRecord": "2023-04-29T15:46:40.681Z",
"batchSize": 1,
"streamArn": "arn:aws:kinesis:eu-central-1:123456789012:stream/dummy"
}
}
condition
からリトライ上限に達したことKinesisBatchInfo
の各種属性から、問題の起きたストリーム、シャード、シーケンス情報
などがわかります。
対象は1レコードのため
startSequenceNumber
endSequenceNumber
が同じです。
レコードを復元
以下の流れでレコードを復元します。
- シャードのイテレーターを取得
- レコードを取得
- レコードをデコード
AWS CLIで実際に操作してみます。
1. シャードのイテレーターを取得
まずはシャードのイテレーターを取得します。
DLQメッセージからシャード番号(shardId
)とシーケンス番号(startSequenceNumber
)がわかるため、この位置(AT_SEQUENCE_NUMBER
)のイテレーターを Kinesis:GetShardIterator API で取得します。
$ aws kinesis get-shard-iterator \
--stream-name dummy \
--shard-id shardId-000000000000 \
--starting-sequence-number 49640293986203916304020474349387293118703111245603536898 \
--shard-iterator-type AT_SEQUENCE_NUMBER
{
"ShardIterator": "AAAAAAAAAAFcBeKIp1K/3NLqn0Dvh6eg0HAeZqfCbBP2Ll32+6ULArVn9TwbQTp0o3oGqJ/nd5W8CiCivPC0rph6JxTJHEzdP1PnTYTHl5yZDcDMZl9sxhfVTF6km/7OoH0LQDd3L/G2x3ny0yuY8dxEI3B7kddheD3YeQ4k0j9gleYDpvDcIS2C8I2/kFk7Bf++3DTW0BV4Ey+gk+cYY7dXoapfGPoJLXTmWXCByU9X3Q7amxjWyg=="
}
イテレーターの位置の指定方法は、今回紹介したシーケンス番号以外にも、
- 最古のシーケンス番号(
TRIM_HORIZON
) - 最新のシーケンス番号=新しく受け取ったレコードを処理したい(
LATEST
) - 時刻指定(
AT_TIMESTAMP
)
など、様々な指定方法があります。
2. レコードを取得
次に、イテレーターの位置にあるレコードを Kinesis:GetRecords API で取得します。
$ aws kinesis get-records \
--shard-iterator "AAAAAAAAAAG7Ge6krGEz+zKHgkvwuwXru3G7U+yGWrxQTrnHKBaqh1rrG5PBeOCjlxTjD51Un3buWuzv8TuP/5CscJtodQik5YITrD8c4QhzHNuU2kMGqgOwWq3bNyGucxNmPW3/GSMc2mR/ncaZsThlK8qjFpQCIqTfkPjFsaZjloXHhZNKDbkRylJ13Gy6jWGClv1koNKQVs7juREKJdhT3/UiMGmh3UlBz5XmF47DN09AbicsiQ=="
{
"Records": [
{
"SequenceNumber": "49640293986203916304020474349387293118703111245603536898",
"ApproximateArrivalTimestamp": "2023-04-29T15:46:40.681000+00:00",
"Data": "Zm9vCg==",
"PartitionKey": "1"
},
{
"SequenceNumber": "49640293986203916304020474349388502044522725874778243074",
"ApproximateArrivalTimestamp": "2023-04-29T15:46:40.681000+00:00",
"Data": "YmFyCg==",
"PartitionKey": "2"
}
],
"NextShardIterator": "AAAAAAAAAAHxP+XECJ/Nh5l3VqaeH1tQzICjBoXBrvFWwxV00F6Nq07yXKK0aW5T66ktKoICag1XD/+oYds1ghVw5KBSc+wYUSDPVrTGRBp/RISWrjn80UtMk/h/vcv3YFKUMJj6YzZ7u5/D2yZDVIYvYKQ9Gfvxi1dS2UUjWfTAIwXmYzCGGX2VEHVMAUP91zD9EF9V7jc4GJKJL8LGnUFBH0of9EAKQF8DAGDRPMKh4o9QPJLuJQ==",
"MillisBehindLatest": 825000
}
- イテレーター(
ShardIterator
/NextShardIterator
) - 2つのレコード、及びその
SequenceNumber
を図示すると、次の様になります。
ShardIterator
と NextShardIterator
の間にレコードがあり、レコードの SequenceNumber
は単調増加しています。
取得するレコード数を --limit
で指定することもできます。
$ aws kinesis get-records \
--shard-iterator "AAAAAAAAAAG7Ge6krGEz+zKHgkvwuwXru3G7U+yGWrxQTrnHKBaqh1rrG5PBeOCjlxTjD51Un3buWuzv8TuP/5CscJtodQik5YITrD8c4QhzHNuU2kMGqgOwWq3bNyGucxNmPW3/GSMc2mR/ncaZsThlK8qjFpQCIqTfkPjFsaZjloXHhZNKDbkRylJ13Gy6jWGClv1koNKQVs7juREKJdhT3/UiMGmh3UlBz5XmF47DN09AbicsiQ==" \
--limit 1
{
"Records": [
{
"SequenceNumber": "49640293986203916304020474349387293118703111245603536898",
"ApproximateArrivalTimestamp": "2023-04-29T15:46:40.681000+00:00",
"Data": "Zm9vCg==",
"PartitionKey": "1"
}
],
"NextShardIterator": "AAAAAAAAAAGu9ytcyEWrMtXHn23JxQKROdrhWR0Dsx5ja88iIID0X5qoKv3QKByQlnkNLzyJ+VY2J9FbwZUEwKdUtryRpI52/Ha1RtNkKJxbrGyrSDTzeidQezxuXgKLq03+d50Qwl24hI7B+g4t70rfOlJkHaeD3IfofjZZm6oWtCUoUTyn/oewUSSA8pgxuP6dz0lorI9yzGVBQuF6xW0oBuN9XDlXU0FrDc+brkpWajB4jYQ2GA==",
"MillisBehindLatest": 1555000
}
3. レコードをデコード
レコードは base64 エンコードされているため、デコードします。
$ echo 'Zm9vCg==' | base64 -d
foo
最後に
Kinesis Data Streamsを利用する際、Lambda が登場してからは
- Lambda Event Source Mapping
- Event Bridge Pipes
等を利用することで、シャードやシーケンスを意識することなく簡単にストリーム処理できるようになりました。 とはいえ、障害発生時には、今回紹介したような手順で、シャードやシーケンスを明示的に指定してエラーの原因になったレコードを確認することも必要です。
Kinesisの実運用では、エラーの起きたレコードを DLQ SQS などに一度飛ばし、Lambda ワーカー経由でKinesisから問題のレコードを取得してSlackなどに通知すると、障害対応の初動の手間が省けると思います。
それでは。