[AWS IoT Greengrass V2] コンポーネントからIoT CoreのメッセージブローカーにPublish/Subscribeしてみました

2021.08.14

1 はじめに

IoT事業部の平内(SIN)です。

コアデバイス上で実行されているコンポーネントは、AWS IoT Device SDKに含まれるAWS IoT Greengrass Core interprocess communication (IPC) を使用して、他のコンポーネントとのプロセス間通信が可能です。

import awsiot.greengrasscoreipc
ipc_client = awsiot.greengrasscoreipc.connect()

今回は、このawsiot.greengrasscoreipc.connectで、AWS IoT Coreのメッセージブローカーに対してPublish/Subscribeするコンポーネントを作成してみました。

参考:Use the AWS IoT Device SDK for interprocess communication (IPC)

2 Publisher

Publishするコンポーネントは、以下の通りです。

(1) artifact

コードです。下記を参考に作成してみました。

参考:https://docs.aws.amazon.com/ja_jp/greengrass/v2/developerguide/ipc-iot-core-mqtt.html#ipc-operation-publishtoiotcore

publisher.py

import time
import json
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
    QOS,
    PublishToIoTCoreRequest
)

TIMEOUT = 10
topic = "topic"
qos = QOS.AT_LEAST_ONCE

ipc_client = awsiot.greengrasscoreipc.connect()
                    
print("😍 publisher start.")

for i in range(3):
    message = {
        "counter": i
    }

    request = PublishToIoTCoreRequest()
    request.topic_name = topic
    request.payload = json.dumps(message).encode('utf-8')
    request.qos = qos

    operation = ipc_client.new_publish_to_iot_core()
    operation.activate(request)
    future = operation.get_response()
    future.result(TIMEOUT)
    
    print("😍 publish :{}".format(message))
    time.sleep(1)

print("😍 publisher finish.")

(2) recipe

レシピでは、accessControlでIoT CoreへのPublishを許可する設定が必要です。

com.example.Publisher-1.0.0.yaml

---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.Publisher
ComponentVersion: '1.0.0'
ComponentConfiguration:
  DefaultConfiguration:
    accessControl:
      aws.greengrass.ipc.mqttproxy:
        com.example.Publisher:publish:1:
          operations:
          - "aws.greengrass#PublishToIoTCore"
          resources:
          - "topic"
Manifests:
  - Lifecycle:
      Install: pip3 install awsiotsdk
      Run: python3 {artifacts:path}/publisher.py

(3) 動作確認

ローカルデバッグコンソールで、コンポーネントを再起動しています。

IoT Coreのコンソールで、TopicをSubscribeすると、メッセージの到着を確認できます。

また、コンポーネントのログでも、動作が確認できます。

※ 確認しやすいように一部編集しています

$ sudo tail -f  /greengrass/v2/logs/com.example.Publisher.log
com.example.Publisher: stdout. 😍 publisher start.. {scriptName=services.com.example.Publisher.lifecycle.Run, serviceName=com.example.Publisher, currentState=RUNNING}
com.example.Publisher: stdout. 😍 publish :{'counter': 0}. {scriptName=services.com.example.Publisher.lifecycle.Run, serviceName=com.example.Publisher, currentState=RUNNING}
com.example.Publisher: stdout. 😍 publish :{'counter': 1}. {scriptName=services.com.example.Publisher.lifecycle.Run, serviceName=com.example.Publisher, currentState=RUNNING}
com.example.Publisher: stdout. 😍 publish :{'counter': 2}. {scriptName=services.com.example.Publisher.lifecycle.Run, serviceName=com.example.Publisher, currentState=RUNNING}
com.example.Publisher: stdout. 😍 publisher finish.. {scriptName=services.com.example.Publisher.lifecycle.Run, serviceName=com.example.Publisher, currentState=RUNNING}
com.example.Publisher: Run script exited. {exitCode=0, serviceName=com.example.Publisher, currentState=RUNNING}

3 Subscriber

コードです。下記を参考にしました。

参考:https://docs.aws.amazon.com/ja_jp/greengrass/v2/developerguide/ipc-iot-core-mqtt.html#ipc-operation-subscribetoiotcore

(1) artifact

subscriber.py

import time
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
    IoTCoreMessage,
    QOS,
    SubscribeToIoTCoreRequest
)

TIMEOUT = 10
topic = "topic"
qos = QOS.AT_MOST_ONCE

ipc_client = awsiot.greengrasscoreipc.connect()

print("😍 subscriber start.")

class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
    def __init__(self):
        super().__init__()

    def on_stream_event(self, event: IoTCoreMessage) -> None:
        message = str(event.message.payload, "utf-8")
        print("😍 payload:{}".format(message))

    def on_stream_error(self, error: Exception) -> bool:
        return True

    def on_stream_closed(self) -> None:
        pass

request = SubscribeToIoTCoreRequest()
request.topic_name = topic
request.qos = qos
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_iot_core(handler)
future = operation.activate(request)
future.result(TIMEOUT)

while True:
    time.sleep(1)

operation.close()
print("😍 subscriber finish.")

(2) recipe

レシピでは、IoT CoreへのSubscribeを許可する設定が必要です。なお、LifecycleRunで設定するPythonのパラメータに、-u が無いと、whileループでログが上手く確認できないので注意が必要です。

com.example.Subscriber-1.0.0.yaml

---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.Subscriber
ComponentVersion: '1.0.0'
ComponentConfiguration:
  DefaultConfiguration:
    accessControl:
      aws.greengrass.ipc.mqttproxy:
        com.example.Subscriber:publish:1:
          operations:
          - "aws.greengrass#SubscribeToIoTCore"
          resources:
          - "topic"
Manifests:
  - Lifecycle:
      Install: pip3 install awsiotsdk
      Run: python3 -u {artifacts:path}/subscriber.py

(3) 動作確認

IoT CoreのコンソールからtopicへのPublishを行いました。

コンポーネントのログでは、メッセージを受信できていることが確認できます。

※ 確認しやすいように一部編集しています

$ sudo tail -f  /greengrass/v2/logs/com.example.Subscriber.log
com.example.Subscriber: stdout. 😍 subscriber start.. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. 😍 payload:{. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. "message": "AWS IoT コンソールからの挨拶". {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. }. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. 😍 payload:{. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. "message": "AWS IoT コンソールからの挨拶". {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. }. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. 😍 payload:{. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. "message": "AWS IoT コンソールからの挨拶". {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}
com.example.Subscriber: stdout. }. {scriptName=services.com.example.Subscriber.lifecycle.Run, serviceName=com.example.Subscriber, currentState=RUNNING}

4 最後に

今回は、このawsiot.greengrasscoreipc.connectを使用して、AWS IoT Coreのメッセージブローカーに対してPublish/Subscribeするコンポーネントを作成してみました。

当然ですが・・・Greengrassをセットアップした時点で、証明書やポリシーが設定されるため、コンポーネントを作成する段階では、別途、証明書や認証情報を考える必要はありません。

5 参考リンク


[AWS IoT Greengrass V2] RaspberryPIにインストールしてみました
[AWS IoT Greengrass V2] RaspberryPIでコンポーネントを作成してみました
[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました
[AWS IoT Greengrass V2] クラウド側からコンポーネントを削除してみました
[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
[AWS IoT Greengrass V2] Lambda関数(コンポーネント)をデプロイしてみました
[AWS IoT Greengrass V2] コンポーネントからIoT CoreのメッセージブローカーにPublish/Subscribeしてみました
[AWS IoT Greengrass V2] コンポーネントからシークレットマネージャにアクセスしてみました