Amazon Timestream に シングルメジャーレコードで 100 件以上のセンサー値があるデータを書き込んでみる

2022.02.10

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

背景

今回、Amazon Timestream を使うことがあり、「データ項目が100件以上あるデータをテーブルに書き込みたい」という要件がありました。その際の対応について分かったことを紹介したいと思います。

本記事の注意点

今回は従来からある「シングルメジャーレコード」による書き込みを想定しています。より効率的な書き込みが可能な「マルチメジャーレコード」については次回の記事でご紹介したいと思います。

簡単なスクリプトでデータを書き込んでみる

まずは、公式ドキュメントに掲載されているドキュメントを元に、SDK(boto3)を使って書き込みを行います。

ここでは、デバイスから送られてくるデータとして下記のようなものを想定しています。

{
  "timestamp": "2021-04-28 01:14:49",
  "device_name": "host1",
  "az": "az1",
  "region": "us-east-1"
  "cpu_utilization": "67.93225830354002",
  "memory_utilization": "88.71518322520387"
  "proc_numbers": 91
}

サンプルコードは下記です。ドキュメントのものを少し修正して cpu_utilizationmemory_utilizationproc_numbers というデータ項目を書き込んでみます。

import boto3
from botocore.config import Config
import time
import random

DatabaseName = 'mytestdb'
TableName = 'mytesttbl'


def current_milli_time():
    return round(time.time() * 1000)

session = boto3.Session()
write_client = session.client('timestream-write', region_name='us-east-1',
                              config=Config(read_timeout=20,    # リクエストタイムアウト(秒)
                              max_pool_connections=5000,        # 最大接続数
                              retries={'max_attempts': 10}))    # 最大試行回数

current_time = str(current_milli_time())

dimensions = [
    {'Name': 'region', 'Value': 'us-east-1'},
    {'Name': 'az', 'Value': 'az1'},
    {'Name': 'hostname', 'Value': 'host1'}
]

cpu_utilization = {
    'Dimensions': dimensions,
    'MeasureName': 'cpu_utilization',
    'MeasureValue': str(random.uniform(1, 90)),
    'MeasureValueType': 'DOUBLE',
    'Time': current_time
}

memory_utilization = {
    'Dimensions': dimensions,
    'MeasureName': 'memory_utilization',
    'MeasureValue': str(random.uniform(1, 90)),
    'MeasureValueType': 'DOUBLE',
    'Time': current_time
}

proc_numbers = {
    'Dimensions': dimensions,
    'MeasureName': 'proc_numbers',
    'MeasureValue': str(random.randint(1, 200)),
    'MeasureValueType': 'BIGINT',
    'Time': current_time
}

records = [cpu_utilization, memory_utilization, proc_numbers]

result = write_client.write_records(DatabaseName=DatabaseName,
                                   TableName=TableName,
                                   Records=records, CommonAttributes={})

コンソールで確認すると、下記のように同じタイムスタンプでcpu_utilizationmemory_utilizationproc_numbers の3項目が3レコードで格納されていることが分かります。

02-single-measure-record-test-2

データ項目が101件以上の場合

先程のサンプルコードで Amazon Timestream にどのようにデータが格納されるか基本的なことを確認できました。サンプルコードでは 3件のデータ項目でしたが下記のように項目が 101 個以上ある場合はどうなるでしょうか?

{
  "timestamp": "2021-04-28 01:14:49",
  "device_name": "hoge_device",
  "data_1": 13.453,
  "data_2": 20.6124,
  .
(中略)
  .
  "data_99": 11.723,
  "data_100": 81.153,
  "data_101": 12.267,
  "data_101": 29.694, 
  .
(中略)
  .
  "data_199": 26.632,
  "data_200": 91.124
}

Amazon Timestream の仕様では、「一度のリクエストで書き込めるのは最大100行まで」という制限があるので、シングルメジャーレコードで 100 件以上のを書き込みたい場合は、複数回に分けて書く必要があります。

01-single-measure-record

  • Records per WriteRecords API request
    • The maximum number of records in a WriteRecords API request. : 100

気になったこと

さて、サービスの仕様から「101 件以上のレコードを書き込みたい」場合は 2 回以上に分けて書き込めばいいことは分かりました。このとき次のようなことが気になりました。

「100件書き込みを行う場合」と「200件書き込みを行う場合」で、書き込み時間が倍にならないか?

検証してみた

調べてもよく分からなかったので、実際に書き込みを行って簡単な検証をしてみることにしました。

200 件書き込みするサンプルコード

先程のコードを元に下記のような検証スクリプト(write_single_measure.py)を用意しました。

import boto3
from botocore.config import Config
import time
import random

session = boto3.Session()
write_client = session.client('timestream-write', region_name='us-east-1',
                              config=Config(read_timeout=20,    # リクエストタイムアウト(秒)
                              max_pool_connections=5000,        # 最大接続数
                              retries={'max_attempts': 10}))    # 最大試行回数

DatabaseName='mytestdb'
TableName='mytesttbl'

def current_milli_time():
    return round(time.time() * 1000)

def gen_dimensions():
    municipalities = random.choice(['Shinjuku', 'Toshima', 'Nakano', 'Ota', 'Chiyoda'])
    gateway_id = random.choice(['gateway_1', 'gateway_2', 'gateway_3'])
    device_name = str(municipalities) + '_' + str(gateway_id)
    dimensions = [
        {'Name': 'Location', 'Value': 'Tokyo'},
        {'Name': 'Municipalities', 'Value': municipalities},
        {'Name': 'DeviceName', 'Value': device_name}
    ]
    return dimensions

def gen_dummy_measure():
    records_X = []
    for measure_num in range(0,100): # 何件のレコードを作成するか ;100件 (0,100) , 最大100 
        MeasureValue = str(random.uniform(1, 90))
        dummy_measure_name = 'data_' + str(measure_num)
        dummy_measure = {
            'Dimensions': gen_dimensions(),
            'MeasureName': dummy_measure_name,
            'MeasureValue': MeasureValue,
            'MeasureValueType': 'DOUBLE',
            'Time': current_time
        }
        records_X.append(dummy_measure)
    return records_X

for write_num in range(0,2): # 生成したレコードを何回書き込むか
    current_time = str(current_milli_time())
    print ("start write_records...: " + str(write_num))
    result = write_client.write_records(DatabaseName=DatabaseName,
                                        TableName=TableName,
                                        Records=gen_dummy_measure(), CommonAttributes={})

想定する環境は下記のとおりです。

  • 都内にある IoT ゲートウェイからのデータを想定
  • 一度に送られてくるデータ項目は 100 件以上

上記のコードでは、31 行目で 100 件のデータ(レコード)を作成しています。また 44 行目で 100 レコードの作成を 2 回行い、100 件ずつ 2 回に分けてデータを書き込みを行います。これにより「 100 件以上( 200 件 )のセンサーデータを書き込む」状況を再現しています。

AWS Cloudshell で書き込み

Amazon Timestream はまだ東京リージョンにないので「N.Virginia」リージョンを使います。事前に適当なデータベース、テーブルを作っておきます。
また、今回は同じ「N.Virginia」リージョンの AWS Cloudshell を使って先程のコードを実行しようと思うので、その準備を行います。(手元の PC から書き込むと距離的なレイテンシが発生するため)

AWS Cloudshell を開いて boto3 をアップデートしておきます。

$ curl -kL https://bootstrap.pypa.io/get-pip.py | python3
$ pip install -U boto3

アップデート後のバージョンは下記の通りです。

$ pip show boto3

Name: boto3
Version: 1.20.48
Summary: The AWS SDK for Python
Home-page: https://github.com/boto/boto3
Author: Amazon Web Services
Author-email: 
License: Apache License 2.0
Location: /home/cloudshell-user/.local/lib/python3.7/site-packages
Requires: botocore, jmespath, s3transfer
Required-by: aws-sam-cli, aws-sam-translator, serverlessrepo

次に、先程のコードを適当な名前(write_single_measure.py)を付けて AWS Cloudshell に保存します。ファイルをアップロードする方法や、vi 等で直接書き込むなど好きな形で対応してください。
vi でコピペするとインデントがずれるので、コピペする前にコマンドモードで:set pasteを実行しておきます。)

コードを実行してみる

単純にスクリプトを実行するだけではつまらないので、下記のパターンで書き込みを試してみたいと思います。
書き込むレコード数を変えて、それぞれのリクエストのレイテンシがどのように変わるか確認してみます。

  • ① 2回のリクエストで 100 行ずつのレコードを書き込む処理を 10 回繰り返す。
    • これを 1 分間隔で 30 分間実行する
  • ② 1回のリクエストで 100 行のレコードを書き込む処理を 10 回繰り返す。
    • これを 1 分間隔で 30 分間実行する

最初に①を行います。下記のシェルスクリプト(write_record.sh)を使って30分間の書き込みを行うので、 これも AWS Cloudshell に保存しておきます。

#!/bin/bash
for count in {1..30}
do
    # write single measure record
    for i in {1..10}
    do
        python3 write_single_measure.py
    done
    echo "sleep 60..."
    sleep 60
done

準備ができたらシェルスクリプトを実行して 30 分待機します。

$ sh write_record.sh

次に先程のサンプルコード(write_single_measure.py)を下記の様に修正して②を実行します。

for write_num in range(0,2): 
↓
for write_num in range(0,1):

修正できたら再度シェルスクリプトを実行します。

$ sh write_record.sh

CloudWatch で確認してみる

CloudWatch では「SuccessfulRequestLatency」というメトリクスを確認してみます。

下記はそのグラフです。今回はスクリプトの実行に 30分かかるので、時間短縮のために別データベースに同時に書き込みを行いました。それぞれの色の意味は下記です。

  • オレンジ色:「① 2回のリクエストで 100 レコードずつ書き込む処理を10回」行ったグラフ
  • 青色:「② 1回のリクエストで 100 レコードを書き込む処理を10回」行ったグラフ

また、上側のグラフはプロットする間隔を「1分」にしたもので、下側は「5分」にしたものになります。(統計はいずれも「平均」です)

03-single-measure-record-cloudwatch-1m5m

次のグラフは初回書き込み以降の部分だけ抽出したものです。

05-single-measure-record-after-first-write

テーブルを作り直したり、一晩何もしないで翌朝同じ書き込みを試すなど、何度か同じことを繰り返してみた所、下記のような動きになりました。

  • 初回書き込み時は SuccessfulRequestLatency の値が高い
  • 2回目以降は SuccessfulRequestLatency の値が徐々に下がり安定した
  • 200 レコード書き込む方が SuccessfulRequestLatencyの値が高いが最大でも 5ms 程度の違い

今回検証したボリュームでは、ほとんど書き込み速度などに違いは見られませんでしたが、書き込むクライアントの実装やボリューム次第で結果は変わるかと思いますので、ご利用される際は事前に自分のユースケースに沿って検証されるのがよいかと思います。

1回のリクエストで101件以上のレコード書き込みをしてみる

仕様として「1回のリクエストで101件以上のレコード書き込み」はエラーになりますが、実際に試してみました。200 レコードを書き込んだサンプルコードの31 行目を for measure_num in range(0,101) として保存します。

修正したスクリプトを実行してみると下記のエラーが返ってきました。仕様どおり「1回のリクエストで書き込めるレコード数の最大値は 100」であることが分かりました。
(見やすさのため改行しています。)

version=null, measureValues=null)]' at 'records' failed to satisfy constraint: 
Member must have length less than or equal to 100

最後に

今回は、従来からある「シングルメジャーレコード」として書き込みを行いました。これは「1レコードに1つのデータ項目」を書く方法になります。そのため、今回のように項目が多い場合その分レコード数も増える形となります。

これを改善する方法として、2021年11月29日リリースされた「マルチメジャーレコード」を使う方法があります。マルチメジャーレコードを使うと「複数のデータ項目を1つのレコード」として書き込めるようになり、コスト効率も上がります。

次回は、「マルチメジャーレコード」による書き込みを試してみたいと思います。