Amazon Timestreamのアップサート動作を確認してみる
AWSには、マネージド型の時系列データベースとして、Amazon Timestreamがあります。PLCやセンサーなどの時系列データに向いています。
Amazon Timestreamは、データの削除はできませんが、データの更新はできます。本記事では、データの追加・更新(アップサート)の動作を確認してみました。
おすすめの方
- Amazon TimestreamをCloudFormationで作成したい方
- Amazon Timestreamにboto3でデータを書き込みたい方
- Amazon Timestreamのアップサート動作を知りたい方
Amazon TimestreamをCloudFormationで作成する
テンプレートファイル
AWSTemplateFormatVersion: "2010-09-09"
Description: Amazon Timestream Sample
Resources:
TimestreamDatabase:
Type: AWS::Timestream::Database
Properties:
DatabaseName: UpsertSampleDatabase
TimestreamTable:
Type: AWS::Timestream::Table
Properties:
DatabaseName: !Ref TimestreamDatabase
TableName: upsert-sample-table
RetentionProperties:
MemoryStoreRetentionPeriodInHours: 168 # 7 days
MagneticStoreRetentionPeriodInDays: 365 # 1 year
MagneticStoreWriteProperties:
EnableMagneticStoreWrites: true # マグネティックストアへの書き込みを有効にする
デプロイ
aws cloudformation deploy \
--template-file cfn.yaml \
--stack-name Amazon-Timestream-Upsert-Sample-Stack
アップサート動作を確認する
まずは、新規データを書き込む
次のスクリプトを実行し、1件のデータを書き込みます。
import boto3
import json
from datetime import datetime, timedelta, timezone
from botocore.exceptions import ClientError
TIMESTREAM_DATABASE_NAME = "UpsertSampleDatabase"
TIMESTREAM_TABLE_NAME = "upsert-sample-table"
client = boto3.client("timestream-write")
def main():
JST = timezone(timedelta(hours=9), "JST")
base_datatime = datetime(2025, 7, 3, 16, 0, 0, tzinfo=JST)
base_unixtime = int(base_datatime.timestamp() * 1000)
records = []
records.append(
make_record(
base_unixtime,
f"device0001",
11.1, # 温度の乱数
22.1, # 湿度の乱数
)
)
# Timestreamに書き込む(書き込み制限が最大100件のため、分割している)
for i in range(0, len(records), 100):
write_records(
TIMESTREAM_DATABASE_NAME,
TIMESTREAM_TABLE_NAME,
records[i : i + 100],
)
def make_record(unixtime, device_id, temperature, humidity):
return {
"Dimensions": [
{"Name": "deviceId", "Value": device_id, "DimensionValueType": "VARCHAR"},
],
"MeasureValueType": "MULTI", # マルチメジャーレコード
"MeasureName": "sensorData",
"Time": str(unixtime),
"TimeUnit": "MILLISECONDS",
"MeasureValues": [
{
"Name": "temperature",
"Value": str(temperature),
"Type": "DOUBLE",
},
{
"Name": "humidity",
"Value": str(humidity),
"Type": "DOUBLE",
},
],
}
def write_records(database_name, table_name, records):
try:
client.write_records(
DatabaseName=database_name,
TableName=table_name,
Records=records,
)
except ClientError as e:
if e.response["Error"]["Code"] == "RejectedRecordsException":
print(e.response["Error"]["Message"])
print(json.dumps(e.response["RejectedRecords"], indent=2))
else:
print(f"error: {e}")
raise e
if __name__ == "__main__":
main()
全件取得すると、1件のデータを取得できました。
SELECT *
FROM "UpsertSampleDatabase"."upsert-sample-table"
完全に同じデータを書き込むと、何も変化しない(エラーも発生しない)
先ほどのスクリプトを再び実行します。つまり、完全に同じデータを書き込む動作です。スクリプトは正常に実行され、Amazon Timestreamのデータに変化はありませんでした。
SELECT *
FROM "UpsertSampleDatabase"."upsert-sample-table"
温度と湿度の値を変更すると、データの書き込みに失敗する(更新できない)
温度と湿度の値のみを変更してみました。
records.append(
make_record(
base_unixtime,
f"device0001",
911.1,
922.1,
)
)
スクリプトを実行すると、「RejectedRecordsException」が発生しました。
botocore.errorfactory.RejectedRecordsException: An error occurred (RejectedRecordsException) when calling the WriteRecords operation: One or more records have been rejected. See RejectedRecords for details.
One or more records have been rejected. See RejectedRecords for details.
[
{
"RecordIndex": 0,
"Reason": "A record already exists with the same time, dimensions, measure name, and record version. A higher record version must be specified in order to update the measure value. Specifying record version is supported by the latest SDKs."
}
]
下記が同じ場合は、既に存在しているものとして書き込みができません。
- time
- dimensions
- measure name
- record version (未指定の場合は、同じバージョンとして扱われる)
MeasureNameのみを変更すると、データの書き込みに成功する
温度と湿度の値を戻して、MeasureNameのみを変更してみました。
records.append(
make_record(
base_unixtime,
f"device0001",
11.1,
22.1,
)
)
...
def make_record(unixtime, device_id, temperature, humidity):
return {
"Dimensions": [
{"Name": "deviceId", "Value": device_id, "DimensionValueType": "VARCHAR"},
],
"MeasureValueType": "MULTI", # マルチメジャーレコード
"MeasureName": "sensorData777",
"Time": str(unixtime),
"TimeUnit": "MILLISECONDS",
"MeasureValues": [
{
"Name": "temperature",
"Value": str(temperature),
"Type": "DOUBLE",
},
{
"Name": "humidity",
"Value": str(humidity),
"Type": "DOUBLE",
},
],
}
スクリプトを実行すると、正常終了しました。全件取得すると、2件のデータを取得できました。
SELECT *
FROM "UpsertSampleDatabase"."upsert-sample-table"
温度と湿度を変更し、Version=1を指定すると、書き込みに失敗する
Version=1を指定してみました。
records.append(
make_record(
base_unixtime,
f"device0001",
11.9,
22.9,
)
)
...
def make_record(unixtime, device_id, temperature, humidity):
return {
"Dimensions": [
{"Name": "deviceId", "Value": device_id, "DimensionValueType": "VARCHAR"},
],
"MeasureValueType": "MULTI", # マルチメジャーレコード
"MeasureName": "sensorData777",
"Time": str(unixtime),
"TimeUnit": "MILLISECONDS",
"MeasureValues": [
{
"Name": "temperature",
"Value": str(temperature),
"Type": "DOUBLE",
},
{
"Name": "humidity",
"Value": str(humidity),
"Type": "DOUBLE",
},
],
"Version": 1,
}
スクリプトを実行すると、「RejectedRecordsException」が発生しました。
botocore.errorfactory.RejectedRecordsException: An error occurred (RejectedRecordsException) when calling the WriteRecords operation: One or more records have been rejected. See RejectedRecords for details.
One or more records have been rejected. See RejectedRecords for details.
[
{
"RecordIndex": 0,
"Reason": "A record already exists with the same time, dimensions, measure name, and version 1. A higher version is required to update the measure value.",
"ExistingVersion": 1
}
]
今のバージョンと同じだから失敗していますね。
温度と湿度を変更し、Version=2を指定すると、書き込みに成功する
Version=1を指定してみました。
records.append(
make_record(
base_unixtime,
f"device0001",
11.9,
22.9,
)
)
...
def make_record(unixtime, device_id, temperature, humidity):
return {
"Dimensions": [
{"Name": "deviceId", "Value": device_id, "DimensionValueType": "VARCHAR"},
],
"MeasureValueType": "MULTI", # マルチメジャーレコード
"MeasureName": "sensorData777",
"Time": str(unixtime),
"TimeUnit": "MILLISECONDS",
"MeasureValues": [
{
"Name": "temperature",
"Value": str(temperature),
"Type": "DOUBLE",
},
{
"Name": "humidity",
"Value": str(humidity),
"Type": "DOUBLE",
},
],
"Version": 2,
}
スクリプトを実行すると、正常終了しました。全件取得すると、2件のデータが取得でき、期待通りに更新されました。
SELECT *
FROM "UpsertSampleDatabase"."upsert-sample-table"
さいごに
Amazon Timestreamのアップサート動作を確認してみました。参考になれば幸いです。