Amazon Timestreamに過去データを投入してみる
AWSには、マネージド型の時系列データベースとして、Amazon Timestreamがあります。PLCやセンサーなどの時系列データに向いています。
今まで時系列データをDynamoDBに格納していましたが、Timestreamへのデータ移行を検証する目的で、過去データの投入を試してみました。
おすすめの方
- Amazon Timestreamに過去データを投入したい方
- Amazon Timestreamにboto3でデータを書き込みたい方
- Amazon Timestreamに保存期間を超えたデータを書き込もうとした際の動作を知りたい方
Amazon TimestreamをCloudFormationで作成する
テンプレートファイル
テーブルは次の設定にします。
- メモリストア: 7日
- マグネティックストア: 365日
また、過去データ投入のため、マグネティックストアへの書き込みを有効にしています。
AWSTemplateFormatVersion: "2010-09-09"
Description: Amazon Timestream Sample
Resources:
TimestreamDatabase:
Type: AWS::Timestream::Database
Properties:
DatabaseName: CfnSampleDatabase
TimestreamTable:
Type: AWS::Timestream::Table
Properties:
DatabaseName: !Ref TimestreamDatabase
TableName: cfn-sample-table
RetentionProperties:
MemoryStoreRetentionPeriodInHours: 168 # 7 days
MagneticStoreRetentionPeriodInDays: 365 # 1 year
MagneticStoreWriteProperties:
EnableMagneticStoreWrites: true # マグネティックストアへの書き込みを有効にする
デプロイ
aws cloudformation deploy \
--template-file cfn.yaml \
--stack-name Amazon-Timestream-Sample-Stack
過去データを投入する
スクリプト
1日ごとに過去365日分のデータを作成し、Amazon Timestreamに書き込むスクリプトです。
import boto3
import random
import json
from datetime import datetime, timedelta, timezone
from botocore.exceptions import ClientError
TIMESTREAM_DATABASE_NAME = "CfnSampleDatabase"
TIMESTREAM_TABLE_NAME = "cfn-sample-table"
DEVICE_ID = "d0001"
client = boto3.client("timestream-write")
def main():
records = []
now = datetime.now(timezone.utc)
# 過去365日のデータを作成
for i in range(365):
records.append(
make_record(
int((now - timedelta(days=i)).timestamp() * 1000),
DEVICE_ID,
round(random.uniform(0, 40), 1), # 温度の乱数
round(random.uniform(0, 100), 1), # 湿度の乱数
)
)
# Timestreamに書き込む(書き込み制限が最大100件のため、分割している)
for i in range(0, len(records), 100):
write_records(
TIMESTREAM_DATABASE_NAME,
TIMESTREAM_TABLE_NAME,
records[i : i + 100],
)
def make_record(unixtime, device_id, temperature, humidity):
return {
"Dimensions": [
{"Name": "deviceId", "Value": device_id, "DimensionValueType": "VARCHAR"},
],
"MeasureValueType": "MULTI", # マルチメジャーレコード
"MeasureName": "sensorData",
"Time": str(unixtime),
"TimeUnit": "MILLISECONDS",
"MeasureValues": [
{
"Name": "temperature",
"Value": str(temperature),
"Type": "DOUBLE",
},
{
"Name": "humidity",
"Value": str(humidity),
"Type": "DOUBLE",
},
],
}
def write_records(database_name, table_name, records):
try:
resp = client.write_records(
DatabaseName=database_name,
TableName=table_name,
Records=records,
)
except ClientError as e:
if e.response["Error"]["Code"] == "RejectedRecordsException":
print(e.response["Error"]["Message"])
print(json.dumps(e.response["RejectedRecords"], indent=2))
else:
print(f"error: {e}")
raise e
print(resp)
if __name__ == "__main__":
main()
Amazon Timestreamでクエリを実行する
最新10件を取得する
SELECT *
FROM "CfnSampleDatabase"."cfn-sample-table"
ORDER BY time DESC
LIMIT 10
古い5件を取得する
約1年前の過去データを確認できました。
SELECT *
FROM "CfnSampleDatabase"."cfn-sample-table"
ORDER BY time ASC
LIMIT 5
マグネティックストアの保存期間より過去のデータは、保存できない
厳密には、メモリストアとマグネティックストアの保存期間より過去のデータです。
- メモリストア: 7日
- マグネティックストア: 365日
上記の場合、372日より前のデータは保存できません。たとえば、スクリプトの一部を次のように変更します。
for i in range(7):
records.append(
make_record(
int((now - timedelta(days=7 + 365 + i)).timestamp() * 1000),
DEVICE_ID,
round(random.uniform(0, 40), 1), # 温度の乱数
round(random.uniform(0, 100), 1), # 湿度の乱数
)
)
実行すると、次のエラーメッセージが表示されます。
One or more records have been rejected. See RejectedRecords for details.
「RejectedRecords」の内容は、下記です。(i=0開始なので、RecordIndex=0は書き込み成功しています。)
[
{
"RecordIndex": 1,
"Reason": "The record timestamp is outside the time range [2024-04-01T02:29:53.236Z, 2025-04-08T03:09:53.236Z) of the data ingestion window."
},
{
"RecordIndex": 2,
"Reason": "The record timestamp is outside the time range [2024-04-01T02:29:53.236Z, 2025-04-08T03:09:53.236Z) of the data ingestion window."
},
{
"RecordIndex": 3,
"Reason": "The record timestamp is outside the time range [2024-04-01T02:29:53.236Z, 2025-04-08T03:09:53.236Z) of the data ingestion window."
},
{
"RecordIndex": 4,
"Reason": "The record timestamp is outside the time range [2024-04-01T02:29:53.236Z, 2025-04-08T03:09:53.236Z) of the data ingestion window."
},
{
"RecordIndex": 5,
"Reason": "The record timestamp is outside the time range [2024-04-01T02:29:53.236Z, 2025-04-08T03:09:53.236Z) of the data ingestion window."
},
{
"RecordIndex": 6,
"Reason": "The record timestamp is outside the time range [2024-04-01T02:29:53.236Z, 2025-04-08T03:09:53.236Z) of the data ingestion window."
}
]
さいごに
Amazon Timestreamに過去データを投入できました。データ移行やデータベース選定の参考になれば幸いです。