AWS IoT Core で受信したデータを Amazon Timestream にマルチメジャーレコードで書き込む

2022.09.16

AWS IoT Core から Amazon Timestream にデータを書き込む場合、IoT Core のルールアクションを使うことができますが、2022 年 09 月 15 日現在ではマルチメジャーレコードによる書き込みがサポートされていません。

今回は、単純な構成として Lambda アクションを使って マルチメジャーレコードを書き込んでみました。

構成図

構成は至ってシンプルです。

00-diagram

サンプルデータ

今回は、デバイスから以下のようなデータが送られてくる想定とします。
デバイス ID 、時間、デバイスの設置場所、環境情報(温度と湿度)といった要素から構成される単純な JSON データです。

{
    "deviceid": "d003",
    "command": "report",
    "datetime": 1663236383943,
    "place": "Osaka",
    "temperature": 28.8,
    "humidity": 65.8
}

Lambda 関数のコード

検証に使ったコードは下記になります。
12 行目〜 13 行目にある Timestream のデータベースとテーブルはご利用中のものに適宜変更してください。

import boto3
from botocore.config import Config
import json

session = boto3.Session()
write_client = session.client('timestream-write', region_name='ap-northeast-1',
                              config=Config(read_timeout=20,    # リクエストタイムアウト(秒)
                              max_pool_connections=5000,        # 最大接続数
                              retries={'max_attempts': 10}))    # 最大試行回数

# Amazon Timestream データベースとテーブル
DatabaseName='<YOUR TIMESTREAM DATABASE NAME>'
TableName='<YOUR TIMESTREAM DATABASE TABLE NAME>'

# ディメンションの作成
def gen_dimensions(event):
    device_id = event['deviceid']
    device_location = event['place']
    dimensions = [
        {'Name': 'Location', 'Value': device_location},
        {'Name': 'DeviceID', 'Value': device_id}
    ]
    return dimensions

def gen_record(event):
    MultiMeasureValue = []
    records = []
    value_num = 0
    records_X = []

    for item_name, item_value in event.items():
        if item_name == 'command':
            myitem = {
                'Name': str(item_name),
                'Value': str(item_value),
                'Type': 'VARCHAR'
            },
        # datetimeはTimestreamのタイムスタンプとして、deviceid と place は dimension として使うので、この3つはmeasureに載せない
        elif item_name == 'deviceid' or item_name == 'datetime' or item_name == 'place':
            continue
        else:
            myitem = {
                'Name': str(item_name),
                'Value': str(item_value),
                'Type': 'DOUBLE'
            },
        records.append(myitem)
        # recordsに追加された最新の要素だけを抽出 (ex. temperatureのキーと値)
        t = records[value_num] 
        #抽出した最新の要素だけを MultiMeasureValue に追加して、挿入するレコードの MeasureValues を生成
        MultiMeasureValue.append(t[0])
        value_num += 1

    #レコード生成
    dummy_measure = {
        'Dimensions': gen_dimensions(event),
        'MeasureName': 'device_metrics',
        'MeasureValueType': 'MULTI',
        'MeasureValues': MultiMeasureValue,
        'Time': str(event['datetime']),
        'TimeUnit': 'MILLISECONDS'
    }
    records_X.append(dummy_measure)
    print("records_X: " + str(records_X))

    return records_X


def lambda_handler(event, context):
    print ("start write_records...: ")
    result = write_client.write_records(DatabaseName=DatabaseName,
                                        TableName=TableName,
                                        Records=gen_record(event), CommonAttributes={})
    return

Timestream に格納するにあたり、今回は次のような仕様としました。

  • datetimeは Timestream のタイムスタンプに利用
  • deviceid, place は Timestream のディメンションとして利用
  • その他のデータは、Timestream のメジャーとして利用

そのため 32 行目〜46 行目でメジャーに入れる項目を整理しています。
なお、ディメンション、メジャーの説明については下記の Blackbelt の資料が参考になります。

IoT Core ルール設定

IoT Core のルールは次のとおりです。
トピックは適当なものを指定します。ルールアクションには、先程のコードで作成した Lambda 関数を指定します。

01-iot-core-rule

Lambda のアクセス権限

今回は Lambda から直接 Timestream に書き込むので、Lambda 関数に必要な権限を追加します。
下記は緩めの権限内容になっているので、必要に応じて Resource 句にて制限をかけてください。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "timestream:DescribeEndpoints",
                "timestream:WriteRecords"
            ],
            "Resource": "*"
        }
    ]
}

ポリシー設定の詳細は下記をご確認ください。

動作確認

準備ができたらデータを送って確認してみます。
データを Publish するクライアントは何でも構いません。下記は AWS IoT コンソールにあるテストクライアントから送信しています。

02-publish-data

Timestream の画面にあるクエリエディタからデータを確認してみましょう。
先程送信したデータがテーブルに入っていることが分かります。
また、データ中にあった Unixtime の時刻が time カラムとして(UTC で)取り込まれていることも分かります。

03-query-result

データの内容を少し変えて、続けて 2 件ほど送信してみました。問題なく書き込みできています。

04-query-result-2

最後に

今回は元のデータが単純な構造でした。次回は実際の環境に即したデータを想定して検証してみたいと思います。

以上です。