この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
re:Invent 2022 で AWS IoT Core の MQTT v5 対応が発表され、引き続き、Python の SDK でも「MQTT v 5対応」がリリースされました。
https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/documents/MQTT5.md
今回は、AWS IoT Device SDK v2 for Python を使用して、 MQTT v5 で新たに拡張されたプロパティを Publish/Subscribe してみました。
注:2022/12/6 現在 AWS IoT Device SDK v2 for Python は、Developer Preview となっています。
2 mqtt5.PublishPacket
Python SDK での publish は、以下のような形で実装されますが、ここでは、送信内容を表現するオブジェクトとして、mqtt5.PublishPacket クラスが使用されています。
publish_future = client.publish(mqtt5.PublishPacket(
topic=topic,
payload=payload,
qos=mqtt5.QoS.AT_LEAST_ONCE
))
SDK のawscrt/mqtt5.pyを確認すると、PublishPacketクラスは、以下のようになっていました。 拡張プロパティには、それぞれ名前が付与され、クラス変数として定義されています。
class PublishPacket:
"""Data model of an `MQTT5 PUBLISH `_ packet
Args:
payload (Any): The payload of the publish message.
qos (QoS): The MQTT quality of service associated with this PUBLISH packet.
retain (bool): True if this is a retained message, false otherwise.
topic (str): The topic associated with this PUBLISH packet.
payload_format_indicator (PayloadFormatIndicator): Property specifying the format of the payload data. The mqtt5 client does not enforce or use this value in a meaningful way.
message_expiry_interval_sec (int): Sent publishes - indicates the maximum amount of time allowed to elapse for message delivery before the server should instead delete the message (relative to a recipient). Received publishes - indicates the remaining amount of time (from the server's perspective) before the message would have been deleted relative to the subscribing client. If left None, indicates no expiration timeout.
topic_alias (int): An integer value that is used to identify the Topic instead of using the Topic Name.
response_topic (str): Opaque topic string intended to assist with request/response implementations. Not internally meaningful to MQTT5 or this client.
correlation_data (Any): Opaque binary data used to correlate between publish messages, as a potential method for request-response implementation. Not internally meaningful to MQTT5.
subscription_identifiers (Sequence[int]): The subscription identifiers of all the subscriptions this message matched.
content_type (str): Property specifying the content type of the payload. Not internally meaningful to MQTT5.
user_properties (Sequence[UserProperty]): List of MQTT5 user properties included with the packet.
"""
payload: Any = "" # Unicode objects are converted to C strings using 'utf-8' encoding
qos: QoS = QoS.AT_MOST_ONCE
retain: bool = False
topic: str = ""
payload_format_indicator: PayloadFormatIndicator = None
message_expiry_interval_sec: int = None
topic_alias: int = None
response_topic: str = None
correlation_data: Any = None # Unicode objects are converted to C strings using 'utf-8' encoding
subscription_identifiers: 'Sequence[int]' = None # ignore attempts to set but provide in received packets
content_type: str = None
user_properties: 'Sequence[UserProperty]' = None
また、user_propertiesは、UserPropertyクラスの配列となっています。
class UserProperty:
"""MQTT5 User Property
Args:
name (str): Property name
value (str): Property value
"""
name: str = None
value: str = None
3 コード
サンプルとして書いてみたコードです。
接続後に、各種のプロパティを設定したメッセージを1個送信し、同時に Subscribe で受け取ってデコードして表示しています。
import os
import time
import json
from concurrent.futures import Future
from awsiot import mqtt5_client_builder
from awscrt import mqtt5
endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
port = 8883
dir = os.path.dirname(os.path.abspath(__file__))
certs = {
"cafile": "{}/certificates/AmazonRootCA1.pem".format(dir),
"certfile": "{}/certificates/client-cert.pem".format(dir),
"keyfile": "{}/certificates/private-key.pem".format(dir),
}
client_id = "client_id"
topic = "sensor/device01"
payload = "message"
TIMEOUT = 100
future_connection_success = Future()
def on_publish_received(publish_packet_data):
publish_packet = publish_packet_data.publish_packet
print("on_message topic:{} payload:{}".format(publish_packet.topic, publish_packet.payload))
#print("{}".format(publish_packet_data)) DEBUG用
print("response_topic:{}".format(publish_packet.response_topic))
print("content_type:{}".format(publish_packet.content_type))
print("message_expiry_interval_sec:{}".format(publish_packet.message_expiry_interval_sec))
print("correlation_data:{}".format(publish_packet.correlation_data))
print("payload_format_indicator:{}".format(publish_packet.payload_format_indicator))
print("user_properties")
for user_property in publish_packet.user_properties:
print(" name:{} value:{}".format(user_property.name, user_property.value))
def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
print("on_connect_success")
global future_connection_success
future_connection_success.set_result(lifecycle_connect_success_data)
if __name__ == '__main__':
client = mqtt5_client_builder.mtls_from_path(
endpoint=endpoint,
port=port,
cert_filepath=certs["certfile"],
pri_key_filepath=certs["keyfile"],
ca_filepath=certs["cafile"],
on_publish_received=on_publish_received,
on_lifecycle_connection_success=on_lifecycle_connection_success,
client_id=client_id)
client.start()
lifecycle_connect_success_data = future_connection_success.result(TIMEOUT)
connack_packet = lifecycle_connect_success_data.connack_packet
negotiated_settings = lifecycle_connect_success_data.negotiated_settings
# Subscribe
subscribe_future = client.subscribe(subscribe_packet=mqtt5.SubscribePacket(
subscriptions=[
mqtt5.Subscription(
topic_filter=topic,
qos=mqtt5.QoS.AT_LEAST_ONCE)
]
))
subscribe_future.result(TIMEOUT)
# Publish
publish_future = client.publish(mqtt5.PublishPacket(
topic=topic,
payload=payload,
user_properties=[
mqtt5.UserProperty(name="key1",value="value1"),
mqtt5.UserProperty(name="key2",value="value2"),
mqtt5.UserProperty(name="key3",value="value3")
],
qos=mqtt5.QoS.AT_LEAST_ONCE,
response_topic="response/device01",
content_type="test/plain",
message_expiry_interval_sec=120,
correlation_data = json.dumps({"seq":100}).encode('utf-8'),
payload_format_indicator = 1
))
publish_completion_data = publish_future.result(TIMEOUT)
time.sleep(1)
# Disconnect
client.stop()
4 実行結果
実行すると、以下のようになります。
% python3 index.py
on_connect_success
on_message topic:sensor/device01 payload:b'message'
response_topic:response/device01
content_type:test/plain
message_expiry_interval_sec:119
correlation_data:{"seq": 100}
payload_format_indicator:1
user_properties
name:key1 value:value1
name:key2 value:value2
name:key3 value:value3
このメッセージを MQTT テストクライアントでサブスクライブした場合も、Properties で内容を確認できます。
5 最後に
今回は、AWS IoT Device SDK v2 for Python を使用して、 MQTT v5 で新たに拡張されたプロパティを Publish/Subscribe してみました。
昨日まで使ってきた、phao.mqttとは、ちょっと違った形になりますが、クラス定義が分かりやすいと感じました。 ドキュメントは、まだ、現時点でそこまで詳しくないですが、コードを見れば特に問題なく実装できそうです。
6 参考リンク
[AWS IoT Core] MQTT v5 を使用してリクエスト・レスポンス パターンを実装して見ました
[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました
[AWS IoT Core] MQTT v5 を使用してトピック・エイリアスを実装して見ました
[AWS IoT Core] MQTT v5 を使用してメッセージ及び、セッション有効期限とクリーンスタートを実装して見ました
[AWS IoT Core] MQTT v5 を使用してレスポンスコードの確認を実装して見ました
[AWS IoT Core] MQTT v5 を使用してフォーマット識別要素で判別する Payload のパースを実装して見ました
[AWS IoT Core] MQTT v5 に対応した 「MQTT テストクライアント」の動作を確認して見ました
[AWS IoT Core] MQTT v5 で追加されたユーザープロパティを ルール で取得して Lambda で使用してみました
[AWS IoT Core] AWS IoT Device SDK v2 for Python で MQTT5 のサポートが始まりました (Developer Preview)
[AWS IoT Core] MQTT v5 で新たに追加されたプロパティ値を Republish で追加してみました