【小ネタ】AWS IoTのアクション設定でKinesis Data Streamsにデータを送る時、パーティションキーに指定するnewuuid()について確認してみました

2020.01.30

1 はじめに

CX事業本部の平内(SIN)です。

AWS IoTでアクション設定を使用してKinesis Data Streamsにデータを送る場合、どのシャードに送るかを指定するためにパーティションキーを設定する箇所があります。

スケール目的でシャードの数を増やしても、パーティションキーを適切に設定しないと、期待通りの分散が出来ません。今回は、ここで一般に使用される newuuid() について、動作を確認してみました。

すいません、特にひねった内容ではありません。個人的に理解を深めるために、単純にドキュメントに記載されている内容をなぞっただけです。

2 検証環境

(1) Kinesis Video Streams

動作確認用のストリームは、シャード数を10に設定して作成しました。

(2) AWS IoT

AWS IoTでは、アクション設定で特定のトピックに来たデータを、先のストリームに送りますが、とりあえず、パーティションキーには、${newuuid()} を設定しています。

3 検証コード

newuuid()は、AWS IoTで利用可能な関数で、16バイトのランダムなUUIDを返すものです。

例: newuuid() = 123a4567-b89c-12d3-e456-789012345000
https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/iot-sql-functions.html

一方、パーティションキーは、データをシャード別にグループ化するためのキーです。
Amazon Kinesis Data Streams の用語と概念

実際に、ランダムなUUIDをパーティションキーとして指定した場合に、どのように分散されているのかを下記のテスト用コードで確認してみました。

(1) mqttへの送信

時系列的に順番に、1から増加する数値を送っています。

const AWS = require("aws-sdk");

const endPoint = "xxxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com";
const topicName = "topic_1";
var iotdata = new AWS.IotData({
    endpoint: endPoint
});

const max = 10;

async function main(){
    for (var i=0; i < max; i++) {
        var params = {
            topic: topicName,
            payload: JSON.stringify({value: i})
        };
        await iotdata.publish(params).promise();
    }
}
main();

(2) Kinesis Data Streamsのデータ列挙

Kinesis Data Streamsのデータ列挙のテスト用のコードです。 ストーリーム名を指定して、シャードを列挙し、それぞれのシャードの先頭から最後まで、データを取得しています。

また、最後には、各シャードのデータ数を一覧しています。

const AWS = require("aws-sdk");
const kinesis = new AWS.Kinesis();

const streamName = "testStream";

async function main(){

    // シャードIDの取得
    const stream = await kinesis.describeStream({StreamName:streamName}).promise();
    const results = stream.StreamDescription.Shards;
    const shardIds = results.map( r => r.ShardId );

    let logs = [];
    let buffer = [];

    // シェードごとのデータ数をカウントする
    const partition = Array(shardIds.length).fill(0);

    // 各シャードのデータ取得
    for(var i=0;i<shardIds.length;i++){
        const shardId = shardIds[i];

        console.log(`[${shardId}]`);

        // イテレータの取得
        const params = {
            ShardId: shardId,
            ShardIteratorType: 'TRIM_HORIZON', // データの先頭から
            StreamName: streamName,
        };
        const result = await kinesis.getShardIterator(params).promise();
        let shardIterator = result.ShardIterator;
        while(true) {
            const data = await kinesis.getRecords({
                ShardIterator: shardIterator, 
                Limit: 1000
            }).promise();

            if (data.Records.length==0) { 
                break;
            }
            data.Records.forEach( r => { 
                console.log(r.ApproximateArrivalTimestamp.getTime() + ',' + r.PartitionKey + ', ' + r.SequenceNumber + ', ' + r.Data);
                buffer.push(r);
                partition[i]++;
            } );
            shardIterator = data.NextShardIterator;
        }
    }
    console.log(`-------------------------------------`);
    partition.forEach( (n,i) => {
        console.log(`[${i}] : ${n}`)
    });

    console.log(`-------------------------------------`);
    buffer.sort( (a,b) => {
        return  a.ApproximateArrivalTimestamp.getTime() - b.ApproximateArrivalTimestamp.getTime();
        // return  a.SequenceNumber - b.SequenceNumber;
    });
    buffer.forEach( b => { 
        console.log(b.ApproximateArrivalTimestamp.getTime() + ',' + b.PartitionKey + ', ' + b.SequenceNumber + ', ' + b.Data);
    })
}
main();

4 10件の送信

先のコードで、10件のデータを送信し、Kinesis Data Streamsに溜まったデータを列挙したところ、以下のようになりました。

[shardId-000000000000]
1580345019079,d45977fb-18b9-43c3-b8fe-8f229f8953f2, 49603743243229486200936459371677281878417297127533182978, {"value":4}
[shardId-000000000001]
1580345018920,57c97b06-adfb-491a-aa7d-08b79cdc140e, 49603743243251786946134989994817608670870330859864457234, {"value":3}
1580345019304,fd7adf21-b38d-4888-b15d-f2fe2fc2d47e, 49603743243251786946134989994820026522509560186933346322, {"value":6}
[shardId-000000000002]
1580345018602,dbbba199-cb9e-4e31-8a4f-897924574a4f, 49603743243274087691333520617867266026852267404092768290, {"value":0}
[shardId-000000000003]
[shardId-000000000004]
[shardId-000000000005]
[shardId-000000000006]
1580345018836,bd1bbe91-34e6-4c23-b75b-8073f06ea364, 49603743243363290672127643110435826751582090108466102370, {"value":2}
1580345019568,e277abf6-f494-46fe-aeb9-d448e623a04a, 49603743243363290672127643110439453529040934064709697634, {"value":8}
1580345019688,613834cc-effb-436b-917e-26dd1a29268c, 49603743243363290672127643110440662454860548693884403810, {"value":9}
[shardId-000000000007]
1580345018686,90c67824-764b-4ac5-86c8-91021abd96d9, 49603743243385591417326173733576153544035123840797376626, {"value":1}
1580345019195,96bdbaaa-6927-4a09-bca4-77aa4f363934, 49603743243385591417326173733578571395674353099146788978, {"value":5}
[shardId-000000000008]
[shardId-000000000009]
1580345019453,b4b4d4d3-bda0-4157-b38f-686e59192074, 49603743243430192907723234979862851758039264520052932754, {"value":7}
-------------------------------------
[0] : 1
[1] : 2
[2] : 1
[3] : 0
[4] : 0
[5] : 0
[6] : 3
[7] : 2
[8] : 0
[9] : 1

送信した10件がすべて入っていますが、1件ずつに分散されている訳ではありません。また、その順番もバラバラです。

これは、ユニークに生成したUUIDが、パーティションキーとして評価される時に、こんな感じで分散されることを表していると思います。

5 パーティションキー

指定されたパーティションキーは、MD5ハッシュ関数で128ビットの整数値にマッピングして、格納先のシャードが決定されるとなっています。

Amazon Kinesis Data Streams の用語と概念

下記のコードで、パーティションキー(UUID)がシャードのグルーピングに使用されるようすを確認してみました。

const uuidList = [] は、先の出力の各データのUUIDをコピーしたものです。128bitのHash値の最大値である FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF をシャードの数(今回は、10個)で割り、グルーピングされる数値の範囲を算出して、それぞれのUUIDのハッシュ値が、どのグループに該当するかを確認しています。

var md5 = require('md5');

const uuidList = [
    "d45977fb-18b9-43c3-b8fe-8f229f8953f2",
    "57c97b06-adfb-491a-aa7d-08b79cdc140e",
    "fd7adf21-b38d-4888-b15d-f2fe2fc2d47e",
    "dbbba199-cb9e-4e31-8a4f-897924574a4f",
    "bd1bbe91-34e6-4c23-b75b-8073f06ea364",
    "e277abf6-f494-46fe-aeb9-d448e623a04a",
    "613834cc-effb-436b-917e-26dd1a29268c",
    "90c67824-764b-4ac5-86c8-91021abd96d9",
    "96bdbaaa-6927-4a09-bca4-77aa4f363934",
    "b4b4d4d3-bda0-4157-b38f-686e59192074"
];

// 128bitのHash値の最大値
const max= parseInt("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", 16);
// シャード数
const shard = 10;
// 境界値
const border = max/shard;
// パーティションへの配置のカウント
const partition = Array(shard).fill(0);

uuidList.forEach(uuid => {
    // UUIDのハッシュ値を数値で取得
    const hash = parseInt(md5(uuid), 16);
    // 境界値から何番目に配置されるかを計算する
    const index = parseInt(hash/(border));
    // カウントする
    partition[index]++;
})

partition.forEach( (n,i) => {
    console.log(`[${i}] : ${n}`)
});

結果は、下記のとおりで、最初の結果と一致している事が確認できます。

[0] : 1
[1] : 2
[2] : 1
[3] : 0
[4] : 0
[5] : 0
[6] : 3
[7] : 2
[8] : 0
[9] : 1

6 固定のパーティションキー

ちなみに、パーティションキーを固定値にしてしまうと、検証結果は、以下のとおりとなり、1つのシャードに全部入ってしまうことが確認できます。

[shardId-000000000000]
[shardId-000000000001]
[shardId-000000000002]
[shardId-000000000003]
[shardId-000000000004]
[shardId-000000000005]
[shardId-000000000006]
[shardId-000000000007]
1580344794097,1, 49603743154182610623203681167485264263685091409505812594, {"value":0}
1580344794232,1, 49603743154182610623203681167487682115324320667855224946, {"value":1}
1580344794399,1, 49603743154182610623203681167488891041143935297029931122, {"value":2}
1580344794513,1, 49603743154182610623203681167490099966963549926204637298, {"value":3}
1580344794676,1, 49603743154182610623203681167491308892783164555379343474, {"value":4}
1580344794781,1, 49603743154182610623203681167492517818602779184554049650, {"value":5}
1580344794938,1, 49603743154182610623203681167494935670242008442903462002, {"value":6}
1580344795094,1, 49603743154182610623203681167496144596061623140797644914, {"value":7}
1580344795201,1, 49603743154182610623203681167497353521881237769972351090, {"value":8}
1580344795317,1, 49603743154182610623203681167498562447700852399147057266, {"value":9}
[shardId-000000000008]
[shardId-000000000009]
-------------------------------------
[0] : 0
[1] : 0
[2] : 0
[3] : 0
[4] : 0
[5] : 0
[6] : 0
[7] : 10
[8] : 0
[9] : 0

7 ソート

時系列で生成したデータなので、各シャードのデータをまとめて、タイムスタンプでソートすると、順番に並んだデータが取得できます。

ソート

buffer.sort( (a,b) => {
    return  a.ApproximateArrivalTimestamp.getTime() - b.ApproximateArrivalTimestamp.getTime();
});
buffer.forEach( b => { 
    console.log(b.ApproximateArrivalTimestamp.getTime() + ',' + b.PartitionKey + ', ' + b.SequenceNumber + ', ' + b.Data);
})

結果出力

-------------------------------------
1580345018602,dbbba199-cb9e-4e31-8a4f-897924574a4f, 49603743243274087691333520617867266026852267404092768290, {"value":0}
1580345018686,90c67824-764b-4ac5-86c8-91021abd96d9, 49603743243385591417326173733576153544035123840797376626, {"value":1}
1580345018836,bd1bbe91-34e6-4c23-b75b-8073f06ea364, 49603743243363290672127643110435826751582090108466102370, {"value":2}
1580345018920,57c97b06-adfb-491a-aa7d-08b79cdc140e, 49603743243251786946134989994817608670870330859864457234, {"value":3}
1580345019079,d45977fb-18b9-43c3-b8fe-8f229f8953f2, 49603743243229486200936459371677281878417297127533182978, {"value":4}
1580345019195,96bdbaaa-6927-4a09-bca4-77aa4f363934, 49603743243385591417326173733578571395674353099146788978, {"value":5}
1580345019304,fd7adf21-b38d-4888-b15d-f2fe2fc2d47e, 49603743243251786946134989994820026522509560186933346322, {"value":6}
1580345019453,b4b4d4d3-bda0-4157-b38f-686e59192074, 49603743243430192907723234979862851758039264520052932754, {"value":7}
1580345019568,e277abf6-f494-46fe-aeb9-d448e623a04a, 49603743243363290672127643110439453529040934064709697634, {"value":8}
1580345019688,613834cc-effb-436b-917e-26dd1a29268c, 49603743243363290672127643110440662454860548693884403810, {"value":9}

ちなみに、シーケンス番号でソートした場合は、時系列の並びになっていませんでした。

ソート

buffer.sort( (a,b) => {
    return  a.SequenceNumber - b.SequenceNumber;
});
buffer.forEach( b => { 
    console.log(b.ApproximateArrivalTimestamp.getTime() + ',' + b.PartitionKey + ', ' + b.SequenceNumber + ', ' + b.Data);
})

結果出力

-------------------------------------
1580345019079,d45977fb-18b9-43c3-b8fe-8f229f8953f2, 49603743243229486200936459371677281878417297127533182978, {"value":4}
1580345018920,57c97b06-adfb-491a-aa7d-08b79cdc140e, 49603743243251786946134989994817608670870330859864457234, {"value":3}
1580345019304,fd7adf21-b38d-4888-b15d-f2fe2fc2d47e, 49603743243251786946134989994820026522509560186933346322, {"value":6}
1580345018602,dbbba199-cb9e-4e31-8a4f-897924574a4f, 49603743243274087691333520617867266026852267404092768290, {"value":0}
1580345018836,bd1bbe91-34e6-4c23-b75b-8073f06ea364, 49603743243363290672127643110435826751582090108466102370, {"value":2}
1580345019568,e277abf6-f494-46fe-aeb9-d448e623a04a, 49603743243363290672127643110439453529040934064709697634, {"value":8}
1580345019688,613834cc-effb-436b-917e-26dd1a29268c, 49603743243363290672127643110440662454860548693884403810, {"value":9}
1580345018686,90c67824-764b-4ac5-86c8-91021abd96d9, 49603743243385591417326173733576153544035123840797376626, {"value":1}
1580345019195,96bdbaaa-6927-4a09-bca4-77aa4f363934, 49603743243385591417326173733578571395674353099146788978, {"value":5}
1580345019453,b4b4d4d3-bda0-4157-b38f-686e59192074, 49603743243430192907723234979862851758039264520052932754, {"value":7}

8 大量データ

パーティションキーを ${newuuid()} に戻して、10000件のデータを送ってみた際の、シャードへの分散状況です。

各シャードに1000件づつ分散されるのが理想値ではありますが・・・充分に分散されているとは言えそうです。

const arr = Array(10000).fill(0);
await Promise.all(arr.map( async (_v,i) => {
    var params = {
        topic: topicName,
        payload: JSON.stringify({value: i})
    };
    await iotdata.publish(params).promise();
    console.log(`send ${i}`)
}))
console.log("done");
-------------------------------------
[0] : 1025
[1] : 990
[2] : 948
[3] : 1061
[4] : 984
[5] : 1012
[6] : 999
[7] : 972
[8] : 981
[9] : 1028
-------------------------------------

9 最後に

今回は、AWS IoTからKinesis Data Streamsにデータを送る場合の、パーティションキー( newuuid() )について、少し確認してみました。

実際に色々試しみて、ちょっと理解進んだような気がしました。