AWS SDKでIoT Coreに Publish() したデータがあるとします。このデータをMQTTでSubscribeできるのか気になったので、試してみました。
おすすめの方
- AWS SDK(boto3)でIoT CoreにPublish()したい方
- aws-iot-device-sdk-python-v2でMQTT5を利用したい方
- AWS IoT Coreでデバイス作成の参考を探している方
まずは、Subscribe側を準備する
ポリシーを作成する
IoT Thing用のポリシーを作成します。
3つの権限を付与します。
- iot:Connect
- iot:Subscribe
- iot:Receive
実験のためポリシーリソースは*
にします。実際には指定することを強く推奨します。
IoT Thingを作成する
IoT Thingを作成します。
証明書は、自動生成します。
さきほど作成したポリシーを選択します。
作成後、「デバイス証明書」と「キーファイル」と「ルートCA証明書(Amazon ルート CA 1)」をダウンロードします。
IoT Coreエンドポイントを把握する
aws iot describe-endpoint \
--endpoint-type iot:Data-ATS
Subscribeするスクリプトを作成する
事前にAWSIoTPythonSDK
を導入します。
pip install awsiotsdk
コードは下記です。ほぼ公式サンプルのままです。
subscribe.py
from awsiot import mqtt5_client_builder
from awscrt import mqtt5
from time import sleep
IOT_CORE_ENDPOINT = "xxx-ats.iot.ap-northeast-1.amazonaws.com"
PORT = 8883
TOPIC_NAME = "any-topic-name"
QOS = 1
TIMEOUT = 5
ROOT_CA_FILE = "AmazonRootCA1.pem"
PRIVATE_KEY_FILE = "xxx-private.pem.key"
CERTIFICATE_FILE = "xxx-certificate.pem.crt"
def main():
# https://github.com/aws/aws-iot-device-sdk-python-v2
# https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/samples/pubsub.py
client = mqtt5_client_builder.mtls_from_path(
endpoint=IOT_CORE_ENDPOINT,
port=PORT,
cert_filepath=CERTIFICATE_FILE,
pri_key_filepath=PRIVATE_KEY_FILE,
ca_filepath=ROOT_CA_FILE,
on_publish_received=on_publish_received,
on_lifecycle_stopped=on_lifecycle_stopped,
on_lifecycle_connection_success=on_lifecycle_connection_success,
on_lifecycle_connection_failure=on_lifecycle_connection_failure,
client_id="any-client-id",
)
print("MQTT5 Client Created")
client.start()
subscribe_future = client.subscribe(
subscribe_packet=mqtt5.SubscribePacket(
subscriptions=[mqtt5.Subscription(topic_filter=TOPIC_NAME, qos=1)]
)
)
suback = subscribe_future.result(TIMEOUT)
print("Subscribed with {}".format(suback.reason_codes))
while True:
# finish: Control + C
sleep(1)
def on_publish_received(publish_packet_data):
publish_packet = publish_packet_data.publish_packet
assert isinstance(publish_packet, mqtt5.PublishPacket)
print(
"Received message from topic'{}':{}".format(
publish_packet.topic, publish_packet.payload
)
)
def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
print("Lifecycle Stopped")
def on_lifecycle_connection_success(
lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData,
):
print("Lifecycle Connection Success")
def on_lifecycle_connection_failure(
lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData,
):
print("Lifecycle Connection Failure")
print(
"Connection failed with exception:{}".format(
lifecycle_connection_failure.exception
)
)
if __name__ == "__main__":
main()
実行する
さきほどダウンロードした「デバイス証明書」と「キーファイル(プライベートキーファイル)」と「ルートCA証明書(Amazon ルート CA 1)」をスクリプトと同じ場所に格納したあと、実行します。
python subscribe.py
次に、publish()側を実行する
publish()するスクリプトを作成する
publish.py
import boto3
client = boto3.client("iot-data")
TOPIC_NAME = "any-topic-name"
def main():
response = client.publish(
topic=TOPIC_NAME,
qos=1,
payload=b"This is a pen.",
)
if __name__ == "__main__":
main()
実行する
python publish.py
結果
subscribe側にデータが来ました。バッチリですね。
MQTT5 Client Created
Lifecycle Connection Success
Subscribed with [<SubackReasonCode.GRANTED_QOS_1: 1>]
Received message from topic'any-topic-name':b'This is a pen.'
さいごに
掲題について試してみました。参考になれば幸いです。