1 はじめに
CX 事業本部のデリバリー部の平内(SIN)です。
re:Invent 2022 で AWS IoT Core の MQTT v5 対応が発表された。
AWS IoT announces general availability for version 5 of MQTT message broker (MQTT5)
発表当初、AWS SDK は、MQTT v5 未対応で、公式の Blog でも、paho-mqtt を使用した例が紹介されていました。
Introducing new MQTTv5 features for AWS IoT Core to help build flexible architecture patterns
しかし、先ほど、何気なく確認すると、AWS IoT Device SDK v2 for Python で、開発者プレビューですが、MQTT v 5対応がリリースされていました。
https://github.com/aws/aws-iot-device-sdk-python-v2/releases
今回は、簡単ですが、Python SDK での MQTT v5 接続を試してみました。
2 AWS IoT Device SDK v2 for Python
MQTT v5 に関するドキュメントは、以下です。
https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/documents/MQTT5.md
まだ、サンプルコードを見ると、MQTT v5 での実装方法が、一発で把握できます。
https://github.com/aws/aws-iot-device-sdk-python-v2/tree/main/samples
3 最小サンプル
aws-iot-device-sdk-python-v2/samples/mqtt5_pubsub.py を参考にさせて頂いて、X509 証明書による TLS 接続のみの最小のサンプルを作成してみました。
最初に、awsiotsdkをアップデートしましたが、1.12.0となっていました。
% pip3 install -U awsiotsdk
% pip3 freeze | grep aws
awscrt==0.16.0
awsiotsdk==1.12.0
コードです。 接続後、1 回メッセージを送受信し、3 秒後に終了します。
import os
import time
import threading
from concurrent.futures import Future
from awsiot import mqtt5_client_builder
from awscrt import mqtt5
endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
port = 8883
dir = os.path.dirname(os.path.abspath(__file__))
certs = {
"cafile": "{}/certificates/AmazonRootCA1.pem".format(dir),
"certfile": "{}/certificates/client-cert.pem".format(dir),
"keyfile": "{}/certificates/private-key.pem".format(dir),
}
client_id = "client_id"
topic = "sensor/device01"
payload = "message"
TIMEOUT = 100
received_all_event = threading.Event()
future_stopped = Future()
future_connection_success = Future()
def on_publish_received(publish_packet_data):
publish_packet = publish_packet_data.publish_packet
print("on_publish_received topic:{} payload:{}".format(publish_packet.topic, publish_packet.payload))
def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
print("on_lifecycle_stopped")
global future_stopped
future_stopped.set_result(lifecycle_stopped_data)
def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
print("on_lifecycle_connection_success")
global future_connection_success
future_connection_success.set_result(lifecycle_connect_success_data)
def on_lifecycle_connection_failure(lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData):
print("Lifecycle Connection Failure")
print("Connection failed with exception:{}".format(lifecycle_connection_failure.exception))
if __name__ == '__main__':
client = mqtt5_client_builder.mtls_from_path(
endpoint=endpoint,
port=port,
cert_filepath=certs["certfile"],
pri_key_filepath=certs["keyfile"],
ca_filepath=certs["cafile"],
on_publish_received=on_publish_received,
on_lifecycle_stopped=on_lifecycle_stopped,
on_lifecycle_connection_success=on_lifecycle_connection_success,
on_lifecycle_connection_failure=on_lifecycle_connection_failure,
client_id=client_id)
client.start()
lifecycle_connect_success_data = future_connection_success.result(TIMEOUT)
connack_packet = lifecycle_connect_success_data.connack_packet
negotiated_settings = lifecycle_connect_success_data.negotiated_settings
# Subscribe
print("Subscribing to topic '{}'...".format(topic))
subscribe_future = client.subscribe(subscribe_packet=mqtt5.SubscribePacket(
subscriptions=[mqtt5.Subscription(
topic_filter=topic,
qos=mqtt5.QoS.AT_LEAST_ONCE)]
))
suback = subscribe_future.result(TIMEOUT)
print("Subscribed with {}".format(suback.reason_codes))
print("Publishing message to topic '{}': {}".format(topic,payload))
publish_future = client.publish(mqtt5.PublishPacket(
topic=topic,
payload=payload,
qos=mqtt5.QoS.AT_LEAST_ONCE
))
publish_completion_data = publish_future.result(TIMEOUT)
print("PubAck received with {}".format(repr(publish_completion_data.puback.reason_code)))
# wait 3 sec
print("wait 3 sec")
time.sleep(3)
# Unsubscribe
print("Unsubscribing from topic '{}'".format(topic))
unsubscribe_future = client.unsubscribe(unsubscribe_packet=mqtt5.UnsubscribePacket(
topic_filters=[topic]))
unsuback = unsubscribe_future.result(TIMEOUT)
print("Unsubscribed with {}".format(unsuback.reason_codes))
print("Stopping Client")
client.stop()
future_stopped.result(TIMEOUT)
print("Client Stopped!")
実行結果は、以下のようになります。
% python3 index.py
on_lifecycle_connection_success
Subscribing to topic 'sensor/device01'...
Subscribed with [<SubackReasonCode.GRANTED_QOS_1: 1>]
Publishing message to topic 'sensor/device01': message
PubAck received with <PubackReasonCode.SUCCESS: 0>
wait 3 sec
on_publish_received topic:sensor/device01 payload:b'message'
Unsubscribing from topic 'sensor/device01'
Unsubscribed with [<UnsubackReasonCode.SUCCESS: 0>]
Stopping Client
on_lifecycle_stopped
Client Stopped!
4 最後に
今回は、簡単ですが、Python SDK での MQTT v5 接続を試してみました。
SDK も対応して、いよいよ、MQTT v5 使っていこう!ってところですが・・・まだ、開発者プレビューであることにご注意ください。
5 参考リンク
[AWS IoT Core] MQTT v5 を使用してリクエスト・レスポンス パターンを実装して見ました
[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました
[AWS IoT Core] MQTT v5 を使用してトピック・エイリアスを実装して見ました
[AWS IoT Core] MQTT v5 を使用してメッセージ及び、セッション有効期限とクリーンスタートを実装して見ました
[AWS IoT Core] MQTT v5 を使用してレスポンスコードの確認を実装して見ました
[AWS IoT Core] MQTT v5 を使用してフォーマット識別要素で判別する Payload のパースを実装して見ました
[AWS IoT Core] MQTT v5 に対応した 「MQTT テストクライアント」の動作を確認して見ました
[AWS IoT Core] MQTT v5 で追加されたユーザープロパティを ルール で取得して Lambda で使用してみました