Kinesisの複数PUT対応APIを試してみた

2014.12.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

AWSチームのすずきです。

AWSが提供するストリーミングデータの受口となるAmazon Kinesis。 2014年12月のアップデートにより、複数レコード(最大500件、1件50KBの合計容量上限は4.5MB)の登録を、1回のAPI実行で行う事が可能となりました。

Amazon Kinesis Update - New High-Throughput PutRecords API Function

Kinesisの用途が広がる可能性がある当機能、AWS CLIを利用し簡単に試す事が可能となっていますので、紹介させて頂きます。

準備

  • OSはAmaon Linux 201409を利用します。
  • EC2ロールとして、Kinesisを含むIAM権限を付与済とします。
  • AWS CLIは1.6.6以降を必要とするため、OS付属のものを入替えます。
sudo yum update -y
rpm -e aws-cli
sudo easy_install pip
sudo pip install awscli
sudo pip install --upgrade awscli

実行方法

ストリーム作成

  • データPUT先となる、Kinesisのストリームを作成します。
  • ストリーム名は「test1207」としました。
  • AWSコンソール、Web画面からの作成も可能です。
aws kinesis create-stream --stream-name test1207 --shard-count 1

JSONファイル作成

  • KinesisにPUTする複数レコードを反映したJSONファイルを用意します。
echo '{
  "Records": [
    {
      "Data": "hoge",
      "PartitionKey": "pk-1"
    },
    {
      "Data": "fuga",
      "PartitionKey": "pk-2"
    }
  ],
  "StreamName": "test1207"
}' > tmp.json

PUT実行

  • 先に用意したJSONファイルをAWS CLIの引数として指定し、実行します。
aws kinesis put-records --cli-input-json file://tmp.json

ストリーム撤去

  • 評価に利用したKinesisのストリームを撤去します。
  • AWSコンソール、Web画面からの操作も可能です。
  • Kinesisストリームは安価(東京では1時間1ストリーム毎に0.0195US$)で利用可能ですが、無料利用枠はありません。ご注意下さい。
aws kinesis delete-stream --stream-name test1207

比較

50バイト1000件のデータを用意し、KinesisへのPUT性能を比較を行いました。

今回リリースされた「PutRecords」による複数レコード処理により、従来の単一レコード処理から比較して約125倍、大幅な性能向上が確認されました。

50バイト×500件のレコードを「PutRecords」API 2回でPUT

$ time f_kinesis_put_500x2

real    0m2.597s
user    0m0.624s
sys     0m0.204s

50バイト×1件のレコードを「PutRecord」APIを 1000回PUT

$ time f_kinesis_put_x1000

real    5m24.014s
user    3m15.680s
sys     0m23.776s

まとめ

HTTPSを利用するKinesisのAPI、1回あたりの実行コストが問題となる事がありました。

Kinesisに対し、1台の端末から短時間に多数の情報が送信される様なケースでは、 端末側にバッファリングの仕組みを設け、今回リリースされた「PutRecords」APIを利用した一括送信を行う事で、 端末側のCPU、ネットワーク、バッテリなどへの負荷軽減効果が期待出来ます。

また、Kinesisへのデータ投入に対応したFluentプラグインや、HTTPより効率的なデータ転送を実現するMQTTブリッジなども、今後「PutRecords」に対応したバージョンがリリースされる事で、より実用性が増す事が期待されます。

参考

今回の性能比較に利用したスクリプトです。

export AWS_DEFAULT_REGION=ap-northeast-1


JSONx500='tmp.x500.json'
JSONx1='tmp.x1.json'

date +%N | base64

f_create_multiput_json () {
  echo '{"Records": [' > ${JSONx500}

  for i in {1..499}
    do
      echo "{\"Data\": \"`date +%N | base64`\"" >>  ${JSONx500}
      echo ',"PartitionKey": "pk-1"},' >>  ${JSONx500}
    done

  echo "{\"Data\": \"`date +%N | base64`\"" >>  ${JSONx500}
  echo ',"PartitionKey": "pk-1"}],' >>  ${JSONx500}
  echo '"StreamName": "test1207"}' >> ${JSONx500}
}

f_create_single_json () {
  echo "{\"Data\": \"`date +%N | base64`\"" >  ${JSONx1}
  echo ',"PartitionKey": "pk-1",' >>  ${JSONx1}
  echo '"StreamName": "test1207" }' >>${JSONx1}
}


f_kinesis_put_500x2 () {
  for i in {1..2}
    do
      f_create_multiput_json
      aws kinesis put-records --cli-input-json file://${JSONx500}
    done
}


f_kinesis_put_x1000 () {
  f_create_single_json

  for i in {1..1000}
    do
      aws kinesis put-record --cli-input-json file://${JSONx1}
    done
}
$ time f_kinesis_put_500x2

real    0m2.597s
user    0m0.624s
sys     0m0.204s
$ time f_kinesis_put_x1000

real    5m24.014s
user    3m15.680s
sys     0m23.776s