AWS IoT Core で受信したデータを Amazon Timestream にマルチメジャーレコードで書き込む
AWS IoT Core から Amazon Timestream にデータを書き込む場合、IoT Core のルールアクションを使うことができますが、2022 年 09 月 15 日現在ではマルチメジャーレコードによる書き込みがサポートされていません。
今回は、単純な構成として Lambda アクションを使って マルチメジャーレコードを書き込んでみました。
構成図
構成は至ってシンプルです。
サンプルデータ
今回は、デバイスから以下のようなデータが送られてくる想定とします。
デバイス ID 、時間、デバイスの設置場所、環境情報(温度と湿度)といった要素から構成される単純な JSON データです。
{ "deviceid": "d003", "command": "report", "datetime": 1663236383943, "place": "Osaka", "temperature": 28.8, "humidity": 65.8 }
Lambda 関数のコード
検証に使ったコードは下記になります。
12 行目〜 13 行目にある Timestream のデータベースとテーブルはご利用中のものに適宜変更してください。
import boto3 from botocore.config import Config import json session = boto3.Session() write_client = session.client('timestream-write', region_name='ap-northeast-1', config=Config(read_timeout=20, # リクエストタイムアウト(秒) max_pool_connections=5000, # 最大接続数 retries={'max_attempts': 10})) # 最大試行回数 # Amazon Timestream データベースとテーブル DatabaseName='<YOUR TIMESTREAM DATABASE NAME>' TableName='<YOUR TIMESTREAM DATABASE TABLE NAME>' # ディメンションの作成 def gen_dimensions(event): device_id = event['deviceid'] device_location = event['place'] dimensions = [ {'Name': 'Location', 'Value': device_location}, {'Name': 'DeviceID', 'Value': device_id} ] return dimensions def gen_record(event): MultiMeasureValue = [] records = [] value_num = 0 records_X = [] for item_name, item_value in event.items(): if item_name == 'command': myitem = { 'Name': str(item_name), 'Value': str(item_value), 'Type': 'VARCHAR' }, # datetimeはTimestreamのタイムスタンプとして、deviceid と place は dimension として使うので、この3つはmeasureに載せない elif item_name == 'deviceid' or item_name == 'datetime' or item_name == 'place': continue else: myitem = { 'Name': str(item_name), 'Value': str(item_value), 'Type': 'DOUBLE' }, records.append(myitem) # recordsに追加された最新の要素だけを抽出 (ex. temperatureのキーと値) t = records[value_num] #抽出した最新の要素だけを MultiMeasureValue に追加して、挿入するレコードの MeasureValues を生成 MultiMeasureValue.append(t[0]) value_num += 1 #レコード生成 dummy_measure = { 'Dimensions': gen_dimensions(event), 'MeasureName': 'device_metrics', 'MeasureValueType': 'MULTI', 'MeasureValues': MultiMeasureValue, 'Time': str(event['datetime']), 'TimeUnit': 'MILLISECONDS' } records_X.append(dummy_measure) print("records_X: " + str(records_X)) return records_X def lambda_handler(event, context): print ("start write_records...: ") result = write_client.write_records(DatabaseName=DatabaseName, TableName=TableName, Records=gen_record(event), CommonAttributes={}) return
Timestream に格納するにあたり、今回は次のような仕様としました。
datetime
は Timestream のタイムスタンプに利用deviceid
,place
は Timestream のディメンションとして利用- その他のデータは、Timestream のメジャーとして利用
そのため 32 行目〜46 行目でメジャーに入れる項目を整理しています。
なお、ディメンション、メジャーの説明については下記の Blackbelt の資料が参考になります。
IoT Core ルール設定
IoT Core のルールは次のとおりです。
トピックは適当なものを指定します。ルールアクションには、先程のコードで作成した Lambda 関数を指定します。
Lambda のアクセス権限
今回は Lambda から直接 Timestream に書き込むので、Lambda 関数に必要な権限を追加します。
下記は緩めの権限内容になっているので、必要に応じて Resource
句にて制限をかけてください。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "timestream:DescribeEndpoints", "timestream:WriteRecords" ], "Resource": "*" } ] }
ポリシー設定の詳細は下記をご確認ください。
動作確認
準備ができたらデータを送って確認してみます。
データを Publish するクライアントは何でも構いません。下記は AWS IoT コンソールにあるテストクライアントから送信しています。
Timestream の画面にあるクエリエディタからデータを確認してみましょう。
先程送信したデータがテーブルに入っていることが分かります。
また、データ中にあった Unixtime の時刻が time カラムとして(UTC で)取り込まれていることも分かります。
データの内容を少し変えて、続けて 2 件ほど送信してみました。問題なく書き込みできています。
最後に
今回は元のデータが単純な構造でした。次回は実際の環境に即したデータを想定して検証してみたいと思います。
以上です。