この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Kinesis Streamsからデータを読み取るときには
- シャードの位置へのポインターを取得し(
GetShardIterator
API) - ポインターからのデータを取得(
GetRecords
API)
という2本立てが必要です。
今回の新機能により、シャード内の位置を指定する方法としてタイムスタンプも追加されました。
2016/04/22時点でシャードの位置を指定する方法は以下のものが有ります。
位置を指定する方法 | 説明 |
AT_SEQUENCE_NUMBER | あるシーケンス番号 |
AFTER_SEQUENCE_NUMBER | あるシーケンス番号の後 |
TRIM_HORIZON | 最も古いレコード |
LATEST | 最も新しいレコード |
AT_TIMESTAMP(今回追加) | タイムスタンプ |
ユースケース
Lambda関数のバグなどにより、Kinesis Stream でバッファリングされたレコードを途中からやり直さなければいけない時、これまでは再開開始するシーケンス番号をがんばって探し当てていましたが、今後は不具合発生日時がわかっていれば、その日時を指定するだけで済みます。
機能の使い方
Kinesis Streamの設定変更は不要です。
GetShardIterator
API 呼び出し時に
ShardIteratorType
にAT_TIMESTAMP
Timestamp
に基点となるユニックスタイム
を指定するだけです。
従来の ShardIteratorType
はTypeだけを指定すれば済みましたが、ShardIteratorType
引数に AT_TIMESTAMP
を指定した場合は、Timestamp
引数で起点となるタイムスタンプの指定も必要です。
以下では AWS CLI と Python SDK の boto3 から AT_TIMESTAMP
を使って実際のレコードを取得します。
AWS CLI から使ってみる
GetShardIterator API でイテレーターを取得
$ aws kinesis get-shard-iterator \
--stream-name foo \
--shard-id shardId-000000000000 \
--shard-iterator-type AT_TIMESTAMP \
--timestamp 1461179497.1234
{
"ShardIterator": "AAAAAAAAAAG5xiH3Mzb718+7duHMHm8H6BDhVPS7V21cX29b6deqFQVRkWVO5Fjw/1ghrcfOM9Ta+tgkLAn/0RPRPUn3W4+YQsCDuzARkxr+KA9lFMtVxslnqpW62JvYMxCK/N2nVHYB6EmAS9UrZ1X4DIiafMIZYWfMBFjLmU7owdieOVCvW2PKVh0Lf4j3k6Ulf7MLoKfoLrhXNqm/nxyRGvyqHdF2XMRsL+Xu3ZZ1hkefFGJPqg=="
}
GetRecords API でレコードを取得
--shard-iterator
の引数として、レスポンスにある ShardIterator
を指定します。
大量のレコードがかえってこないように、--limit 15
でレコード数を制限しています。
$ aws kinesis get-records \
--shard-iterator "AAAAAAAAAAG5xiH3Mzb718+7duHMHm8H6BDhVPS7V21cX29b6deqFQVRkWVO5Fjw/1ghrcfOM9Ta+tgkLAn/0RPRPUn3W4+YQsCDuzARkxr+KA9lFMtVxslnqpW62JvYMxCK/N2nVHYB6EmAS9UrZ1X4DIiafMIZYWfMBFjLmU7owdieOVCvW2PKVh0Lf4j3k6Ulf7MLoKfoLrhXNqm/nxyRGvyqHdF2XMRsL+Xu3ZZ1hkefFGJPqg==" \
--limit 10
{
"Records": [
{
"Data": "MTQ2MTE3OTQ5Ny43Mg==",
"PartitionKey": "1461179497.72",
"ApproximateArrivalTimestamp": 1461179497.723,
"SequenceNumber": "49561217150810828701271391730195637510528297060444143618"
},
...,
{
"Data": "MTQ2MTE3OTUwMC4yOQ==",
"PartitionKey": "1461179500.29",
"ApproximateArrivalTimestamp": 1461179500.297,
"SequenceNumber": "49561217150810828701271391730201682139626370412476104706"
}
],
"NextShardIterator": "AAAAAAAAAAEpARFJ2z+prIxsEnqLaWIR65WgdgzAhYTgSI+3kdtEEF95uvA8uT3gH6JPvgdy4S6Z05WsoHbA29Q4Hc/CKi3YFNKrmaq0GDNmKbZ4xFjvOwGNw1vikvzrEDkEEJKBWzWxViJoJaoMuJ6ZNjIFx43zj3+lNnje7gVERhp76B8b2stCiiiq+B/59+JRARHVpqSpYqVnm4UOIoWmhpbZG3Dh",
"MillisBehindLatest": 76276000
}
今回作成したレコードでは、Data
と PartitionKey
にクライアントのデータ送信日時を設定しています。
また ApproximateArrivalTimestamp
はサーバーのレコード取得日時です。
--timestamp 1461179497.1234
という指定に対して、一番古いデータは以下です。
{
"Data": "MTQ2MTE3OTQ5Ny43Mg==",
"PartitionKey": "1461179497.72",
"ApproximateArrivalTimestamp": 1461179497.723,
"SequenceNumber": "49561217150810828701271391730195637510528297060444143618"
},
- Timestamp < PartitionKey
- Timestamp < ApproximateArrivalTimestamp
なので期待通りですね。
boto3 から使ってみる
同じ要領で Python SDK の boto3 からもデータ取得します。
プログラム全体
# kinesis-timestamp.py
import boto3
import pprint
client = boto3.client('kinesis')
# get shard iterator at specified time
response = client.get_shard_iterator(
StreamName='foo',
ShardIteratorType="AT_TIMESTAMP",
Timestamp=1461179497.1234,
ShardId="shardId-000000000000",
)
# retrieve data
shard_iterator = response['ShardIterator']
response = client.get_records(ShardIterator=shard_iterator, Limit=10)
for record in response['Records']:
pprint.pprint(record)
Timestampを指定し、各レコードを pretty print しているだけです。
なお、引数 Timestamp には、ユニックスタイムを直接渡すだけでなく、以下のように datetime 型を渡すこともできます。AWSへのリクエスト時に自動でユニックスタイムスタンプに変換されます。
import datetime
...
response = client.get_shard_iterator(
StreamName='foo',
ShardIteratorType="AT_TIMESTAMP",
Timestamp=datetime.datetime(2016, 4, 1),
ShardId="shardId-000000000000",
)
...
実際に実行してみましょう
$ python kinesis-timestamp.py
{u'ApproximateArrivalTimestamp': datetime.datetime(2016, 4, 20, 19, 11, 37, 723000, tzinfo=tzlocal()),
u'Data': '1461179497.72',
u'PartitionKey': u'1461179497.72',
u'SequenceNumber': u'49561217150810828701271391730195637510528297060444143618'}
...
{u'ApproximateArrivalTimestamp': datetime.datetime(2016, 4, 20, 19, 11, 40, 297000, tzinfo=tzlocal()),
u'Data': '1461179500.29',
u'PartitionKey': u'1461179500.29',
u'SequenceNumber': u'49561217150810828701271391730201682139626370412476104706'}
AWS CLIの時と同じ結果が得られました。
AWS Lambda のイベントソースにKinesis Streamsを指定したらどうなる?
AWS Lambda はイベントソースとしてKinesis Streamsを指定できます。
Kinesis Streamsを指定した時の開始位置の選択肢は、従来通り
TRIM_HORIZON
(最も古いレコード)LATEST
(最も新しいレコード)
のままで変更はありません。
残念。
参考
- https://aws.amazon.com/jp/blogs/news/amazon-kinesis-update-amazon-elasticsearch-service-integration-shard-level-metrics-time-based-iterators/
- http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
- http://docs.aws.amazon.com/lambda/latest/dg/API_CreateEventSourceMapping.html