Amazon Timestreamのアップサート動作を確認してみる

Amazon Timestreamのアップサート動作を確認してみる

Amazon Timestreamのアップサート動作を確認してみました。
Clock Icon2025.07.03

AWSには、マネージド型の時系列データベースとして、Amazon Timestreamがあります。PLCやセンサーなどの時系列データに向いています。

Amazon Timestreamは、データの削除はできませんが、データの更新はできます。本記事では、データの追加・更新(アップサート)の動作を確認してみました。

おすすめの方

  • Amazon TimestreamをCloudFormationで作成したい方
  • Amazon Timestreamにboto3でデータを書き込みたい方
  • Amazon Timestreamのアップサート動作を知りたい方

Amazon TimestreamをCloudFormationで作成する

テンプレートファイル

AWSTemplateFormatVersion: "2010-09-09"
Description: Amazon Timestream Sample

Resources:
  TimestreamDatabase:
    Type: AWS::Timestream::Database
    Properties:
      DatabaseName: UpsertSampleDatabase

  TimestreamTable:
    Type: AWS::Timestream::Table
    Properties:
      DatabaseName: !Ref TimestreamDatabase
      TableName: upsert-sample-table
      RetentionProperties:
        MemoryStoreRetentionPeriodInHours: 168  # 7 days
        MagneticStoreRetentionPeriodInDays: 365  # 1 year
      MagneticStoreWriteProperties:
        EnableMagneticStoreWrites: true # マグネティックストアへの書き込みを有効にする

デプロイ

aws cloudformation deploy \
    --template-file cfn.yaml \
    --stack-name Amazon-Timestream-Upsert-Sample-Stack

アップサート動作を確認する

まずは、新規データを書き込む

次のスクリプトを実行し、1件のデータを書き込みます。

import boto3
import json

from datetime import datetime, timedelta, timezone

from botocore.exceptions import ClientError

TIMESTREAM_DATABASE_NAME = "UpsertSampleDatabase"
TIMESTREAM_TABLE_NAME = "upsert-sample-table"

client = boto3.client("timestream-write")

def main():
    JST = timezone(timedelta(hours=9), "JST")
    base_datatime = datetime(2025, 7, 3, 16, 0, 0, tzinfo=JST)
    base_unixtime = int(base_datatime.timestamp() * 1000)

    records = []

    records.append(
        make_record(
            base_unixtime,
            f"device0001",
            11.1,  # 温度の乱数
            22.1,  # 湿度の乱数
        )
    )

    # Timestreamに書き込む(書き込み制限が最大100件のため、分割している)
    for i in range(0, len(records), 100):
        write_records(
            TIMESTREAM_DATABASE_NAME,
            TIMESTREAM_TABLE_NAME,
            records[i : i + 100],
        )

def make_record(unixtime, device_id, temperature, humidity):
    return {
        "Dimensions": [
            {"Name": "deviceId", "Value": device_id, "DimensionValueType": "VARCHAR"},
        ],
        "MeasureValueType": "MULTI",  # マルチメジャーレコード
        "MeasureName": "sensorData",
        "Time": str(unixtime),
        "TimeUnit": "MILLISECONDS",
        "MeasureValues": [
            {
                "Name": "temperature",
                "Value": str(temperature),
                "Type": "DOUBLE",
            },
            {
                "Name": "humidity",
                "Value": str(humidity),
                "Type": "DOUBLE",
            },
        ],
    }

def write_records(database_name, table_name, records):
    try:
        client.write_records(
            DatabaseName=database_name,
            TableName=table_name,
            Records=records,
        )
    except ClientError as e:
        if e.response["Error"]["Code"] == "RejectedRecordsException":
            print(e.response["Error"]["Message"])
            print(json.dumps(e.response["RejectedRecords"], indent=2))
        else:
            print(f"error: {e}")
        raise e

if __name__ == "__main__":
    main()

全件取得すると、1件のデータを取得できました。

SELECT * 
FROM "UpsertSampleDatabase"."upsert-sample-table"

01_timestream

完全に同じデータを書き込むと、何も変化しない(エラーも発生しない)

先ほどのスクリプトを再び実行します。つまり、完全に同じデータを書き込む動作です。スクリプトは正常に実行され、Amazon Timestreamのデータに変化はありませんでした。

SELECT * 
FROM "UpsertSampleDatabase"."upsert-sample-table"

01_timestream

温度と湿度の値を変更すると、データの書き込みに失敗する(更新できない)

温度と湿度の値のみを変更してみました。

    records.append(
        make_record(
            base_unixtime,
            f"device0001",
            911.1,
            922.1,
        )
    )

スクリプトを実行すると、「RejectedRecordsException」が発生しました。

botocore.errorfactory.RejectedRecordsException: An error occurred (RejectedRecordsException) when calling the WriteRecords operation: One or more records have been rejected. See RejectedRecords for details.

One or more records have been rejected. See RejectedRecords for details.
[
  {
    "RecordIndex": 0,
    "Reason": "A record already exists with the same time, dimensions, measure name, and record version. A higher record version must be specified in order to update the measure value. Specifying record version is supported by the latest SDKs."
  }
]

下記が同じ場合は、既に存在しているものとして書き込みができません。

  • time
  • dimensions
  • measure name
  • record version (未指定の場合は、同じバージョンとして扱われる)

MeasureNameのみを変更すると、データの書き込みに成功する

温度と湿度の値を戻して、MeasureNameのみを変更してみました。

    records.append(
        make_record(
            base_unixtime,
            f"device0001",
            11.1,
            22.1,
        )
    )

    ...

def make_record(unixtime, device_id, temperature, humidity):
    return {
        "Dimensions": [
            {"Name": "deviceId", "Value": device_id, "DimensionValueType": "VARCHAR"},
        ],
        "MeasureValueType": "MULTI",  # マルチメジャーレコード
        "MeasureName": "sensorData777",
        "Time": str(unixtime),
        "TimeUnit": "MILLISECONDS",
        "MeasureValues": [
            {
                "Name": "temperature",
                "Value": str(temperature),
                "Type": "DOUBLE",
            },
            {
                "Name": "humidity",
                "Value": str(humidity),
                "Type": "DOUBLE",
            },
        ],
    }

スクリプトを実行すると、正常終了しました。全件取得すると、2件のデータを取得できました。

SELECT * 
FROM "UpsertSampleDatabase"."upsert-sample-table"

02_timestream

温度と湿度を変更し、Version=1を指定すると、書き込みに失敗する

Version=1を指定してみました。

    records.append(
        make_record(
            base_unixtime,
            f"device0001",
            11.9,
            22.9,
        )
    )

    ...

def make_record(unixtime, device_id, temperature, humidity):
    return {
        "Dimensions": [
            {"Name": "deviceId", "Value": device_id, "DimensionValueType": "VARCHAR"},
        ],
        "MeasureValueType": "MULTI",  # マルチメジャーレコード
        "MeasureName": "sensorData777",
        "Time": str(unixtime),
        "TimeUnit": "MILLISECONDS",
        "MeasureValues": [
            {
                "Name": "temperature",
                "Value": str(temperature),
                "Type": "DOUBLE",
            },
            {
                "Name": "humidity",
                "Value": str(humidity),
                "Type": "DOUBLE",
            },
        ],
        "Version": 1,
    }

スクリプトを実行すると、「RejectedRecordsException」が発生しました。

botocore.errorfactory.RejectedRecordsException: An error occurred (RejectedRecordsException) when calling the WriteRecords operation: One or more records have been rejected. See RejectedRecords for details.

One or more records have been rejected. See RejectedRecords for details.
[
  {
    "RecordIndex": 0,
    "Reason": "A record already exists with the same time, dimensions, measure name, and version 1. A higher version is required to update the measure value.",
    "ExistingVersion": 1
  }
]

今のバージョンと同じだから失敗していますね。

温度と湿度を変更し、Version=2を指定すると、書き込みに成功する

Version=1を指定してみました。

    records.append(
        make_record(
            base_unixtime,
            f"device0001",
            11.9,
            22.9,
        )
    )

    ...

def make_record(unixtime, device_id, temperature, humidity):
    return {
        "Dimensions": [
            {"Name": "deviceId", "Value": device_id, "DimensionValueType": "VARCHAR"},
        ],
        "MeasureValueType": "MULTI",  # マルチメジャーレコード
        "MeasureName": "sensorData777",
        "Time": str(unixtime),
        "TimeUnit": "MILLISECONDS",
        "MeasureValues": [
            {
                "Name": "temperature",
                "Value": str(temperature),
                "Type": "DOUBLE",
            },
            {
                "Name": "humidity",
                "Value": str(humidity),
                "Type": "DOUBLE",
            },
        ],
        "Version": 2,
    }

スクリプトを実行すると、正常終了しました。全件取得すると、2件のデータが取得でき、期待通りに更新されました。

SELECT * 
FROM "UpsertSampleDatabase"."upsert-sample-table"

03_timestream

さいごに

Amazon Timestreamのアップサート動作を確認してみました。参考になれば幸いです。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.