Amazon Timestream のマルチメジャーレコードを試してみる

Amazon Timestream のマルチメジャーレコードを試してみる

Clock Icon2022.02.10

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

前回の記事で、Amazon Timestream に対して「シングルメジャーレコード」による書き込みについてご紹介しました。

今回は、2021 年 11 月 29 日にリリースされた「マルチメジャーレコード」を試してみたいと思います。

マルチメジャーレコードとは?

従来の「シングルメジャーレコード」では「1項目のデータに対して1レコード」でしか書き込みができませんでした。また、Amazon Timestream は 「1度のリクエストで書き込めるレコードの最大数は 100」であることから、101件以上の項目を持つデータを書き込む時はリクエストを2回以上に分けて行う必要があり、実装上の手間にもなっていました。

01-single-measure-record

マルチメジャーレコードを使うことで「1つのレコードに複数の項目(メジャー値)を持たせる」ことが可能になりました。
シングルメジャーレコードの場合に比べて、下記のように「1レコードに複数項目を格納できる」ようになります。(下記では、実際には data_0data_9の10個のメジャー値を格納しています。)

200-multi-measure-record-sample

下記ドキュメントに「シングルメジャーレコード」と「マルチメジャーレコード」の詳細や Java のサンプルコードも掲載されています。

SDK(boto3)を使って書き込む

それでは実際に使ってみたいと思います。
前回の記事で使ったスクリプトを少し修正して書き込んでみます。下記を適当な名前(write_multi_measure.py)を付けて Amazon Timestream と同じリージョンの AWS Cloudshell に保存します。

import boto3
from botocore.config import Config
import time
import random

session = boto3.Session()
write_client = session.client('timestream-write', region_name='us-east-1',
                              config=Config(read_timeout=20,    # リクエストタイムアウト(秒)
                              max_pool_connections=5000,        # 最大接続数
                              retries={'max_attempts': 10}))    # 最大試行回数

# Amazon Timestream データベースとテーブル
DatabaseName='mytestdb'
TableName='mytesttbl'

# タイムスタンプの付与
def current_milli_time():
    return round(time.time() * 1000)

# ディメンションの作成
def gen_dimensions():
    municipalities = random.choice(['Shinjuku', 'Toshima', 'Nakano', 'Ota', 'Chiyoda'])
    gateway_id = random.choice(['gateway_1', 'gateway_2', 'gateway_3'])
    device_name = str(municipalities) + '_' + str(gateway_id)
    dimensions = [
        {'Name': 'Location', 'Value': 'Tokyo'},
        {'Name': 'Municipalities', 'Value': municipalities},
        {'Name': 'DeviceName', 'Value': device_name}
    ]
    return dimensions

# 何件のメジャー値(アイテム、RDBのカラムに相当するもの)を作成するか
# 1レコードあたりに入れるアイテムを複数作成する
def gen_item():
    records = []
    MultiMeasureValue = []
    start_item_num = 0 # 任意のアイテム番号からMeasureValueを作成
    end_item_num = 10 # start_item_numから 256を超えない範囲で指定

    for multi_measure_num in range(start_item_num,end_item_num): 
        #ランダムな数値データをメジャー値として作成
        MeasureValue = str(random.uniform(1, 90))
        dummy_multi_measure = 'item_' + str(multi_measure_num)
        myitem = {
            'Name': dummy_multi_measure,
            'Value': MeasureValue,
            'Type': 'DOUBLE'
        },
        records.append(myitem)
        t = records[multi_measure_num - start_item_num]
        MultiMeasureValue.append(t[0]) # 要素だけ抽出して、それを連結して変数に入れる
    return MultiMeasureValue

# レコードの生成
def gen_dummy_record():
    records_X = []
    # 何件のレコードを作成するか ; 100件:(0,100) 最大100
    for record_num in range(0,5):
        dummy_measure = {
            'Dimensions': gen_dimensions(),
            'MeasureName': 'dummy_metrics',
            'MeasureValueType': 'MULTI',
            'MeasureValues': gen_item(),
            'Time': str(current_milli_time()),
            'TimeUnit': 'MILLISECONDS'
        }
        records_X.append(dummy_measure)
        time.sleep(1/1000)
    return records_X

for write_num in range(0,1): # 生成したレコードを何回書き込むか
    print ("start write_records...: " + str(write_num))
    result = write_client.write_records(DatabaseName=DatabaseName,
                                        TableName=TableName,
                                        Records=gen_dummy_record(), CommonAttributes={})

スクリプトを実行します。

$ python3 write_multi_measure.py

問題なく実行できたら、マネージドコンソールのクエリーエディターで確認してみます。下記のように「1つのレコードで複数の項目(メジャー値)」が書き込めていることが分かります。

200-query-multi-measure-console

SDK でクエリも行ってみます。

import json
import boto3
from botocore.config import Config

config = Config(region_name = 'us-east-1')
config.endpoint_discovery_enabled = True
timestream_query_client = boto3.client('timestream-query', config=config)


#def lambda_handler(event, context):
result = timestream_query_client.query(
    QueryString='SELECT * FROM "mytestdb".mytesttbl ORDER BY time DESC LIMIT 10'
)

print(result['ColumnInfo'])
print(result['Rows'])

結果は下記のようになります。(見やすいように整形しています)
先程の書き込みで 5 行のレコードを書き込んでいるので、5 行のレコードが取得できていることが分かります。(Dataが 5 件ある)

[
    {'Name': 'Municipalities', 'Type': {'ScalarType': 'VARCHAR'}},
    {'Name': 'DeviceName', 'Type': {'ScalarType': 'VARCHAR'}},
    {'Name': 'Location', 'Type': {'ScalarType': 'VARCHAR'}},
    {'Name': 'measure_name', 'Type': {'ScalarType': 'VARCHAR'}},
    {'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}},
    {'Name': 'item_9', 'Type': {'ScalarType': 'DOUBLE'}},
    {'Name': 'item_8', 'Type': {'ScalarType': 'DOUBLE'}},
    {'Name': 'item_7', 'Type': {'ScalarType': 'DOUBLE'}},
    {'Name': 'item_6', 'Type': {'ScalarType': 'DOUBLE'}},
    {'Name': 'item_5', 'Type': {'ScalarType': 'DOUBLE'}},
    {'Name': 'item_4', 'Type': {'ScalarType': 'DOUBLE'}},
    {'Name': 'item_3', 'Type': {'ScalarType': 'DOUBLE'}},
    {'Name': 'item_2', 'Type': {'ScalarType': 'DOUBLE'}},
    {'Name': 'item_1', 'Type': {'ScalarType': 'DOUBLE'}},
    {'Name': 'item_0', 'Type': {'ScalarType': 'DOUBLE'}}
]
[
    {
        'Data': [
            {'ScalarValue': 'Nakano'},
            {'ScalarValue': 'Nakano_gateway_2'},
            {'ScalarValue': 'Tokyo'},
            {'ScalarValue': 'dummy_metrics'},
            {'ScalarValue': '2022-02-09 04:35:50.993000000'},
            {'ScalarValue': '13.50698384832771'},
            {'ScalarValue': '62.32520403126607'},
            {'ScalarValue': '50.38438006121369'},
            {'ScalarValue': '60.47911288436019'},
            {'ScalarValue': '47.916499037614855'},
            {'ScalarValue': '1.0762744982580215'},
            {'ScalarValue': '3.7927855674438677'},
            {'ScalarValue': '72.66258193613993'},
            {'ScalarValue': '68.32646113258168'},
            {'ScalarValue': '36.78812920437527'}
        ]
    },
    {
        'Data': [
            {'ScalarValue': 'Nakano'},
            {'ScalarValue': 'Nakano_gateway_2'},
            {'ScalarValue': 'Tokyo'},
            {'ScalarValue': 'dummy_metrics'},
            {'ScalarValue': '2022-02-09 04:35:50.992000000'},
            {'ScalarValue': '49.44764511225782'},
            {'ScalarValue': '45.59324588471825'},
            {'ScalarValue': '74.36151137653971'},
            {'ScalarValue': '77.81426709078417'},
            {'ScalarValue': '45.05477357771364'},
            {'ScalarValue': '17.934284039689093'},
            {'ScalarValue': '60.6791329264966'},
            {'ScalarValue': '5.366442659377906'},
            {'ScalarValue': '38.67095252168725'},
            {'ScalarValue': '2.925094564822059'}]},
        {'Data': [
            {'ScalarValue': 'Toshima'},
            {'ScalarValue': 'Toshima_gateway_3'},
            {'ScalarValue': 'Tokyo'},
            {'ScalarValue': 'dummy_metrics'},
            {'ScalarValue': '2022-02-09 04:35:50.991000000'},
            {'ScalarValue': '59.907529223180795'},
            {'ScalarValue': '83.27823413119977'},
            {'ScalarValue': '20.411899452679975'},
            {'ScalarValue': '61.461197174190914'},
            {'ScalarValue': '5.903499747455618'},
            {'ScalarValue': '23.459727095424405'},
            {'ScalarValue': '63.53081175503368'},
            {'ScalarValue': '29.542469182092596'},
            {'ScalarValue': '36.30519959843377'},
            {'ScalarValue': '39.738470642676965'}
            ]
        },
        {'Data': [
            {'ScalarValue': 'Ota'},
            {'ScalarValue': 'Ota_gateway_1'},
            {'ScalarValue': 'Tokyo'},
            {'ScalarValue': 'dummy_metrics'},
            {'ScalarValue': '2022-02-09 04:35:50.990000000'},
            {'ScalarValue': '62.536454418975055'},
            {'ScalarValue': '78.60200891758909'},
            {'ScalarValue': '81.28243048511581'},
            {'ScalarValue': '48.23886560138938'},
            {'ScalarValue': '21.995361088522625'},
            {'ScalarValue': '33.9646652804811'},
            {'ScalarValue': '64.15876158250896'},
            {'ScalarValue': '45.2563780993231'},
            {'ScalarValue': '3.122616178184904'},
            {'ScalarValue': '30.95097443666494'}
            ]
        },
        {'Data': [
            {'ScalarValue': 'Chiyoda'},
            {'ScalarValue': 'Chiyoda_gateway_3'},
            {'ScalarValue': 'Tokyo'},
            {'ScalarValue': 'dummy_metrics'},
            {'ScalarValue': '2022-02-09 04:35:50.989000000'},
            {'ScalarValue': '30.633073977415798'},
            {'ScalarValue': '79.05262979806567'},
            {'ScalarValue': '2.1765639177699914'},
            {'ScalarValue': '46.001269642205756'},
            {'ScalarValue': '87.2690313507515'},
            {'ScalarValue': '6.13416352380873'},
            {'ScalarValue': '79.85928126190133'},
            {'ScalarValue': '24.834849521789724'},
            {'ScalarValue': '48.895769632640665'},
            {'ScalarValue': '40.906454935172256'}
        ]
    }
]

書き込み速度の確認

下記のリリースニュースでは、マルチメジャーレコードを使うと「書き込み速度も早くなる」という記載があるので、上記のサンプルスクリプトを使って確認してみたいと思います。

コード中の下記の値を変えて「200 個の項目(バリュー値)を持つレコードを 1 行書き込む」ように変更します。

  • 37〜38 行目( 「1レコードに格納する項目数」を指定)
    • start_item_num = 0
    • end_item_num = 200
  • 58行目(「1 回のリクエストで書き込むレコード数」を指定)
    • for record_num in range(0,1):

修正できたら前回の記事と同様に、下記のシェルスクリプト(write_record.sh)を使って 30 分間断続的に書き込みを行います。

#!/bin/bash
for count in {1..30}
do
    # write single measure record
    for i in {1..10}
    do
        python3 write_multi_measure.py
    done
    echo "sleep 60..."
    sleep 60
done

AWS Cloudshell でこのシェルスクリプトを実行します。

$ sh write_record.sh

CloudWatch で確認

今回も、「マルチメジャーレコードの書き込み」と「シングルメジャーレコードの書き込み」をそれぞれ同時に別テーブルに対して行いました。

書き込みが終わったら前回と同様「SuccessfulRequestLatency」というメトリクスを確認してみます。グラフの色の意味は下記です。

  • 緑色:マルチメジャーレコードで 200 個のバリュー値を持つレコードを 1 レコード書き込み
  • オレンジ色:シングルメジャーレコードで 200 行のレコードを 100 行ずつ2回のリクエストで書き込み

100-cloudwatch_timestream

次のグラフは 2 回目の書き込み以後の時間で拡大したものです。

101-cloudwatch_timestream_after-2nd-write_2

(作業タイミングの関係で線が切れていますが)下記のような動きになりました。

  • 初回書き込み時は SuccessfulRequestLatency の値が高い(前回と同様)
  • 2回目以降は SuccessfulRequestLatency の値が徐々に下がり安定した
  • マルチメジャーレコード書き込み(緑色)の方が SuccessfulRequestLatencyの値が 4ms 程度小さい。

書き込み量を変えて書き込む

先程は「シングルメジャーレコード」と「マルチメジャーレコード」の書き込み速度について確認しました。最後に「マルチメジャーレコードで書き込む量を増やした場合」について確認してみたいと思います。

先程のコードを下記のように修正して「1 レコードに 200 項目を持つレコードを 100 行」書き込んでみます。(1 レコードの書き込みから100 レコードの書き込みに増やした)

  • 38 行目( 「1 レコードに格納する項目数」を指定)
    • end_item_num = 200
  • 58行目(「1 回のリクエストで書き込むレコード数」を指定)
    • for record_num in range(0,100):

この場合の「SuccessfulRequestLatency」メトリクスは下記のようになりました。
これまでと同様に「初回書き込み時」だけ値が高くなり、その後、安定化しています。
やはり、書き込み量が増えると「SuccessfulRequestLatency」も増加することが分かりました。(とはいえ最後の方は 190ms 以下ですが…)

201-write-200-record-multi-measure

今回の検証では上記のような結果になりましたが、実行速度がどの程度になるのかはユースケースやクライアントの実装などに依存するかと思います。実際に利用される場合は自分の要件に応じて事前に検証されることをおすすめいたします。

最後に

今回の検証を通して、従来のシングルメジャーレコードに比べてマルチメジャーレコードは直感的で分かりやすい形で書き込めることが分かりました。

ストレージの費用削減やクエリパフォーマンスも上がるので、これからどんどん活用していきたいと思います。東京リージョンで利用できるようになる日が待ち遠しいですね。

以上です。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.