Amazon Kinesis StreamsのリシャーディングがAPI一発で行えるようになりました

アイキャッチ

Amazon Kinesis Streamer の皆様こんにちは。

リシャーディングのあまりの面倒くささから、ピークに合わせてシャード数を固定してストリーム運用したり、シャード数変更の際は同じ名前でシャード数だけ変えて作り直してきたという方が多いのではないでしょうか。

2016/11/16のサービスアップデートにより、ストリームのリシャーディングが API 一発で実行出来るようになりました。 トラフィックに合わせてシャード数をダイナミックにオートスケールさせるなど、夢広がる API です。

早速試してみました。

旧方式のリシャーディング:SplitShard API 編

Amazon Kinesis Streams のストリームはシャードを束ねたものであり、シャード1本辺りの性能上限が決まっているため、ストリームのキャパシティを増やしたければ、シャード数を増やすことになります。 aws-black-belt-online-seminar-2016-amazon-kinesis-12-638

AWS Black Belt Online Seminar 2016 Amazon Kinesis から

ストリームにデータを投入するときは、データのパーティションキーからハッシュキーを求め、ハッシュキーに対応するシャードにデータが投入されます。

シャード数が1のときは、ハッシュキーレンジ全体を一つのシャードがカバーします。 シャードの分割は2分割しかできないため、分割したいシャードに割り振られたキーレインジを確認し、そのレインジを2分割して新しいシャードに割り振ります。

分割したいシャードのキーレインジが0〜3であれば

  • 0〜1
  • 2〜3

に2分割できるため、元のシャードIDが "shardId-00" だとすれば

$ aws kinesis split-shard \
  --stream-name ストリーム名 \
  --shard-to-split shardId-00 \
  --new-starting-hash-key 2

というようなAPIを叩くと、新しいシャード

  • キーレインジ 0 〜 1 の shardId-01
  • キーレインジ 2 〜 3 の shardId-02

が作成されます。

シャード数を1から2に変える程度であれば大した手間ではありませんが、シャード数が多いときに、各シャードのキーレインジを確認し、2分割するように各シャードに対して API を投げるのは、理屈はわかるのものの「もう少し楽にできないの?」と思ってしまいます。

新方式のリシャーディング:UpdteShardCount API 編

新しい UpdteShardCount API を使ったリシャーディングは何が嬉しいのかというと

  • リシャードしたいストリーム名
  • リシャード後のシャード数

を指定すると、キーレインジが均等になるようにシャード分割・統合を繰り返していい感じにリシャードしてくれます。

各シャードのキーレインジを確認したり、新しいキーレインジを計算したり、各シャードに分割・統合のAPIを投げるといっためんどくさい操作から開放されます。

ドキュメントでも、高レベル API の UpdteShardCount を使うように誘導するような書き方がされています。

kinesis-resharding-a-stream-docs

UpdteShardCount API を使ってみる

早速使ってみましょう。

API を叩く

シャード数が2のストリーム foo に対して、各シャードを2分割して シャード数4にしたい場合、次のコマンドを実行します。

$ aws kinesis update-shard-count \
  --stream-name foo \
  --target-shard-count 4 \
  --scaling-type UNIFORM_SCALING
{
    "TargetShardCount": 4,
    "StreamName": "foo",
    "CurrentShardCount": 2
}

TargetShardCount はリシャード後のシャード数、 StreamName はリシャード対象のストリーム名、CurrentShardCount はリシャード前のシャード数です。

ハッシュキーレインジの分割方針(--scaling-type) は現時点では UNIFORM_SCALING(均一) の一択です。

シャードの変化を確認

$ aws kinesis describe-stream --stream-name foo

を実行すると、リシャード後のシャード状況を確認できます。

分割前のシャードは

    {
      "ShardId": "shardId-000000000000",
      "HashKeyRange": {
        "EndingHashKey": "170141183460469231731687303715884105727",
        "StartingHashKey": "0"
      },
      "SequenceNumberRange": {
        "EndingSequenceNumber": "49567765304005334770507617359803006622437488738042904578",
        "StartingSequenceNumber": "49567765303994184397908352048233447689120770321652121602"
      }
    },

というように StartingSequenceNumber/EndingSequenceNumber の両方の値が入っています(新しいデータは受け付けないCLOSEDな状態)。

分割後に新規に作成されたシャードは

    {
      "ShardId": "shardId-000000000002",
      "HashKeyRange": {
        "EndingHashKey": "85070591730234615865843651857942052864",
        "StartingHashKey": "0"
      },
      "ParentShardId": "shardId-000000000000",
      "SequenceNumberRange": {
        "StartingSequenceNumber": "49567765350424335901249109428910813132774667910635323426"
      }
    },

というように、StartingSequenceNumber はありますが、EndingSequenceNumber はありません(新しいデータを受け付けるOPENな状態)。 また、分割元のシャードIDが ParentShardId 属性からわかります。

UpdteShardCount API の制限について

便利なAPIですが、いくつかの(一部はクリティカルな)制限も存在します。

24 時間毎に2回しか呼び出せない

24 時間以内にスケールアウト・スケールインをそれぞれ実行すると、リミットの2回を使い切ってしまいます。 3回目を呼び出すと

An error occurred (LimitExceededException) when calling the UpdateShardCount operation: Rate exceeded for stream test under account 123456789012.

と怒られてしまいます。 非常につらいですね。

この制限はシャード毎に課せられます。

リシャード後のシャード数は2倍まで

リシャード後のシャード数は2倍を超えてはいけません。

シャード数1のストリームを倍の2にすることはできますが、3, 4, 8 のように、2倍を超えたシャード数にすることはできません。

シャード数2のストリームを8にしてみましょう

$ aws kinesis update-shard-count --stream-name test --target-shard-count 8 --scaling-type UNIFORM_SCALING

An error occurred (InvalidArgumentException) when calling the UpdateShardCount operation: UpdateShardCount cannot scale up over double your current open shard count. Current open shard count: 2 Target shard count: 8

リシャード後のシャード数は1/2まで

リシャード後のシャード数は半分を割ってはいけません。

シャード数8のストリームを半分の4にすることはできますが、1, 2, 3 のように、半分を割ったシャード数にすることはできません。

シャード数4のストリームを1にしてみましょう。

$ aws kinesis update-shard-count --stream-name foo --target-shard-count 1 --scaling-type UNIFORM_SCALING

An error occurred (InvalidArgumentException) when calling the UpdateShardCount operation: UpdateShardCount cannot scale down below half your current open shard count. Current open shard count: 4 Target shard count: 1

200 シャードの壁

200シャードを超えてスケールアップできません。 スケールダウン後のシャード数が200を超えるときはどうようにスケールダウンできません。

これに引っかかることはレアでしょう。

シャード上限の壁

Kinesisではリージョン毎にシャード数の上限が存在します。 この上限を超えた値にスケールアップすることはできません。

シャード数操作がカジュアルに行える様になったからなのか、こっそりと DescribeLimits という API も同時に追加されています。

$ aws kinesis describe-limits
{
    "OpenShardCount": 6,
    "ShardLimit": 25
}

上限はデータを受け付けているシャード(Open Shard)だけが対象です。リシャード元になったシャード(Closed Shard)の数は対象外です。

イレギュラーなリシャードをやってみる

シャードの分割は2分割のみ、統合は隣り合った2シャードを統合するのみ、という制約があるため、シャード数の変更は2倍、または1/2倍というように2の冪乗を利用するのが一般的かと思います。

それではイレギュラーに、2シャードのストリームを3シャードにするとどうなるのでしょうか?

$ aws kinesis create-stream --stream-name bar --shard-count 2
$ aws kinesis update-shard-count --stream-name bar --target-shard-count 3 --scaling-type UNIFORM_SCALING
{
    "TargetShardCount": 3,
    "StreamName": "bar",
    "CurrentShardCount": 2
}

しばらくしてストリームを describe すると次のような結果になりました。

$ aws kinesis describe-stream --stream-name bar
{
    "StreamDescription": {
        "RetentionPeriodHours": 24,
        "StreamName": "bar",
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                ...
            },
            {
                "ShardId": "shardId-000000000001",
                ...
            },
            {
                "ShardId": "shardId-000000000002",
                ...
                "ParentShardId": "shardId-000000000000",
                ...
            },
            {
                "ShardId": "shardId-000000000003",
                ...
                "ParentShardId": "shardId-000000000000",
                ...
            },
            {
                "ShardId": "shardId-000000000004",
                ...
                "ParentShardId": "shardId-000000000001",
                ...
            },
            {
                "ShardId": "shardId-000000000005",
                ...
                "ParentShardId": "shardId-000000000001",
                ...
            },
            {
                "ShardId": "shardId-000000000006",
                ...
                "ParentShardId": "shardId-000000000003",
                "AdjacentParentShardId": "shardId-000000000004",
                ...
            }
        ],
        ...
        "StreamStatus": "ACTIVE"
    }
}

2等分されたストリームを3等分するために、もとの2ストリームをそれぞれ

  • 2/3(ShardId-02), 1/3(ShardId-03)
  • 1/3(ShardId-04), 2/3(ShardId-05)

で分割し、隣り合った 1/3 x 2 をマージして、新しいシャード(ShardId-06)を作成しています。

図にすると、下図のような感じでしょうか。

update-stream-shard (1)

ややこしいですね。

Kinesis Streams はストリームに対する変更は一度に一つしか行えないため、SPLIT→SPLIT→MERGEがシーケンシャルに呼び出されます。

シャード数が大きいストリームに対して、このような操作をすると、均一にシャード分割するまでに何度もSPLIT/MERGEを繰り返すことになるため、特異な要件がない限りは2の冪乗以外のシャード数変更操作は控えましょう。

CloudWatch メトリクスと連携してダイナミックにリシャード

CloudWatch メトリクスにアラームを設定し、Lambda を呼び出して

  • Kinesis Streams のリシャード
  • Cloud Watch Alarm の閾値を新シャード数に合わせて更新

を行うユースケースが AWS Big Data Blog で紹介されています。

アーキテクチャー

処理の流れは下図の通りです

cloudwatch-lambda-kinesis-shard

ウォークスルー

以下の手順で、紹介されているユースケース環境を構築できます。

  1. リシャード、CloudWatch Alarm の閾値を変更する Lambda をデプロイ
  2. Kinesis Streams の作成
  3. CloudWatch Alarm にメトリクス(ストリームのIncomingRecordsなど)を設定
  4. CloudWatch Alarm から SNS に通知
  5. SNS の通知先に Lambda を設定

あとは、Kinesis Streams に設定した閾値を超えるようにデータを流し込むと、Lambda が呼び出されます。

Lambda 関数の注意点

この構成をやる上でハマるとすれば、Lambda 関数のデプロイでしょう。

UpdateShardCount API はリリースされたばかりのため、Lambda のコンテナーにデフォルトでインストールされているライブラリを使うと、API を呼び出せない可能性があります。

ブログで紹介されている Python(boto3) のサンプルコードを使いまわす場合、サンプルコードを lambda_function.py という名前で保存し、同じディレクトリで

$ pip install boto3 -t .
$ zip -r lambda.zip .

というように boto3 の最新版をインストールすると良いでしょう。 あとは アーカイブ lambda.zip を管理画面からアップロードするだけです。

boto3 のバージョンが古すぎる場合

Lambda に標準でインストールされている boto3(AWS SDK for Python)のバージョンが古すぎる場合
'Kinesis' object has no attribute 'update_shard_count': AttributeError
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 28, in lambda_handler
kinesis.update_shard_count(
AttributeError: 'Kinesis' object has no attribute 'update_shard_count'

というようなエラーが発生します。

まとめ

新たに追加された Amazon Kinesis Streams の UpdteShardCount API を使ってシャード数を一発で変更する方法を確認しました。 以前よりはリシャーディングの手間が大幅に軽減されましたが、この新しい API には

  • 1日2回のAPI 呼び出し制限
  • 1/2倍, 2倍 を超えるスケールイン・アウトはできない

といった大きな制約もあります。

特に前者の制限は大きく、現時点では一日に何回もスケールアウト・インを繰り返したり、シャード数の倍倍分割でシャード数を大きく変動させることもできません。 ウォークスルーにあるようにCloudWatch連携してカジュアルにオートスケールさせることは、現実的には厳しものがあります。 ただ、これら制限は近い将来には改善されていくと思われます。

まずは、リシャードを一発で行えるハイレベルAPIが提供されたことを喜ぶべきでしょう。

以前の Amazon Kinesis Streams は低レベルな API が提供されているだけの玄人匂がただようサービスでしたが、最近は管理画面が強化されたり、頭を使わずにリシャーディング出来るようになったりと、利用の敷居を下げて、ユーザーの裾野を広げる方向に力を入れているようです。

これまで Kinesis を敬遠してきたエンジニアが Kinesis を触るきっかけになれば幸いです。

参考