[新機能]Kinesis Streamsが時刻ベースのイテレーターに対応しました

2016.04.22

Kinesis Streamsからデータを読み取るときには

  • シャードの位置へのポインターを取得し(GetShardIterator API)
  • ポインターからのデータを取得(GetRecords API)

という2本立てが必要です。

今回の新機能により、シャード内の位置を指定する方法としてタイムスタンプも追加されました。

2016/04/22時点でシャードの位置を指定する方法は以下のものが有ります。

位置を指定する方法 説明
AT_SEQUENCE_NUMBER あるシーケンス番号
AFTER_SEQUENCE_NUMBER あるシーケンス番号の後
TRIM_HORIZON 最も古いレコード
LATEST 最も新しいレコード
AT_TIMESTAMP(今回追加) タイムスタンプ

ユースケース

Lambda関数のバグなどにより、Kinesis Stream でバッファリングされたレコードを途中からやり直さなければいけない時、これまでは再開開始するシーケンス番号をがんばって探し当てていましたが、今後は不具合発生日時がわかっていれば、その日時を指定するだけで済みます。

機能の使い方

Kinesis Streamの設定変更は不要です。

GetShardIterator API 呼び出し時に

  • ShardIteratorTypeAT_TIMESTAMP
  • Timestamp に基点となるユニックスタイム

を指定するだけです。

従来の ShardIteratorType はTypeだけを指定すれば済みましたが、ShardIteratorType 引数に AT_TIMESTAMP を指定した場合は、Timestamp 引数で起点となるタイムスタンプの指定も必要です。

以下では AWS CLI と Python SDK の boto3 から AT_TIMESTAMP を使って実際のレコードを取得します。

AWS CLI から使ってみる

GetShardIterator API でイテレーターを取得

$ aws kinesis get-shard-iterator \
  --stream-name foo \
  --shard-id shardId-000000000000 \
  --shard-iterator-type AT_TIMESTAMP \
  --timestamp 1461179497.1234
{
    "ShardIterator": "AAAAAAAAAAG5xiH3Mzb718+7duHMHm8H6BDhVPS7V21cX29b6deqFQVRkWVO5Fjw/1ghrcfOM9Ta+tgkLAn/0RPRPUn3W4+YQsCDuzARkxr+KA9lFMtVxslnqpW62JvYMxCK/N2nVHYB6EmAS9UrZ1X4DIiafMIZYWfMBFjLmU7owdieOVCvW2PKVh0Lf4j3k6Ulf7MLoKfoLrhXNqm/nxyRGvyqHdF2XMRsL+Xu3ZZ1hkefFGJPqg=="
}

GetRecords API でレコードを取得

--shard-iterator の引数として、レスポンスにある ShardIterator を指定します。

大量のレコードがかえってこないように、--limit 15 でレコード数を制限しています。

$ aws kinesis get-records \
  --shard-iterator "AAAAAAAAAAG5xiH3Mzb718+7duHMHm8H6BDhVPS7V21cX29b6deqFQVRkWVO5Fjw/1ghrcfOM9Ta+tgkLAn/0RPRPUn3W4+YQsCDuzARkxr+KA9lFMtVxslnqpW62JvYMxCK/N2nVHYB6EmAS9UrZ1X4DIiafMIZYWfMBFjLmU7owdieOVCvW2PKVh0Lf4j3k6Ulf7MLoKfoLrhXNqm/nxyRGvyqHdF2XMRsL+Xu3ZZ1hkefFGJPqg==" \
  --limit 10
{
    "Records": [
        {
            "Data": "MTQ2MTE3OTQ5Ny43Mg==",
            "PartitionKey": "1461179497.72",
            "ApproximateArrivalTimestamp": 1461179497.723,
            "SequenceNumber": "49561217150810828701271391730195637510528297060444143618"
        },
        ...,
        {
            "Data": "MTQ2MTE3OTUwMC4yOQ==",
            "PartitionKey": "1461179500.29",
            "ApproximateArrivalTimestamp": 1461179500.297,
            "SequenceNumber": "49561217150810828701271391730201682139626370412476104706"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAEpARFJ2z+prIxsEnqLaWIR65WgdgzAhYTgSI+3kdtEEF95uvA8uT3gH6JPvgdy4S6Z05WsoHbA29Q4Hc/CKi3YFNKrmaq0GDNmKbZ4xFjvOwGNw1vikvzrEDkEEJKBWzWxViJoJaoMuJ6ZNjIFx43zj3+lNnje7gVERhp76B8b2stCiiiq+B/59+JRARHVpqSpYqVnm4UOIoWmhpbZG3Dh",
    "MillisBehindLatest": 76276000
}

今回作成したレコードでは、DataPartitionKey にクライアントのデータ送信日時を設定しています。 また ApproximateArrivalTimestamp はサーバーのレコード取得日時です。

--timestamp 1461179497.1234 という指定に対して、一番古いデータは以下です。

{
    "Data": "MTQ2MTE3OTQ5Ny43Mg==",
    "PartitionKey": "1461179497.72",
    "ApproximateArrivalTimestamp": 1461179497.723,
    "SequenceNumber": "49561217150810828701271391730195637510528297060444143618"
},
  • Timestamp < PartitionKey
  • Timestamp < ApproximateArrivalTimestamp

なので期待通りですね。

boto3 から使ってみる

同じ要領で Python SDK の boto3 からもデータ取得します。

プログラム全体

# kinesis-timestamp.py
import boto3
import pprint

client = boto3.client('kinesis')

# get shard iterator at specified time
response = client.get_shard_iterator(
    StreamName='foo',
    ShardIteratorType="AT_TIMESTAMP",
    Timestamp=1461179497.1234,
    ShardId="shardId-000000000000",
)

# retrieve data
shard_iterator = response['ShardIterator']
response = client.get_records(ShardIterator=shard_iterator, Limit=10)
for record in response['Records']:
    pprint.pprint(record)

Timestampを指定し、各レコードを pretty print しているだけです。

なお、引数 Timestamp には、ユニックスタイムを直接渡すだけでなく、以下のように datetime 型を渡すこともできます。AWSへのリクエスト時に自動でユニックスタイムスタンプに変換されます。

import datetime
...

response = client.get_shard_iterator(
    StreamName='foo',
    ShardIteratorType="AT_TIMESTAMP",
    Timestamp=datetime.datetime(2016, 4, 1),
    ShardId="shardId-000000000000",
)
...

実際に実行してみましょう

$ python kinesis-timestamp.py
{u'ApproximateArrivalTimestamp': datetime.datetime(2016, 4, 20, 19, 11, 37, 723000, tzinfo=tzlocal()),
 u'Data': '1461179497.72',
 u'PartitionKey': u'1461179497.72',
 u'SequenceNumber': u'49561217150810828701271391730195637510528297060444143618'}
...
{u'ApproximateArrivalTimestamp': datetime.datetime(2016, 4, 20, 19, 11, 40, 297000, tzinfo=tzlocal()),
 u'Data': '1461179500.29',
 u'PartitionKey': u'1461179500.29',
 u'SequenceNumber': u'49561217150810828701271391730201682139626370412476104706'}

AWS CLIの時と同じ結果が得られました。

AWS Lambda のイベントソースにKinesis Streamsを指定したらどうなる?

AWS Lambda はイベントソースとしてKinesis Streamsを指定できます。

Kinesis Streamsを指定した時の開始位置の選択肢は、従来通り

  • TRIM_HORIZON(最も古いレコード)
  • LATEST(最も新しいレコード)

のままで変更はありません。

lambda-kinesis

残念。

参考