この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
CX事業本部の平内(SIN)です。
AWS IoTでアクション設定を使用してKinesis Data Streamsにデータを送る場合、どのシャードに送るかを指定するためにパーティションキーを設定する箇所があります。
スケール目的でシャードの数を増やしても、パーティションキーを適切に設定しないと、期待通りの分散が出来ません。今回は、ここで一般に使用される newuuid() について、動作を確認してみました。
すいません、特にひねった内容ではありません。個人的に理解を深めるために、単純にドキュメントに記載されている内容をなぞっただけです。
2 検証環境
(1) Kinesis Data Streams
動作確認用のストリームは、シャード数を10に設定して作成しました。
(2) AWS IoT
AWS IoTでは、アクション設定で特定のトピックに来たデータを、先のストリームに送りますが、とりあえず、パーティションキーには、${newuuid()} を設定しています。
3 検証コード
newuuid()は、AWS IoTで利用可能な関数で、16バイトのランダムなUUIDを返すものです。
例: newuuid() = 123a4567-b89c-12d3-e456-789012345000
https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/iot-sql-functions.html
一方、パーティションキーは、データをシャード別にグループ化するためのキーです。
Amazon Kinesis Data Streams の用語と概念
実際に、ランダムなUUIDをパーティションキーとして指定した場合に、どのように分散されているのかを下記のテスト用コードで確認してみました。
(1) mqttへの送信
時系列的に順番に、1から増加する数値を送っています。
const AWS = require("aws-sdk");
const endPoint = "xxxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com";
const topicName = "topic_1";
var iotdata = new AWS.IotData({
endpoint: endPoint
});
const max = 10;
async function main(){
for (var i=0; i < max; i++) {
var params = {
topic: topicName,
payload: JSON.stringify({value: i})
};
await iotdata.publish(params).promise();
}
}
main();
(2) Kinesis Data Streamsのデータ列挙
Kinesis Data Streamsのデータ列挙のテスト用のコードです。 ストーリーム名を指定して、シャードを列挙し、それぞれのシャードの先頭から最後まで、データを取得しています。
また、最後には、各シャードのデータ数を一覧しています。
const AWS = require("aws-sdk");
const kinesis = new AWS.Kinesis();
const streamName = "testStream";
async function main(){
// シャードIDの取得
const stream = await kinesis.describeStream({StreamName:streamName}).promise();
const results = stream.StreamDescription.Shards;
const shardIds = results.map( r => r.ShardId );
let logs = [];
let buffer = [];
// シェードごとのデータ数をカウントする
const partition = Array(shardIds.length).fill(0);
// 各シャードのデータ取得
for(var i=0;i<shardIds.length;i++){
const shardId = shardIds[i];
console.log(`[${shardId}]`);
// イテレータの取得
const params = {
ShardId: shardId,
ShardIteratorType: 'TRIM_HORIZON', // データの先頭から
StreamName: streamName,
};
const result = await kinesis.getShardIterator(params).promise();
let shardIterator = result.ShardIterator;
while(true) {
const data = await kinesis.getRecords({
ShardIterator: shardIterator,
Limit: 1000
}).promise();
if (data.Records.length==0) {
break;
}
data.Records.forEach( r => {
console.log(r.ApproximateArrivalTimestamp.getTime() + ',' + r.PartitionKey + ', ' + r.SequenceNumber + ', ' + r.Data);
buffer.push(r);
partition[i]++;
} );
shardIterator = data.NextShardIterator;
}
}
console.log(`-------------------------------------`);
partition.forEach( (n,i) => {
console.log(`[${i}] : ${n}`)
});
console.log(`-------------------------------------`);
buffer.sort( (a,b) => {
return a.ApproximateArrivalTimestamp.getTime() - b.ApproximateArrivalTimestamp.getTime();
// return a.SequenceNumber - b.SequenceNumber;
});
buffer.forEach( b => {
console.log(b.ApproximateArrivalTimestamp.getTime() + ',' + b.PartitionKey + ', ' + b.SequenceNumber + ', ' + b.Data);
})
}
main();
4 10件の送信
先のコードで、10件のデータを送信し、Kinesis Data Streamsに溜まったデータを列挙したところ、以下のようになりました。
[shardId-000000000000]
1580345019079,d45977fb-18b9-43c3-b8fe-8f229f8953f2, 49603743243229486200936459371677281878417297127533182978, {"value":4}
[shardId-000000000001]
1580345018920,57c97b06-adfb-491a-aa7d-08b79cdc140e, 49603743243251786946134989994817608670870330859864457234, {"value":3}
1580345019304,fd7adf21-b38d-4888-b15d-f2fe2fc2d47e, 49603743243251786946134989994820026522509560186933346322, {"value":6}
[shardId-000000000002]
1580345018602,dbbba199-cb9e-4e31-8a4f-897924574a4f, 49603743243274087691333520617867266026852267404092768290, {"value":0}
[shardId-000000000003]
[shardId-000000000004]
[shardId-000000000005]
[shardId-000000000006]
1580345018836,bd1bbe91-34e6-4c23-b75b-8073f06ea364, 49603743243363290672127643110435826751582090108466102370, {"value":2}
1580345019568,e277abf6-f494-46fe-aeb9-d448e623a04a, 49603743243363290672127643110439453529040934064709697634, {"value":8}
1580345019688,613834cc-effb-436b-917e-26dd1a29268c, 49603743243363290672127643110440662454860548693884403810, {"value":9}
[shardId-000000000007]
1580345018686,90c67824-764b-4ac5-86c8-91021abd96d9, 49603743243385591417326173733576153544035123840797376626, {"value":1}
1580345019195,96bdbaaa-6927-4a09-bca4-77aa4f363934, 49603743243385591417326173733578571395674353099146788978, {"value":5}
[shardId-000000000008]
[shardId-000000000009]
1580345019453,b4b4d4d3-bda0-4157-b38f-686e59192074, 49603743243430192907723234979862851758039264520052932754, {"value":7}
-------------------------------------
[0] : 1
[1] : 2
[2] : 1
[3] : 0
[4] : 0
[5] : 0
[6] : 3
[7] : 2
[8] : 0
[9] : 1
送信した10件がすべて入っていますが、1件ずつに分散されている訳ではありません。また、その順番もバラバラです。
これは、ユニークに生成したUUIDが、パーティションキーとして評価される時に、こんな感じで分散されることを表していると思います。
5 パーティションキー
指定されたパーティションキーは、MD5ハッシュ関数で128ビットの整数値にマッピングして、格納先のシャードが決定されるとなっています。
Amazon Kinesis Data Streams の用語と概念
下記のコードで、パーティションキー(UUID)がシャードのグルーピングに使用されるようすを確認してみました。
const uuidList = [] は、先の出力の各データのUUIDをコピーしたものです。128bitのHash値の最大値である FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF をシャードの数(今回は、10個)で割り、グルーピングされる数値の範囲を算出して、それぞれのUUIDのハッシュ値が、どのグループに該当するかを確認しています。
var md5 = require('md5');
const uuidList = [
"d45977fb-18b9-43c3-b8fe-8f229f8953f2",
"57c97b06-adfb-491a-aa7d-08b79cdc140e",
"fd7adf21-b38d-4888-b15d-f2fe2fc2d47e",
"dbbba199-cb9e-4e31-8a4f-897924574a4f",
"bd1bbe91-34e6-4c23-b75b-8073f06ea364",
"e277abf6-f494-46fe-aeb9-d448e623a04a",
"613834cc-effb-436b-917e-26dd1a29268c",
"90c67824-764b-4ac5-86c8-91021abd96d9",
"96bdbaaa-6927-4a09-bca4-77aa4f363934",
"b4b4d4d3-bda0-4157-b38f-686e59192074"
];
// 128bitのHash値の最大値
const max= parseInt("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", 16);
// シャード数
const shard = 10;
// 境界値
const border = max/shard;
// パーティションへの配置のカウント
const partition = Array(shard).fill(0);
uuidList.forEach(uuid => {
// UUIDのハッシュ値を数値で取得
const hash = parseInt(md5(uuid), 16);
// 境界値から何番目に配置されるかを計算する
const index = parseInt(hash/(border));
// カウントする
partition[index]++;
})
partition.forEach( (n,i) => {
console.log(`[${i}] : ${n}`)
});
結果は、下記のとおりで、最初の結果と一致している事が確認できます。
[0] : 1
[1] : 2
[2] : 1
[3] : 0
[4] : 0
[5] : 0
[6] : 3
[7] : 2
[8] : 0
[9] : 1
6 固定のパーティションキー
ちなみに、パーティションキーを固定値にしてしまうと、検証結果は、以下のとおりとなり、1つのシャードに全部入ってしまうことが確認できます。
[shardId-000000000000]
[shardId-000000000001]
[shardId-000000000002]
[shardId-000000000003]
[shardId-000000000004]
[shardId-000000000005]
[shardId-000000000006]
[shardId-000000000007]
1580344794097,1, 49603743154182610623203681167485264263685091409505812594, {"value":0}
1580344794232,1, 49603743154182610623203681167487682115324320667855224946, {"value":1}
1580344794399,1, 49603743154182610623203681167488891041143935297029931122, {"value":2}
1580344794513,1, 49603743154182610623203681167490099966963549926204637298, {"value":3}
1580344794676,1, 49603743154182610623203681167491308892783164555379343474, {"value":4}
1580344794781,1, 49603743154182610623203681167492517818602779184554049650, {"value":5}
1580344794938,1, 49603743154182610623203681167494935670242008442903462002, {"value":6}
1580344795094,1, 49603743154182610623203681167496144596061623140797644914, {"value":7}
1580344795201,1, 49603743154182610623203681167497353521881237769972351090, {"value":8}
1580344795317,1, 49603743154182610623203681167498562447700852399147057266, {"value":9}
[shardId-000000000008]
[shardId-000000000009]
-------------------------------------
[0] : 0
[1] : 0
[2] : 0
[3] : 0
[4] : 0
[5] : 0
[6] : 0
[7] : 10
[8] : 0
[9] : 0
7 ソート
時系列で生成したデータなので、各シャードのデータをまとめて、タイムスタンプでソートすると、順番に並んだデータが取得できます。
ソート
buffer.sort( (a,b) => {
return a.ApproximateArrivalTimestamp.getTime() - b.ApproximateArrivalTimestamp.getTime();
});
buffer.forEach( b => {
console.log(b.ApproximateArrivalTimestamp.getTime() + ',' + b.PartitionKey + ', ' + b.SequenceNumber + ', ' + b.Data);
})
結果出力
-------------------------------------
1580345018602,dbbba199-cb9e-4e31-8a4f-897924574a4f, 49603743243274087691333520617867266026852267404092768290, {"value":0}
1580345018686,90c67824-764b-4ac5-86c8-91021abd96d9, 49603743243385591417326173733576153544035123840797376626, {"value":1}
1580345018836,bd1bbe91-34e6-4c23-b75b-8073f06ea364, 49603743243363290672127643110435826751582090108466102370, {"value":2}
1580345018920,57c97b06-adfb-491a-aa7d-08b79cdc140e, 49603743243251786946134989994817608670870330859864457234, {"value":3}
1580345019079,d45977fb-18b9-43c3-b8fe-8f229f8953f2, 49603743243229486200936459371677281878417297127533182978, {"value":4}
1580345019195,96bdbaaa-6927-4a09-bca4-77aa4f363934, 49603743243385591417326173733578571395674353099146788978, {"value":5}
1580345019304,fd7adf21-b38d-4888-b15d-f2fe2fc2d47e, 49603743243251786946134989994820026522509560186933346322, {"value":6}
1580345019453,b4b4d4d3-bda0-4157-b38f-686e59192074, 49603743243430192907723234979862851758039264520052932754, {"value":7}
1580345019568,e277abf6-f494-46fe-aeb9-d448e623a04a, 49603743243363290672127643110439453529040934064709697634, {"value":8}
1580345019688,613834cc-effb-436b-917e-26dd1a29268c, 49603743243363290672127643110440662454860548693884403810, {"value":9}
ちなみに、シーケンス番号でソートした場合は、時系列の並びになっていませんでした。
ソート
buffer.sort( (a,b) => {
return a.SequenceNumber - b.SequenceNumber;
});
buffer.forEach( b => {
console.log(b.ApproximateArrivalTimestamp.getTime() + ',' + b.PartitionKey + ', ' + b.SequenceNumber + ', ' + b.Data);
})
結果出力
-------------------------------------
1580345019079,d45977fb-18b9-43c3-b8fe-8f229f8953f2, 49603743243229486200936459371677281878417297127533182978, {"value":4}
1580345018920,57c97b06-adfb-491a-aa7d-08b79cdc140e, 49603743243251786946134989994817608670870330859864457234, {"value":3}
1580345019304,fd7adf21-b38d-4888-b15d-f2fe2fc2d47e, 49603743243251786946134989994820026522509560186933346322, {"value":6}
1580345018602,dbbba199-cb9e-4e31-8a4f-897924574a4f, 49603743243274087691333520617867266026852267404092768290, {"value":0}
1580345018836,bd1bbe91-34e6-4c23-b75b-8073f06ea364, 49603743243363290672127643110435826751582090108466102370, {"value":2}
1580345019568,e277abf6-f494-46fe-aeb9-d448e623a04a, 49603743243363290672127643110439453529040934064709697634, {"value":8}
1580345019688,613834cc-effb-436b-917e-26dd1a29268c, 49603743243363290672127643110440662454860548693884403810, {"value":9}
1580345018686,90c67824-764b-4ac5-86c8-91021abd96d9, 49603743243385591417326173733576153544035123840797376626, {"value":1}
1580345019195,96bdbaaa-6927-4a09-bca4-77aa4f363934, 49603743243385591417326173733578571395674353099146788978, {"value":5}
1580345019453,b4b4d4d3-bda0-4157-b38f-686e59192074, 49603743243430192907723234979862851758039264520052932754, {"value":7}
8 大量データ
パーティションキーを ${newuuid()} に戻して、10000件のデータを送ってみた際の、シャードへの分散状況です。
各シャードに1000件づつ分散されるのが理想値ではありますが・・・充分に分散されているとは言えそうです。
const arr = Array(10000).fill(0);
await Promise.all(arr.map( async (_v,i) => {
var params = {
topic: topicName,
payload: JSON.stringify({value: i})
};
await iotdata.publish(params).promise();
console.log(`send ${i}`)
}))
console.log("done");
-------------------------------------
[0] : 1025
[1] : 990
[2] : 948
[3] : 1061
[4] : 984
[5] : 1012
[6] : 999
[7] : 972
[8] : 981
[9] : 1028
-------------------------------------
9 最後に
今回は、AWS IoTからKinesis Data Streamsにデータを送る場合の、パーティションキー( newuuid() )について、少し確認してみました。
実際に色々試しみて、ちょっと理解進んだような気がしました。