Amazon Timestreamに過去データを投入してみる

Amazon Timestreamに過去データを投入してみる

Amazon Timestreamに過去データを投入できました。
Clock Icon2025.04.08

AWSには、マネージド型の時系列データベースとして、Amazon Timestreamがあります。PLCやセンサーなどの時系列データに向いています。

今まで時系列データをDynamoDBに格納していましたが、Timestreamへのデータ移行を検証する目的で、過去データの投入を試してみました。

おすすめの方

  • Amazon Timestreamに過去データを投入したい方
  • Amazon Timestreamにboto3でデータを書き込みたい方
  • Amazon Timestreamに保存期間を超えたデータを書き込もうとした際の動作を知りたい方

Amazon TimestreamをCloudFormationで作成する

テンプレートファイル

テーブルは次の設定にします。

  • メモリストア: 7日
  • マグネティックストア: 365日

また、過去データ投入のため、マグネティックストアへの書き込みを有効にしています。

cfn.yaml
AWSTemplateFormatVersion: "2010-09-09"
Description: Amazon Timestream Sample

Resources:
  TimestreamDatabase:
    Type: AWS::Timestream::Database
    Properties:
      DatabaseName: CfnSampleDatabase

  TimestreamTable:
    Type: AWS::Timestream::Table
    Properties:
      DatabaseName: !Ref TimestreamDatabase
      TableName: cfn-sample-table
      RetentionProperties:
        MemoryStoreRetentionPeriodInHours: 168  # 7 days
        MagneticStoreRetentionPeriodInDays: 365  # 1 year
      MagneticStoreWriteProperties:
        EnableMagneticStoreWrites: true # マグネティックストアへの書き込みを有効にする

デプロイ

aws cloudformation deploy \
    --template-file cfn.yaml \
    --stack-name Amazon-Timestream-Sample-Stack

過去データを投入する

スクリプト

1日ごとに過去365日分のデータを作成し、Amazon Timestreamに書き込むスクリプトです。

import boto3
import random
import json

from datetime import datetime, timedelta, timezone

from botocore.exceptions import ClientError

TIMESTREAM_DATABASE_NAME = "CfnSampleDatabase"
TIMESTREAM_TABLE_NAME = "cfn-sample-table"

DEVICE_ID = "d0001"

client = boto3.client("timestream-write")

def main():
    records = []

    now = datetime.now(timezone.utc)

    # 過去365日のデータを作成
    for i in range(365):
        records.append(
            make_record(
                int((now - timedelta(days=i)).timestamp() * 1000),
                DEVICE_ID,
                round(random.uniform(0, 40), 1),  # 温度の乱数
                round(random.uniform(0, 100), 1),  # 湿度の乱数
            )
        )

    # Timestreamに書き込む(書き込み制限が最大100件のため、分割している)
    for i in range(0, len(records), 100):
        write_records(
            TIMESTREAM_DATABASE_NAME,
            TIMESTREAM_TABLE_NAME,
            records[i : i + 100],
        )

def make_record(unixtime, device_id, temperature, humidity):
    return {
        "Dimensions": [
            {"Name": "deviceId", "Value": device_id, "DimensionValueType": "VARCHAR"},
        ],
        "MeasureValueType": "MULTI",  # マルチメジャーレコード
        "MeasureName": "sensorData",
        "Time": str(unixtime),
        "TimeUnit": "MILLISECONDS",
        "MeasureValues": [
            {
                "Name": "temperature",
                "Value": str(temperature),
                "Type": "DOUBLE",
            },
            {
                "Name": "humidity",
                "Value": str(humidity),
                "Type": "DOUBLE",
            },
        ],
    }

def write_records(database_name, table_name, records):
    try:
        resp = client.write_records(
            DatabaseName=database_name,
            TableName=table_name,
            Records=records,
        )
    except ClientError as e:
        if e.response["Error"]["Code"] == "RejectedRecordsException":
            print(e.response["Error"]["Message"])
            print(json.dumps(e.response["RejectedRecords"], indent=2))
        else:
            print(f"error: {e}")
        raise e

    print(resp)

if __name__ == "__main__":
    main()

Amazon Timestreamでクエリを実行する

最新10件を取得する

SELECT *
FROM "CfnSampleDatabase"."cfn-sample-table"
ORDER BY time DESC
LIMIT 10

01_timestream_query

古い5件を取得する

約1年前の過去データを確認できました。

SELECT *
FROM "CfnSampleDatabase"."cfn-sample-table"
ORDER BY time ASC
LIMIT 5

02_timestream_query

マグネティックストアの保存期間より過去のデータは、保存できない

厳密には、メモリストアとマグネティックストアの保存期間より過去のデータです。

  • メモリストア: 7日
  • マグネティックストア: 365日

上記の場合、372日より前のデータは保存できません。たとえば、スクリプトの一部を次のように変更します。

    for i in range(7):
        records.append(
            make_record(
                int((now - timedelta(days=7 + 365 + i)).timestamp() * 1000),
                DEVICE_ID,
                round(random.uniform(0, 40), 1),  # 温度の乱数
                round(random.uniform(0, 100), 1),  # 湿度の乱数
            )
        )

実行すると、次のエラーメッセージが表示されます。

One or more records have been rejected. See RejectedRecords for details.

「RejectedRecords」の内容は、下記です。(i=0開始なので、RecordIndex=0は書き込み成功しています。)

[
  {
    "RecordIndex": 1,
    "Reason": "The record timestamp is outside the time range [2024-04-01T02:29:53.236Z, 2025-04-08T03:09:53.236Z) of the data ingestion window."
  },
  {
    "RecordIndex": 2,
    "Reason": "The record timestamp is outside the time range [2024-04-01T02:29:53.236Z, 2025-04-08T03:09:53.236Z) of the data ingestion window."
  },
  {
    "RecordIndex": 3,
    "Reason": "The record timestamp is outside the time range [2024-04-01T02:29:53.236Z, 2025-04-08T03:09:53.236Z) of the data ingestion window."
  },
  {
    "RecordIndex": 4,
    "Reason": "The record timestamp is outside the time range [2024-04-01T02:29:53.236Z, 2025-04-08T03:09:53.236Z) of the data ingestion window."
  },
  {
    "RecordIndex": 5,
    "Reason": "The record timestamp is outside the time range [2024-04-01T02:29:53.236Z, 2025-04-08T03:09:53.236Z) of the data ingestion window."
  },
  {
    "RecordIndex": 6,
    "Reason": "The record timestamp is outside the time range [2024-04-01T02:29:53.236Z, 2025-04-08T03:09:53.236Z) of the data ingestion window."
  }
]

さいごに

Amazon Timestreamに過去データを投入できました。データ移行やデータベース選定の参考になれば幸いです。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.