[新機能]Kinesis Streamsがシャード単位のメトリクスに対応しました

2016.05.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

2016/04/19のアップデートにより、Kinesisストリームはシャード単位のメトリクス(shard level metrics)に対応しました。

Amazon Kinesis アップデート – Amazon Elasticsearch Service との統合、シャード単位のメトリクス、時刻ベースのイテレーター | Amazon Web Services ブログ

従来はストリーム単位のメトリクスにしか対応していませんでしたが、今回の機能追加により

  • シャード間のレコードの偏り
  • データ処理の止まっているシャード
  • データ処理遅延の発生しているシャード

などを簡単に検出できるようになりました。

機能概要

シャード単位メトリクスを有効にするには?

シャード単位のメトリクスは RDS にもあるEnhanced Monitoring(拡張モニタリング)の形で提供され、標準では無効化されています。また、メトリクス単位で有効・無効を制御できます。

新規に提供されるメトリクス一覧

CloudWatchのKinesisストリームのストリーム・シャードそれぞれのメトリクスを確認します。

シャード単位のメトリクスが今回追加されたメトリクスです。

説明 Stream Level Shard Level
取得したバイト数 GetRecords.Bytes OutgoingBytes
レコード追加されてから、処理されるまでのラグ。 Deprecated GetRecords.IteratorAge N/A
 レコード追加されてから、処理されるまでのラグ。 単位はミリ秒 GetRecords.IteratorAgeMilliseconds IteratorAgeMilliseconds
GetRecords APIの処理時間 GetRecords.Latency N/A
GetRecords APIでストリームから取得したレコード数 GetRecords.Records OutgoingRecords
 GetRecords APIの成功数 GetRecords.Success N/A
追加したバイト数

  • PutRecord
  • PutRecords

APIの合計

IncomingBytes IncomingBytes
追加したレコード数

  • PutRecord
  • PutRecords

APIの合計

IncomingRecords IncomingRecords
PutRecord APIでストリームに追加したバイト数 PutRecord.Bytes N/A
 PutRecord APIの処理時間 PutRecord.Latency N/A
 PutRecord APIの成功数 PutRecord.Success N/A
PutRecords APIでストリームに追加したバイト数 PutRecords.Bytes N/A
PutRecords APIでストリームに追加したバイト数 PutRecords.Latency N/A
PutRecords APIの処理時間 PutRecords.Records N/A
PutRecords APIの成功数 PutRecords.Success N/A
レコード取得でスロットリングされたAPI呼び出し数 ReadProvisionedThroughputExceeded ReadProvisionedThroughputExceeded
レコード追加でスロットリングされたAPI呼び出し数 WriteProvisionedThroughputExceeded WriteProvisionedThroughputExceeded

メトリクスの見方

シャード間のレコードの偏っていないか確認するには、IncomingRecordsSUMを確認します。

データ処理の止まっているシャードやデータ処理遅延の発生しているシャードを確認するには、IteratorAgeMillisecondsAVERAGEを確認します。

メトリクスの詳細は次のURLから確認できます。

http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html

CloudWatchのグラフ

CloudWatchの管理画面からストリーム「foo」のシャード単位のIncomingRecordsメトリクスをグラフ化したのが以下です。

kinesis-cloudwatch-incoming-records-per-shard

ストリーム「foo」を構成する「0番」と「1番」のシャードそれぞれのメトリクスを取得出来ていますね。

2シャード間でレコード数はほぼ同じで偏りがないことがわかります。

Kinesis シャード単位メトリクスの操作例

Kinesisストリームの拡張モニタリング機能はAPI経由でしか操作できません。 以下では AWS CLI からシャード単位メトリクスを操作します。

準備

まずは準備としてKinesisストリームを作成します。

$ aws --version
aws-cli/1.10.25 Python/2.7.10 Linux/4.4.8-20.46.amzn1.x86_64 botocore/1.4.16

# Create Kinesis Stream
$ aws kinesis create-stream --stream-name bar --shard-count 2
$ aws kinesis wait stream-exists --stream-name bar
aws kinesis wait stream-exists --stream-name bar
# Describe Kinesis Stream
$ aws kinesis describe-stream --stream-name bar
{
    "StreamDescription": {
        "RetentionPeriodHours": 24,
        "StreamName": "bar",
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "EndingHashKey": "170141183460469231731687303715884105727",
                    "StartingHashKey": "0"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49561278658050145864612465902958334562332714288264773634"
                }
            },
            {
                "ShardId": "shardId-000000000001",
                "HashKeyRange": {
                    "EndingHashKey": "340282366920938463463374607431768211455",
                    "StartingHashKey": "170141183460469231731687303715884105728"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49561278658072446609810996526099870280605362649770754066"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:ap-northeast-1:123456789012:stream/bar",
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "StreamStatus": "ACTIVE"
    }
}

ストリームの describe 結果に EnhancedMonitoring ブロックが追加されていますね。

シャード単位メトリクスを有効にする

シャード単位メトリクスは拡張モニタリングのため、標準では無効化されています。 enable-enchanced-monitoring API で有効化します。 メトリクス単位で有効・無効を制御出来るため、有効にするメトリクスを--shard-level-metrics で指定します。

# Enable Enchanced Monitoring
$ aws kinesis enable-enhanced-monitoring \
  --stream-name bar \
  --shard-level-metrics IncomingBytes IncomingRecords
{
    "StreamName": "bar",
    "CurrentShardLevelMetrics": [],
    "DesiredShardLevelMetrics": [
        "IncomingBytes",
        "IncomingRecords"
    ]
}
$ aws kinesis wait stream-exists --stream-name bar
# Describe Kinesis Stream
$ aws kinesis describe-stream --stream-name bar | jq .StreamDescription.EnhancedMonitoring
[
  {
    "ShardLevelMetrics": [
      "IncomingBytes",
      "IncomingRecords"
    ]
  }
]

enable-enchanced-monitoring API のレスポンスには

  • 変更前に有効なメトリクス(CurrentShardLevelMetrics)
  • 変更後に有効なメトリクス(DesiredShardLevelMetrics)

がかえってきます。

すべてのシャード単位メトリクスを有効にする

有効にするメトリクスを逐一指定するのは手間ですね。 --shard-level-metrics ALL とすると、全てのシャード単位メトリクスが有効になります。

$ aws kinesis enable-enhanced-monitoring \
  --stream-name bar \
  --shard-level-metrics ALL
{
    "StreamName": "bar",
    "CurrentShardLevelMetrics": [
        "IncomingBytes",
        "IncomingRecords"
    ],
    "DesiredShardLevelMetrics": [
        "IncomingBytes",
        "OutgoingRecords",
        "IteratorAgeMilliseconds",
        "IncomingRecords",
        "ReadProvisionedThroughputExceeded",
        "WriteProvisionedThroughputExceeded",
        "OutgoingBytes"
    ]
}
$ aws kinesis wait stream-exists --stream-name bar
$ aws kinesis describe-stream --stream-name bar | jq .StreamDescription.EnhancedMonitoring
[
  {
    "ShardLevelMetrics": [
      "IncomingBytes",
      "OutgoingRecords",
      "IteratorAgeMilliseconds",
      "IncomingRecords",
      "ReadProvisionedThroughputExceeded",
      "WriteProvisionedThroughputExceeded",
      "OutgoingBytes"
    ]
  }
]

シャード単位メトリクスを削減する

取得するメトリクスを削減したい場合、削除するメトリクスをdisable-enchanced-monitoring API で無効化します。

$ aws kinesis disable-enhanced-monitoring \
  --stream-name bar \
  --shard-level-metrics IncomingBytes OutgoingRecords
{
    "StreamName": "bar",
    "CurrentShardLevelMetrics": [
        "IncomingBytes",
        "OutgoingRecords",
        "IteratorAgeMilliseconds",
        "IncomingRecords",
        "ReadProvisionedThroughputExceeded",
        "WriteProvisionedThroughputExceeded",
        "OutgoingBytes"
    ],
    "DesiredShardLevelMetrics": [
        "IteratorAgeMilliseconds",
        "IncomingRecords",
        "ReadProvisionedThroughputExceeded",
        "WriteProvisionedThroughputExceeded",
        "OutgoingBytes"
    ]
}
$ aws kinesis describe-stream --stream-name bar | jq .StreamDescription.EnhancedMonitoring
[
  {
    "ShardLevelMetrics": [
      "IteratorAgeMilliseconds",
      "IncomingRecords",
      "ReadProvisionedThroughputExceeded",
      "WriteProvisionedThroughputExceeded",
      "OutgoingBytes"
    ]
  }
]

enable-enchanced-monitoring API と同じく、disable-enchanced-monitoring API のレスポンスには

  • 変更前に有効なメトリクス(CurrentShardLevelMetrics)
  • 変更後に有効なメトリクス(DesiredShardLevelMetrics)

がかえってきます。

全部削除する

無効にするメトリクスを逐一指定するのは手間ですね。 --shard-level-metrics ALL とすると、全てのシャード単位メトリクスが無効になります。

$ aws kinesis disable-enhanced-monitoring \
  --stream-name bar \
  --shard-level-metrics ALL
{
    "StreamName": "bar",
    "CurrentShardLevelMetrics": [
        "IteratorAgeMilliseconds",
        "IncomingRecords",
        "ReadProvisionedThroughputExceeded",
        "WriteProvisionedThroughputExceeded",
        "OutgoingBytes"
    ],
    "DesiredShardLevelMetrics": []
}
$ aws kinesis describe-stream --stream-name bar | jq .StreamDescription.EnhancedMonitoring
[
  {
    "ShardLevelMetrics": []
  }
]

CloudWatch からメトリクスを取得する

get-metric-statistics API で --dimensions として

  • ストリーム
  • シャード

を指定します。

$ aws cloudwatch get-metric-statistics \
  --namespace AWS/Kinesis \
  --metric-name IncomingRecords \
  --dimensions Name=ShardId,Value=shardId-000000000001 \
               Name=StreamName,Value=foo \
  --start-time 2016-04-20T21:00:00 \
  --end-time 2016-04-20T22:00:00 \
  --period 600 \
  --statistics Sum
{
    "Datapoints": [
        {
            "Timestamp": "2016-04-20T21:30:00Z",
            "Sum": 550.0,
            "Unit": "Count"
        },
        {
            "Timestamp": "2016-04-20T21:40:00Z",
            "Sum": 571.0,
            "Unit": "Count"
        },
        {
            "Timestamp": "2016-04-20T21:10:00Z",
            "Sum": 577.0,
            "Unit": "Count"
        },
        {
            "Timestamp": "2016-04-20T21:00:00Z",
            "Sum": 577.0,
            "Unit": "Count"
        },
        {
            "Timestamp": "2016-04-20T21:50:00Z",
            "Sum": 572.0,
            "Unit": "Count"
        },
        {
            "Timestamp": "2016-04-20T21:20:00Z",
            "Sum": 591.0,
            "Unit": "Count"
        }
    ],
    "Label": "IncomingRecords"
}

注意点

APIからしか拡張モニタリングは有効に出来ない

Kinesisストリームの拡張モニタリング機能はAPI経由でしか操作できません。 Kinesisストリームはデータ保持期間の変更やシャードの分割・統合などもAPI経由でしか操作できないので、Kinesisに慣れている人からすれば「またか」という感じですが、管理画面からも変更出来るようにしてほしいものです。

更新中のストリームは変更できない

Kinesis ストリームの更新操作(拡張モニタリング、シャードの分割/統合など)は "StreamStatus" が "ACTIVE" なストリームに対してしか操作できません。

"StreamStatus" が "UPDATING" なストリームに更新操作を行うと、以下の様なエラーが発生します。

$ aws kinesis enable-enhanced-monitoring --stream-name bar --shard-level-metrics ALL

A client error (ResourceInUseException) occurred when calling the EnableEnhancedMonitoring operation: Stream bar under account 123456789012 not ACTIVE, instead in state UPDATING

kinesis-stream-state-diagram

enable/disable-enchanced-monitoring APIによって取得するメトリクスに差分がない時

enable/disable-enchanced-monitoring API 実行によって CurrentShardLevelMetrics と DesiredShardLevelMetrics に差分がない場合、Kinesis ストリームの更新処理は走らず、"StreamStatus" は "ACTIVE" なままです。

$ aws kinesis enable-enhanced-monitoring \
  --stream-name bar \
  --shard-level-metrics IteratorAgeMilliseconds
{
    "StreamName": "bar",
    "CurrentShardLevelMetrics": [
        "IteratorAgeMilliseconds",
        "IncomingRecords",
        "ReadProvisionedThroughputExceeded",
        "OutgoingBytes"
    ],
    "DesiredShardLevelMetrics": [
        "IteratorAgeMilliseconds",
        "IncomingRecords",
        "ReadProvisionedThroughputExceeded",
        "OutgoingBytes"
    ]
}
$ aws kinesis describe-stream --stream-name bar | jq .StreamDescription.StreamStatus
"ACTIVE"

参考