Amazon Timestreamに約473万件のデータを書き込んだり、検索をしてみた

Amazon Timestreamに約473万件のデータを書き込んだり、検索をしてみた

データ移行や、大量データの検索などを想定して試してみました。
Clock Icon2025.04.28

AWSには、マネージド型の時系列データベースとして、Amazon Timestreamがあります。PLCやセンサーなどの時系列データに向いています。

たとえばDynamoDBテーブルからのデータ移行や、大量データの検索などを想定して試してみました。

おすすめの方

  • タイトルについて知りたい方
  • Amazon Timestreamにboto3でデータを書き込みたい方
  • Amazon Timestreamのクエリサンプルを知りたい方

Amazon TimestreamをCloudFormationで作成する

テンプレートファイル

cfn.yaml
AWSTemplateFormatVersion: "2010-09-09"
Description: Amazon Timestream Many Data Sample

Resources:
  TimestreamDatabase:
    Type: AWS::Timestream::Database
    Properties:
      DatabaseName: ManyDataSampleDatabase

  TimestreamTable:
    Type: AWS::Timestream::Table
    Properties:
      DatabaseName: !Ref TimestreamDatabase
      TableName: many-data-sample-table
      RetentionProperties:
        MemoryStoreRetentionPeriodInHours: 168  # 7 days
        MagneticStoreRetentionPeriodInDays: 1825  # 5 year
      MagneticStoreWriteProperties:
        EnableMagneticStoreWrites: true # マグネティックストアへの書き込みを有効にする

デプロイ

aws cloudformation deploy \
  --template-file cfn.yaml \
  --stack-name Amazon-Timestream-Many-Data-Sample-Stack

Amazon Timestreamに約473万件のデータを書き込む

本記事のお試しデータ

3つのデバイスがあり、それぞれに6つのセンサーが存在する構成とします。各センサーは、温度と湿度を計測できます。

  • デバイス1
    • センサー1
    • センサー2
    • センサー3
    • センサー4
    • センサー5
    • センサー6
  • デバイス2
    • センサー1
    • センサー2
    • センサー3
    • センサー4
    • センサー5
    • センサー6
  • デバイス3
    • センサー1
    • センサー2
    • センサー3
    • センサー4
    • センサー5
    • センサー6

それぞれのセンサーから、10分毎に5年間、データを受信した想定です。つまり、データ件数は4730400です。

  • 3 x 6 x 6 x 24 x 365 x 5 = 4730400

データのサンプル

{
  "Dimensions": [
    {
      "Name": "deviceId",
      "Value": "device0001",
      "DimensionValueType": "VARCHAR"
    },
    {
      "Name": "sensorId",
      "Value": "sensor0001",
      "DimensionValueType": "VARCHAR"
    }
  ],
  "MeasureValueType": "MULTI",
  "MeasureName": "sensorData",
  "Time": "1745798400000",
  "TimeUnit": "MILLISECONDS",
  "MeasureValues": [
    {
      "Name": "temperature",
      "Value": "25.8",
      "Type": "DOUBLE"
    },
    {
      "Name": "humidity",
      "Value": "14.1",
      "Type": "DOUBLE"
    }
  ]
}

スクリプト

それぞれのセンサーごとに5年分のデータを作成し、Amazon Timestreamに書き込みます。

import boto3
import random
import json

from datetime import datetime, timedelta, timezone

from botocore.exceptions import ClientError

TIMESTREAM_DATABASE_NAME = "ManyDataSampleDatabase"
TIMESTREAM_TABLE_NAME = "many-data-sample-table"

client = boto3.client("timestream-write")

def main():
    JST = timezone(timedelta(hours=9), "JST")
    base_datatime = datetime(2025, 4, 28, 9, 0, 0, tzinfo=JST)
    base_unixtime = int(base_datatime.timestamp() * 1000)

    records = []

    # デバイス1〜3
    for device_number in range(1, 4):
        # センサー1〜6
        for sensor_number in range(1, 7):
            # 5年分のデータ(10分毎)
            for count in range(6 * 24 * 365 * 5):
                records.append(
                    make_record(
                        base_unixtime - count * 10 * 60 * 1000,  # 10分毎にする
                        f"device{device_number:04d}",
                        f"sensor{sensor_number:04d}",
                        round(random.uniform(0, 40), 1),  # 温度の乱数
                        round(random.uniform(0, 100), 1),  # 湿度の乱数
                    )
                )

    # Timestreamに書き込む(書き込み制限が最大100件のため、分割している)
    for i in range(0, len(records), 100):
        write_records(
            TIMESTREAM_DATABASE_NAME,
            TIMESTREAM_TABLE_NAME,
            records[i : i + 100],
        )

def make_record(unixtime, device_id, sensor_id, temperature, humidity):
    return {
        "Dimensions": [
            {"Name": "deviceId", "Value": device_id, "DimensionValueType": "VARCHAR"},
            {"Name": "sensorId", "Value": sensor_id, "DimensionValueType": "VARCHAR"},
        ],
        "MeasureValueType": "MULTI",  # マルチメジャーレコード
        "MeasureName": "sensorData",
        "Time": str(unixtime),
        "TimeUnit": "MILLISECONDS",
        "MeasureValues": [
            {
                "Name": "temperature",
                "Value": str(temperature),
                "Type": "DOUBLE",
            },
            {
                "Name": "humidity",
                "Value": str(humidity),
                "Type": "DOUBLE",
            },
        ],
    }

def write_records(database_name, table_name, records):
    try:
        resp = client.write_records(
            DatabaseName=database_name,
            TableName=table_name,
            Records=records,
        )
    except ClientError as e:
        if e.response["Error"]["Code"] == "RejectedRecordsException":
            print(e.response["Error"]["Message"])
            print(json.dumps(e.response["RejectedRecords"], indent=2))
        else:
            print(f"error: {e}")
        raise e

    # print(resp)

if __name__ == "__main__":
    main()

スクリプトを実行する

約1時間12分ほど掛かりました。IAMロールの最大セッション時間が1時間の場合は、変更しておきましょう。元の値に戻すのも忘れずに。

$ time python app.py 

python app.py  678.10s user 102.04s system 17% cpu 1:12:53.22 total

Amazon Timestreamで検索する

参考までに3回実施して持続時間とスキャンされたバイト数も記載します。あくまでも参考です。

すべてのデータ件数を取得する

SELECT COUNT(*) AS total_records
FROM "ManyDataSampleDatabase"."many-data-sample-table"
  • 持続時間
    • 5.129 秒
    • 5.346 秒
    • 2.619 秒
  • スキャンされたバイト数
    • 198.50 MB

期待通りの件数でした。

01_timestream

デバイス1のデータ件数を取得する

SELECT COUNT(*) AS total_records
FROM "ManyDataSampleDatabase"."many-data-sample-table"
WHERE deviceId='device0001'
  • 持続時間
    • 1.190 秒
    • 2.469 秒
    • 1.284 秒
  • スキャンされたバイト数
    • 66.17 MB

期待通りの件数(全件の1/3)です。

02_timestream

デバイスごとに最新データの日時を取得する

SELECT deviceId, MAX(time) AS latest_timestamp
FROM "ManyDataSampleDatabase"."many-data-sample-table"
WHERE measure_name='sensorData'
GROUP BY deviceId
ORDER BY latest_timestamp DESC, deviceId ASC
  • 持続時間
    • 5.494 秒
    • 6.587 秒
    • 4.357 秒
  • スキャンされたバイト数
    • 198.50 MB

03_timestream

デバイスのセンサーごとに最新データの値を取得する

SELECT deviceId, sensorId,
	MAX_BY(time, time) AS latest_timestamp,
	MAX_BY(temperature, time) AS latest_temperature,
	MAX_BY(humidity, time) AS latest_humidity
FROM "ManyDataSampleDatabase"."many-data-sample-table"
WHERE measure_name='sensorData'
GROUP BY deviceId, sensorId
ORDER BY latest_timestamp DESC, deviceId ASC, sensorId ASC
  • 持続時間
    • 9.214 秒
    • 9.692 秒
    • 8.399 秒
  • スキャンされたバイト数
    • 198.50 MB

04_timestream

デバイスのセンサーごとに、過去30日間における最大値・最小値・平均値を取得する

SELECT deviceId, sensorId,
	MAX(temperature) AS temperature_max,
	MIN(temperature) AS temperature_min,
	ROUND(AVG(temperature), 1) AS temperature_avg,
	MAX(humidity) AS humidity_max,
	MIN(humidity) AS humidity_min,
	ROUND(AVG(humidity), 1) AS humidity_avg
FROM "ManyDataSampleDatabase"."many-data-sample-table"
WHERE measure_name='sensorData' AND time >= ago(30day)
GROUP BY deviceId, sensorId
ORDER BY deviceId ASC, sensorId ASC
  • 持続時間
    • 1.301 秒
    • 1.571 秒
    • 0.3590 秒
  • スキャンされたバイト数
    • 3.21 MB

ランダム値とはいえ、データ量が多いため、ほぼ最大値・ほぼ最小値になりました。

05_timestream

デバイスのセンサーごとに、過去365日間のうち、1日ごとの最大値・最小値・平均値を取得する

SELECT deviceId, sensorId, 
	bin(time, 1day) AS time_bin,
    MIN(time) AS bin_begin_time,
    MAX(time) AS bin_end_time,
	MAX(temperature) AS temperature_max,
	MIN(temperature) AS temperature_min,
	ROUND(AVG(temperature), 1) AS temperature_avg,
	MAX(humidity) AS humidity_max,
	MIN(humidity) AS humidity_min,
	ROUND(AVG(humidity), 1) AS humidity_avg
FROM "ManyDataSampleDatabase"."many-data-sample-table"
WHERE measure_name='sensorData' AND time >= ago(365day)
GROUP BY deviceId, sensorId, bin(time, 1day)
ORDER BY deviceId ASC, sensorId ASC, bin(time, 1day) ASC
  • 持続時間
    • 4.453 秒
    • 4.433 秒
    • 2.937 秒
  • スキャンされたバイト数
    • 39.65 MB

06_timestream

デバイスのセンサーごとに、過去365日間のうち、30日ごとの最大値・最小値・平均値を取得する

SELECT deviceId, sensorId, 
	bin(time, 30day) AS time_bin,
    MIN(time) AS bin_begin_time,
    MAX(time) AS bin_end_time,
	MAX(temperature) AS temperature_max,
	MIN(temperature) AS temperature_min,
	ROUND(AVG(temperature), 1) AS temperature_avg,
	MAX(humidity) AS humidity_max,
	MIN(humidity) AS humidity_min,
	ROUND(AVG(humidity), 1) AS humidity_avg
FROM "ManyDataSampleDatabase"."many-data-sample-table"
WHERE measure_name='sensorData' AND time >= ago(365day)
GROUP BY deviceId, sensorId, bin(time, 30day)
ORDER BY deviceId ASC, sensorId ASC, bin(time, 30day) ASC
  • 持続時間
    • 1.903 秒
    • 3.100 秒
    • 1.975 秒
  • スキャンされたバイト数
    • 39.65 MB

07_timestream

さいごに

大量データを想定して、いくつかの検索を実行してみました。初心者なので、もっと良いSQLがあるかもしれません。参考になれば幸いです。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.