[Kinesis] データ保持の延長(increase-stream-retention-period)を試してみた

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

AWSチームのすずきです。

高い拡張性、可用性、信頼性を兼ね備えたAmazon Kinesis。 re:Invent2015期間中に発表されたアップデートにより、 Kinesisストリームに登録(Put)されたレコードの有効期間を24時間から最大168時間(7日)まで延長できる 「increase-stream-retention-period」が利用可能となりました。

今回その動作確認を行う機会がありましたので、紹介させて頂きます。

事前準備

OS環境

  • Amazon Linux 2015.09
$ cat /etc/system-release
Amazon Linux AMI release 2015.09

aws-cli

  • 「increase-stream-retention-period」APIに対応したaws-cliを、pipを利用してインストールします。
curl -O https://bootstrap.pypa.io/get-pip.py
sudo /usr/local/bin/pip install  awscli --upgrade
  • バージョン確認
$ /usr/local/bin/aws --version
aws-cli/1.8.12 Python/2.7.10 Linux/4.1.7-15.23.amzn1.x86_64
$ which aws
/usr/local/bin/aws
  • 対応コマンド確認 (タブ補完出力)
$ aws kinesis 
add-tags-to-stream                 get-shard-iterator                 put-records 
create-stream                      increase-stream-retention-period   remove-tags-from-stream 
decrease-stream-retention-period   list-streams                       split-shard 
delete-stream                      list-tags-for-stream               wait
describe-stream                    merge-shards                       
get-records                        put-record                     

確認内容

検証用ストリーム作成

  • kinesis-1d (デフォルト)
  • kinesis-7d (延長検証用)
aws kinesis create-stream --stream-name kinesis-1d --shard-count 1
aws kinesis create-stream --stream-name kinesis-7d --shard-count 1

データ有効期間の延長

  • 「increase-stream-retention-period」を利用し、Kinesisストリーム上に登録されたデータの有効期間168時間(最大7日)に設定しました。
aws kinesis increase-stream-retention-period  --stream-name kinesis-7d --retention-period-hours 168

設置ストリーム確認

aws kinesis list-streams | jq .
{
  "StreamNames": [
    "kinesis-1d",
    "kinesis-7d"
  ]
}

aws kinesis describe-stream --stream-name kinesis-1d | jq .
{
  "StreamDescription": {
    "StreamStatus": "ACTIVE",
    "StreamName": "kinesis-1d",
    "StreamARN": "arn:aws:kinesis:us-west-2:784693731708:stream/kinesis-1d",
    "Shards": [
      {
        "ShardId": "shardId-000000000000",
        "HashKeyRange": {
          "EndingHashKey": "340282366920938463463374607431768211455",
          "StartingHashKey": "0"
        },
        "SequenceNumberRange": {
          "StartingSequenceNumber": "49555271187326158350874797010000317100074688792485691394"
        }
      }
    ]
  }
}
$ aws kinesis describe-stream --stream-name kinesis-7d | jq .
{
  "StreamDescription": {
    "StreamStatus": "ACTIVE",
    "StreamName": "kinesis-7d",
    "StreamARN": "arn:aws:kinesis:us-west-2:784693731708:stream/kinesis-7d",
    "Shards": [
      {
        "ShardId": "shardId-000000000000",
        "HashKeyRange": {
          "EndingHashKey": "340282366920938463463374607431768211455",
          "StartingHashKey": "0"
        },
        "SequenceNumberRange": {
          "StartingSequenceNumber": "49555271186969346427698307039735745607712314939670528002"
        }
      }
    ]
  }
}

データ登録

  • 「put-record」により、2つのKinesisストリームに1件のデータを投入しました。
  • レコード確認の効率化の為、Put時の「SequenceNumber」を控えます。
aws kinesis put-record --stream-name kinesis-1d --partition-key pk --data "{\"date\":\"`date +%s.%3N`\"}"

{
    "ShardId": "shardId-000000000000", 
    "SequenceNumber": "49555271187326158350874797120990587673074204737766686722"
}


aws kinesis put-record --stream-name kinesis-7d --partition-key pk --data "{\"date\":\"`date +%s.%3N`\"}"
{
    "ShardId": "shardId-000000000000", 
    "SequenceNumber": "49555271186969346427698307174103014754599916679353860098"
}

レコード参照(直後)

  • 「get-shard-iterator」「get-records」を利用し、2つのKinesisストリームのレコードが参照出来る事を確認しました。

デフォルト

SEQ1d='49555271187326158350874797120990587673074204737766686722'
aws kinesis get-records \
    --shard-iterator `aws kinesis get-shard-iterator \
    --stream-name kinesis-1d --shard-id 0 \
    --shard-iterator-type AT_SEQUENCE_NUMBER \
    --starting-sequence-number ${SEQ1d} \
    | jq  .ShardIterator` \
    | jq .
{
  "Records": [
    {
      "Data": "eyJkYXRlIjoiMTQ0NDQ5NzczOS4yMzEifQ==",
      "PartitionKey": "pk",
      "ApproximateArrivalTimestamp": 1444497740.182,
      "SequenceNumber": "49555271187326158350874797120990587673074204737766686722"
    }
  ],
  "NextShardIterator": "AAAAAAAAAAHscqxEcLaRh4SPWG3p1C34/5hWL4IDPfSjh71CWXExCDTpJaZtYSdEaTPY7YzN9Z2+bkPFXPy/SCnXtGVR3Nf7k/viaMq84x/t/a5dLJ9LE08W6tgFZsfVFnLAUKLZztkynbCrXqJ28cgcw9xDd9EKhmHfjYIQLDZRkwI0GH70abhcJWvUhc4YWAqN9ciUCqiEsvvNCbvMW61zUg3R27BO",
  "MillisBehindLatest": 327000
}

拡張済

SEQ7d='49555271186969346427698307174103014754599916679353860098'
aws kinesis get-records \
    --shard-iterator `aws kinesis get-shard-iterator \
    --stream-name kinesis-7d --shard-id 0 \
    --shard-iterator-type AT_SEQUENCE_NUMBER \
    --starting-sequence-number ${SEQ7d} \
    | jq  .ShardIterator` \
    | jq .
{
  "Records": [
    {
      "Data": "eyJkYXRlIjoiMTQ0NDQ5Nzc1OC42ODYifQ==",
      "PartitionKey": "pk",
      "ApproximateArrivalTimestamp": 1444497759.658,
      "SequenceNumber": "49555271186969346427698307174103014754599916679353860098"
    }
  ],
  "NextShardIterator": "AAAAAAAAAAFVeIawHl3yBU98E9xnfQFPtI1UYCvKW2Z3m037Pt51XM0/NJVMtHpWfHWCX3NBd7qEutZqOE1RJHKHVhP330u94PAYIVxE6rMoJ0Xe+lIKCDLJSYqNQ/JqrS7djgjX4XdMSk62QR45qjZzz+O3WYLMpgeUgZrKd78CjquaVBewGGiTUNEQbS8bMo4utbgyoPTrnqZOs8+VQ2NAPxD/0E60",
  "MillisBehindLatest": 406000
}

レコード参照(33時間経過後)

デフォルト

  • 有効期間未拡張のKinesisストリームでは、24時間経過したレコードは自動削除されるため、33時間前に登録されたレコードは取得不能(Recordsが空)でした。
SEQ1d='49555271187326158350874797120990587673074204737766686722'
aws kinesis get-records \
    --shard-iterator `aws kinesis get-shard-iterator \
    --stream-name kinesis-1d --shard-id 0 \
    --shard-iterator-type AT_SEQUENCE_NUMBER \
    --starting-sequence-number ${SEQ1d} \
    | jq  .ShardIterator` \
    | jq .
{
  "Records": [],
  "NextShardIterator": "AAAAAAAAAAGPIi10N4HlQrBvuORFZHB9mLekm5LVGUZvyMWiPz9NeLJE8c0pygLaomHj7bn9eNcs08VLtG9qR75ePuy8UrbYTdCbrD2ZeKAVnX1B5pI1R6c/EKpNx7fV8g1NwpLIm0g8wNnRF6173vhQyJBCCayqxn7AVEDIRs3ny+seTBzWHS9LWStu/Z149R0GrgUiarp0wySir8SLe2EvrB1mdsV8",
  "MillisBehindLatest": 86400000
}

拡張済

  • 有効期間を168時間に延長したKinesisストリームでは、33時間前に登録したレコードの取得は可能でした
SEQ7d='49555271186969346427698307174103014754599916679353860098'
aws kinesis get-records \
    --shard-iterator `aws kinesis get-shard-iterator \
    --stream-name kinesis-7d --shard-id 0 \
    --shard-iterator-type AT_SEQUENCE_NUMBER \
    --starting-sequence-number ${SEQ7d} \
    | jq  .ShardIterator` \
    | jq .
{
  "Records": [
    {
      "Data": "eyJkYXRlIjoiMTQ0NDQ5Nzc1OC42ODYifQ==",
      "PartitionKey": "pk",
      "ApproximateArrivalTimestamp": 0,
      "SequenceNumber": "49555271186969346427698307174103014754599916679353860098"
    }
  ],
  "NextShardIterator": "AAAAAAAAAAEI3XyakVYxIs6jkQnemJZrIAwsU4qjSIKHaMFuPOxT0L/n57vFAeRByBhVnB7/51P7bLT407IxAol/cryo/k9Vk5D9A5drFWbxQ/GQOrAw5gkAzbwZPH+DcLdCT7j3QDaMkjEh8m2Xp6+YvBNK8jhWyfiuQcoA/iYYvGPJivn6yIW4/Y9R2W0DS6TiO8XErI4csUkXygGsZi5mIvK+op8lknQzr2h3p6zObn4DSFmxmgbu/q9J8kWB0cWTjA/76I9J6bqJunETTUssJ1Wr8uI1",
  "MillisBehindLatest": 162182000
}

有効期間を短縮

  • 「decrease-stream-retention-period」APIを利用し、有効期間を「168」→「30」時間に短縮すると、33時間前に投入したレコードの参照が不能となりました。
aws kinesis decrease-stream-retention-period  --stream-name kinesis-7d --retention-period-hours 30
aws kinesis get-records     --shard-iterator `aws kinesis get-shard-iterator \    --stream-name kinesis-7d --shard-id 0 \
   --shard-iterator-type AT_SEQUENCE_NUMBER \
   --starting-sequence-number ${SEQ7d} \
   | jq  .ShardIterator`     | jq .
{
 "Records": [],
 "NextShardIterator": "AAAAAAAAAAHueoi6/xjiy96tHTC5b8CKinjpGC5I3gT/rXCsmGe3XDbq9Ld5wnVQVIl0+kColRIXItiGDgPwKb94x/okTYlY+uXGlZxnJzovLWB+ynfO5paV+2s9miGX+Nknk1UKrU1C1pTlqTaBfNaRWYxusueCGI/pkod7o0T5JgdOvKk8rEQtPuI0fpFCDAvIbutpyNpVOPOiIdoreifkgig8r3hE",
 "MillisBehindLatest": 86400000
}

有効期間の再延長

  • 期限切れによりレコード参照不能となったKinesisストリーム、その有効期間を「168」まで延長しても、33時間前のレコードは参照出来ませんでした。

  • 既にKinesisストリームから削除済となったレコードの救出には、「increase-stream-retention-period」は使えない模様です。

  • Kinesisストリームの有効期間の短縮「decrease-stream-retention-period」は、有効期限切れのレコードの削除が即時に実施される模様です。必要なレコードが処理済みである事について確証を得た上で実施する事をお奨めします。

 aws kinesis increase-stream-retention-period  --stream-name kinesis-7d --retention-period-hours 168
aws kinesis get-records     --shard-iterator `aws kinesis get-shard-iterator \    --stream-name kinesis-7d --shard-id 0 \
   --shard-iterator-type AT_SEQUENCE_NUMBER \
   --starting-sequence-number ${SEQ7d} \
   | jq  .ShardIterator`     | jq .
{
 "Records": [],
 "NextShardIterator": "AAAAAAAAAAHueoi6/xjiy96tHTC5b8CKinjpGC5I3gT/rXCsmGe3XDbq9Ld5wnVQVIl0+kColRIXItiGDgPwKb94x/okTYlY+uXGlZxnJzovLWB+ynfO5paV+2s9miGX+Nknk1UKrU1C1pTlqTaBfNaRWYxusueCGI/pkod7o0T5JgdOvKk8rEQtPuI0fpFCDAvIbutpyNpVOPOiIdoreifkgig8r3hE",
 "MillisBehindLatest": 86400000
}

検証用ストリームの撤去

  • 検証用に用いたKinesisストリームを削除し、不要な課金回避します。
aws kinesis delete-stream --stream-name kinesis-1d
aws kinesis delete-stream --stream-name kinesis-7d

まとめ

従来、Kinesisに投入されたデータは、24時間以内にEC2(KinesisApp)、Lambdaなどを利用し 各種データストアに展開する必要がありましたが、 高い可用性、信頼性が担保されないデータストアやアプリケーションをEC2上で稼働させている場合、 障害対応の遅れがデータロスなど大きな問題となる場合がありました。

今回のアップデートにより、一週間の猶予期間が生まれた事で、Kinesisで組まれたアプリケーション環境に より高い冪等性を確保する事が可能となりました。

Kinesisストリーム、1シャードを稼働させた月額費用は以下のとおり。 数日のジョブの遅延は許容されるが、データロスが許容できないシステムでは、 休日運用シフトを組んだり準待機体制を取る場合と比較すれば格段に低コストと考えられます。

また、多くのシャードが稼働する場合でも、 re:Invent期間中に発表されたLambdaのスケジュールファンクションを利用し、 シャードの増減と合わせ、レコードの有効期間を制御する事で、より高いコストパフォーマンスを 実現できると思われます。

確実な進化を続けるAmazon Kinesis、ぜひお試しください。

Kinesis料金試算表

  • 費用はUS$
  • Put課金(1,000,000件毎、$0.0215(東京)、$0.014(US)は集計外
- 東京(1時間) オレゴン(1時間) 東京(30日) オレゴン(30日)
1シャード 0.0195 0.015 14.04 10.8
1シャード延長費 0.026 0.02 18.72 14.4
1シャード(延長費込) 0.0455 0.035 32.76 25.2