この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
AWS IoT Core から Amazon Timestream にデータを書き込む場合、IoT Core のルールアクションを使うことができますが、2022 年 09 月 15 日現在ではマルチメジャーレコードによる書き込みがサポートされていません。
今回は、単純な構成として Lambda アクションを使って マルチメジャーレコードを書き込んでみました。
構成図
構成は至ってシンプルです。
サンプルデータ
今回は、デバイスから以下のようなデータが送られてくる想定とします。
デバイス ID 、時間、デバイスの設置場所、環境情報(温度と湿度)といった要素から構成される単純な JSON データです。
{
"deviceid": "d003",
"command": "report",
"datetime": 1663236383943,
"place": "Osaka",
"temperature": 28.8,
"humidity": 65.8
}
Lambda 関数のコード
検証に使ったコードは下記になります。
12 行目〜 13 行目にある Timestream のデータベースとテーブルはご利用中のものに適宜変更してください。
import boto3
from botocore.config import Config
import json
session = boto3.Session()
write_client = session.client('timestream-write', region_name='ap-northeast-1',
config=Config(read_timeout=20, # リクエストタイムアウト(秒)
max_pool_connections=5000, # 最大接続数
retries={'max_attempts': 10})) # 最大試行回数
# Amazon Timestream データベースとテーブル
DatabaseName='<YOUR TIMESTREAM DATABASE NAME>'
TableName='<YOUR TIMESTREAM DATABASE TABLE NAME>'
# ディメンションの作成
def gen_dimensions(event):
device_id = event['deviceid']
device_location = event['place']
dimensions = [
{'Name': 'Location', 'Value': device_location},
{'Name': 'DeviceID', 'Value': device_id}
]
return dimensions
def gen_record(event):
MultiMeasureValue = []
records = []
value_num = 0
records_X = []
for item_name, item_value in event.items():
if item_name == 'command':
myitem = {
'Name': str(item_name),
'Value': str(item_value),
'Type': 'VARCHAR'
},
# datetimeはTimestreamのタイムスタンプとして、deviceid と place は dimension として使うので、この3つはmeasureに載せない
elif item_name == 'deviceid' or item_name == 'datetime' or item_name == 'place':
continue
else:
myitem = {
'Name': str(item_name),
'Value': str(item_value),
'Type': 'DOUBLE'
},
records.append(myitem)
# recordsに追加された最新の要素だけを抽出 (ex. temperatureのキーと値)
t = records[value_num]
#抽出した最新の要素だけを MultiMeasureValue に追加して、挿入するレコードの MeasureValues を生成
MultiMeasureValue.append(t[0])
value_num += 1
#レコード生成
dummy_measure = {
'Dimensions': gen_dimensions(event),
'MeasureName': 'device_metrics',
'MeasureValueType': 'MULTI',
'MeasureValues': MultiMeasureValue,
'Time': str(event['datetime']),
'TimeUnit': 'MILLISECONDS'
}
records_X.append(dummy_measure)
print("records_X: " + str(records_X))
return records_X
def lambda_handler(event, context):
print ("start write_records...: ")
result = write_client.write_records(DatabaseName=DatabaseName,
TableName=TableName,
Records=gen_record(event), CommonAttributes={})
return
Timestream に格納するにあたり、今回は次のような仕様としました。
datetime
は Timestream のタイムスタンプに利用deviceid
,place
は Timestream のディメンションとして利用- その他のデータは、Timestream のメジャーとして利用
そのため 32 行目〜46 行目でメジャーに入れる項目を整理しています。
なお、ディメンション、メジャーの説明については下記の Blackbelt の資料が参考になります。
IoT Core ルール設定
IoT Core のルールは次のとおりです。
トピックは適当なものを指定します。ルールアクションには、先程のコードで作成した Lambda 関数を指定します。
Lambda のアクセス権限
今回は Lambda から直接 Timestream に書き込むので、Lambda 関数に必要な権限を追加します。
下記は緩めの権限内容になっているので、必要に応じて Resource
句にて制限をかけてください。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"timestream:DescribeEndpoints",
"timestream:WriteRecords"
],
"Resource": "*"
}
]
}
ポリシー設定の詳細は下記をご確認ください。
動作確認
準備ができたらデータを送って確認してみます。
データを Publish するクライアントは何でも構いません。下記は AWS IoT コンソールにあるテストクライアントから送信しています。
Timestream の画面にあるクエリエディタからデータを確認してみましょう。
先程送信したデータがテーブルに入っていることが分かります。
また、データ中にあった Unixtime の時刻が time カラムとして(UTC で)取り込まれていることも分かります。
データの内容を少し変えて、続けて 2 件ほど送信してみました。問題なく書き込みできています。
最後に
今回は元のデータが単純な構造でした。次回は実際の環境に即したデータを想定して検証してみたいと思います。
以上です。