[AWS IoT Greengrass V2] Amazon CloudWatch メトリクスコンポーネント (aws.greengrass.Cloudwatch) を使用してカスタムメトリクスをパブリッシュしてみました

2023.01.06

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX 事業本部のデリバリー部の平内(SIN)です。

AWS IoT Greengrass V2 では、AWS から提供されるいくつかの事前構築済みコンポーネントが利用可能ですが、その中の Amazon CloudWatch メトリクス (aws.greengrass.Cloudwatch) を使用すると、コンポーネントから CloudWatch メトリクスをパブリッシュできるようになります。
デベロッパーガイド - CloudWatch メトリクス

今回は、RaspberryPi で動作する Greengrass V2 のカスタムコンポーネントからメトリクスを送信する手順を確認してみました。

2 aws.greengrass.Cloudwatch

Amazon CloudWatch メトリクス (aws.greengrass.Cloudwatch) は、パブリックコンポーネントの一覧から選択できます。

特に何も設定しないでデプロイすると、デフォルトの設定は以下のようになっています。(確認しやすいように、Configration を整形しています)

アクセスコントロールで、ipc.pubsub及び、ipc.mqttproxyが設定されており、メトリクス送信用のトピック(cloudwatch/metric/put)と、結果を受け取るトピック(cloudwatch/metric/put/status)が確認できます。

このトピックをそのまま利用する場合は、特に作業はありませんが、変更する場合は、このアクセスコントロールを修正する必要があります。

sudo /greengrass/v2/bin/greengrass-cli component list

・・・略・・・

Component Name: aws.greengrass.TokenExchangeService
    Version: 2.0.3
    State: RUNNING
    Configuration: {"activePort":39869.0,"port":0.0}

・・・略・・・

Component Name: aws.greengrass.Cloudwatch
    Version: 3.1.0
    State: RUNNING
    Configuration: {
        "accessControl": {
            "aws.greengrass.ipc.mqttproxy": {
                "aws.greengrass.Cloudwatch:mqttproxy:1": {
                    "operations": [
                        "aws.greengrass#SubscribeToIoTCore"
                    ],
                    "policyDescription": "Allows access to subscribe to input topics.",
                    "resources": [
                        "cloudwatch/metric/put"
                    ]
                },
                "aws.greengrass.Cloudwatch:mqttproxy:2": {
                    "operations": [
                        "aws.greengrass#PublishToIoTCore"
                    ],
                    "policyDescription": "Allows access to publish to output topics.",
                    "resources": [
                        "cloudwatch/metric/put/status"
                    ]
                }
            },
            "aws.greengrass.ipc.pubsub": {
                "aws.greengrass.Cloudwatch:pubsub:1": {
                    "operations": [
                        "aws.greengrass#SubscribeToTopic"
                    ],
                    "policyDescription": "Allows access to subscribe to input topics.",
                    "resources": [
                        "cloudwatch/metric/put"
                    ]
                },
                "aws.greengrass.Cloudwatch:pubsub:2": {
                    "operations": [
                        "aws.greengrass#PublishToTopic"
                    ],
                    "policyDescription": "Allows access to publish to output topics.",
                    "resources": [
                        "cloudwatch/metric/put/status"
                    ]
                }
            }
        },
        "InputTopic": "cloudwatch/metric/put",
        "LogLevel": "INFO",
        "MaxMetricsToRetain": "5000",
        "OutputTopic": "cloudwatch/metric/put/status",
        "PublishInterval": "10",
        "PublishRegion": "ap-northeast-1",
        "PubSubToIoTCore": "false",
        "UseInstaller": "true"
    }

・・・略・・・

3 権限追加

Amazon CloudWatch メトリクス (aws.greengrass.Cloudwatch) を利用可能にするためには、Greengrass デバイスに AWS へのアクセス権の付与が必要です。

GreengrassV2CloudwatchPolicy

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": ["cloudwatch:PutMetricData"],
      "Effect": "Allow",
      "Resource": "*"
    }
  ]
}

上記のポリシーを作成し、ロールエイリアス(GreengrassV2TokenExchangeRoleAlias)経由で設定されるロール(GreengrassV2TokenExchangeRole)に追加します。

4 カスタムコンポーネント作成

ここまでの作業で、メトリクス送信の準備は完了です。

続いて、メトリクスを送信するカスタムコンポーネントを作成します。

参考:[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました

(1) コード

作成したコードは以下の通りです。

10 秒毎にメトリクス を送信しています。また、その結果を確認するために、Subscribe も行っています。

import time
import datetime
import random
import os

import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
    PublishToTopicRequest,
    PublishMessage,
    JsonMessage,
    SubscribeToTopicRequest,
    SubscriptionResponseMessage
)

def log(message):
    dt_now = datetime.datetime.now()
    with open('/tmp/Cloudwatch.log', 'a') as f:
        print("{} {}".format(dt_now, message), file=f)

class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
    def __init__(self):
        super().__init__()

    def on_stream_event(self, event: SubscriptionResponseMessage) -> None:
        log("recv: {}".format(event.json_message.message))

    def on_stream_error(self, error: Exception) -> bool:
        return True

    def on_stream_closed(self) -> None:
        pass

def main():
    TIMEOUT = 10

    publishTopic = "cloudwatch/metric/put"
    subscribeTopic = "cloudwatch/metric/put/status"

    place = "front"
    namespace = "CloudwatchSample"
    metric_name = "latency"
    unit = "Seconds"
    dimensions = [
        { "name": "place", "value": place}
    ]

    log("start.")
    ipc_client = awsiot.greengrasscoreipc.connect()

    request = SubscribeToTopicRequest()
    request.topic = subscribeTopic
    handler = StreamHandler()
    operation = ipc_client.new_subscribe_to_topic(handler)
    future = operation.activate(request)
    future.result(TIMEOUT)

    for i in range(100):

        timestamp = int(time.time())
        value = random.randint(0, 60)
        message = {
            "request": {
                "namespace": namespace,
                "metricData": {
                    "metricName": metric_name,
                    "dimensions": dimensions,
                    "timestamp": timestamp,
                    "value": value,
                    "unit": unit
                }
            }
        }
        publish_message = PublishMessage()
        publish_message.json_message = JsonMessage()
        publish_message.json_message.message = message

        request = PublishToTopicRequest()
        request.topic = publishTopic
        request.publish_message = publish_message

        operation = ipc_client.new_publish_to_topic()
        operation.activate(request)
        future = operation.get_response()
        future.result(TIMEOUT)
        log("publish :{}".format(message))
        time.sleep(10)

    log("finish.")

if __name__ == '__main__':
    main()

(2) レシピ

コードを S3 に配置し、コンポーネントを作成します。 accessControl で、メトリクス送信のためのPublish 及び、その結果を受け取れるようにSubscribeが追加されています。

{
  "RecipeFormatVersion": "2020-01-25",
  "ComponentName": "com.example.CloudwatchSample",
  "ComponentVersion": "1.0.0",
  "ComponentType": "aws.greengrass.generic",
  "ComponentDescription": "CloudwatchSample",
  "ComponentPublisher": "Me",
  "ComponentConfiguration": {
    "DefaultConfiguration": {
      "accessControl": {
        "aws.greengrass.ipc.pubsub": {
          "com.example.LocalPublisher:pubsub:1": {
            "operations": ["aws.greengrass#PublishToTopic"],
            "resources": ["cloudwatch/metric/put"]
          },
          "com.example.LocalSubscriber:pubsub:2": {
            "operations": ["aws.greengrass#SubscribeToTopic"],
            "resources": ["cloudwatch/metric/put/status"]
          }
        }
      }
    }
  },
  "Manifests": [
    {
      "Platform": {
        "os": "linux"
      },
      "Name": "Linux",
      "Lifecycle": {
        "Run": "python3 {artifacts:path}/cloudwatch_sample.py"
      },
      "Artifacts": [
        {
          "Uri": "s3://greengrass-artifacts/artifacts/com.example.CloudwatchSample/1.0.0/cloudwatch_sample.py",
          "Digest": "HKW9phUsvCJmfyjuc4y6gkACt6SEDekV1mfjqAWn6Yc=",
          "Algorithm": "SHA-256",
          "Unarchive": "NONE",
          "Permission": {
            "Read": "OWNER",
            "Execute": "NONE"
          }
        }
      ]
    }
  ],
  "Lifecycle": {}
}

(3) デプロイ

作成したコンポーネントをマイコンポーネントとして追加し、デプロイします。

デプロイ後にデバイス上で確認すると、下記のようにRUNNINGとなっていることが確認できます。

$ sudo /greengrass/v2/bin/greengrass-cli component list

・・・略・・・

Component Name: com.example.CloudwatchSample
    Version: 1.0.0
    State: RUNNING
    Configuration: {"accessControl":{"aws.greengrass.ipc.pubsub":{"com.example.LocalPublisher:pubsub:1":{"operations":["aws.greengrass#PublishToTopic"],"resources":["cloudwatch/metric/put"]},"com.example.LocalSubscriber:pubsub:2":{"operations":["aws.greengrass#SubscribeToTopic"],"resources":["cloudwatch/metric/put/status"]}}}}

5 結果

(1) ログ

カスタムコンポーネントのログは、以下のようになりました。Publish した内容と、その結果(recv)が表示されています。

※ 確認しやすいように、タイムスタンプを編集(削除)しています。

$ tail -f /tmp/Cloudwatch.log
Start.
publish :{'request': {'namespace': 'CloudwatchSample', 'metricData': {'metricName': 'latency', 'dimensions': [{'name': 'place', 'value': 'front'}], 'timestamp': 1672950627, 'value': 50, 'unit': 'Seconds'}}}
recv: {'response': {'status': 'success', 'cloudwatch_rid': 'befb7d6a-20e5-4b92-b65e-bd8c85094459', 'namespace': 'CloudwatchSample'}, 'id': ''}
publish :{'request': {'namespace': 'CloudwatchSample', 'metricData': {'metricName': 'latency', 'dimensions': [{'name': 'place', 'value': 'front'}], 'timestamp': 1672950637, 'value': 58, 'unit': 'Seconds'}}}
recv: {'response': {'status': 'success', 'cloudwatch_rid': '52829e68-0541-42e9-81ee-59b9fd652926', 'namespace': 'CloudwatchSample'}, 'id': ''}
publish :{'request': {'namespace': 'CloudwatchSample', 'metricData': {'metricName': 'latency', 'dimensions': [{'name': 'place', 'value': 'front'}], 'timestamp': 1672950647, 'value': 3, 'unit': 'Seconds'}}}
recv: {'response': {'status': 'success', 'cloudwatch_rid': 'cec1af0d-ae1f-4488-8db7-2ba00bbf6dcc', 'namespace': 'CloudwatchSample'}, 'id': ''}

・・・略・・・

(2) メトリクス

Cloudwatch のコンソールで、送信されたメトリクスを確認している様子です。

(3) エラー

Subscribe した cloudwatch/metric/put/statusからは、問題なく送信できた場合、response': {'status': 'success'} が返されますが、また、エラーとなった場合に、その理由が確認できます。

  • 例)Seconds の value に文字列が設定されている ('value': '100')
publish :{'request': {'namespace': 'CloudwatchSample', 'metricData': {'metricName': 'latency', 'dimensions': [{'name': 'place', 'value': 'front'}], 'timestamp': 1672952563, 'value': '100', 'unit': 'Seconds'}}}
recv: {'response': {'status': 'fail', 'error_message': '', 'error': 'mandatory field (value) is not a number'}, 'id': "<class 'ValueError'>"}
  • 例)timestamp の時間に矛盾がある ('timestamp': 100000000 )
publish :{'request': {'namespace': 'CloudwatchSample', 'metricData': {'metricName': 'latency', 'dimensions': [{'name': 'place', 'value': 'front'}], 'timestamp': 100000000, 'value': 56, 'unit': 'Seconds'}}}
recv: {'response': {'status': 'fail', 'error_message': 'An error occurred (InvalidParameterValue) when calling the PutMetricData operation: The parameter MetricData.member.1.Timestamp must specify a time within the past two weeks.', 'error': "<class 'botocore.errorfactory.InvalidParameterValueException'>", 'namespace': 'CloudwatchSample'}, 'id': ''}

6 最後に

今回は、Greengrass V2 のカスタムコンポーネントからメトリクスを送信する手順を確認してみました。

カスタムメトリクスを使用すると、デバイスの状態を監視したり、分析するのに非常に便利に利用できます。MQTT Publish するだけで、簡単にカスタムメトリクス が送信できるのは、非常にありがたいと思います。

7 参考リンク


[AWS IoT Greengrass V2] RaspberryPI にインストールしてみました
[AWS IoT Greengrass V2] RaspberryPI でコンポーネントを作成してみました
[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました
[AWS IoT Greengrass V2] クラウド側からコンポーネントを削除してみました
[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
[AWS IoT Greengrass V2] Lambda 関数(コンポーネント)をデプロイしてみました
[AWS IoT Greengrass V2] コンポーネントから IoT Core のメッセージブローカーに Publish/Subscribe してみました
[AWS IoT Greengrass V2] コンポーネントからシークレットマネージャにアクセスしてみました
[AWS IoT Greengrass V2] コンポーネントでコアデバイス間の Publish/Subscribe を試してみました
[AWS IoT Greengrass V2] ログマネージャでコンポーネントのログを CloudWatch Logs に送ってみました
[AWS IoT Greengrass V2] トークン交換サービスでコンポーネントから DynamoDB にアクセスしてみました
[AWS IoT Greengrass V2] ストリームマネージャーを使用してコンポーネントから Kinesis Data Streams へデータを送ってみました
[AWS IoT Greengrass V2] プロセス間通信 (IPC) を使用してコンポーネントの設定値を使用してみました
[AWS IoT Greengrass V2] ストリームマネージャーを使用して Web カメラの画像を毎秒 2 フレームで S3 に送信してみました
[AWS IoT Greenglass V2] 100 円ショップの Bluetooth リモコン シャッターでパトランプ回してみました
[AWS IoT Greemgrass V2] MQTT のメッセージで再起動やシャットダウンを行うコンポーネントを作成してみました。