AWS IoT SiteWise ゲートウェイで収集した設備機器の稼働データを AWS IoT Analytics で変換して Amazon QuickSight で可視化してみた

製造業の設備機器のデータは収集して可視化するだけでは宝の持ち腐れです。BI ツールで業務改善に活用していきましょう! Let's 製造業 IoT !!
2022.06.24

はじめに

以前の記事で OPC UA サーバから設備機器の稼働データを SiteWise ゲートウェイで収集し、IoT Analytics に送る方法をご紹介しました。

今回は、下記のように IoT Analytics に送った機器の稼働データを QuickSight で可視化して BI として活用できる基盤を作ってみたいと思います。

01-sitewise-iot-analytics-quicksight

上記の構成図では、AWS IoT Analytics の部分が簡素化されていますが、詳細な構成は下記になります。(AWS IoT Analytics の箇所だけ詳細に記載した図になります。)

02-iot-analytics-quicksight-3

前回の構成では、IoT Analytics で受け取ったデータに対して何も処理をせずにデータセットに保存していたので下記のような CSV データになっています。

16-iot-analytics-dataset

このデータ構造だと QuickSight から扱うのが難しいので、パイプラインのアクティビティで Lambda 関数を呼び出して分析しやすい形にデータを変換することにします。

なお、本記事では前回の記事の構成がすでに存在している前提となります。

Lambda 関数の作成

今回は Lambda 関数を AWS SAM で作ります。

sam init

$ sam init \
    --runtime python3.9 \
    --name iot-analytics-lambda-sitewise-data \
    --app-template hello-world \
    --package-type Zip

$ cd iot-analytics-lambda-sitewise-data

テンプレートの作成

22行目で IoT Analytics から Lambda 関数を実行するためのリソースベースポリシーを設定しています。

template.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  iot-analytics-lambda-sitewise-data

  Sample SAM Template for iot-analytics-lambda-sitewise-data

Globals:
  Function:
    Timeout: 3

Resources:
  IoTAnalyticsPipelineSiteWiseFunction:
    Type: AWS::Serverless::Function 
    Properties:
      CodeUri: hello_world/
      Handler: app.lambda_handler
      Runtime: python3.9
      Architectures:
        - x86_64
      AutoPublishAlias: dev
  LambdaPermission:
    Type: AWS::Lambda::Permission
    Properties:
      Action: lambda:InvokeFunction
      FunctionName: !GetAtt IoTAnalyticsPipelineSiteWiseFunction.Arn
      Principal: iotanalytics.amazonaws.com

Outputs:
  FunctionARN:
    Description: "IoT Analytics Pipeline SiteWise Function ARN"
    Value: !GetAtt IoTAnalyticsPipelineSiteWiseFunction.Arn
  FunctionIamRole:
    Description: "Implicit IAM Role created for IoT Analytics Pipeline SiteWise function"
    Value: !GetAtt IoTAnalyticsPipelineSiteWiseFunctionRole.Arn

Lambda 関数のコード

app.py

import json
import logging
import sys
import time
from datetime import datetime

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)


def lambda_handler(event, context):
    logger.info("event: {}".format(event))
    transformed = []
    
    for e in event:
        # e に単一アセットのデータが全て入る
        logger.info("e: {}".format(json.dumps(e, indent=2)))
        property_alias = e['propertyAlias']
        propertyValuesList = e['propertyValues'] 
        property_value = propertyValuesList[0]

        # 設備機器データからデータ値を取り出す
        value = ""
        if 'doubleValue' in property_value['value']:
            value = property_value['value']['doubleValue']
            logger.info("value_type: doubleValue")
        if 'integerValue' in property_value['value']:
            value = property_value['value']['integerValue']
            logger.info("value_type: integerValue")
        if 'booleanValue' in property_value['value']:
            value = property_value['value']['booleanValue']
            logger.info("value_type: booleanValue")
        if 'stringValue' in property_value['value']:
            value = property_value['value']['stringValue']
            logger.info("value_type: stringValue")

        # 設備機器データからQualityを取り出す
        quality = ""
        if 'quality' in property_value:
            quality = property_value['quality']
            logger.debug("quality in payload")
        
        # 設備機器データからTimestampを取り出す(ナノ秒の有無でパターン分け)
        timestamp = ""
        unixtime = ""
        nanoseconds = ""
        devicetime = ""
        if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and 'offsetInNanos' in property_value['timestamp']:
            unixtime = property_value['timestamp']['timeInSeconds']
            devicetime = datetime.fromtimestamp(unixtime)
            milliseconds = int(property_value['timestamp']['offsetInNanos'] / 1000000)
            timestamp = str(devicetime) + str(".") + str(milliseconds)

        if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and not 'offsetInNanos' in property_value['timestamp']:
            unixtime = property_value['timestamp']['timeInSeconds']
            devicetime = datetime.fromtimestamp(unixtime)
            timestamp = str(devicetime) + str(".") + str(000)

        # 変換したデータの生成
        row = {}
        row['propertyAlias'] = property_alias
        row['value'] = value
        row['timestamp'] = timestamp
        row['quality'] = quality
        logger.debug("row: {}".format(row))
        transformed.append(row)

    logger.info("transformed: {}\n".format(json.dumps(transformed, indent=2)))
    return transformed

ビルド & デプロイ

$ sam build

$ sam package \
    --output-template-file packaged.yaml \
    --s3-bucket ichida-aws-sam-artifact

$ sam deploy \
    --template-file packaged.yaml \
    --stack-name iot-analytics-lambda-sitewise \
    --s3-bucket ichida-aws-sam-artifact \
    --capabilities CAPABILITY_NAMED_IAM \
    --no-fail-on-empty-changeset

AWS IoT Analytics のパイプライン設定

次に IoT Analytics のパイプラインの設定を行います。
AWS のマネージメントコンソールで、前回の記事で作成したパイプラインの画面を開きます。
対象のパイプラインの画面で「アクティビティ」タブを選択して、「アクティビティ」の設定箇所で「編集」をクリックします。

03-pipeline-activity

前回の記事で SiteWise ゲートウェイ (StreamManager経由)からデータが IoT Analytics に送られている場合、下記のように受信したメッセージから読み取られた「属性」が表示されますが、特に何もせず「次へ」をクリックします。

04-message-attribute

「パイプラインアクティビティ」では「Lambda 関数でメッセージを変換」を選択します。

05-select-activity

変換処理を行う Lambda 関数をプルダウンより選択します。
バッチサイズは Lambda による処理内容や Lambda のメモリ量などに応じて適宜変更してください。

06-select-lambda-function

また、「Lambda 関数でメッセージを変換」を選択すると下記のように変換前のメッセージが表示されます。この状態で「プレビューを更新」ボタンをクリックすると、この Lambda 関数により変換されたメッセージをプレビューとして表示できるので、正しく変換できているか事前に確認することができます。

今回は、送られてきた設備機器の稼働データをネストの無い JSON に変換して、QuickSight で扱いやすい形にしています。

(※すでに前回の記事で IoT Analytics にデータが届いている場合に受信メッセージが表示されます)

07-function-preview

注意点としては、 Python で書いた Lambda 関数の場合、event で受け取るデータは上記の「受信メッセージ」をリストに格納したものになるという点です(下記参照)。辞書型ではない点に注意してください。

[{'propertyAlias': '/Factory/…(中略)…'quality': 'GOOD'}]}]

動作確認できたら「次へ」をクリックして進みます。

08-next

最後に「更新」ボタンをクリックしてアクティビティの設定が完了です。

09-finish-modify

AWS IoT Analytics のデータセット設定

シングルステップセットアップで IoT Analytics の環境を作成した場合、SQL データセットのクエリは下記のようなシンプルなものがセットされます。

SELECT * FROM ggcstreammanager_datastore

このクエリを QuickSight で扱いやすい CSV を出力するように変更します。
パイプラインの Lambda で変換したデータ形式に合わせて次のようなクエリに変更します。

SELECT 
    propertyAlias,
    value,
    timestamp,
    quality
FROM ggcstreammanager_datastore

17-sql-query

Amazon QuickSight の設定

次に QuickSight を設定していきます。QuickSight の画面で「新しいデータセット」をクリックします。

10-home-quicksight

一覧から「AWS IoT Analytics」を選択します。

11-select-iot-analytics

データソースにしたい IoT Analytics のデータセットを選択します。

12-select-new-data-source

データセットの作成直後はデータのインポート処理が走りますが、少し待つと完了します。

13-first-view-quicksight

それではグラフを作っていきたいと思います。ビジュアルタイプは「折れ線グラフ」を選択して、「フィールドウェル」には下記のように設定しました。

  • X軸:timesamp(分)
  • 値:value(最大)
  • 色:propertyAlias

また、全ての propertyAlias を表示させるとデータの大きさがバラバラでうまく可視化できないので、ここでは「フィルター」で温度(Temperature)だけ表示するようにしています。

14-temperature

同様に「RPM」のデータもグラフ化して次のような形にしてみました。

15-temperature-rpm

最後に

今回は取得したデータだけを使ってQuickSightで可視化してみました。 取得したデータだけを折れ線グラフで可視化するだけならば、SiteWise Monitor の方が簡単ですが、CSVやその他のシステムのデータと組み合わせた分析を行う場合は、QuickSight が有効なサービスになります。

また、QuickSightは最短で1時間ごとの更新になるので、リアルタイム可視化が必要なデータは SiteWise Monitor を使い、バッチ的な処理でよい場合は QuickSight を使うといった形で使い分ける形が良さそうです。

以上です。