[AWS IoT Greengrass V2] コンポーネントからIoT CoreのメッセージブローカーにPublish/Subscribeしてみました
1 はじめに
IoT事業部の平内(SIN)です。
コアデバイス上で実行されているコンポーネントは、AWS IoT Device SDKに含まれるAWS IoT Greengrass Core interprocess communication (IPC) を使用して、他のコンポーネントとのプロセス間通信が可能です。
import awsiot.greengrasscoreipc ipc_client = awsiot.greengrasscoreipc.connect()
今回は、このawsiot.greengrasscoreipc.connectで、AWS IoT Coreのメッセージブローカーに対してPublish/Subscribeするコンポーネントを作成してみました。
参考:Use the AWS IoT Device SDK for interprocess communication (IPC)
2 Publisher
Publishするコンポーネントは、以下の通りです。
(1) artifact
コードです。下記を参考に作成してみました。
publisher.py
import time import json import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( QOS, PublishToIoTCoreRequest ) TIMEOUT = 10 topic = "topic" qos = QOS.AT_LEAST_ONCE ipc_client = awsiot.greengrasscoreipc.connect() print("? publisher start.") for i in range(3): message = { "counter": i } request = PublishToIoTCoreRequest() request.topic_name = topic request.payload = json.dumps(message).encode('utf-8') request.qos = qos operation = ipc_client.new_publish_to_iot_core() operation.activate(request) future = operation.get_response() future.result(TIMEOUT) print("? publish :{}".format(message)) time.sleep(1) print("? publisher finish.")
(2) recipe
レシピでは、accessControlでIoT CoreへのPublishを許可する設定が必要です。
com.example.Publisher-1.0.0.yaml
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.Publisher ComponentVersion: '1.0.0' ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.Publisher:publish:1: operations: - "aws.greengrass#PublishToIoTCore" resources: - "topic" Manifests: - Lifecycle: Install: pip3 install awsiotsdk Run: python3 {artifacts:path}/publisher.py
(3) 動作確認
ローカルデバッグコンソールで、コンポーネントを再起動しています。
IoT Coreのコンソールで、TopicをSubscribeすると、メッセージの到着を確認できます。
また、コンポーネントのログでも、動作が確認できます。
※ 確認しやすいように一部編集しています
$ sudo tail -f /greengrass/v2/logs/com.example.Publisher.log com.example.Publisher: stdout. ? publisher start.. {scriptName=services.com.example.Publisher.lifecycle.Run, serviceName=com.example.Publisher, currentState=RUNNING} com.example.Publisher: stdout. ? publish :{'counter': 0}. {scriptName=services.com.example.Publisher.lifecycle.Run, serviceName=com.example.Publisher, currentState=RUNNING} com.example.Publisher: stdout. ? publish :{'counter': 1}. {scriptName=services.com.example.Publisher.lifecycle.Run, serviceName=com.example.Publisher, currentState=RUNNING} com.example.Publisher: stdout. ? publish :{'counter': 2}. {scriptName=services.com.example.Publisher.lifecycle.Run, serviceName=com.example.Publisher, currentState=RUNNING} com.example.Publisher: stdout. ? publisher finish.. {scriptName=services.com.example.Publisher.lifecycle.Run, serviceName=com.example.Publisher, currentState=RUNNING} com.example.Publisher: Run script exited. {exitCode=0, serviceName=com.example.Publisher, currentState=RUNNING}
3 Subscriber
コードです。下記を参考にしました。
(1) artifact
subscriber.py
import time import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( IoTCoreMessage, QOS, SubscribeToIoTCoreRequest ) TIMEOUT = 10 topic = "topic" qos = QOS.AT_MOST_ONCE ipc_client = awsiot.greengrasscoreipc.connect() print("? subscriber start.") class StreamHandler(client.SubscribeToIoTCoreStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: IoTCoreMessage) -> None: message = str(event.message.payload, "utf-8") print("? payload:{}".format(message)) def on_stream_error(self, error: Exception) -> bool: return True def on_stream_closed(self) -> None: pass request = SubscribeToIoTCoreRequest() request.topic_name = topic request.qos = qos handler = StreamHandler() operation = ipc_client.new_subscribe_to_iot_core(handler) future = operation.activate(request) future.result(TIMEOUT) while True: time.sleep(1) operation.close() print("? subscriber finish.")
(2) recipe
レシピでは、IoT CoreへのSubscribeを許可する設定が必要です。なお、LifecycleのRunで設定するPythonのパラメータに、-u が無いと、whileループでログが上手く確認できないので注意が必要です。
com.example.Subscriber-1.0.0.yaml
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.Subscriber ComponentVersion: '1.0.0' ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.mqttproxy: com.example.Subscriber:publish:1: operations: - "aws.greengrass#SubscribeToIoTCore" resources: - "topic" Manifests: - Lifecycle: Install: pip3 install awsiotsdk Run: python3 -u {artifacts:path}/subscriber.py
(3) 動作確認
IoT CoreのコンソールからtopicへのPublishを行いました。
コンポーネントのログでは、メッセージを受信できていることが確認できます。
※ 確認しやすいように一部編集しています
$ sudo tail -f /greengrass/v2/logs/com.example.Subscriber.log com.example.Subscriber: stdout. ? subscriber start.. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING} com.example.Subscriber: stdout. ? payload:{. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING} com.example.Subscriber: stdout. "message": "AWS IoT コンソールからの挨拶". {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING} com.example.Subscriber: stdout. }. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING} com.example.Subscriber: stdout. ? payload:{. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING} com.example.Subscriber: stdout. "message": "AWS IoT コンソールからの挨拶". {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING} com.example.Subscriber: stdout. }. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING} com.example.Subscriber: stdout. ? payload:{. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING} com.example.Subscriber: stdout. "message": "AWS IoT コンソールからの挨拶". {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING} com.example.Subscriber: stdout. }. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
4 最後に
今回は、このawsiot.greengrasscoreipc.connectを使用して、AWS IoT Coreのメッセージブローカーに対してPublish/Subscribeするコンポーネントを作成してみました。
当然ですが・・・Greengrassをセットアップした時点で、証明書やポリシーが設定されるため、コンポーネントを作成する段階では、別途、証明書や認証情報を考える必要はありません。
5 参考リンク
[AWS IoT Greengrass V2] RaspberryPIにインストールしてみました
[AWS IoT Greengrass V2] RaspberryPIでコンポーネントを作成してみました
[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました
[AWS IoT Greengrass V2] クラウド側からコンポーネントを削除してみました
[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
[AWS IoT Greengrass V2] Lambda関数(コンポーネント)をデプロイしてみました
[AWS IoT Greengrass V2] コンポーネントからIoT CoreのメッセージブローカーにPublish/Subscribeしてみました
[AWS IoT Greengrass V2] コンポーネントからシークレットマネージャにアクセスしてみました