[AWS IoT Greengrass V2] コンポーネントでコアデバイス間のPublish/Subscribeを試してみました

2021.08.15

1 はじめに

IoT事業部の平内(SIN)です。

前回、AWS IoT Greengrass Core interprocess communication (IPC) を使用して、IoT Coreのメッセージブローカーにアクセスしてみました。

今回は、メッセージブローカーではなく、コアデバイス間でのPub/Subを試してみました。
参考:Publish/subscribe local messages

下記の動画は、作成したサプンルの動作を確認している様子です。左のコンソールは、PublishとSubscribeのコンポーネントのログをtailしているもので、右は開発デバイス上のローカルデバッグコンソールでコンポーネントの再起動などを行なっているものです。

操作している内容は以下のとおりです。

  • Subscriberコンポーネントを再起動(メッセージ待機に入る)
  • Publisherコンポーネントを再起動(メッセージを3回送って終了)
  • Publisherコンポーネントを起動(メッセージを3回送って終了)

2 Publisher

Publishするコンポーネントは、以下の通りです。

(1) artifact

コードです。IoT Coreに対する操作と殆ど同じです。

PublishMessageには、message(バイナリ用)及び、json_message(JSON用)の2種類のオブジェクトがありますが、今回は、JSON用を使用しています。

publisher.py

import time
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
    PublishToTopicRequest,
    PublishMessage,
    JsonMessage
)

TIMEOUT = 10
topic = "topic"

print("😍 local_publisher start.")

ipc_client = awsiot.greengrasscoreipc.connect()

for i in range(3):
    message = {
        "counter": i
    }
    request = PublishToTopicRequest()
    request.topic = topic
    publish_message = PublishMessage()
    publish_message.json_message = JsonMessage()
    publish_message.json_message.message = message
    request.publish_message = publish_message
    operation = ipc_client.new_publish_to_topic()
    operation.activate(request)
    future = operation.get_response()
    future.result(TIMEOUT)

    print("😍 publish :{}".format(message))
    time.sleep(1)

print("😍 local_publisher finish.")

(2) recipe

レシピでは、accessControlでaws.greengrass.ipc.pubsubへのaws.greengrass#PublishToTopicが許可されています。

com.example.LocalPublisher-1.0.0.yaml

---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.LocalPublisher
ComponentVersion: '1.0.0'
ComponentConfiguration:
  DefaultConfiguration:
    accessControl:
      aws.greengrass.ipc.pubsub:
        com.example.LocalPublisher:pubsub:1:
          operations:
          - "aws.greengrass#PublishToTopic"
          resources:
          - "*"
Manifests:
  - Lifecycle:
      Install: pip3 install awsiotsdk
      Run: python3 -u {artifacts:path}/publisher.py

(3) 動作確認

コンポーネントのログは、以下のようになります。

※ 確認しやすいように一部編集しています

$ sudo tail -f  /greengrass/v2/logs/com.example.LocalPublisher.log
com.example.LocalPublisher: stdout. 😍 local_publisher start.. {scriptName=services.com.example.LocalPublisher.lifecycle.Run, serviceName=com.example.LocalPublisher, currentState=RUNNING}
com.example.LocalPublisher: stdout. 😍 publish :{'counter': 0}. {scriptName=services.com.example.LocalPublisher.lifecycle.Run, serviceName=com.example.LocalPublisher, currentState=RUNNING}
com.example.LocalPublisher: stdout. 😍 publish :{'counter': 1}. {scriptName=services.com.example.LocalPublisher.lifecycle.Run, serviceName=com.example.LocalPublisher, currentState=RUNNING}
com.example.LocalPublisher: stdout. 😍 publish :{'counter': 2}. {scriptName=services.com.example.LocalPublisher.lifecycle.Run, serviceName=com.example.LocalPublisher, currentState=RUNNING}
com.example.LocalPublisher: stdout. 😍 local_publisher finish.. {scriptName=services.com.example.LocalPublisher.lifecycle.Run, serviceName=com.example.LocalPublisher, currentState=RUNNING}

3 Subscriber

サブスクライブ側のコードです。到着したメッセージの内容もJSON用(event.json_message.message)を指定する事に注意が必要です。

(1) artifact

subscriber.py

import time
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
    SubscribeToTopicRequest,
    SubscriptionResponseMessage
)

TIMEOUT = 10
topic = "topic"

print("😍 local_subscriber start.")

ipc_client = awsiot.greengrasscoreipc.connect()

class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
    def __init__(self):
        super().__init__()

    def on_stream_event(self, event: SubscriptionResponseMessage) -> None:
        print("😍 payload:{}".format(event.json_message.message))

    def on_stream_error(self, error: Exception) -> bool:
        return True

    def on_stream_closed(self) -> None:
        pass

request = SubscribeToTopicRequest()
request.topic = topic
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_topic(handler)
future = operation.activate(request)
future.result(TIMEOUT)

while True:
    time.sleep(1)

operation.close()
print("😍 local_subscriber finish.")

(2) recipe

aws.greengrass.ipc.pubsubで、aws.greengrass#SubscribeToTopicを許可しています。なお、LifecycleRunで設定するPythonのパラメータに、-u が無いと、whileループでログが上手く確認できないので注意が必要です。

com.example.LocalSubscriber-1.0.0.yaml

---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.LocalSubscriber
ComponentVersion: '1.0.0'
ComponentConfiguration:
  DefaultConfiguration:
    accessControl:
      aws.greengrass.ipc.pubsub:
        com.example.LocalSubscriber:pubsub:1:
          operations:
          - "aws.greengrass#SubscribeToTopic"
          resources:
          - "*"
Manifests:
  - Lifecycle:
      Install: pip3 install awsiotsdk
      Run: |
        python3 -u {artifacts:path}/subscriber.py

(3) 動作確認

コンポーネントのログでは、メッセージを受信できていることが確認できます。

※ 確認しやすいように一部編集しています

com.example.LocalSubscriber: stdout. 😍 local_subscriber start.. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING}
com.example.LocalSubscriber: stdout. 😍 payload:{'counter': 0.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING}
com.example.LocalSubscriber: stdout. 😍 payload:{'counter': 1.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING}
com.example.LocalSubscriber: stdout. 😍 payload:{'counter': 2.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING}
com.example.LocalSubscriber: stdout. 😍 payload:{'counter': 0.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING}
com.example.LocalSubscriber: stdout. 😍 payload:{'counter': 1.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING}
com.example.LocalSubscriber: stdout. 😍 payload:{'counter': 2.0}. {scriptName=services.com.example.LocalSubscriber.lifecycle.Run, serviceName=com.example.LocalSubscriber, currentState=RUNNING}

4 最後に

今回は、コアデバイス間でのPub/Subを試してみました。

コード自体が全然別なので、アクセス制御だけで、IoT Coreのメッセージブローカーと共有することはできません。 また、メッセージオブジェクトにバイナリ用とJSON用があって、どちらを使用しているかを意識することが大事だと思いました。

5 参考リンク


[AWS IoT Greengrass V2] RaspberryPIにインストールしてみました
[AWS IoT Greengrass V2] RaspberryPIでコンポーネントを作成してみました
[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました
[AWS IoT Greengrass V2] クラウド側からコンポーネントを削除してみました
[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
[AWS IoT Greengrass V2] Lambda関数(コンポーネント)をデプロイしてみました
[AWS IoT Greengrass V2] コンポーネントからIoT CoreのメッセージブローカーにPublish/Subscribeしてみました
[AWS IoT Greengrass V2] コンポーネントからシークレットマネージャにアクセスしてみました