[AWS IoT Greengrass V2] Amazon CloudWatch メトリクスコンポーネント (aws.greengrass.Cloudwatch) を使用してカスタムメトリクスをパブリッシュしてみました
1 はじめに
CX 事業本部のデリバリー部の平内(SIN)です。
AWS IoT Greengrass V2 では、AWS から提供されるいくつかの事前構築済みコンポーネントが利用可能ですが、その中の Amazon CloudWatch メトリクス (aws.greengrass.Cloudwatch) を使用すると、コンポーネントから CloudWatch メトリクスをパブリッシュできるようになります。
デベロッパーガイド - CloudWatch メトリクス
今回は、RaspberryPi で動作する Greengrass V2 のカスタムコンポーネントからメトリクスを送信する手順を確認してみました。
2 aws.greengrass.Cloudwatch
Amazon CloudWatch メトリクス (aws.greengrass.Cloudwatch) は、パブリックコンポーネントの一覧から選択できます。
特に何も設定しないでデプロイすると、デフォルトの設定は以下のようになっています。(確認しやすいように、Configration を整形しています)
アクセスコントロールで、ipc.pubsub及び、ipc.mqttproxyが設定されており、メトリクス送信用のトピック(cloudwatch/metric/put)と、結果を受け取るトピック(cloudwatch/metric/put/status)が確認できます。
このトピックをそのまま利用する場合は、特に作業はありませんが、変更する場合は、このアクセスコントロールを修正する必要があります。
sudo /greengrass/v2/bin/greengrass-cli component list ・・・略・・・ Component Name: aws.greengrass.TokenExchangeService Version: 2.0.3 State: RUNNING Configuration: {"activePort":39869.0,"port":0.0} ・・・略・・・ Component Name: aws.greengrass.Cloudwatch Version: 3.1.0 State: RUNNING Configuration: { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "aws.greengrass.Cloudwatch:mqttproxy:1": { "operations": [ "aws.greengrass#SubscribeToIoTCore" ], "policyDescription": "Allows access to subscribe to input topics.", "resources": [ "cloudwatch/metric/put" ] }, "aws.greengrass.Cloudwatch:mqttproxy:2": { "operations": [ "aws.greengrass#PublishToIoTCore" ], "policyDescription": "Allows access to publish to output topics.", "resources": [ "cloudwatch/metric/put/status" ] } }, "aws.greengrass.ipc.pubsub": { "aws.greengrass.Cloudwatch:pubsub:1": { "operations": [ "aws.greengrass#SubscribeToTopic" ], "policyDescription": "Allows access to subscribe to input topics.", "resources": [ "cloudwatch/metric/put" ] }, "aws.greengrass.Cloudwatch:pubsub:2": { "operations": [ "aws.greengrass#PublishToTopic" ], "policyDescription": "Allows access to publish to output topics.", "resources": [ "cloudwatch/metric/put/status" ] } } }, "InputTopic": "cloudwatch/metric/put", "LogLevel": "INFO", "MaxMetricsToRetain": "5000", "OutputTopic": "cloudwatch/metric/put/status", "PublishInterval": "10", "PublishRegion": "ap-northeast-1", "PubSubToIoTCore": "false", "UseInstaller": "true" } ・・・略・・・
3 権限追加
Amazon CloudWatch メトリクス (aws.greengrass.Cloudwatch) を利用可能にするためには、Greengrass デバイスに AWS へのアクセス権の付与が必要です。
GreengrassV2CloudwatchPolicy
{ "Version": "2012-10-17", "Statement": [ { "Action": ["cloudwatch:PutMetricData"], "Effect": "Allow", "Resource": "*" } ] }
上記のポリシーを作成し、ロールエイリアス(GreengrassV2TokenExchangeRoleAlias)経由で設定されるロール(GreengrassV2TokenExchangeRole)に追加します。
4 カスタムコンポーネント作成
ここまでの作業で、メトリクス送信の準備は完了です。
続いて、メトリクスを送信するカスタムコンポーネントを作成します。
参考:[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました
(1) コード
作成したコードは以下の通りです。
10 秒毎にメトリクス を送信しています。また、その結果を確認するために、Subscribe も行っています。
import time import datetime import random import os import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, JsonMessage, SubscribeToTopicRequest, SubscriptionResponseMessage ) def log(message): dt_now = datetime.datetime.now() with open('/tmp/Cloudwatch.log', 'a') as f: print("{} {}".format(dt_now, message), file=f) class StreamHandler(client.SubscribeToIoTCoreStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: log("recv: {}".format(event.json_message.message)) def on_stream_error(self, error: Exception) -> bool: return True def on_stream_closed(self) -> None: pass def main(): TIMEOUT = 10 publishTopic = "cloudwatch/metric/put" subscribeTopic = "cloudwatch/metric/put/status" place = "front" namespace = "CloudwatchSample" metric_name = "latency" unit = "Seconds" dimensions = [ { "name": "place", "value": place} ] log("start.") ipc_client = awsiot.greengrasscoreipc.connect() request = SubscribeToTopicRequest() request.topic = subscribeTopic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) future = operation.activate(request) future.result(TIMEOUT) for i in range(100): timestamp = int(time.time()) value = random.randint(0, 60) message = { "request": { "namespace": namespace, "metricData": { "metricName": metric_name, "dimensions": dimensions, "timestamp": timestamp, "value": value, "unit": unit } } } publish_message = PublishMessage() publish_message.json_message = JsonMessage() publish_message.json_message.message = message request = PublishToTopicRequest() request.topic = publishTopic request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future = operation.get_response() future.result(TIMEOUT) log("publish :{}".format(message)) time.sleep(10) log("finish.") if __name__ == '__main__': main()
(2) レシピ
コードを S3 に配置し、コンポーネントを作成します。 accessControl で、メトリクス送信のためのPublish 及び、その結果を受け取れるようにSubscribeが追加されています。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.CloudwatchSample", "ComponentVersion": "1.0.0", "ComponentType": "aws.greengrass.generic", "ComponentDescription": "CloudwatchSample", "ComponentPublisher": "Me", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.LocalPublisher:pubsub:1": { "operations": ["aws.greengrass#PublishToTopic"], "resources": ["cloudwatch/metric/put"] }, "com.example.LocalSubscriber:pubsub:2": { "operations": ["aws.greengrass#SubscribeToTopic"], "resources": ["cloudwatch/metric/put/status"] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Name": "Linux", "Lifecycle": { "Run": "python3 {artifacts:path}/cloudwatch_sample.py" }, "Artifacts": [ { "Uri": "s3://greengrass-artifacts/artifacts/com.example.CloudwatchSample/1.0.0/cloudwatch_sample.py", "Digest": "HKW9phUsvCJmfyjuc4y6gkACt6SEDekV1mfjqAWn6Yc=", "Algorithm": "SHA-256", "Unarchive": "NONE", "Permission": { "Read": "OWNER", "Execute": "NONE" } } ] } ], "Lifecycle": {} }
(3) デプロイ
作成したコンポーネントをマイコンポーネントとして追加し、デプロイします。
デプロイ後にデバイス上で確認すると、下記のようにRUNNINGとなっていることが確認できます。
$ sudo /greengrass/v2/bin/greengrass-cli component list ・・・略・・・ Component Name: com.example.CloudwatchSample Version: 1.0.0 State: RUNNING Configuration: {"accessControl":{"aws.greengrass.ipc.pubsub":{"com.example.LocalPublisher:pubsub:1":{"operations":["aws.greengrass#PublishToTopic"],"resources":["cloudwatch/metric/put"]},"com.example.LocalSubscriber:pubsub:2":{"operations":["aws.greengrass#SubscribeToTopic"],"resources":["cloudwatch/metric/put/status"]}}}}
5 結果
(1) ログ
カスタムコンポーネントのログは、以下のようになりました。Publish した内容と、その結果(recv)が表示されています。
※ 確認しやすいように、タイムスタンプを編集(削除)しています。
$ tail -f /tmp/Cloudwatch.log Start. publish :{'request': {'namespace': 'CloudwatchSample', 'metricData': {'metricName': 'latency', 'dimensions': [{'name': 'place', 'value': 'front'}], 'timestamp': 1672950627, 'value': 50, 'unit': 'Seconds'}}} recv: {'response': {'status': 'success', 'cloudwatch_rid': 'befb7d6a-20e5-4b92-b65e-bd8c85094459', 'namespace': 'CloudwatchSample'}, 'id': ''} publish :{'request': {'namespace': 'CloudwatchSample', 'metricData': {'metricName': 'latency', 'dimensions': [{'name': 'place', 'value': 'front'}], 'timestamp': 1672950637, 'value': 58, 'unit': 'Seconds'}}} recv: {'response': {'status': 'success', 'cloudwatch_rid': '52829e68-0541-42e9-81ee-59b9fd652926', 'namespace': 'CloudwatchSample'}, 'id': ''} publish :{'request': {'namespace': 'CloudwatchSample', 'metricData': {'metricName': 'latency', 'dimensions': [{'name': 'place', 'value': 'front'}], 'timestamp': 1672950647, 'value': 3, 'unit': 'Seconds'}}} recv: {'response': {'status': 'success', 'cloudwatch_rid': 'cec1af0d-ae1f-4488-8db7-2ba00bbf6dcc', 'namespace': 'CloudwatchSample'}, 'id': ''} ・・・略・・・
(2) メトリクス
Cloudwatch のコンソールで、送信されたメトリクスを確認している様子です。
(3) エラー
Subscribe した cloudwatch/metric/put/statusからは、問題なく送信できた場合、response': {'status': 'success'} が返されますが、また、エラーとなった場合に、その理由が確認できます。
- 例)Seconds の value に文字列が設定されている ('value': '100')
publish :{'request': {'namespace': 'CloudwatchSample', 'metricData': {'metricName': 'latency', 'dimensions': [{'name': 'place', 'value': 'front'}], 'timestamp': 1672952563, 'value': '100', 'unit': 'Seconds'}}} recv: {'response': {'status': 'fail', 'error_message': '', 'error': 'mandatory field (value) is not a number'}, 'id': "<class 'ValueError'>"}
- 例)timestamp の時間に矛盾がある ('timestamp': 100000000 )
publish :{'request': {'namespace': 'CloudwatchSample', 'metricData': {'metricName': 'latency', 'dimensions': [{'name': 'place', 'value': 'front'}], 'timestamp': 100000000, 'value': 56, 'unit': 'Seconds'}}} recv: {'response': {'status': 'fail', 'error_message': 'An error occurred (InvalidParameterValue) when calling the PutMetricData operation: The parameter MetricData.member.1.Timestamp must specify a time within the past two weeks.', 'error': "<class 'botocore.errorfactory.InvalidParameterValueException'>", 'namespace': 'CloudwatchSample'}, 'id': ''}
6 最後に
今回は、Greengrass V2 のカスタムコンポーネントからメトリクスを送信する手順を確認してみました。
カスタムメトリクスを使用すると、デバイスの状態を監視したり、分析するのに非常に便利に利用できます。MQTT Publish するだけで、簡単にカスタムメトリクス が送信できるのは、非常にありがたいと思います。
7 参考リンク
[AWS IoT Greengrass V2] RaspberryPI にインストールしてみました
[AWS IoT Greengrass V2] RaspberryPI でコンポーネントを作成してみました
[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました
[AWS IoT Greengrass V2] クラウド側からコンポーネントを削除してみました
[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
[AWS IoT Greengrass V2] Lambda 関数(コンポーネント)をデプロイしてみました
[AWS IoT Greengrass V2] コンポーネントから IoT Core のメッセージブローカーに Publish/Subscribe してみました
[AWS IoT Greengrass V2] コンポーネントからシークレットマネージャにアクセスしてみました
[AWS IoT Greengrass V2] コンポーネントでコアデバイス間の Publish/Subscribe を試してみました
[AWS IoT Greengrass V2] ログマネージャでコンポーネントのログを CloudWatch Logs に送ってみました
[AWS IoT Greengrass V2] トークン交換サービスでコンポーネントから DynamoDB にアクセスしてみました
[AWS IoT Greengrass V2] ストリームマネージャーを使用してコンポーネントから Kinesis Data Streams へデータを送ってみました
[AWS IoT Greengrass V2] プロセス間通信 (IPC) を使用してコンポーネントの設定値を使用してみました
[AWS IoT Greengrass V2] ストリームマネージャーを使用して Web カメラの画像を毎秒 2 フレームで S3 に送信してみました
[AWS IoT Greenglass V2] 100 円ショップの Bluetooth リモコン シャッターでパトランプ回してみました
[AWS IoT Greemgrass V2] MQTT のメッセージで再起動やシャットダウンを行うコンポーネントを作成してみました。