1 はじめに
CX 事業本部のデリバリー部の平内(SIN)です。
IoT 機器としてのデバイスでは、通信回線が、比較的贅沢に確保できない場合が有ります。
この場合、「通信量は、できるだけ小さくしたい」というニーズとなり、必要なデータをバイナリで送信するものも少なくないように思います。
例えば、「タイムスタンプ」「温度」と「湿度」といった内容のデータを考えてみた場合、キー名や、精度にもよりますが、下記の場合、文字列で 96byte となっています。
data = {
"timestamp":1672186603.354603,
"temperature": 8.13476276397705,
"humidity": 53.37971496582031
}
payload = json.dumps(data)
そして、同じ内容を固定長バイナリで表現すると、精度を失うことなく、僅か 16 バイトとなります。
この差は、属性が増えるほど大きくなります。また、「温度」「湿度」のようなデータだと、小数点以下は、2桁ぐらいあれば十分かもしれませんが、データの種類のよっては、大きな有効桁数が必要な場合もあり、表現桁数が増えるとそれだけデータも大きくなることになります。
単純にサイズという意味では、殆どの場合、固定長バイナリデータに軍配が上がると思います。
今回は、固定長バイナリで送信された MQTT データを、Lambda で処理する要領を確認してみました。
2 Could not parse request body into json
AWS IoT Code のルールで、アクションに Lambda を指定した場合、データ部分がバイナリのままだと、以下のようなエラーとなってしまいます。
details:
Could not parse request body into json: Could not parse payload into json: Invalid UTF-8 middle byte 0xea\n at [Source: (byte[])"xxxxxx"; line: 1, column: 7] (Service: AWSLambdaInternal;
Status Code: 400;
Error Code: InvalidRequestContentException;
Request ID: 9c9cec50-3858-4d11-8a01-08a3fea95a0b; Proxy: null)
eventType: RuleExecution
logLevel: ERROR
これは、リクエストボディを JSON 解析することに失敗している事が原因です。
Lambda へ送る前に、Kinesis Data Streams を挟むことなどで対応は可能ですが、今回は、ここをルールでエンコードすることで対応する要領を試してみました。
3 ルール
ルールで、payload 本体を Base64 でエンコードして、JSON 形式に変換する SQL ステートメントは、以下のようになります。
SELECT encode(*, 'base64') AS base64 FROM "sensor/test"
アクションに Lambda を指定すると、次のようなパラメータで Lambda が起動されることになります。
{ "base64": "zVx7+OHq2EH9JwJB1IRVQg==" }
4 動作確認
(1) 送信コード
動作確認のため、固定長バイナリのデータを送信するクライアントを作成してみました。
最初の例のとおり、「タイムスタンプ」「温度」「湿度」を 16 バイトの固定長にパックして、5件のデータを送信しています。
import os
import time
import datetime
import struct
import random
from concurrent.futures import Future
from awsiot import mqtt5_client_builder
from awscrt import mqtt5
endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
port = 8883
dir = os.path.dirname(os.path.abspath(__file__))
certs = {
"cafile": "{}/certs/AmazonRootCA1.pem".format(dir),
"certfile": "{}/certs/cert.pem".format(dir),
"keyfile": "{}/certs/private-key.pem".format(dir),
}
client_id = "sample_device"
TIMEOUT = 100
future_connection_success = Future()
def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
print("on_connect_success")
global future_connection_success
future_connection_success.set_result(lifecycle_connect_success_data)
def disp(dt, data):
print("{} {} type:{} len:{}".format(dt, data, type(data), len(data)))
if __name__ == '__main__':
client = mqtt5_client_builder.mtls_from_path(
endpoint = endpoint,
port = port,
cert_filepath = certs["certfile"],
pri_key_filepath = certs["keyfile"],
ca_filepath = certs["cafile"],
on_lifecycle_connection_success = on_lifecycle_connection_success,
client_id = client_id)
client.start()
lifecycle_connect_success_data = future_connection_success.result(TIMEOUT)
connack_packet = lifecycle_connect_success_data.connack_packet
negotiated_settings = lifecycle_connect_success_data.negotiated_settings
for i in range(5):
# ダミーデータの生成
now = datetime.datetime.now()
temperature = random.uniform(0, 30)
humidity = random.uniform(30, 60)
print("date:{} temperature:{} humidity:{}".format(now, temperature, humidity))
# 変換と結合
ts = struct.pack('<d', now.timestamp()) # 8btye
temp = struct.pack('<f', temperature) # 4byte
hum = struct.pack('<f', humidity) # 4byte
transfer_data = ts + temp + hum
print("transfer_data type:{} len:{}".format(type(transfer_data), len(transfer_data)))
# Publish
publish_future = client.publish(mqtt5.PublishPacket(
topic = "sensor/test",
payload = transfer_data
))
publish_completion_data = publish_future.result(TIMEOUT)
time.sleep(0.1)
# Disconnect
client.stop()
(2) Lambda
起動された Lambda で、データを取得しているコードです。
ルールでエンコードされた Base64 をデコードし、パックの逆順でアンパックしています。
import json
import base64
import datetime
import struct
def lambda_handler(event, context):
print("event:{}".format(event))
# Base64のデコード
transfer_data = base64.b64decode(event["base64"].encode())
print("transfer_data type:{} len:{}".format(type(transfer_data), len(transfer_data)))
# データの分離
ts = transfer_data[:8] # 0〜7(8byte)
temp = transfer_data[8:12] # 8〜11(4byte)
hum = transfer_data[12:16] # 12〜15(4byte)
# 変換
now = datetime.datetime.fromtimestamp(struct.unpack('<d', ts)[0])
temperature = struct.unpack('<f', temp)[0]
humidity = struct.unpack('<f', hum)[0]
# 表示
payload = {
"datetime": now,
"temperature": temperature,
"humidity": humidity
}
print("payload:{}".format(payload))
(3) 結果
送信用のプログラムを実行している様子です。
ランダムに生成された、「温度」と「湿度」が、「タイムスタンプ」と共に表示されています。
% python3 index.py
on_connect_success
date:2022-12-28 09:03:45.927539 temperature:8.13476276397705 humidity:53.37971496582031
transfer_data type:<class 'bytes'> len:16
date:2022-12-28 09:03:46.042790 temperature:11.671396255493164 humidity:31.489978790283203
transfer_data type:<class 'bytes'> len:16
date:2022-12-28 09:03:46.143824 temperature:9.486563682556152 humidity:38.76728820800781
transfer_data type:<class 'bytes'> len:16
date:2022-12-28 09:03:46.244779 temperature:8.634918212890625 humidity:46.43960189819336
transfer_data type:<class 'bytes'> len:16
date:2022-12-28 09:03:46.350665 temperature:13.452837944030762 humidity:46.664886474609375
transfer_data type:<class 'bytes'> len:16
MQTT テストクライアントでは、以下のように見えています。
最後に、Lambda の実行ログです。
「タイムスタンプ」「温度」「湿度」が、再現されていることを確認できます。
payload:{'datetime': datetime.datetime(2022, 12, 28, 0, 3, 45, 927539), 'temperature': 8.13476276397705, 'humidity': 53.37971496582031}
payload:{'datetime': datetime.datetime(2022, 12, 28, 0, 3, 46, 42790), 'temperature': 11.671396255493164, 'humidity': 31.489978790283203}
payload:{'datetime': datetime.datetime(2022, 12, 28, 0, 3, 46, 143824), 'temperature': 9.486563682556152, 'humidity': 38.76728820800781}
payload:{'datetime': datetime.datetime(2022, 12, 28, 0, 3, 46, 244779), 'temperature': 8.634918212890625, 'humidity': 46.43960189819336}
payload:{'datetime': datetime.datetime(2022, 12, 28, 0, 3, 46, 350665), 'temperature': 13.452837944030762, 'humidity': 46.664886474609375}
5 最後に
今回は、固定長バイナリで表現された Payload を、ルールでエンコードして、Lambda デコードする要領を試してみました。
固定長バイナリで送信されるデータも、色々と対応可能です。
例)
また、ルールでは、aws_lambda(functionArn, inputJson)で、Lambda をコールすることも可能ですので、ルールで JSON まで変換してしまって、アクションで直接 DynamoDB に保存するというような実装も可能でしょう。