AWS IoT SiteWise ゲートウェイで収集した設備機器の稼働データを AWS IoT Analytics で変換して Amazon QuickSight で可視化してみた
はじめに
以前の記事で OPC UA サーバから設備機器の稼働データを SiteWise ゲートウェイで収集し、IoT Analytics に送る方法をご紹介しました。
今回は、下記のように IoT Analytics に送った機器の稼働データを QuickSight で可視化して BI として活用できる基盤を作ってみたいと思います。
上記の構成図では、AWS IoT Analytics の部分が簡素化されていますが、詳細な構成は下記になります。(AWS IoT Analytics の箇所だけ詳細に記載した図になります。)
前回の構成では、IoT Analytics で受け取ったデータに対して何も処理をせずにデータセットに保存していたので下記のような CSV データになっています。
このデータ構造だと QuickSight から扱うのが難しいので、パイプラインのアクティビティで Lambda 関数を呼び出して分析しやすい形にデータを変換することにします。
なお、本記事では前回の記事の構成がすでに存在している前提となります。
Lambda 関数の作成
今回は Lambda 関数を AWS SAM で作ります。
sam init
$ sam init \ --runtime python3.9 \ --name iot-analytics-lambda-sitewise-data \ --app-template hello-world \ --package-type Zip $ cd iot-analytics-lambda-sitewise-data
テンプレートの作成
22行目で IoT Analytics から Lambda 関数を実行するためのリソースベースポリシーを設定しています。
AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > iot-analytics-lambda-sitewise-data Sample SAM Template for iot-analytics-lambda-sitewise-data Globals: Function: Timeout: 3 Resources: IoTAnalyticsPipelineSiteWiseFunction: Type: AWS::Serverless::Function Properties: CodeUri: hello_world/ Handler: app.lambda_handler Runtime: python3.9 Architectures: - x86_64 AutoPublishAlias: dev LambdaPermission: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !GetAtt IoTAnalyticsPipelineSiteWiseFunction.Arn Principal: iotanalytics.amazonaws.com Outputs: FunctionARN: Description: "IoT Analytics Pipeline SiteWise Function ARN" Value: !GetAtt IoTAnalyticsPipelineSiteWiseFunction.Arn FunctionIamRole: Description: "Implicit IAM Role created for IoT Analytics Pipeline SiteWise function" Value: !GetAtt IoTAnalyticsPipelineSiteWiseFunctionRole.Arn
Lambda 関数のコード
import json import logging import sys import time from datetime import datetime # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) def lambda_handler(event, context): logger.info("event: {}".format(event)) transformed = [] for e in event: # e に単一アセットのデータが全て入る logger.info("e: {}".format(json.dumps(e, indent=2))) property_alias = e['propertyAlias'] propertyValuesList = e['propertyValues'] property_value = propertyValuesList[0] # 設備機器データからデータ値を取り出す value = "" if 'doubleValue' in property_value['value']: value = property_value['value']['doubleValue'] logger.info("value_type: doubleValue") if 'integerValue' in property_value['value']: value = property_value['value']['integerValue'] logger.info("value_type: integerValue") if 'booleanValue' in property_value['value']: value = property_value['value']['booleanValue'] logger.info("value_type: booleanValue") if 'stringValue' in property_value['value']: value = property_value['value']['stringValue'] logger.info("value_type: stringValue") # 設備機器データからQualityを取り出す quality = "" if 'quality' in property_value: quality = property_value['quality'] logger.debug("quality in payload") # 設備機器データからTimestampを取り出す(ナノ秒の有無でパターン分け) timestamp = "" unixtime = "" nanoseconds = "" devicetime = "" if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and 'offsetInNanos' in property_value['timestamp']: unixtime = property_value['timestamp']['timeInSeconds'] devicetime = datetime.fromtimestamp(unixtime) milliseconds = int(property_value['timestamp']['offsetInNanos'] / 1000000) timestamp = str(devicetime) + str(".") + str(milliseconds) if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and not 'offsetInNanos' in property_value['timestamp']: unixtime = property_value['timestamp']['timeInSeconds'] devicetime = datetime.fromtimestamp(unixtime) timestamp = str(devicetime) + str(".") + str(000) # 変換したデータの生成 row = {} row['propertyAlias'] = property_alias row['value'] = value row['timestamp'] = timestamp row['quality'] = quality logger.debug("row: {}".format(row)) transformed.append(row) logger.info("transformed: {}\n".format(json.dumps(transformed, indent=2))) return transformed
ビルド & デプロイ
$ sam build $ sam package \ --output-template-file packaged.yaml \ --s3-bucket ichida-aws-sam-artifact $ sam deploy \ --template-file packaged.yaml \ --stack-name iot-analytics-lambda-sitewise \ --s3-bucket ichida-aws-sam-artifact \ --capabilities CAPABILITY_NAMED_IAM \ --no-fail-on-empty-changeset
AWS IoT Analytics のパイプライン設定
次に IoT Analytics のパイプラインの設定を行います。
AWS のマネージメントコンソールで、前回の記事で作成したパイプラインの画面を開きます。
対象のパイプラインの画面で「アクティビティ」タブを選択して、「アクティビティ」の設定箇所で「編集」をクリックします。
前回の記事で SiteWise ゲートウェイ (StreamManager経由)からデータが IoT Analytics に送られている場合、下記のように受信したメッセージから読み取られた「属性」が表示されますが、特に何もせず「次へ」をクリックします。
「パイプラインアクティビティ」では「Lambda 関数でメッセージを変換」を選択します。
変換処理を行う Lambda 関数をプルダウンより選択します。
バッチサイズは Lambda による処理内容や Lambda のメモリ量などに応じて適宜変更してください。
また、「Lambda 関数でメッセージを変換」を選択すると下記のように変換前のメッセージが表示されます。この状態で「プレビューを更新」ボタンをクリックすると、この Lambda 関数により変換されたメッセージをプレビューとして表示できるので、正しく変換できているか事前に確認することができます。
今回は、送られてきた設備機器の稼働データをネストの無い JSON に変換して、QuickSight で扱いやすい形にしています。
(※すでに前回の記事で IoT Analytics にデータが届いている場合に受信メッセージが表示されます)
注意点としては、 Python で書いた Lambda 関数の場合、event
で受け取るデータは上記の「受信メッセージ」をリストに格納したものになるという点です(下記参照)。辞書型ではない点に注意してください。
[{'propertyAlias': '/Factory/…(中略)…'quality': 'GOOD'}]}]
動作確認できたら「次へ」をクリックして進みます。
最後に「更新」ボタンをクリックしてアクティビティの設定が完了です。
AWS IoT Analytics のデータセット設定
シングルステップセットアップで IoT Analytics の環境を作成した場合、SQL データセットのクエリは下記のようなシンプルなものがセットされます。
SELECT * FROM ggcstreammanager_datastore
このクエリを QuickSight で扱いやすい CSV を出力するように変更します。
パイプラインの Lambda で変換したデータ形式に合わせて次のようなクエリに変更します。
SELECT propertyAlias, value, timestamp, quality FROM ggcstreammanager_datastore
Amazon QuickSight の設定
次に QuickSight を設定していきます。QuickSight の画面で「新しいデータセット」をクリックします。
一覧から「AWS IoT Analytics」を選択します。
データソースにしたい IoT Analytics のデータセットを選択します。
データセットの作成直後はデータのインポート処理が走りますが、少し待つと完了します。
それではグラフを作っていきたいと思います。ビジュアルタイプは「折れ線グラフ」を選択して、「フィールドウェル」には下記のように設定しました。
- X軸:timesamp(分)
- 値:value(最大)
- 色:propertyAlias
また、全ての propertyAlias を表示させるとデータの大きさがバラバラでうまく可視化できないので、ここでは「フィルター」で温度(Temperature)だけ表示するようにしています。
同様に「RPM」のデータもグラフ化して次のような形にしてみました。
最後に
今回は取得したデータだけを使ってQuickSightで可視化してみました。 取得したデータだけを折れ線グラフで可視化するだけならば、SiteWise Monitor の方が簡単ですが、CSVやその他のシステムのデータと組み合わせた分析を行う場合は、QuickSight が有効なサービスになります。
また、QuickSightは最短で1時間ごとの更新になるので、リアルタイム可視化が必要なデータは SiteWise Monitor を使い、バッチ的な処理でよい場合は QuickSight を使うといった形で使い分ける形が良さそうです。
以上です。