Amazon Timestreamに約473万件のデータを書き込んだり、検索をしてみた
AWSには、マネージド型の時系列データベースとして、Amazon Timestreamがあります。PLCやセンサーなどの時系列データに向いています。
たとえばDynamoDBテーブルからのデータ移行や、大量データの検索などを想定して試してみました。
おすすめの方
- タイトルについて知りたい方
- Amazon Timestreamにboto3でデータを書き込みたい方
- Amazon Timestreamのクエリサンプルを知りたい方
Amazon TimestreamをCloudFormationで作成する
テンプレートファイル
AWSTemplateFormatVersion: "2010-09-09"
Description: Amazon Timestream Many Data Sample
Resources:
TimestreamDatabase:
Type: AWS::Timestream::Database
Properties:
DatabaseName: ManyDataSampleDatabase
TimestreamTable:
Type: AWS::Timestream::Table
Properties:
DatabaseName: !Ref TimestreamDatabase
TableName: many-data-sample-table
RetentionProperties:
MemoryStoreRetentionPeriodInHours: 168 # 7 days
MagneticStoreRetentionPeriodInDays: 1825 # 5 year
MagneticStoreWriteProperties:
EnableMagneticStoreWrites: true # マグネティックストアへの書き込みを有効にする
デプロイ
aws cloudformation deploy \
--template-file cfn.yaml \
--stack-name Amazon-Timestream-Many-Data-Sample-Stack
Amazon Timestreamに約473万件のデータを書き込む
本記事のお試しデータ
3つのデバイスがあり、それぞれに6つのセンサーが存在する構成とします。各センサーは、温度と湿度を計測できます。
- デバイス1
- センサー1
- センサー2
- センサー3
- センサー4
- センサー5
- センサー6
- デバイス2
- センサー1
- センサー2
- センサー3
- センサー4
- センサー5
- センサー6
- デバイス3
- センサー1
- センサー2
- センサー3
- センサー4
- センサー5
- センサー6
それぞれのセンサーから、10分毎に5年間、データを受信した想定です。つまり、データ件数は4730400です。
- 3 x 6 x 6 x 24 x 365 x 5 = 4730400
データのサンプル
{
"Dimensions": [
{
"Name": "deviceId",
"Value": "device0001",
"DimensionValueType": "VARCHAR"
},
{
"Name": "sensorId",
"Value": "sensor0001",
"DimensionValueType": "VARCHAR"
}
],
"MeasureValueType": "MULTI",
"MeasureName": "sensorData",
"Time": "1745798400000",
"TimeUnit": "MILLISECONDS",
"MeasureValues": [
{
"Name": "temperature",
"Value": "25.8",
"Type": "DOUBLE"
},
{
"Name": "humidity",
"Value": "14.1",
"Type": "DOUBLE"
}
]
}
スクリプト
それぞれのセンサーごとに5年分のデータを作成し、Amazon Timestreamに書き込みます。
import boto3
import random
import json
from datetime import datetime, timedelta, timezone
from botocore.exceptions import ClientError
TIMESTREAM_DATABASE_NAME = "ManyDataSampleDatabase"
TIMESTREAM_TABLE_NAME = "many-data-sample-table"
client = boto3.client("timestream-write")
def main():
JST = timezone(timedelta(hours=9), "JST")
base_datatime = datetime(2025, 4, 28, 9, 0, 0, tzinfo=JST)
base_unixtime = int(base_datatime.timestamp() * 1000)
records = []
# デバイス1〜3
for device_number in range(1, 4):
# センサー1〜6
for sensor_number in range(1, 7):
# 5年分のデータ(10分毎)
for count in range(6 * 24 * 365 * 5):
records.append(
make_record(
base_unixtime - count * 10 * 60 * 1000, # 10分毎にする
f"device{device_number:04d}",
f"sensor{sensor_number:04d}",
round(random.uniform(0, 40), 1), # 温度の乱数
round(random.uniform(0, 100), 1), # 湿度の乱数
)
)
# Timestreamに書き込む(書き込み制限が最大100件のため、分割している)
for i in range(0, len(records), 100):
write_records(
TIMESTREAM_DATABASE_NAME,
TIMESTREAM_TABLE_NAME,
records[i : i + 100],
)
def make_record(unixtime, device_id, sensor_id, temperature, humidity):
return {
"Dimensions": [
{"Name": "deviceId", "Value": device_id, "DimensionValueType": "VARCHAR"},
{"Name": "sensorId", "Value": sensor_id, "DimensionValueType": "VARCHAR"},
],
"MeasureValueType": "MULTI", # マルチメジャーレコード
"MeasureName": "sensorData",
"Time": str(unixtime),
"TimeUnit": "MILLISECONDS",
"MeasureValues": [
{
"Name": "temperature",
"Value": str(temperature),
"Type": "DOUBLE",
},
{
"Name": "humidity",
"Value": str(humidity),
"Type": "DOUBLE",
},
],
}
def write_records(database_name, table_name, records):
try:
resp = client.write_records(
DatabaseName=database_name,
TableName=table_name,
Records=records,
)
except ClientError as e:
if e.response["Error"]["Code"] == "RejectedRecordsException":
print(e.response["Error"]["Message"])
print(json.dumps(e.response["RejectedRecords"], indent=2))
else:
print(f"error: {e}")
raise e
# print(resp)
if __name__ == "__main__":
main()
スクリプトを実行する
約1時間12分ほど掛かりました。IAMロールの最大セッション時間が1時間の場合は、変更しておきましょう。元の値に戻すのも忘れずに。
$ time python app.py
python app.py 678.10s user 102.04s system 17% cpu 1:12:53.22 total
Amazon Timestreamで検索する
参考までに3回実施して持続時間とスキャンされたバイト数も記載します。あくまでも参考です。
すべてのデータ件数を取得する
SELECT COUNT(*) AS total_records
FROM "ManyDataSampleDatabase"."many-data-sample-table"
- 持続時間
- 5.129 秒
- 5.346 秒
- 2.619 秒
- スキャンされたバイト数
- 198.50 MB
期待通りの件数でした。
デバイス1のデータ件数を取得する
SELECT COUNT(*) AS total_records
FROM "ManyDataSampleDatabase"."many-data-sample-table"
WHERE deviceId='device0001'
- 持続時間
- 1.190 秒
- 2.469 秒
- 1.284 秒
- スキャンされたバイト数
- 66.17 MB
期待通りの件数(全件の1/3)です。
デバイスごとに最新データの日時を取得する
SELECT deviceId, MAX(time) AS latest_timestamp
FROM "ManyDataSampleDatabase"."many-data-sample-table"
WHERE measure_name='sensorData'
GROUP BY deviceId
ORDER BY latest_timestamp DESC, deviceId ASC
- 持続時間
- 5.494 秒
- 6.587 秒
- 4.357 秒
- スキャンされたバイト数
- 198.50 MB
デバイスのセンサーごとに最新データの値を取得する
SELECT deviceId, sensorId,
MAX_BY(time, time) AS latest_timestamp,
MAX_BY(temperature, time) AS latest_temperature,
MAX_BY(humidity, time) AS latest_humidity
FROM "ManyDataSampleDatabase"."many-data-sample-table"
WHERE measure_name='sensorData'
GROUP BY deviceId, sensorId
ORDER BY latest_timestamp DESC, deviceId ASC, sensorId ASC
- 持続時間
- 9.214 秒
- 9.692 秒
- 8.399 秒
- スキャンされたバイト数
- 198.50 MB
デバイスのセンサーごとに、過去30日間における最大値・最小値・平均値を取得する
SELECT deviceId, sensorId,
MAX(temperature) AS temperature_max,
MIN(temperature) AS temperature_min,
ROUND(AVG(temperature), 1) AS temperature_avg,
MAX(humidity) AS humidity_max,
MIN(humidity) AS humidity_min,
ROUND(AVG(humidity), 1) AS humidity_avg
FROM "ManyDataSampleDatabase"."many-data-sample-table"
WHERE measure_name='sensorData' AND time >= ago(30day)
GROUP BY deviceId, sensorId
ORDER BY deviceId ASC, sensorId ASC
- 持続時間
- 1.301 秒
- 1.571 秒
- 0.3590 秒
- スキャンされたバイト数
- 3.21 MB
ランダム値とはいえ、データ量が多いため、ほぼ最大値・ほぼ最小値になりました。
デバイスのセンサーごとに、過去365日間のうち、1日ごとの最大値・最小値・平均値を取得する
SELECT deviceId, sensorId,
bin(time, 1day) AS time_bin,
MIN(time) AS bin_begin_time,
MAX(time) AS bin_end_time,
MAX(temperature) AS temperature_max,
MIN(temperature) AS temperature_min,
ROUND(AVG(temperature), 1) AS temperature_avg,
MAX(humidity) AS humidity_max,
MIN(humidity) AS humidity_min,
ROUND(AVG(humidity), 1) AS humidity_avg
FROM "ManyDataSampleDatabase"."many-data-sample-table"
WHERE measure_name='sensorData' AND time >= ago(365day)
GROUP BY deviceId, sensorId, bin(time, 1day)
ORDER BY deviceId ASC, sensorId ASC, bin(time, 1day) ASC
- 持続時間
- 4.453 秒
- 4.433 秒
- 2.937 秒
- スキャンされたバイト数
- 39.65 MB
デバイスのセンサーごとに、過去365日間のうち、30日ごとの最大値・最小値・平均値を取得する
SELECT deviceId, sensorId,
bin(time, 30day) AS time_bin,
MIN(time) AS bin_begin_time,
MAX(time) AS bin_end_time,
MAX(temperature) AS temperature_max,
MIN(temperature) AS temperature_min,
ROUND(AVG(temperature), 1) AS temperature_avg,
MAX(humidity) AS humidity_max,
MIN(humidity) AS humidity_min,
ROUND(AVG(humidity), 1) AS humidity_avg
FROM "ManyDataSampleDatabase"."many-data-sample-table"
WHERE measure_name='sensorData' AND time >= ago(365day)
GROUP BY deviceId, sensorId, bin(time, 30day)
ORDER BY deviceId ASC, sensorId ASC, bin(time, 30day) ASC
- 持続時間
- 1.903 秒
- 3.100 秒
- 1.975 秒
- スキャンされたバイト数
- 39.65 MB
さいごに
大量データを想定して、いくつかの検索を実行してみました。初心者なので、もっと良いSQLがあるかもしれません。参考になれば幸いです。