[AWS IoT Core] 固定長バイナリを送信して、Lambdaで処理してみました
1 はじめに
CX 事業本部のデリバリー部の平内(SIN)です。
IoT 機器としてのデバイスでは、通信回線が、比較的贅沢に確保できない場合が有ります。
この場合、「通信量は、できるだけ小さくしたい」というニーズとなり、必要なデータをバイナリで送信するものも少なくないように思います。
例えば、「タイムスタンプ」「温度」と「湿度」といった内容のデータを考えてみた場合、キー名や、精度にもよりますが、下記の場合、文字列で 96byte となっています。
data = { "timestamp":1672186603.354603, "temperature": 8.13476276397705, "humidity": 53.37971496582031 } payload = json.dumps(data)
そして、同じ内容を固定長バイナリで表現すると、精度を失うことなく、僅か 16 バイトとなります。
この差は、属性が増えるほど大きくなります。また、「温度」「湿度」のようなデータだと、小数点以下は、2桁ぐらいあれば十分かもしれませんが、データの種類のよっては、大きな有効桁数が必要な場合もあり、表現桁数が増えるとそれだけデータも大きくなることになります。
単純にサイズという意味では、殆どの場合、固定長バイナリデータに軍配が上がると思います。
今回は、固定長バイナリで送信された MQTT データを、Lambda で処理する要領を確認してみました。
2 Could not parse request body into json
AWS IoT Code のルールで、アクションに Lambda を指定した場合、データ部分がバイナリのままだと、以下のようなエラーとなってしまいます。
details: Could not parse request body into json: Could not parse payload into json: Invalid UTF-8 middle byte 0xea\n at [Source: (byte[])"xxxxxx"; line: 1, column: 7] (Service: AWSLambdaInternal; Status Code: 400; Error Code: InvalidRequestContentException; Request ID: 9c9cec50-3858-4d11-8a01-08a3fea95a0b; Proxy: null) eventType: RuleExecution logLevel: ERROR
これは、リクエストボディを JSON 解析することに失敗している事が原因です。
Lambda へ送る前に、Kinesis Data Streams を挟むことなどで対応は可能ですが、今回は、ここをルールでエンコードすることで対応する要領を試してみました。
3 ルール
ルールで、payload 本体を Base64 でエンコードして、JSON 形式に変換する SQL ステートメントは、以下のようになります。
SELECT encode(*, 'base64') AS base64 FROM "sensor/test"
アクションに Lambda を指定すると、次のようなパラメータで Lambda が起動されることになります。
{ "base64": "zVx7+OHq2EH9JwJB1IRVQg==" }
4 動作確認
(1) 送信コード
動作確認のため、固定長バイナリのデータを送信するクライアントを作成してみました。
最初の例のとおり、「タイムスタンプ」「温度」「湿度」を 16 バイトの固定長にパックして、5件のデータを送信しています。
import os import time import datetime import struct import random from concurrent.futures import Future from awsiot import mqtt5_client_builder from awscrt import mqtt5 endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" port = 8883 dir = os.path.dirname(os.path.abspath(__file__)) certs = { "cafile": "{}/certs/AmazonRootCA1.pem".format(dir), "certfile": "{}/certs/cert.pem".format(dir), "keyfile": "{}/certs/private-key.pem".format(dir), } client_id = "sample_device" TIMEOUT = 100 future_connection_success = Future() def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData): print("on_connect_success") global future_connection_success future_connection_success.set_result(lifecycle_connect_success_data) def disp(dt, data): print("{} {} type:{} len:{}".format(dt, data, type(data), len(data))) if __name__ == '__main__': client = mqtt5_client_builder.mtls_from_path( endpoint = endpoint, port = port, cert_filepath = certs["certfile"], pri_key_filepath = certs["keyfile"], ca_filepath = certs["cafile"], on_lifecycle_connection_success = on_lifecycle_connection_success, client_id = client_id) client.start() lifecycle_connect_success_data = future_connection_success.result(TIMEOUT) connack_packet = lifecycle_connect_success_data.connack_packet negotiated_settings = lifecycle_connect_success_data.negotiated_settings for i in range(5): # ダミーデータの生成 now = datetime.datetime.now() temperature = random.uniform(0, 30) humidity = random.uniform(30, 60) print("date:{} temperature:{} humidity:{}".format(now, temperature, humidity)) # 変換と結合 ts = struct.pack('<d', now.timestamp()) # 8btye temp = struct.pack('<f', temperature) # 4byte hum = struct.pack('<f', humidity) # 4byte transfer_data = ts + temp + hum print("transfer_data type:{} len:{}".format(type(transfer_data), len(transfer_data))) # Publish publish_future = client.publish(mqtt5.PublishPacket( topic = "sensor/test", payload = transfer_data )) publish_completion_data = publish_future.result(TIMEOUT) time.sleep(0.1) # Disconnect client.stop()
(2) Lambda
起動された Lambda で、データを取得しているコードです。
ルールでエンコードされた Base64 をデコードし、パックの逆順でアンパックしています。
import json import base64 import datetime import struct def lambda_handler(event, context): print("event:{}".format(event)) # Base64のデコード transfer_data = base64.b64decode(event["base64"].encode()) print("transfer_data type:{} len:{}".format(type(transfer_data), len(transfer_data))) # データの分離 ts = transfer_data[:8] # 0〜7(8byte) temp = transfer_data[8:12] # 8〜11(4byte) hum = transfer_data[12:16] # 12〜15(4byte) # 変換 now = datetime.datetime.fromtimestamp(struct.unpack('<d', ts)[0]) temperature = struct.unpack('<f', temp)[0] humidity = struct.unpack('<f', hum)[0] # 表示 payload = { "datetime": now, "temperature": temperature, "humidity": humidity } print("payload:{}".format(payload))
(3) 結果
送信用のプログラムを実行している様子です。
ランダムに生成された、「温度」と「湿度」が、「タイムスタンプ」と共に表示されています。
% python3 index.py on_connect_success date:2022-12-28 09:03:45.927539 temperature:8.13476276397705 humidity:53.37971496582031 transfer_data type:<class 'bytes'> len:16 date:2022-12-28 09:03:46.042790 temperature:11.671396255493164 humidity:31.489978790283203 transfer_data type:<class 'bytes'> len:16 date:2022-12-28 09:03:46.143824 temperature:9.486563682556152 humidity:38.76728820800781 transfer_data type:<class 'bytes'> len:16 date:2022-12-28 09:03:46.244779 temperature:8.634918212890625 humidity:46.43960189819336 transfer_data type:<class 'bytes'> len:16 date:2022-12-28 09:03:46.350665 temperature:13.452837944030762 humidity:46.664886474609375 transfer_data type:<class 'bytes'> len:16
MQTT テストクライアントでは、以下のように見えています。
最後に、Lambda の実行ログです。
「タイムスタンプ」「温度」「湿度」が、再現されていることを確認できます。
payload:{'datetime': datetime.datetime(2022, 12, 28, 0, 3, 45, 927539), 'temperature': 8.13476276397705, 'humidity': 53.37971496582031} payload:{'datetime': datetime.datetime(2022, 12, 28, 0, 3, 46, 42790), 'temperature': 11.671396255493164, 'humidity': 31.489978790283203} payload:{'datetime': datetime.datetime(2022, 12, 28, 0, 3, 46, 143824), 'temperature': 9.486563682556152, 'humidity': 38.76728820800781} payload:{'datetime': datetime.datetime(2022, 12, 28, 0, 3, 46, 244779), 'temperature': 8.634918212890625, 'humidity': 46.43960189819336} payload:{'datetime': datetime.datetime(2022, 12, 28, 0, 3, 46, 350665), 'temperature': 13.452837944030762, 'humidity': 46.664886474609375}
5 最後に
今回は、固定長バイナリで表現された Payload を、ルールでエンコードして、Lambda デコードする要領を試してみました。
固定長バイナリで送信されるデータも、色々と対応可能です。
例)
また、ルールでは、aws_lambda(functionArn, inputJson)で、Lambda をコールすることも可能ですので、ルールで JSON まで変換してしまって、アクションで直接 DynamoDB に保存するというような実装も可能でしょう。