
AWS SDKでIoT Coreに Publish() したデータを MQTT5でSubscribeする
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
AWS SDKでIoT Coreに Publish() したデータがあるとします。このデータをMQTTでSubscribeできるのか気になったので、試してみました。
おすすめの方
- AWS SDK(boto3)でIoT CoreにPublish()したい方
- aws-iot-device-sdk-python-v2でMQTT5を利用したい方
- AWS IoT Coreでデバイス作成の参考を探している方
まずは、Subscribe側を準備する
ポリシーを作成する
IoT Thing用のポリシーを作成します。
3つの権限を付与します。
- iot:Connect
- iot:Subscribe
- iot:Receive
実験のためポリシーリソースは*にします。実際には指定することを強く推奨します。
IoT Thingを作成する
IoT Thingを作成します。
証明書は、自動生成します。
さきほど作成したポリシーを選択します。
作成後、「デバイス証明書」と「キーファイル」と「ルートCA証明書(Amazon ルート CA 1)」をダウンロードします。
IoT Coreエンドポイントを把握する
aws iot describe-endpoint \
    --endpoint-type iot:Data-ATS
Subscribeするスクリプトを作成する
事前にAWSIoTPythonSDKを導入します。
pip install awsiotsdk
コードは下記です。ほぼ公式サンプルのままです。
from awsiot import mqtt5_client_builder
from awscrt import mqtt5
from time import sleep
IOT_CORE_ENDPOINT = "xxx-ats.iot.ap-northeast-1.amazonaws.com"
PORT = 8883
TOPIC_NAME = "any-topic-name"
QOS = 1
TIMEOUT = 5
ROOT_CA_FILE = "AmazonRootCA1.pem"
PRIVATE_KEY_FILE = "xxx-private.pem.key"
CERTIFICATE_FILE = "xxx-certificate.pem.crt"
def main():
    # https://github.com/aws/aws-iot-device-sdk-python-v2
    # https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/samples/pubsub.py
    client = mqtt5_client_builder.mtls_from_path(
        endpoint=IOT_CORE_ENDPOINT,
        port=PORT,
        cert_filepath=CERTIFICATE_FILE,
        pri_key_filepath=PRIVATE_KEY_FILE,
        ca_filepath=ROOT_CA_FILE,
        on_publish_received=on_publish_received,
        on_lifecycle_stopped=on_lifecycle_stopped,
        on_lifecycle_connection_success=on_lifecycle_connection_success,
        on_lifecycle_connection_failure=on_lifecycle_connection_failure,
        client_id="any-client-id",
    )
    print("MQTT5 Client Created")
    client.start()
    subscribe_future = client.subscribe(
        subscribe_packet=mqtt5.SubscribePacket(
            subscriptions=[mqtt5.Subscription(topic_filter=TOPIC_NAME, qos=1)]
        )
    )
    suback = subscribe_future.result(TIMEOUT)
    print("Subscribed with {}".format(suback.reason_codes))
    while True:
        # finish: Control + C
        sleep(1)
def on_publish_received(publish_packet_data):
    publish_packet = publish_packet_data.publish_packet
    assert isinstance(publish_packet, mqtt5.PublishPacket)
    print(
        "Received message from topic'{}':{}".format(
            publish_packet.topic, publish_packet.payload
        )
    )
def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
    print("Lifecycle Stopped")
def on_lifecycle_connection_success(
    lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData,
):
    print("Lifecycle Connection Success")
def on_lifecycle_connection_failure(
    lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData,
):
    print("Lifecycle Connection Failure")
    print(
        "Connection failed with exception:{}".format(
            lifecycle_connection_failure.exception
        )
    )
if __name__ == "__main__":
    main()
実行する
さきほどダウンロードした「デバイス証明書」と「キーファイル(プライベートキーファイル)」と「ルートCA証明書(Amazon ルート CA 1)」をスクリプトと同じ場所に格納したあと、実行します。
python subscribe.py
次に、publish()側を実行する
publish()するスクリプトを作成する
import boto3
client = boto3.client("iot-data")
TOPIC_NAME = "any-topic-name"
def main():
    response = client.publish(
        topic=TOPIC_NAME,
        qos=1,
        payload=b"This is a pen.",
    )
if __name__ == "__main__":
    main()
実行する
python publish.py
結果
subscribe側にデータが来ました。バッチリですね。
MQTT5 Client Created Lifecycle Connection Success Subscribed with [<SubackReasonCode.GRANTED_QOS_1: 1>] Received message from topic'any-topic-name':b'This is a pen.'
さいごに
掲題について試してみました。参考になれば幸いです。


















