[AWS IoT Core] AWS IoT Device SDK v2 for Python を使用して MQTT v5 で新たに拡張されたプロパティを送受信してみました

2022.12.06

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

re:Invent 2022 で AWS IoT Core の MQTT v5 対応が発表され、引き続き、Python の SDK でも「MQTT v 5対応」がリリースされました。
https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/documents/MQTT5.md

今回は、AWS IoT Device SDK v2 for Python を使用して、 MQTT v5 で新たに拡張されたプロパティを Publish/Subscribe してみました。

注:2022/12/6 現在  AWS IoT Device SDK v2 for Python は、Developer Preview となっています。

2 mqtt5.PublishPacket

Python SDK での publish は、以下のような形で実装されますが、ここでは、送信内容を表現するオブジェクトとして、mqtt5.PublishPacket クラスが使用されています。

publish_future = client.publish(mqtt5.PublishPacket(
        topic=topic,
        payload=payload,
        qos=mqtt5.QoS.AT_LEAST_ONCE
    ))

SDK のawscrt/mqtt5.pyを確認すると、PublishPacketクラスは、以下のようになっていました。 拡張プロパティには、それぞれ名前が付与され、クラス変数として定義されています。

class PublishPacket:
    """Data model of an `MQTT5 PUBLISH `_ packet

    Args:
        payload (Any): The payload of the publish message.
        qos (QoS): The MQTT quality of service associated with this PUBLISH packet.
        retain (bool): True if this is a retained message, false otherwise.
        topic (str): The topic associated with this PUBLISH packet.
        payload_format_indicator (PayloadFormatIndicator): Property specifying the format of the payload data. The mqtt5 client does not enforce or use this value in a meaningful way.
        message_expiry_interval_sec (int): Sent publishes - indicates the maximum amount of time allowed to elapse for message delivery before the server should instead delete the message (relative to a recipient). Received publishes - indicates the remaining amount of time (from the server's perspective) before the message would have been deleted relative to the subscribing client. If left None, indicates no expiration timeout.
        topic_alias (int): An integer value that is used to identify the Topic instead of using the Topic Name.
        response_topic (str): Opaque topic string intended to assist with request/response implementations.  Not internally meaningful to MQTT5 or this client.
        correlation_data (Any): Opaque binary data used to correlate between publish messages, as a potential method for request-response implementation.  Not internally meaningful to MQTT5.
        subscription_identifiers (Sequence[int]): The subscription identifiers of all the subscriptions this message matched.
        content_type (str): Property specifying the content type of the payload.  Not internally meaningful to MQTT5.
        user_properties (Sequence[UserProperty]): List of MQTT5 user properties included with the packet.
    """
    payload: Any = ""  # Unicode objects are converted to C strings using 'utf-8' encoding
    qos: QoS = QoS.AT_MOST_ONCE
    retain: bool = False
    topic: str = ""
    payload_format_indicator: PayloadFormatIndicator = None
    message_expiry_interval_sec: int = None
    topic_alias: int = None
    response_topic: str = None
    correlation_data: Any = None   # Unicode objects are converted to C strings using 'utf-8' encoding
    subscription_identifiers: 'Sequence[int]' = None  # ignore attempts to set but provide in received packets
    content_type: str = None
    user_properties: 'Sequence[UserProperty]' = None

また、user_propertiesは、UserPropertyクラスの配列となっています。

class UserProperty:
    """MQTT5 User Property

    Args:
        name (str): Property name
        value (str): Property value
    """

    name: str = None
    value: str = None

3 コード

サンプルとして書いてみたコードです。

接続後に、各種のプロパティを設定したメッセージを1個送信し、同時に Subscribe で受け取ってデコードして表示しています。

import os
import time
import json
from concurrent.futures import Future
from awsiot import mqtt5_client_builder
from awscrt import mqtt5

endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
port = 8883

dir = os.path.dirname(os.path.abspath(__file__))
certs = {
    "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir),
    "certfile": "{}/certificates/client-cert.pem".format(dir),
    "keyfile": "{}/certificates/private-key.pem".format(dir),
}
client_id = "client_id"
topic = "sensor/device01"
payload = "message"
TIMEOUT = 100

future_connection_success = Future()

def on_publish_received(publish_packet_data):
    publish_packet = publish_packet_data.publish_packet
    print("on_message topic:{} payload:{}".format(publish_packet.topic, publish_packet.payload))
    #print("{}".format(publish_packet_data)) DEBUG用
    print("response_topic:{}".format(publish_packet.response_topic))
    print("content_type:{}".format(publish_packet.content_type))
    print("message_expiry_interval_sec:{}".format(publish_packet.message_expiry_interval_sec))
    print("correlation_data:{}".format(publish_packet.correlation_data))
    print("payload_format_indicator:{}".format(publish_packet.payload_format_indicator))
    print("user_properties")
    for  user_property in publish_packet.user_properties:
         print("    name:{} value:{}".format(user_property.name, user_property.value))

def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
    print("on_connect_success")
    global future_connection_success
    future_connection_success.set_result(lifecycle_connect_success_data)

if __name__ == '__main__':

    client = mqtt5_client_builder.mtls_from_path(
        endpoint=endpoint,
        port=port,
        cert_filepath=certs["certfile"],
        pri_key_filepath=certs["keyfile"],
        ca_filepath=certs["cafile"],
        on_publish_received=on_publish_received,
        on_lifecycle_connection_success=on_lifecycle_connection_success,
        client_id=client_id)

    client.start()
    lifecycle_connect_success_data = future_connection_success.result(TIMEOUT)
    connack_packet = lifecycle_connect_success_data.connack_packet
    negotiated_settings = lifecycle_connect_success_data.negotiated_settings

    # Subscribe
    subscribe_future = client.subscribe(subscribe_packet=mqtt5.SubscribePacket(
        subscriptions=[
            mqtt5.Subscription(
                topic_filter=topic,
                qos=mqtt5.QoS.AT_LEAST_ONCE)
            ]
    ))
    subscribe_future.result(TIMEOUT)

    # Publish
    publish_future = client.publish(mqtt5.PublishPacket(
        topic=topic,
        payload=payload,
        user_properties=[
            mqtt5.UserProperty(name="key1",value="value1"),
            mqtt5.UserProperty(name="key2",value="value2"),
            mqtt5.UserProperty(name="key3",value="value3")
        ],
        qos=mqtt5.QoS.AT_LEAST_ONCE,
        response_topic="response/device01",
        content_type="test/plain",
        message_expiry_interval_sec=120,
        correlation_data =  json.dumps({"seq":100}).encode('utf-8'),
        payload_format_indicator = 1
    ))
    publish_completion_data = publish_future.result(TIMEOUT)
    time.sleep(1)

    # Disconnect
    client.stop()

4 実行結果

実行すると、以下のようになります。

% python3 index.py
on_connect_success
on_message topic:sensor/device01 payload:b'message'
response_topic:response/device01
content_type:test/plain
message_expiry_interval_sec:119
correlation_data:{"seq": 100}
payload_format_indicator:1
user_properties
    name:key1 value:value1
    name:key2 value:value2
    name:key3 value:value3

このメッセージを MQTT テストクライアントでサブスクライブした場合も、Properties で内容を確認できます。

5 最後に

今回は、AWS IoT Device SDK v2 for Python を使用して、 MQTT v5 で新たに拡張されたプロパティを Publish/Subscribe してみました。

昨日まで使ってきた、phao.mqttとは、ちょっと違った形になりますが、クラス定義が分かりやすいと感じました。 ドキュメントは、まだ、現時点でそこまで詳しくないですが、コードを見れば特に問題なく実装できそうです。

6 参考リンク


[AWS IoT Core] MQTT v5 を使用してリクエスト・レスポンス パターンを実装して見ました
[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました
[AWS IoT Core] MQTT v5 を使用してトピック・エイリアスを実装して見ました
[AWS IoT Core] MQTT v5 を使用してメッセージ及び、セッション有効期限とクリーンスタートを実装して見ました
[AWS IoT Core] MQTT v5 を使用してレスポンスコードの確認を実装して見ました
[AWS IoT Core] MQTT v5 を使用してフォーマット識別要素で判別する Payload のパースを実装して見ました
[AWS IoT Core] MQTT v5 に対応した 「MQTT テストクライアント」の動作を確認して見ました
[AWS IoT Core] MQTT v5 で追加されたユーザープロパティを ルール で取得して Lambda で使用してみました
[AWS IoT Core] AWS IoT Device SDK v2 for Python で MQTT5 のサポートが始まりました (Developer Preview)
[AWS IoT Core] MQTT v5 で新たに追加されたプロパティ値を Republish で追加してみました