[AWS IoT Core] AWS IoT Device SDK v2 for Python でMQTT5 のサポートが始まりました (Developer Preview)

2022.12.05

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX 事業本部のデリバリー部の平内(SIN)です。

re:Invent 2022 で AWS IoT Core の MQTT v5 対応が発表された。

AWS IoT announces general availability for version 5 of MQTT message broker (MQTT5)

発表当初、AWS SDK は、MQTT v5 未対応で、公式の Blog でも、paho-mqtt を使用した例が紹介されていました。

Introducing new MQTTv5 features for AWS IoT Core to help build flexible architecture patterns

しかし、先ほど、何気なく確認すると、AWS IoT Device SDK v2 for Python で、開発者プレビューですが、MQTT v 5対応がリリースされていました。

https://github.com/aws/aws-iot-device-sdk-python-v2/releases

今回は、簡単ですが、Python SDK での MQTT v5 接続を試してみました。

2 AWS IoT Device SDK v2 for Python

MQTT v5 に関するドキュメントは、以下です。

https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/documents/MQTT5.md

まだ、サンプルコードを見ると、MQTT v5 での実装方法が、一発で把握できます。

https://github.com/aws/aws-iot-device-sdk-python-v2/tree/main/samples

3 最小サンプル

aws-iot-device-sdk-python-v2/samples/mqtt5_pubsub.py を参考にさせて頂いて、X509 証明書による TLS 接続のみの最小のサンプルを作成してみました。

最初に、awsiotsdkをアップデートしましたが、1.12.0となっていました。

% pip3 install -U  awsiotsdk
% pip3 freeze | grep aws
awscrt==0.16.0
awsiotsdk==1.12.0

コードです。 接続後、1 回メッセージを送受信し、3 秒後に終了します。

import os
import time
import threading
from concurrent.futures import Future
from awsiot import mqtt5_client_builder
from awscrt import mqtt5

endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
port = 8883

dir = os.path.dirname(os.path.abspath(__file__))
certs = {
    "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir),
    "certfile": "{}/certificates/client-cert.pem".format(dir),
    "keyfile": "{}/certificates/private-key.pem".format(dir),
}
client_id = "client_id"
topic = "sensor/device01"
payload = "message"
TIMEOUT = 100

received_all_event = threading.Event()
future_stopped = Future()
future_connection_success = Future()

def on_publish_received(publish_packet_data):
    publish_packet = publish_packet_data.publish_packet
    print("on_publish_received topic:{} payload:{}".format(publish_packet.topic, publish_packet.payload))

def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
    print("on_lifecycle_stopped")
    global future_stopped
    future_stopped.set_result(lifecycle_stopped_data)

def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
    print("on_lifecycle_connection_success")
    global future_connection_success
    future_connection_success.set_result(lifecycle_connect_success_data)

def on_lifecycle_connection_failure(lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData):
    print("Lifecycle Connection Failure")
    print("Connection failed with exception:{}".format(lifecycle_connection_failure.exception))

if __name__ == '__main__':

    client = mqtt5_client_builder.mtls_from_path(
        endpoint=endpoint,
        port=port,
        cert_filepath=certs["certfile"],
        pri_key_filepath=certs["keyfile"],
        ca_filepath=certs["cafile"],
        on_publish_received=on_publish_received,
        on_lifecycle_stopped=on_lifecycle_stopped,
        on_lifecycle_connection_success=on_lifecycle_connection_success,
        on_lifecycle_connection_failure=on_lifecycle_connection_failure,
        client_id=client_id)

    client.start()
    lifecycle_connect_success_data = future_connection_success.result(TIMEOUT)
    connack_packet = lifecycle_connect_success_data.connack_packet
    negotiated_settings = lifecycle_connect_success_data.negotiated_settings

    # Subscribe
    print("Subscribing to topic '{}'...".format(topic))
    subscribe_future = client.subscribe(subscribe_packet=mqtt5.SubscribePacket(
        subscriptions=[mqtt5.Subscription(
            topic_filter=topic,
            qos=mqtt5.QoS.AT_LEAST_ONCE)]
    ))
    suback = subscribe_future.result(TIMEOUT)
    print("Subscribed with {}".format(suback.reason_codes))

    print("Publishing message to topic '{}': {}".format(topic,payload))
    publish_future = client.publish(mqtt5.PublishPacket(
        topic=topic,
        payload=payload,
        qos=mqtt5.QoS.AT_LEAST_ONCE
    ))

    publish_completion_data = publish_future.result(TIMEOUT)
    print("PubAck received with {}".format(repr(publish_completion_data.puback.reason_code)))

    # wait 3 sec
    print("wait 3 sec")
    time.sleep(3)

    # Unsubscribe
    print("Unsubscribing from topic '{}'".format(topic))
    unsubscribe_future = client.unsubscribe(unsubscribe_packet=mqtt5.UnsubscribePacket(
        topic_filters=[topic]))
    unsuback = unsubscribe_future.result(TIMEOUT)
    print("Unsubscribed with {}".format(unsuback.reason_codes))

    print("Stopping Client")
    client.stop()

    future_stopped.result(TIMEOUT)
    print("Client Stopped!")

実行結果は、以下のようになります。

% python3 index.py
on_lifecycle_connection_success
Subscribing to topic 'sensor/device01'...
Subscribed with [<SubackReasonCode.GRANTED_QOS_1: 1>]
Publishing message to topic 'sensor/device01': message
PubAck received with <PubackReasonCode.SUCCESS: 0>
wait 3 sec
on_publish_received topic:sensor/device01 payload:b'message'
Unsubscribing from topic 'sensor/device01'
Unsubscribed with [<UnsubackReasonCode.SUCCESS: 0>]
Stopping Client
on_lifecycle_stopped
Client Stopped!

4 最後に

今回は、簡単ですが、Python SDK での MQTT v5 接続を試してみました。

SDK も対応して、いよいよ、MQTT v5 使っていこう!ってところですが・・・まだ、開発者プレビューであることにご注意ください。

5 参考リンク


[AWS IoT Core] MQTT v5 を使用してリクエスト・レスポンス パターンを実装して見ました
[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました
[AWS IoT Core] MQTT v5 を使用してトピック・エイリアスを実装して見ました
[AWS IoT Core] MQTT v5 を使用してメッセージ及び、セッション有効期限とクリーンスタートを実装して見ました
[AWS IoT Core] MQTT v5 を使用してレスポンスコードの確認を実装して見ました
[AWS IoT Core] MQTT v5 を使用してフォーマット識別要素で判別する Payload のパースを実装して見ました
[AWS IoT Core] MQTT v5 に対応した 「MQTT テストクライアント」の動作を確認して見ました
[AWS IoT Core] MQTT v5 で追加されたユーザープロパティを ルール で取得して Lambda で使用してみました