Amazon Timestreamでマルチメジャーレコードの検索をしてみた
Amazon Timestreamでマルチメジャーレコードの検索をしてみました。
AWSには、マネージド型の時系列データベースとして、Amazon Timestreamがあります。PLCやセンサーなどの時系列データに向いています。
Amazon Timestreamに保存された時系列データに対して、いくつかのクエリを投げて確認してみました。
おすすめの方
- Amazon Timestreamにboto3でデータを書き込みたい方
- Amazon Timestreamのクエリサンプルを知りたい方
本記事のお試しデータ
2つのデバイスがあり、それぞれに3つのセンサーが存在する構成とします。各センサーは、温度と湿度を計測できます。
-
デバイス1
- センサー1
- センサー2
- センサー3
-
デバイス2
- センサー1
- センサー2
- センサー3
それぞれのセンサーから、1日1回、5日間のデータが送信されたものとします。つまり、データ件数は、全30件です(2 x 3 x 5)。
データのサンプル
デバイス1のセンサー1の温度・湿度データのサンプルです。
{
"Dimensions": [
{
"Name": "deviceId",
"Value": "device0001",
"DimensionValueType": "VARCHAR"
},
{
"Name": "sensorId",
"Value": "sensor0001",
"DimensionValueType": "VARCHAR"
}
],
"MeasureValueType": "MULTI",
"MeasureName": "sensorData",
"Time": "1744070400000",
"TimeUnit": "MILLISECONDS",
"MeasureValues": [
{
"Name": "temperature",
"Value": "11.0",
"Type": "DOUBLE"
},
{
"Name": "humidity",
"Value": "21.0",
"Type": "DOUBLE"
}
]
}
Amazon TimestreamをCloudFormationで作成する
テンプレートファイル
cfn.yaml
AWSTemplateFormatVersion: "2010-09-09"
Description: Amazon Timestream Sample
Resources:
TimestreamDatabase:
Type: AWS::Timestream::Database
Properties:
DatabaseName: CfnSampleDatabase
TimestreamTable:
Type: AWS::Timestream::Table
Properties:
DatabaseName: !Ref TimestreamDatabase
TableName: cfn-sample-table
RetentionProperties:
MemoryStoreRetentionPeriodInHours: 24 # 1 days
MagneticStoreRetentionPeriodInDays: 30 # 30 days
MagneticStoreWriteProperties:
EnableMagneticStoreWrites: true # マグネティックストアへの書き込みを有効にする
デプロイ
aws cloudformation deploy \
--template-file cfn.yaml \
--stack-name Amazon-Timestream-Sample-Stack
Amazon Timestreamにマルチメジャーレコードを書き込む
スクリプト
センサーごとに5日分(1日1回)のデータを作成し、Amazon Timestreamに書き込みます。
import boto3
import random
import json
from datetime import datetime, timedelta, timezone
from botocore.exceptions import ClientError
TIMESTREAM_DATABASE_NAME = "CfnSampleDatabase"
TIMESTREAM_TABLE_NAME = "cfn-sample-table"
client = boto3.client("timestream-write")
def main():
JST = timezone(timedelta(hours=9), "JST")
base_datatime = datetime(2025, 4, 8, 9, 0, 0, tzinfo=JST)
records = []
# デバイス1〜2
for device_number in range(1, 3):
# センサー1〜3
for sensor_number in range(1, 4):
# 5日分のデータ
for day in range(5):
records.append(
make_record(
int((base_datatime - timedelta(days=day)).timestamp() * 1000),
f"device{device_number:04d}",
f"sensor{sensor_number:04d}",
device_number * 10 + sensor_number + day / 10,
device_number * 20 + sensor_number + day / 10,
)
)
# Timestreamに書き込む(書き込み制限が最大100件のため、分割している)
for i in range(0, len(records), 100):
write_records(
TIMESTREAM_DATABASE_NAME,
TIMESTREAM_TABLE_NAME,
records[i : i + 100],
)
def make_record(unixtime, device_id, sensor_id, temperature, humidity):
return {
"Dimensions": [
{"Name": "deviceId", "Value": device_id, "DimensionValueType": "VARCHAR"},
{"Name": "sensorId", "Value": sensor_id, "DimensionValueType": "VARCHAR"},
],
"MeasureValueType": "MULTI", # マルチメジャーレコード
"MeasureName": "sensorData",
"Time": str(unixtime),
"TimeUnit": "MILLISECONDS",
"MeasureValues": [
{
"Name": "temperature",
"Value": str(temperature),
"Type": "DOUBLE",
},
{
"Name": "humidity",
"Value": str(humidity),
"Type": "DOUBLE",
},
],
}
def write_records(database_name, table_name, records):
try:
resp = client.write_records(
DatabaseName=database_name,
TableName=table_name,
Records=records,
)
except ClientError as e:
if e.response["Error"]["Code"] == "RejectedRecordsException":
print(e.response["Error"]["Message"])
print(json.dumps(e.response["RejectedRecords"], indent=2))
else:
print(f"error: {e}")
raise e
print(resp)
if __name__ == "__main__":
main()
マルチメジャーレコードの検索をする
とりあえず全件取得する
SELECT *
FROM "CfnSampleDatabase"."cfn-sample-table"
ORDER BY time ASC, deviceId ASC, sensorId ASC
30件のデータが得られました。
デバイス1のデータを取得する
SELECT *
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE deviceId='device0001'
ORDER BY time ASC, deviceId ASC, sensorId ASC
デバイス1のセンサー1だけを取得する
SELECT *
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE deviceId='device0001' AND sensorId='sensor0001'
ORDER BY time ASC, deviceId ASC, sensorId ASC
デバイスごとに最新データの日時を取得する
SELECT deviceId, MAX(time) AS latest_timestamp
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE measure_name='sensorData'
GROUP BY deviceId
ORDER BY latest_timestamp DESC, deviceId ASC
デバイスのセンサーごとに最新データの日時を取得する
SELECT deviceId, sensorId, MAX(time) AS latest_timestamp
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE measure_name='sensorData'
GROUP BY deviceId, sensorId
ORDER BY latest_timestamp DESC, deviceId ASC, sensorId ASC
デバイスのセンサーごとに最新データの値を取得する
SELECT deviceId, sensorId,
MAX_BY(time, time) AS latest_timestamp,
MAX_BY(temperature, time) AS latest_temperature,
MAX_BY(humidity, time) AS latest_humidity
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE measure_name='sensorData'
GROUP BY deviceId, sensorId
ORDER BY latest_timestamp DESC, deviceId ASC, sensorId ASC
デバイスのセンサーごとに最も古いデータの値を取得する
SELECT deviceId, sensorId,
MIN_BY(time, time) AS oldest_timestanp,
MIN_BY(temperature, time) AS oldest_temperature,
MIN_BY(humidity, time) AS oldest_humidity
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE measure_name='sensorData'
GROUP BY deviceId, sensorId
ORDER BY oldest_timestanp DESC, deviceId ASC, sensorId ASC
デバイスのセンサーごとに、過去7日間における最大値・最小値・平均値を取得する
SELECT deviceId, sensorId,
MAX(temperature) AS temperature_max,
MIN(temperature) AS temperature_min,
ROUND(AVG(temperature), 1) AS temperature_avg,
MAX(humidity) AS humidity_max,
MIN(humidity) AS humidity_min,
ROUND(AVG(humidity), 1) AS humidity_avg
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE measure_name='sensorData' AND time >= ago(7day)
GROUP BY deviceId, sensorId
ORDER BY deviceId ASC, sensorId ASC
デバイスのセンサーごとに、過去3日間における最大値・最小値・平均値を取得する
SELECT deviceId, sensorId,
MAX(temperature) AS temperature_max,
MIN(temperature) AS temperature_min,
ROUND(AVG(temperature), 1) AS temperature_avg,
MAX(humidity) AS humidity_max,
MIN(humidity) AS humidity_min,
ROUND(AVG(humidity), 1) AS humidity_avg
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE measure_name='sensorData' AND time >= ago(3day)
GROUP BY deviceId, sensorId
ORDER BY deviceId ASC, sensorId ASC
デバイスのセンサーごとに、過去7日間のうち、3日ごとの最大値・最小値・平均値を取得する
SELECT deviceId, sensorId,
bin(time, 3day) AS time_bin,
MAX(time) AS bin_end_time,
MIN(time) AS bin_begin_time,
MAX(temperature) AS temperature_max,
MIN(temperature) AS temperature_min,
ROUND(AVG(temperature), 1) AS temperature_avg,
MAX(humidity) AS humidity_max,
MIN(humidity) AS humidity_min,
ROUND(AVG(humidity), 1) AS humidity_avg
FROM "CfnSampleDatabase"."cfn-sample-table"
WHERE measure_name='sensorData' AND time >= ago(7day)
GROUP BY deviceId, sensorId, bin(time, 3day)
ORDER BY deviceId ASC, sensorId ASC
さいごに
様々なSQL関数があるので、便利に利用できるシーンが多そうです。参考になれば幸いです。