この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
IoT事業部の平内(SIN)です。
コアデバイス上で実行されているコンポーネントは、AWS IoT Device SDKに含まれるAWS IoT Greengrass Core interprocess communication (IPC) を使用して、他のコンポーネントとのプロセス間通信が可能です。
import awsiot.greengrasscoreipc
ipc_client = awsiot.greengrasscoreipc.connect()
今回は、このawsiot.greengrasscoreipc.connectで、AWS IoT Coreのメッセージブローカーに対してPublish/Subscribeするコンポーネントを作成してみました。
参考:Use the AWS IoT Device SDK for interprocess communication (IPC)
2 Publisher
Publishするコンポーネントは、以下の通りです。
(1) artifact
コードです。下記を参考に作成してみました。
publisher.py
import time
import json
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
QOS,
PublishToIoTCoreRequest
)
TIMEOUT = 10
topic = "topic"
qos = QOS.AT_LEAST_ONCE
ipc_client = awsiot.greengrasscoreipc.connect()
print("? publisher start.")
for i in range(3):
message = {
"counter": i
}
request = PublishToIoTCoreRequest()
request.topic_name = topic
request.payload = json.dumps(message).encode('utf-8')
request.qos = qos
operation = ipc_client.new_publish_to_iot_core()
operation.activate(request)
future = operation.get_response()
future.result(TIMEOUT)
print("? publish :{}".format(message))
time.sleep(1)
print("? publisher finish.")
(2) recipe
レシピでは、accessControlでIoT CoreへのPublishを許可する設定が必要です。
com.example.Publisher-1.0.0.yaml
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.Publisher
ComponentVersion: '1.0.0'
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.mqttproxy:
com.example.Publisher:publish:1:
operations:
- "aws.greengrass#PublishToIoTCore"
resources:
- "topic"
Manifests:
- Lifecycle:
Install: pip3 install awsiotsdk
Run: python3 {artifacts:path}/publisher.py
(3) 動作確認
ローカルデバッグコンソールで、コンポーネントを再起動しています。
IoT Coreのコンソールで、TopicをSubscribeすると、メッセージの到着を確認できます。
また、コンポーネントのログでも、動作が確認できます。
※ 確認しやすいように一部編集しています
$ sudo tail -f /greengrass/v2/logs/com.example.Publisher.log
com.example.Publisher: stdout. ? publisher start.. {scriptName=services.com.example.Publisher.lifecycle.Run, serviceName=com.example.Publisher, currentState=RUNNING}
com.example.Publisher: stdout. ? publish :{'counter': 0}. {scriptName=services.com.example.Publisher.lifecycle.Run, serviceName=com.example.Publisher, currentState=RUNNING}
com.example.Publisher: stdout. ? publish :{'counter': 1}. {scriptName=services.com.example.Publisher.lifecycle.Run, serviceName=com.example.Publisher, currentState=RUNNING}
com.example.Publisher: stdout. ? publish :{'counter': 2}. {scriptName=services.com.example.Publisher.lifecycle.Run, serviceName=com.example.Publisher, currentState=RUNNING}
com.example.Publisher: stdout. ? publisher finish.. {scriptName=services.com.example.Publisher.lifecycle.Run, serviceName=com.example.Publisher, currentState=RUNNING}
com.example.Publisher: Run script exited. {exitCode=0, serviceName=com.example.Publisher, currentState=RUNNING}
3 Subscriber
コードです。下記を参考にしました。
(1) artifact
subscriber.py
import time
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
IoTCoreMessage,
QOS,
SubscribeToIoTCoreRequest
)
TIMEOUT = 10
topic = "topic"
qos = QOS.AT_MOST_ONCE
ipc_client = awsiot.greengrasscoreipc.connect()
print("? subscriber start.")
class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: IoTCoreMessage) -> None:
message = str(event.message.payload, "utf-8")
print("? payload:{}".format(message))
def on_stream_error(self, error: Exception) -> bool:
return True
def on_stream_closed(self) -> None:
pass
request = SubscribeToIoTCoreRequest()
request.topic_name = topic
request.qos = qos
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_iot_core(handler)
future = operation.activate(request)
future.result(TIMEOUT)
while True:
time.sleep(1)
operation.close()
print("? subscriber finish.")
(2) recipe
レシピでは、IoT CoreへのSubscribeを許可する設定が必要です。なお、LifecycleのRunで設定するPythonのパラメータに、-u が無いと、whileループでログが上手く確認できないので注意が必要です。
com.example.Subscriber-1.0.0.yaml
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.Subscriber
ComponentVersion: '1.0.0'
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.mqttproxy:
com.example.Subscriber:publish:1:
operations:
- "aws.greengrass#SubscribeToIoTCore"
resources:
- "topic"
Manifests:
- Lifecycle:
Install: pip3 install awsiotsdk
Run: python3 -u {artifacts:path}/subscriber.py
(3) 動作確認
IoT CoreのコンソールからtopicへのPublishを行いました。
コンポーネントのログでは、メッセージを受信できていることが確認できます。
※ 確認しやすいように一部編集しています
$ sudo tail -f /greengrass/v2/logs/com.example.Subscriber.log
com.example.Subscriber: stdout. ? subscriber start.. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. ? payload:{. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. "message": "AWS IoT コンソールからの挨拶". {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. }. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. ? payload:{. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. "message": "AWS IoT コンソールからの挨拶". {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. }. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. ? payload:{. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. "message": "AWS IoT コンソールからの挨拶". {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. }. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
4 最後に
今回は、このawsiot.greengrasscoreipc.connectを使用して、AWS IoT Coreのメッセージブローカーに対してPublish/Subscribeするコンポーネントを作成してみました。
当然ですが・・・Greengrassをセットアップした時点で、証明書やポリシーが設定されるため、コンポーネントを作成する段階では、別途、証明書や認証情報を考える必要はありません。
5 参考リンク
[AWS IoT Greengrass V2] RaspberryPIにインストールしてみました
[AWS IoT Greengrass V2] RaspberryPIでコンポーネントを作成してみました
[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました
[AWS IoT Greengrass V2] クラウド側からコンポーネントを削除してみました
[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
[AWS IoT Greengrass V2] Lambda関数(コンポーネント)をデプロイしてみました
[AWS IoT Greengrass V2] コンポーネントからIoT CoreのメッセージブローカーにPublish/Subscribeしてみました
[AWS IoT Greengrass V2] コンポーネントからシークレットマネージャにアクセスしてみました