この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
IoT事業部の平内(SIN)です。
前回、AWS IoT Greengrass Core interprocess communication (IPC) を使用して、IoT Coreのメッセージブローカーにアクセスしてみました。
今回は、メッセージブローカーではなく、コアデバイス間でのPub/Subを試してみました。
参考:Publish/subscribe local messages
下記の動画は、作成したサプンルの動作を確認している様子です。左のコンソールは、PublishとSubscribeのコンポーネントのログをtailしているもので、右は開発デバイス上のローカルデバッグコンソールでコンポーネントの再起動などを行なっているものです。
操作している内容は以下のとおりです。
- Subscriberコンポーネントを再起動(メッセージ待機に入る)
- Publisherコンポーネントを再起動(メッセージを3回送って終了)
- Publisherコンポーネントを起動(メッセージを3回送って終了)
2 Publisher
Publishするコンポーネントは、以下の通りです。
(1) artifact
コードです。IoT Coreに対する操作と殆ど同じです。
PublishMessageには、message(バイナリ用)及び、json_message(JSON用)の2種類のオブジェクトがありますが、今回は、JSON用を使用しています。
publisher.py
import time
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
PublishToTopicRequest,
PublishMessage,
JsonMessage
)
TIMEOUT = 10
topic = "topic"
print("? local_publisher start.")
ipc_client = awsiot.greengrasscoreipc.connect()
for i in range(3):
message = {
"counter": i
}
request = PublishToTopicRequest()
request.topic = topic
publish_message = PublishMessage()
publish_message.json_message = JsonMessage()
publish_message.json_message.message = message
request.publish_message = publish_message
operation = ipc_client.new_publish_to_topic()
operation.activate(request)
future = operation.get_response()
future.result(TIMEOUT)
print("? publish :{}".format(message))
time.sleep(1)
print("? local_publisher finish.")
(2) recipe
レシピでは、accessControlでaws.greengrass.ipc.pubsubへのaws.greengrass#PublishToTopicが許可されています。
com.example.LocalPublisher-1.0.0.yaml
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.LocalPublisher
ComponentVersion: '1.0.0'
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
com.example.LocalPublisher:pubsub:1:
operations:
- "aws.greengrass#PublishToTopic"
resources:
- "*"
Manifests:
- Lifecycle:
Install: pip3 install awsiotsdk
Run: python3 -u {artifacts:path}/publisher.py
(3) 動作確認
コンポーネントのログは、以下のようになります。
※ 確認しやすいように一部編集しています
$ sudo tail -f /greengrass/v2/logs/com.example.LocalPublisher.log
com.example.LocalPublisher: stdout. ? local_publisher start.. {scriptName=services.com.example.LocalPublisher.lifecycle.Run, serviceName=com.example.LocalPublisher, currentState=RUNNING}
com.example.LocalPublisher: stdout. ? publish :{'counter': 0}. {scriptName=services.com.example.LocalPublisher.lifecycle.Run, serviceName=com.example.LocalPublisher, currentState=RUNNING}
com.example.LocalPublisher: stdout. ? publish :{'counter': 1}. {scriptName=services.com.example.LocalPublisher.lifecycle.Run, serviceName=com.example.LocalPublisher, currentState=RUNNING}
com.example.LocalPublisher: stdout. ? publish :{'counter': 2}. {scriptName=services.com.example.LocalPublisher.lifecycle.Run, serviceName=com.example.LocalPublisher, currentState=RUNNING}
com.example.LocalPublisher: stdout. ? local_publisher finish.. {scriptName=services.com.example.LocalPublisher.lifecycle.Run, serviceName=com.example.LocalPublisher, currentState=RUNNING}
3 Subscriber
サブスクライブ側のコードです。到着したメッセージの内容もJSON用(event.json_message.message)を指定する事に注意が必要です。
(1) artifact
subscriber.py
import time
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
SubscribeToTopicRequest,
SubscriptionResponseMessage
)
TIMEOUT = 10
topic = "topic"
print("? local_subscriber start.")
ipc_client = awsiot.greengrasscoreipc.connect()
class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: SubscriptionResponseMessage) -> None:
print("? payload:{}".format(event.json_message.message))
def on_stream_error(self, error: Exception) -> bool:
return True
def on_stream_closed(self) -> None:
pass
request = SubscribeToTopicRequest()
request.topic = topic
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_topic(handler)
future = operation.activate(request)
future.result(TIMEOUT)
while True:
time.sleep(1)
operation.close()
print("? local_subscriber finish.")
(2) recipe
aws.greengrass.ipc.pubsubで、aws.greengrass#SubscribeToTopicを許可しています。なお、LifecycleのRunで設定するPythonのパラメータに、-u が無いと、whileループでログが上手く確認できないので注意が必要です。
com.example.LocalSubscriber-1.0.0.yaml
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.LocalSubscriber
ComponentVersion: '1.0.0'
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
com.example.LocalSubscriber:pubsub:1:
operations:
- "aws.greengrass#SubscribeToTopic"
resources:
- "*"
Manifests:
- Lifecycle:
Install: pip3 install awsiotsdk
Run: |
python3 -u {artifacts:path}/subscriber.py
(3) 動作確認
コンポーネントのログでは、メッセージを受信できていることが確認できます。
※ 確認しやすいように一部編集しています
com.example.LocalSubscriber: stdout. ? local_subscriber start.. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING}
com.example.LocalSubscriber: stdout. ? payload:{'counter': 0.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING}
com.example.LocalSubscriber: stdout. ? payload:{'counter': 1.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING}
com.example.LocalSubscriber: stdout. ? payload:{'counter': 2.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING}
com.example.LocalSubscriber: stdout. ? payload:{'counter': 0.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING}
com.example.LocalSubscriber: stdout. ? payload:{'counter': 1.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING}
com.example.LocalSubscriber: stdout. ? payload:{'counter': 2.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING}
4 最後に
今回は、コアデバイス間でのPub/Subを試してみました。
コード自体が全然別なので、アクセス制御だけで、IoT Coreのメッセージブローカーと共有することはできません。 また、メッセージオブジェクトにバイナリ用とJSON用があって、どちらを使用しているかを意識することが大事だと思いました。
5 参考リンク
[AWS IoT Greengrass V2] RaspberryPIにインストールしてみました
[AWS IoT Greengrass V2] RaspberryPIでコンポーネントを作成してみました
[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました
[AWS IoT Greengrass V2] クラウド側からコンポーネントを削除してみました
[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
[AWS IoT Greengrass V2] Lambda関数(コンポーネント)をデプロイしてみました
[AWS IoT Greengrass V2] コンポーネントからIoT CoreのメッセージブローカーにPublish/Subscribeしてみました
[AWS IoT Greengrass V2] コンポーネントからシークレットマネージャにアクセスしてみました