[AWS IoT Core] MQTT v5 で追加されたユーザープロパティを ルール で取得して Lambda で使用してみました

2022.12.04

1 はじめに

CX 事業本部のデリバリー部の平内(SIN)です。

MQTT v5 では、拡張されたプロパティの1つとして「ユーザープロパティ」が定義されており、UTF-8 キーと値のペアで自由に定義することができます。

今回は、AWS IoT Core のルールで実行される Lambda に、このプロパティの内容を渡す要領を確認してみました。

使用したのは、ルールのファンクションとして新設された、get_user_properties(userPropertyKey) です。
https://docs.aws.amazon.com/iot/latest/developerguide/iot-sql-functions.html#iot-sql-function-get-user-properties

ルールの SQL ステートメントで、上記のファンクションを使用して、アクションに Lambda を設定しています。

なお、現時点(2022/12/03)では、AWS で提供される SDK は、MQTT v5 に対応していないため、サンプル作成には、paho.mqtt を使用させて頂きました。

2022/12/05 追記 Python SDK が開発者プレビューで MQTT v5 対応となりました。
参考:https://dev.classmethod.jp/articles/aws-iot-core-mqtt-v5-sdk-for-python

2 サンプルコード

下記は、動作確認用に作成したクライアントのサンプルコードです。

接続後、ユーザープロパティ「[("key1", "value1"),("key2", "value2"),("key3", "value3")]」を設定したメッセージを1回 Publish するだけのものです。

import time
import os
import json
import ssl
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes

endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
port = 8883

dir = os.path.dirname(os.path.abspath(__file__))
certs = {
    "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir),
    "certfile": "{}/certificates/client-cert.pem".format(dir),
    "keyfile": "{}/certificates/private-key.pem".format(dir),
}

def main():

    topic = "sensor/device01"

    client = mqtt.Client("client_id", protocol = mqtt.MQTTv5)
    client.tls_set(certs["cafile"],
            certfile=certs["certfile"],
            keyfile=certs["keyfile"],
            cert_reqs=ssl.CERT_REQUIRED,
            tls_version=ssl.PROTOCOL_TLSv1_2,
            ciphers=None)

    client.connect(endpoint, port, properties = None)
    client.loop_start()

    time.sleep(1)

    payload = json.dumps({"message":"hello"})

    properties = Properties(PacketTypes.PUBLISH)
    properties.UserProperty = [("key1", "value1"),("key2", "value2"),("key3", "value3")]
    client.publish(topic, payload ,properties = properties)

    time.sleep(1)

if __name__ == "__main__":
    main()

※ 現時点(2022/12/04)では、AWS で提供される SDK は、MQTT v5 に対応していないため、サンプル作成には、paho.mqtt を使用させて頂きました。

送信すると、MQTT テストクライアントでメッセージが到着していることを確認できます。

3 Rules

続いてルールの設定です。

get_user_properties(userPropertyKey) を使用した SQL ステートメントは、以下のようになっています。

SELECT *, get_user_properties() as userProperty FROM 'sensor/#'

アクションには、Lambda 関数を設定しています。

4 Lambda

ルール のアクションに指定された Lambda のコードです。

最初に event を print して、Lambda 実行時に渡されたパラメータを表示しています。 また、ルールでuserPropertyに設定したユーザープロパティの内容を、パースして表示しています。

import json

def lambda_handler(event, context):
    print("event: {}".format(event))
    for property in event["userProperty"]:
        for key in property.keys():
            value = property[key]
            print("{}:{}".format(key, value))

メッセージが到着して実行された Lambda のログです。

event: {'message': 'hello', 'userProperty': [{'key1': 'value1'}, {'key2': 'value2'}, {'key3': 'value3'}]}
key1:value1
key2:value2
key3:value3

クライアントが送ったユーザープロパティの内容が利用できていることを確認できます。

5 get_user_properties()

get_user_properties()については、以下にドキュメントがあります。


https://docs.aws.amazon.com/iot/latest/developerguide/iot-sql-functions.html#iot-sql-function-get-user-properties

ドキュメントに記載されているとおり、get_user_properties の引数にキーを指定することで、一部のプロパティ値を取り出すことも可能です。

SELECT *, get_user_properties('key1') as userProperty FROM 'sensor/#'
event: {'message': 'hello', 'userProperty': ['value1']}

6 最後に

ユーザープロパティを使用すると、既存の Payload に影響を及ぼすことなく、キーペアが自由に定義できます。

今回試したように、その内容を、後段の Lambda 等 に引き継げるとなると、応用範囲も少し広がってくるかも知れません。

なお、AWS のブログでは、get_user_property() でルール上でエンコードしている例が紹介されていました。

https://aws.amazon.com/jp/blogs/iot/introducing-new-mqttv5-features-for-aws-iot-core-to-help-build-flexible-architecture-patterns/

7 参考リンク


[AWS IoT Core] MQTT v5 を使用してリクエスト・レスポンス パターンを実装して見ました
[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました
[AWS IoT Core] MQTT v5 を使用してトピック・エイリアスを実装して見ました
[AWS IoT Core] MQTT v5 を使用してメッセージ及び、セッション有効期限とクリーンスタートを実装して見ました
[AWS IoT Core] MQTT v5 を使用してレスポンスコードの確認を実装して見ました
[AWS IoT Core] MQTT v5 を使用してフォーマット識別要素で判別する Payload のパースを実装して見ました
[AWS IoT Core] MQTT v5 に対応した 「MQTT テストクライアント」の動作を確認して見ました