AWS SDKでIoT Coreに Publish() したデータを MQTT5でSubscribeする

できました。
2023.08.15

AWS SDKでIoT Coreに Publish() したデータがあるとします。このデータをMQTTでSubscribeできるのか気になったので、試してみました。

概要図(AWS SDKでpublishして、MQTTでsubscribeする)

おすすめの方

  • AWS SDK(boto3)でIoT CoreにPublish()したい方
  • aws-iot-device-sdk-python-v2でMQTT5を利用したい方
  • AWS IoT Coreでデバイス作成の参考を探している方

まずは、Subscribe側を準備する

ポリシーを作成する

IoT Thing用のポリシーを作成します。

IoTポリシーを作成する

3つの権限を付与します。

  • iot:Connect
  • iot:Subscribe
  • iot:Receive

実験のためポリシーリソースは*にします。実際には指定することを強く推奨します。

IoTポリシーに3つの権限を付与する

IoT Thingを作成する

IoT Thingを作成します。

モノを作成する

証明書は、自動生成します。

デバイス証明書を作成する

さきほど作成したポリシーを選択します。

デバイス証明書にポリシーをアタッチする

作成後、「デバイス証明書」と「キーファイル」と「ルートCA証明書(Amazon ルート CA 1)」をダウンロードします。

デバイス証明書などをダウンロードする

IoT Coreエンドポイントを把握する

aws iot describe-endpoint \
    --endpoint-type iot:Data-ATS

Subscribeするスクリプトを作成する

事前にAWSIoTPythonSDKを導入します。

pip install awsiotsdk

コードは下記です。ほぼ公式サンプルのままです。

subscribe.py

from awsiot import mqtt5_client_builder

from awscrt import mqtt5
from time import sleep

IOT_CORE_ENDPOINT = "xxx-ats.iot.ap-northeast-1.amazonaws.com"
PORT = 8883
TOPIC_NAME = "any-topic-name"
QOS = 1

TIMEOUT = 5

ROOT_CA_FILE = "AmazonRootCA1.pem"
PRIVATE_KEY_FILE = "xxx-private.pem.key"
CERTIFICATE_FILE = "xxx-certificate.pem.crt"


def main():
    # https://github.com/aws/aws-iot-device-sdk-python-v2
    # https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/samples/pubsub.py
    client = mqtt5_client_builder.mtls_from_path(
        endpoint=IOT_CORE_ENDPOINT,
        port=PORT,
        cert_filepath=CERTIFICATE_FILE,
        pri_key_filepath=PRIVATE_KEY_FILE,
        ca_filepath=ROOT_CA_FILE,
        on_publish_received=on_publish_received,
        on_lifecycle_stopped=on_lifecycle_stopped,
        on_lifecycle_connection_success=on_lifecycle_connection_success,
        on_lifecycle_connection_failure=on_lifecycle_connection_failure,
        client_id="any-client-id",
    )
    print("MQTT5 Client Created")

    client.start()

    subscribe_future = client.subscribe(
        subscribe_packet=mqtt5.SubscribePacket(
            subscriptions=[mqtt5.Subscription(topic_filter=TOPIC_NAME, qos=1)]
        )
    )
    suback = subscribe_future.result(TIMEOUT)
    print("Subscribed with {}".format(suback.reason_codes))

    while True:
        # finish: Control + C
        sleep(1)


def on_publish_received(publish_packet_data):
    publish_packet = publish_packet_data.publish_packet
    assert isinstance(publish_packet, mqtt5.PublishPacket)
    print(
        "Received message from topic'{}':{}".format(
            publish_packet.topic, publish_packet.payload
        )
    )


def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
    print("Lifecycle Stopped")


def on_lifecycle_connection_success(
    lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData,
):
    print("Lifecycle Connection Success")


def on_lifecycle_connection_failure(
    lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData,
):
    print("Lifecycle Connection Failure")
    print(
        "Connection failed with exception:{}".format(
            lifecycle_connection_failure.exception
        )
    )


if __name__ == "__main__":
    main()

実行する

さきほどダウンロードした「デバイス証明書」と「キーファイル(プライベートキーファイル)」と「ルートCA証明書(Amazon ルート CA 1)」をスクリプトと同じ場所に格納したあと、実行します。

python subscribe.py

次に、publish()側を実行する

publish()するスクリプトを作成する

publish.py

import boto3

client = boto3.client("iot-data")

TOPIC_NAME = "any-topic-name"


def main():
    response = client.publish(
        topic=TOPIC_NAME,
        qos=1,
        payload=b"This is a pen.",
    )


if __name__ == "__main__":
    main()

実行する

python publish.py

結果

subscribe側にデータが来ました。バッチリですね。

MQTT5 Client Created
Lifecycle Connection Success
Subscribed with [<SubackReasonCode.GRANTED_QOS_1: 1>]
Received message from topic'any-topic-name':b'This is a pen.'

さいごに

掲題について試してみました。参考になれば幸いです。

参考