AWS CLIを使ってKinesis Data Streamsのレコードをシーケンス番号から取得してみた

2023.05.01

Amazon Kinesis Data Streamsのストリームにレコードを追加すると、時間とともに単調増加するシーケンス番号が割り振られ、ポインターとしてデータが保存されます。 このようにすることで、シャード内ではレコードが時系列順に並びます。

逆に、ストリーム内のデータにアクセスする場合、シャードを指定し、イテレーターを移動させながら、シーケンシャルにレコードをアクセスします。

Producerはシャードを束ねたストリームに対して操作しますが、Consumerは特定のシャードに対して操作します。 ご注意ください。

本記事では

  • シャード番号
  • シーケンス番号

が与えられた時に、その実レコードを AWS CLIで取得する方法を紹介します。

Kinesisストリームを処理するワーカーにデッドレターキュー(DLQ)を設定し、エラーの原因になったレコードを確認するようなユースケースを想定しています。

デッドレターキューメッセージのサンプル

Kinesis Data StreamsをEventBridge Pipes経由でLambdaで処理し、リトライ上限に到達すると、以下の様なメッセージが DLQ に送信されます。

{
  "context": {
    "partnerResourceArn": "arn:aws:pipes:eu-central-1:123456789012:pipe/k_worker",
    "condition": "RetryAttemptsExhausted"
  },
  "version": "1.0",
  "timestamp": "2023-04-29T15:47:42.068Z",
  "KinesisBatchInfo": {
    "shardId": "shardId-000000000000",
    "startSequenceNumber": "49640293986203916304020474349387293118703111245603536898",
    "endSequenceNumber": "49640293986203916304020474349387293118703111245603536898",
    "approximateArrivalOfFirstRecord": "2023-04-29T15:46:40.681Z",
    "approximateArrivalOfLastRecord": "2023-04-29T15:46:40.681Z",
    "batchSize": 1,
    "streamArn": "arn:aws:kinesis:eu-central-1:123456789012:stream/dummy"
  }
}
  • condition からリトライ上限に達したこと
  • KinesisBatchInfo の各種属性から、問題の起きたストリーム、シャード、シーケンス情報

などがわかります。

対象は1レコードのため

  • startSequenceNumber
  • endSequenceNumber

が同じです。

レコードを復元

以下の流れでレコードを復元します。

  1. シャードのイテレーターを取得
  2. レコードを取得
  3. レコードをデコード

AWS CLIで実際に操作してみます。

1. シャードのイテレーターを取得

まずはシャードのイテレーターを取得します。

DLQメッセージからシャード番号(shardId)とシーケンス番号(startSequenceNumber)がわかるため、この位置(AT_SEQUENCE_NUMBER)のイテレーターを Kinesis:GetShardIterator API で取得します。

$ aws kinesis get-shard-iterator \
    --stream-name dummy \
    --shard-id shardId-000000000000 \
    --starting-sequence-number 49640293986203916304020474349387293118703111245603536898 \
    --shard-iterator-type AT_SEQUENCE_NUMBER
{
    "ShardIterator": "AAAAAAAAAAFcBeKIp1K/3NLqn0Dvh6eg0HAeZqfCbBP2Ll32+6ULArVn9TwbQTp0o3oGqJ/nd5W8CiCivPC0rph6JxTJHEzdP1PnTYTHl5yZDcDMZl9sxhfVTF6km/7OoH0LQDd3L/G2x3ny0yuY8dxEI3B7kddheD3YeQ4k0j9gleYDpvDcIS2C8I2/kFk7Bf++3DTW0BV4Ey+gk+cYY7dXoapfGPoJLXTmWXCByU9X3Q7amxjWyg=="
}

イテレーターの位置の指定方法は、今回紹介したシーケンス番号以外にも、

  • 最古のシーケンス番号(TRIM_HORIZON)
  • 最新のシーケンス番号=新しく受け取ったレコードを処理したい(LATEST)
  • 時刻指定(AT_TIMESTAMP)

など、様々な指定方法があります。

2. レコードを取得

次に、イテレーターの位置にあるレコードを Kinesis:GetRecords API で取得します。

$ aws kinesis get-records \
  --shard-iterator "AAAAAAAAAAG7Ge6krGEz+zKHgkvwuwXru3G7U+yGWrxQTrnHKBaqh1rrG5PBeOCjlxTjD51Un3buWuzv8TuP/5CscJtodQik5YITrD8c4QhzHNuU2kMGqgOwWq3bNyGucxNmPW3/GSMc2mR/ncaZsThlK8qjFpQCIqTfkPjFsaZjloXHhZNKDbkRylJ13Gy6jWGClv1koNKQVs7juREKJdhT3/UiMGmh3UlBz5XmF47DN09AbicsiQ=="
{
    "Records": [
        {
            "SequenceNumber": "49640293986203916304020474349387293118703111245603536898",
            "ApproximateArrivalTimestamp": "2023-04-29T15:46:40.681000+00:00",
            "Data": "Zm9vCg==",
            "PartitionKey": "1"
        },
        {
            "SequenceNumber": "49640293986203916304020474349388502044522725874778243074",
            "ApproximateArrivalTimestamp": "2023-04-29T15:46:40.681000+00:00",
            "Data": "YmFyCg==",
            "PartitionKey": "2"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAHxP+XECJ/Nh5l3VqaeH1tQzICjBoXBrvFWwxV00F6Nq07yXKK0aW5T66ktKoICag1XD/+oYds1ghVw5KBSc+wYUSDPVrTGRBp/RISWrjn80UtMk/h/vcv3YFKUMJj6YzZ7u5/D2yZDVIYvYKQ9Gfvxi1dS2UUjWfTAIwXmYzCGGX2VEHVMAUP91zD9EF9V7jc4GJKJL8LGnUFBH0of9EAKQF8DAGDRPMKh4o9QPJLuJQ==",
    "MillisBehindLatest": 825000
}
  • イテレーター(ShardIterator / NextShardIterator)
  • 2つのレコード、及びその SequenceNumber

を図示すると、次の様になります。

ShardIteratorNextShardIterator の間にレコードがあり、レコードの SequenceNumber は単調増加しています。

取得するレコード数を --limit で指定することもできます。

$ aws kinesis get-records \
  --shard-iterator "AAAAAAAAAAG7Ge6krGEz+zKHgkvwuwXru3G7U+yGWrxQTrnHKBaqh1rrG5PBeOCjlxTjD51Un3buWuzv8TuP/5CscJtodQik5YITrD8c4QhzHNuU2kMGqgOwWq3bNyGucxNmPW3/GSMc2mR/ncaZsThlK8qjFpQCIqTfkPjFsaZjloXHhZNKDbkRylJ13Gy6jWGClv1koNKQVs7juREKJdhT3/UiMGmh3UlBz5XmF47DN09AbicsiQ==" \
  --limit 1
{
    "Records": [
        {
            "SequenceNumber": "49640293986203916304020474349387293118703111245603536898",
            "ApproximateArrivalTimestamp": "2023-04-29T15:46:40.681000+00:00",
            "Data": "Zm9vCg==",
            "PartitionKey": "1"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAGu9ytcyEWrMtXHn23JxQKROdrhWR0Dsx5ja88iIID0X5qoKv3QKByQlnkNLzyJ+VY2J9FbwZUEwKdUtryRpI52/Ha1RtNkKJxbrGyrSDTzeidQezxuXgKLq03+d50Qwl24hI7B+g4t70rfOlJkHaeD3IfofjZZm6oWtCUoUTyn/oewUSSA8pgxuP6dz0lorI9yzGVBQuF6xW0oBuN9XDlXU0FrDc+brkpWajB4jYQ2GA==",
    "MillisBehindLatest": 1555000
}

3. レコードをデコード

レコードは base64 エンコードされているため、デコードします。

$ echo 'Zm9vCg==' | base64 -d
foo

最後に

Kinesis Data Streamsを利用する際、Lambda が登場してからは

  • Lambda Event Source Mapping
  • Event Bridge Pipes

等を利用することで、シャードやシーケンスを意識することなく簡単にストリーム処理できるようになりました。 とはいえ、障害発生時には、今回紹介したような手順で、シャードやシーケンスを明示的に指定してエラーの原因になったレコードを確認することも必要です。

Kinesisの実運用では、エラーの起きたレコードを DLQ SQS などに一度飛ばし、Lambda ワーカー経由でKinesisから問題のレコードを取得してSlackなどに通知すると、障害対応の初動の手間が省けると思います。

それでは。