この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
CX 事業本部のデリバリー部の平内(SIN)です。
MQTT v5 では、拡張されたプロパティの1つとして「ユーザープロパティ」が定義されており、UTF-8 キーと値のペアで自由に定義することができます。
今回は、AWS IoT Core のルールで実行される Lambda に、このプロパティの内容を渡す要領を確認してみました。
使用したのは、ルールのファンクションとして新設された、get_user_properties(userPropertyKey) です。
https://docs.aws.amazon.com/iot/latest/developerguide/iot-sql-functions.html#iot-sql-function-get-user-properties
ルールの SQL ステートメントで、上記のファンクションを使用して、アクションに Lambda を設定しています。
なお、現時点(2022/12/03)では、AWS で提供される SDK は、MQTT v5 に対応していないため、サンプル作成には、paho.mqtt を使用させて頂きました。
参考:https://dev.classmethod.jp/articles/aws-iot-core-mqtt-v5-sdk-for-python
2 サンプルコード
下記は、動作確認用に作成したクライアントのサンプルコードです。
接続後、ユーザープロパティ「[("key1", "value1"),("key2", "value2"),("key3", "value3")]」を設定したメッセージを1回 Publish するだけのものです。
import time
import os
import json
import ssl
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
port = 8883
dir = os.path.dirname(os.path.abspath(__file__))
certs = {
"cafile": "{}/certificates/AmazonRootCA1.pem".format(dir),
"certfile": "{}/certificates/client-cert.pem".format(dir),
"keyfile": "{}/certificates/private-key.pem".format(dir),
}
def main():
topic = "sensor/device01"
client = mqtt.Client("client_id", protocol = mqtt.MQTTv5)
client.tls_set(certs["cafile"],
certfile=certs["certfile"],
keyfile=certs["keyfile"],
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2,
ciphers=None)
client.connect(endpoint, port, properties = None)
client.loop_start()
time.sleep(1)
payload = json.dumps({"message":"hello"})
properties = Properties(PacketTypes.PUBLISH)
properties.UserProperty = [("key1", "value1"),("key2", "value2"),("key3", "value3")]
client.publish(topic, payload ,properties = properties)
time.sleep(1)
if __name__ == "__main__":
main()
※ 現時点(2022/12/04)では、AWS で提供される SDK は、MQTT v5 に対応していないため、サンプル作成には、paho.mqtt を使用させて頂きました。
送信すると、MQTT テストクライアントでメッセージが到着していることを確認できます。
3 Rules
続いてルールの設定です。
get_user_properties(userPropertyKey) を使用した SQL ステートメントは、以下のようになっています。
SELECT *, get_user_properties() as userProperty FROM 'sensor/#'
アクションには、Lambda 関数を設定しています。
4 Lambda
ルール のアクションに指定された Lambda のコードです。
最初に event を print して、Lambda 実行時に渡されたパラメータを表示しています。 また、ルールでuserPropertyに設定したユーザープロパティの内容を、パースして表示しています。
import json
def lambda_handler(event, context):
print("event: {}".format(event))
for property in event["userProperty"]:
for key in property.keys():
value = property[key]
print("{}:{}".format(key, value))
メッセージが到着して実行された Lambda のログです。
event: {'message': 'hello', 'userProperty': [{'key1': 'value1'}, {'key2': 'value2'}, {'key3': 'value3'}]}
key1:value1
key2:value2
key3:value3
クライアントが送ったユーザープロパティの内容が利用できていることを確認できます。
5 get_user_properties()
get_user_properties()については、以下にドキュメントがあります。
ドキュメントに記載されているとおり、get_user_properties の引数にキーを指定することで、一部のプロパティ値を取り出すことも可能です。
SELECT *, get_user_properties('key1') as userProperty FROM 'sensor/#'
event: {'message': 'hello', 'userProperty': ['value1']}
6 最後に
ユーザープロパティを使用すると、既存の Payload に影響を及ぼすことなく、キーペアが自由に定義できます。
今回試したように、その内容を、後段の Lambda 等 に引き継げるとなると、応用範囲も少し広がってくるかも知れません。
なお、AWS のブログでは、get_user_property() でルール上でエンコードしている例が紹介されていました。
7 参考リンク
[AWS IoT Core] MQTT v5 を使用してリクエスト・レスポンス パターンを実装して見ました
[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました
[AWS IoT Core] MQTT v5 を使用してトピック・エイリアスを実装して見ました
[AWS IoT Core] MQTT v5 を使用してメッセージ及び、セッション有効期限とクリーンスタートを実装して見ました
[AWS IoT Core] MQTT v5 を使用してレスポンスコードの確認を実装して見ました
[AWS IoT Core] MQTT v5 を使用してフォーマット識別要素で判別する Payload のパースを実装して見ました
[AWS IoT Core] MQTT v5 に対応した 「MQTT テストクライアント」の動作を確認して見ました