[AWS IoT Core] 固定長バイナリを送信して、Lambdaで処理してみました

2022.12.28

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX 事業本部のデリバリー部の平内(SIN)です。

IoT 機器としてのデバイスでは、通信回線が、比較的贅沢に確保できない場合が有ります。

この場合、「通信量は、できるだけ小さくしたい」というニーズとなり、必要なデータをバイナリで送信するものも少なくないように思います。

例えば、「タイムスタンプ」「温度」と「湿度」といった内容のデータを考えてみた場合、キー名や、精度にもよりますが、下記の場合、文字列で 96byte となっています。

data = {
    "timestamp":1672186603.354603,
    "temperature": 8.13476276397705,
    "humidity": 53.37971496582031
}

payload = json.dumps(data)

そして、同じ内容を固定長バイナリで表現すると、精度を失うことなく、僅か 16 バイトとなります。

この差は、属性が増えるほど大きくなります。また、「温度」「湿度」のようなデータだと、小数点以下は、2桁ぐらいあれば十分かもしれませんが、データの種類のよっては、大きな有効桁数が必要な場合もあり、表現桁数が増えるとそれだけデータも大きくなることになります。

単純にサイズという意味では、殆どの場合、固定長バイナリデータに軍配が上がると思います。

今回は、固定長バイナリで送信された MQTT データを、Lambda で処理する要領を確認してみました。

2 Could not parse request body into json

AWS IoT Code のルールで、アクションに Lambda を指定した場合、データ部分がバイナリのままだと、以下のようなエラーとなってしまいます。

details:
Could not parse request body into json: Could not parse payload into json: Invalid UTF-8 middle byte 0xea\n at [Source: (byte[])"xxxxxx"; line: 1, column: 7] (Service: AWSLambdaInternal;

Status Code: 400;
Error Code: InvalidRequestContentException;
Request ID: 9c9cec50-3858-4d11-8a01-08a3fea95a0b; Proxy: null)
eventType: RuleExecution
logLevel: ERROR

これは、リクエストボディを JSON 解析することに失敗している事が原因です。

Lambda へ送る前に、Kinesis Data Streams を挟むことなどで対応は可能ですが、今回は、ここをルールでエンコードすることで対応する要領を試してみました。

3 ルール

ルールで、payload 本体を Base64 でエンコードして、JSON 形式に変換する SQL ステートメントは、以下のようになります。

SELECT encode(*, 'base64') AS base64 FROM "sensor/test"

アクションに Lambda を指定すると、次のようなパラメータで Lambda が起動されることになります。

{ "base64": "zVx7+OHq2EH9JwJB1IRVQg==" }

4 動作確認

(1) 送信コード

動作確認のため、固定長バイナリのデータを送信するクライアントを作成してみました。

最初の例のとおり、「タイムスタンプ」「温度」「湿度」を 16 バイトの固定長にパックして、5件のデータを送信しています。

import os
import time
import datetime
import struct
import random
from concurrent.futures import Future
from awsiot import mqtt5_client_builder
from awscrt import mqtt5

endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
port = 8883

dir = os.path.dirname(os.path.abspath(__file__))
certs = {
    "cafile": "{}/certs/AmazonRootCA1.pem".format(dir),
    "certfile": "{}/certs/cert.pem".format(dir),
    "keyfile": "{}/certs/private-key.pem".format(dir),
}
client_id = "sample_device"
TIMEOUT = 100

future_connection_success = Future()

def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
    print("on_connect_success")
    global future_connection_success
    future_connection_success.set_result(lifecycle_connect_success_data)

def disp(dt, data):
    print("{} {} type:{} len:{}".format(dt, data, type(data), len(data)))

if __name__ == '__main__':

    client = mqtt5_client_builder.mtls_from_path(
        endpoint = endpoint,
        port = port,
        cert_filepath = certs["certfile"],
        pri_key_filepath = certs["keyfile"],
        ca_filepath = certs["cafile"],
        on_lifecycle_connection_success = on_lifecycle_connection_success,
        client_id = client_id)

    client.start()
    lifecycle_connect_success_data = future_connection_success.result(TIMEOUT)
    connack_packet = lifecycle_connect_success_data.connack_packet
    negotiated_settings = lifecycle_connect_success_data.negotiated_settings

    for i in range(5):

        # ダミーデータの生成
        now = datetime.datetime.now()
        temperature = random.uniform(0, 30)
        humidity = random.uniform(30, 60)
        print("date:{} temperature:{} humidity:{}".format(now, temperature, humidity))

        # 変換と結合
        ts = struct.pack('<d', now.timestamp()) # 8btye
        temp = struct.pack('<f', temperature) # 4byte
        hum = struct.pack('<f', humidity) # 4byte
        transfer_data = ts + temp + hum
        print("transfer_data type:{} len:{}".format(type(transfer_data), len(transfer_data)))

        # Publish
        publish_future = client.publish(mqtt5.PublishPacket(
            topic = "sensor/test",
            payload = transfer_data
        ))
        publish_completion_data = publish_future.result(TIMEOUT)
        time.sleep(0.1)

    # Disconnect
    client.stop()

(2) Lambda

起動された Lambda で、データを取得しているコードです。

ルールでエンコードされた Base64 をデコードし、パックの逆順でアンパックしています。

import json
import base64
import datetime
import struct

def lambda_handler(event, context):
    print("event:{}".format(event))
    # Base64のデコード
    transfer_data = base64.b64decode(event["base64"].encode())
    print("transfer_data type:{} len:{}".format(type(transfer_data), len(transfer_data)))
    # データの分離
    ts = transfer_data[:8] # 0〜7(8byte)
    temp = transfer_data[8:12] # 8〜11(4byte)
    hum = transfer_data[12:16] # 12〜15(4byte)
    # 変換
    now = datetime.datetime.fromtimestamp(struct.unpack('<d', ts)[0])
    temperature = struct.unpack('<f', temp)[0]
    humidity = struct.unpack('<f', hum)[0]
    # 表示
    payload = {
        "datetime": now,
        "temperature": temperature,
        "humidity": humidity
    }
    print("payload:{}".format(payload))

(3) 結果

送信用のプログラムを実行している様子です。

ランダムに生成された、「温度」と「湿度」が、「タイムスタンプ」と共に表示されています。

% python3 index.py
on_connect_success
date:2022-12-28 09:03:45.927539 temperature:8.13476276397705 humidity:53.37971496582031
transfer_data type:<class 'bytes'> len:16
date:2022-12-28 09:03:46.042790 temperature:11.671396255493164 humidity:31.489978790283203
transfer_data type:<class 'bytes'> len:16
date:2022-12-28 09:03:46.143824 temperature:9.486563682556152 humidity:38.76728820800781
transfer_data type:<class 'bytes'> len:16
date:2022-12-28 09:03:46.244779 temperature:8.634918212890625 humidity:46.43960189819336
transfer_data type:<class 'bytes'> len:16
date:2022-12-28 09:03:46.350665 temperature:13.452837944030762 humidity:46.664886474609375
transfer_data type:<class 'bytes'> len:16

MQTT テストクライアントでは、以下のように見えています。

最後に、Lambda の実行ログです。

「タイムスタンプ」「温度」「湿度」が、再現されていることを確認できます。

payload:{'datetime': datetime.datetime(2022, 12, 28, 0, 3, 45, 927539), 'temperature': 8.13476276397705, 'humidity': 53.37971496582031}
payload:{'datetime': datetime.datetime(2022, 12, 28, 0, 3, 46, 42790), 'temperature': 11.671396255493164, 'humidity': 31.489978790283203}
payload:{'datetime': datetime.datetime(2022, 12, 28, 0, 3, 46, 143824), 'temperature': 9.486563682556152, 'humidity': 38.76728820800781}
payload:{'datetime': datetime.datetime(2022, 12, 28, 0, 3, 46, 244779), 'temperature': 8.634918212890625, 'humidity': 46.43960189819336}
payload:{'datetime': datetime.datetime(2022, 12, 28, 0, 3, 46, 350665), 'temperature': 13.452837944030762, 'humidity': 46.664886474609375}

5 最後に

今回は、固定長バイナリで表現された Payload を、ルールでエンコードして、Lambda デコードする要領を試してみました。

固定長バイナリで送信されるデータも、色々と対応可能です。

例)

また、ルールでは、aws_lambda(functionArn, inputJson)で、Lambda をコールすることも可能ですので、ルールで JSON まで変換してしまって、アクションで直接 DynamoDB に保存するというような実装も可能でしょう。

参考:aws_lambda(functionArn, inputJson)