Amazon Timestreamでマルチメジャーレコードの検索をしてみた

Amazon Timestreamでマルチメジャーレコードの検索をしてみた

Amazon Timestreamでマルチメジャーレコードの検索をしてみました。
Clock Icon2025.04.09

AWSには、マネージド型の時系列データベースとして、Amazon Timestreamがあります。PLCやセンサーなどの時系列データに向いています。

Amazon Timestreamに保存された時系列データに対して、いくつかのクエリを投げて確認してみました。

おすすめの方

  • Amazon Timestreamにboto3でデータを書き込みたい方
  • Amazon Timestreamのクエリサンプルを知りたい方

本記事のお試しデータ

2つのデバイスがあり、それぞれに3つのセンサーが存在する構成とします。各センサーは、温度と湿度を計測できます。

  • デバイス1

    • センサー1
    • センサー2
    • センサー3
  • デバイス2

    • センサー1
    • センサー2
    • センサー3

それぞれのセンサーから、1日1回、5日間のデータが送信されたものとします。つまり、データ件数は、全30件です(2 x 3 x 5)。

データのサンプル

デバイス1のセンサー1の温度・湿度データのサンプルです。

{
  "Dimensions": [
    {
      "Name": "deviceId",
      "Value": "device0001",
      "DimensionValueType": "VARCHAR"
    },
    {
      "Name": "sensorId",
      "Value": "sensor0001",
      "DimensionValueType": "VARCHAR"
    }
  ],
  "MeasureValueType": "MULTI",
  "MeasureName": "sensorData",
  "Time": "1744070400000",
  "TimeUnit": "MILLISECONDS",
  "MeasureValues": [
    {
      "Name": "temperature",
      "Value": "11.0",
      "Type": "DOUBLE"
    },
    {
      "Name": "humidity",
      "Value": "21.0",
      "Type": "DOUBLE"
    }
  ]
}

Amazon TimestreamをCloudFormationで作成する

テンプレートファイル

cfn.yaml
AWSTemplateFormatVersion: "2010-09-09"
Description: Amazon Timestream Sample

Resources:
  TimestreamDatabase:
    Type: AWS::Timestream::Database
    Properties:
      DatabaseName: CfnSampleDatabase

  TimestreamTable:
    Type: AWS::Timestream::Table
    Properties:
      DatabaseName: !Ref TimestreamDatabase
      TableName: cfn-sample-table
      RetentionProperties:
        MemoryStoreRetentionPeriodInHours: 24  # 1 days
        MagneticStoreRetentionPeriodInDays: 30  # 30 days
      MagneticStoreWriteProperties:
        EnableMagneticStoreWrites: true # マグネティックストアへの書き込みを有効にする

デプロイ

aws cloudformation deploy \
    --template-file cfn.yaml \
    --stack-name Amazon-Timestream-Sample-Stack

Amazon Timestreamにマルチメジャーレコードを書き込む

スクリプト

センサーごとに5日分(1日1回)のデータを作成し、Amazon Timestreamに書き込みます。

import boto3
import random
import json

from datetime import datetime, timedelta, timezone

from botocore.exceptions import ClientError

TIMESTREAM_DATABASE_NAME = "CfnSampleDatabase"
TIMESTREAM_TABLE_NAME = "cfn-sample-table"

client = boto3.client("timestream-write")

def main():
    JST = timezone(timedelta(hours=9), "JST")
    base_datatime = datetime(2025, 4, 8, 9, 0, 0, tzinfo=JST)

    records = []

    # デバイス1〜2
    for device_number in range(1, 3):
        # センサー1〜3
        for sensor_number in range(1, 4):
            # 5日分のデータ
            for day in range(5):
                records.append(
                    make_record(
                        int((base_datatime - timedelta(days=day)).timestamp() * 1000),
                        f"device{device_number:04d}",
                        f"sensor{sensor_number:04d}",
                        device_number * 10 + sensor_number + day / 10,
                        device_number * 20 + sensor_number + day / 10,
                    )
                )

    # Timestreamに書き込む(書き込み制限が最大100件のため、分割している)
    for i in range(0, len(records), 100):
        write_records(
            TIMESTREAM_DATABASE_NAME,
            TIMESTREAM_TABLE_NAME,
            records[i : i + 100],
        )

def make_record(unixtime, device_id, sensor_id, temperature, humidity):
    return {
        "Dimensions": [
            {"Name": "deviceId", "Value": device_id, "DimensionValueType": "VARCHAR"},
            {"Name": "sensorId", "Value": sensor_id, "DimensionValueType": "VARCHAR"},
        ],
        "MeasureValueType": "MULTI",  # マルチメジャーレコード
        "MeasureName": "sensorData",
        "Time": str(unixtime),
        "TimeUnit": "MILLISECONDS",
        "MeasureValues": [
            {
                "Name": "temperature",
                "Value": str(temperature),
                "Type": "DOUBLE",
            },
            {
                "Name": "humidity",
                "Value": str(humidity),
                "Type": "DOUBLE",
            },
        ],
    }

def write_records(database_name, table_name, records):
    try:
        resp = client.write_records(
            DatabaseName=database_name,
            TableName=table_name,
            Records=records,
        )
    except ClientError as e:
        if e.response["Error"]["Code"] == "RejectedRecordsException":
            print(e.response["Error"]["Message"])
            print(json.dumps(e.response["RejectedRecords"], indent=2))
        else:
            print(f"error: {e}")
        raise e

    print(resp)

if __name__ == "__main__":
    main()

マルチメジャーレコードの検索をする

とりあえず全件取得する

SELECT *
FROM "CfnSampleDatabase"."cfn-sample-table"
ORDER BY time ASC, deviceId ASC, sensorId ASC

30件のデータが得られました。

01_timestream_query

デバイス1のデータを取得する

SELECT *
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE deviceId='device0001'
ORDER BY time ASC, deviceId ASC, sensorId ASC

02_timestream_query

デバイス1のセンサー1だけを取得する

SELECT *
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE deviceId='device0001' AND sensorId='sensor0001'
ORDER BY time ASC, deviceId ASC, sensorId ASC

03_timestream_query

デバイスごとに最新データの日時を取得する

SELECT deviceId, MAX(time) AS latest_timestamp
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE measure_name='sensorData'
GROUP BY deviceId
ORDER BY latest_timestamp DESC, deviceId ASC

04_timestream_query

デバイスのセンサーごとに最新データの日時を取得する

SELECT deviceId, sensorId, MAX(time) AS latest_timestamp
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE measure_name='sensorData'
GROUP BY deviceId, sensorId
ORDER BY latest_timestamp DESC, deviceId ASC, sensorId ASC

05_timestream_query

デバイスのセンサーごとに最新データの値を取得する

SELECT deviceId, sensorId,
	MAX_BY(time, time) AS latest_timestamp,
	MAX_BY(temperature, time) AS latest_temperature,
	MAX_BY(humidity, time) AS latest_humidity
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE measure_name='sensorData'
GROUP BY deviceId, sensorId
ORDER BY latest_timestamp DESC, deviceId ASC, sensorId ASC

06_timestream_query

デバイスのセンサーごとに最も古いデータの値を取得する

SELECT deviceId, sensorId,
	MIN_BY(time, time) AS oldest_timestanp,
	MIN_BY(temperature, time) AS oldest_temperature,
	MIN_BY(humidity, time) AS oldest_humidity
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE measure_name='sensorData'
GROUP BY deviceId, sensorId
ORDER BY oldest_timestanp DESC, deviceId ASC, sensorId ASC

07_timestream_query

デバイスのセンサーごとに、過去7日間における最大値・最小値・平均値を取得する

SELECT deviceId, sensorId, 
	MAX(temperature) AS temperature_max,
	MIN(temperature) AS temperature_min,
	ROUND(AVG(temperature), 1) AS temperature_avg,
	MAX(humidity) AS humidity_max,
	MIN(humidity) AS humidity_min,
	ROUND(AVG(humidity), 1) AS humidity_avg
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE measure_name='sensorData' AND time >= ago(7day)
GROUP BY deviceId, sensorId
ORDER BY deviceId ASC, sensorId ASC

08_timestream_query

デバイスのセンサーごとに、過去3日間における最大値・最小値・平均値を取得する

SELECT deviceId, sensorId, 
	MAX(temperature) AS temperature_max,
	MIN(temperature) AS temperature_min,
	ROUND(AVG(temperature), 1) AS temperature_avg,
	MAX(humidity) AS humidity_max,
	MIN(humidity) AS humidity_min,
	ROUND(AVG(humidity), 1) AS humidity_avg
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE measure_name='sensorData' AND time >= ago(3day)
GROUP BY deviceId, sensorId
ORDER BY deviceId ASC, sensorId ASC

09_timestream_query

デバイスのセンサーごとに、過去7日間のうち、3日ごとの最大値・最小値・平均値を取得する

SELECT deviceId, sensorId, 
	bin(time, 3day) AS time_bin,
    MAX(time) AS bin_end_time,
    MIN(time) AS bin_begin_time,
	MAX(temperature) AS temperature_max,
	MIN(temperature) AS temperature_min,
	ROUND(AVG(temperature), 1) AS temperature_avg,
	MAX(humidity) AS humidity_max,
	MIN(humidity) AS humidity_min,
	ROUND(AVG(humidity), 1) AS humidity_avg
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE measure_name='sensorData' AND time >= ago(7day)
GROUP BY deviceId, sensorId, bin(time, 3day)
ORDER BY deviceId ASC, sensorId ASC

10_timestream_query

さいごに

様々なSQL関数があるので、便利に利用できるシーンが多そうです。参考になれば幸いです。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.