設備機器のデータを AWS IoT SiteWise ゲートウェイから AWS IoT Analytics に送ってみた

前回に引き続き、製造業 IoT の為の AWS アーキテクチャパターンの紹介です。今回は BI ツールの活用を前提に IoT Analytics へデータを送る方法を紹介します。
2022.06.13

前回の記事では、SiteWise ゲートウェイから Kinesis Data Streams にデータを送りました。今回は送り先を変えて IoT Analytics に送ってみたいと思います。
IoT Analytics は QuickSight のデータソースにすることができるので、BI ツールとしての可視化も容易になります。

全体の構成

デバイス側の構成は前回のものと変わりません。ストリームマネージャーで利用するコンポーネントが変わるだけです。クラウド側も送り先が IoT Analytics になっています。

00-iot-analytics-diagram

AWS IoT Analytics リソースの作成

最初に IoT Analytics の環境を作成しておきます。今回はテストなのでシングルステップセットアップで作成します。シングルステップセットアップについては下記の記事を参照してください。

MQTT トピックは使わないので適当なものを設定します。

01-build-iot-analytics

作成できた「チャンネル名」を控えておいてください。後で作成する Greengrass コンポーネントのコードの中で送信先に指定します。

02-iot-analytics-channel

ストリーム管理用のカスタムコンポーネントの作成

前回と同様に、IoT Analytics にデータ送信するカスタムコンポーネントは GDK (Greengrass Development Kit) で作成します。そのためパラメータ及びコンポーネントのコードが異なるだけで、手順自体は前回と同じです。

GDK 用の各種ファイルを用意

GDK がインストールされた PC の任意の場所で作業用ディレクトリを作成します。
--name で指定した名前で作業ディレクトリが作成されます。今回は StreamManager2IoTAnalytics という名前にしました。

$ gdk component init \
  -l python \
  --name StreamManager2IoTAnalytics \
  -t HelloWorld

次に各種ファイルを用意していきます。まずは gdk-config.json です。
コンポーネント名は適当なものを付けます。コンポーネント名は作業ディレクトリにちなんで com.example.StreamManager2IoTAnalytics としました。 また、bucket で指定した名前の S3 バケットにコンポーネントのアーティファクトが保存されますが、コンポーネントのパブリッシュ時にオプションでバケットを指定すると、そちらが優先されます。

gdk-config.json

{
  "component": {
    "com.example.StreamManager2IoTAnalytics": {
      "author": "CM-ICHIDA",
      "version": "NEXT_PATCH",
      "build": {
        "build_system": "zip"
      },
      "publish": {
        "bucket": "gdk-my-bucket",
        "region": "ap-northeast-1"
      }
    }
  },
  "gdk_version": "1.1.0"
}
  • recipe.yaml
    • 依存関係でストリームマネージャーを指定します。
    • Lifecycle では、GDK が Zip 化したアーティファクトをデプロイ時に展開する想定のパスを指定します。

recipe.yaml

---
RecipeFormatVersion: "2020-01-25"
ComponentName: "{COMPONENT_NAME}"
ComponentVersion: "{COMPONENT_VERSION}"
ComponentType: "aws.greengrass.generic"
ComponentDescription: "Stream Manager to IoT Analytics"
ComponentPublisher: "{COMPONENT_AUTHOR}"
ComponentDependencies:
  aws.greengrass.StreamManager:
    VersionRequirement: "^2.0.0"
Manifests:
  - Platform:
      os: all
    Lifecycle:
      Install: pip3 install --user -r {artifacts:decompressedPath}/StreamManager2IoTAnalytics/requirements.txt
      Run: |
        export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/StreamManager2IoTAnalytics/stream_manager
        python3 -u {artifacts:decompressedPath}/StreamManager2IoTAnalytics/main.py
    Artifacts:
      - URI: "s3://BUCKET_NAME/COMPONENT_NAME/COMPONENT_VERSION/StreamManager2IoTAnalytics.zip"
        Unarchive: ZIP
  • main.py
    • ストリームマネージャー用コンポーネントのコードです。
    • IoT Analytics 向けのカスタムのストリームを作るだけのコンポーネントです。
    • 17行目は、このコンポーネントが作成するストリームマネージャーのストリーム名です。分かりやすい一意の名前を付けます。
    • 18行目では、実際に作成した IoT Analytics のチャンネル名を指定しています。
    • 19行目では、各メッセージ ID の前に付けられるプレフィックス名を指定します。適当な一意のものを付けます。
import asyncio
import logging
import time

from stream_manager import (
    ExportDefinition,
    IoTAnalyticsConfig,
    MessageStreamDefinition,
    ResourceNotFoundException,
    StrategyOnFull,
    StreamManagerClient,
)

def main():
    try:
        logger=logging.getLogger()
        stream_name = "IoTAnalyticsStream"
        iot_channel = "ggcstreammanager_channel"
        iot_msg_id_prefix = "samplePrefix"
        client = StreamManagerClient()

        # ストリームが存在する場合は、一旦削除する
        try:
            client.delete_message_stream(stream_name=stream_name)
        except ResourceNotFoundException:
            pass

        # ストリーム作成(IoT Analytics)
        exports = ExportDefinition(
            iot_analytics=[IoTAnalyticsConfig(
                identifier="IoTAnalyticsExport" + stream_name,
                iot_channel=iot_channel,
                iot_msg_id_prefix=iot_msg_id_prefix,
                batch_size=10 # 確認しやすいように、10件溜まったら送信するようにする
            )]
        )
        client.create_message_stream(
            MessageStreamDefinition(
                name=stream_name,
                strategy_on_full=StrategyOnFull.OverwriteOldestData,
                export_definition=exports
            )
        )


    except asyncio.TimeoutError:
        print("Timed out")
    except Exception as e:
        print(e)
        print(type(e))
    finally:
        if client:
            client.close()

logging.basicConfig(level=logging.INFO)
# Start up this sample code
main()
  • 参考ドキュメント

ストリーム作成時のパラメータについては下記ドキュメントが詳しいです。

requirements.txt と SDK の ZIP ファイルを用意

次に、requirements.txtと SDK の ZIP ファイルを用意します。具体的には下記ドキュメントにある「Use the Stream Manager SDK for Python」 の内に従って作業しますが、今回は GDK に合わせた内容に変更しています。

必要なファイル郡はストリームマネージャーの SDK のソースに含まれているので、適当なディレクトリで Python の SDK をダウンロードします。

$ git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-python.git

ダウンロードしたディレクトリに移動します。

$ cd aws-greengrass-stream-manager-sdk-python

aws-greengrass-stream-manager-sdk-python ディレクトリには下記の通り requirements.txtstream_manager ディレクトリがあるので、これを GDK のソースがある作業ディレクトリにコピーします。

$ ls -1a

./
../
.git/
CHANGELOG.rst
LICENSE
MANIFEST.in
NOTICE
README.md
docs/
requirements.txt
samples/
setup.cfg
setup.py
stream_manager/
stream_manager_sdk.zip
$ cp -R stream_manager ~/YOUR-GDK-CODE-PATH/StreamManager2IoTAnalytics/
$ cp requirements.txt ~/YOUR-GDK-CODE-PATH/StreamManager2IoTAnalytics/

コンポーネントのビルド & アーティファクトのパブリッシュ

これでコンポーネントをビルドする準備ができたので、GDK の作業用ディレクトリに戻りビルドします。

$ gdk component build

特に問題なければアーティファクトとなる ZIP ファイルが zip-build ディレクトリに作成されているはずなので、そのままパブリッシュします。
gdk component publish を実行する場合は、S3 と Greengrass に対して IAM 権限が必要になるので事前に作業環境に対して付与しておくようにしましょう。権限の内容については、こちらの記事で紹介しています。

$ gdk component publish --bucket gdk-my-bucket

SiteWise ゲートウェイに IAM 権限を追加

ストリームマネージャーコンポーネントをデプロイした際は、このコンポーネントが IoT Analytics にデータを送るので、デバイス側にその権限が必要になります。
追加する対象の IAM Role はゲートウェイ(Greengrass V2デバイス)の GreengrassV2TokenExchangeRole になります。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "iotanalytics:BatchPutMessage"
            ],
            "Resource": "arn:aws:iotanalytics:ap-northeast-1:*:channel/ggcstreammanager_channel"
        }
    ]
}
  • 参考ページ

カスタムコンポーネントのデプロイ

GDK で問題なくパブリッシュができていれば、Greengrass のコンソールでコンポーネントを確認することができます。左上の「デプロイ」ボタンをクリックしてデプロイしましょう。

06-iot-analytics-component

OPC UA サーバの起動

下記の記事で紹介しているコードを元に ダミーの OPC UA サーバを起動しておきます。

SiteWise ゲートウェイの設定変更

最後に SiteWise ゲートウェイの設定で、ゲートウェイからのデータ送信先を変更します。
SiteWise のコンソールで「ゲートウェイ」のページを開いて、該当するデータソースを選択します。

03-edit-sitewise-gateway-data-source

「送信先」を「AWS IoT Greengrass ストリームマネージャー」に変更します。次に「Greengrass ストリーム名」にコンポーネントのコード内で指定した「IoTAnalyticsStream」を入力します。

04-config-stream-name

しばらくするとゲートウェイに変更が反映されます。これで変更作業はすべて完了です。

動作確認

では動作確認してみましょう。無事に IoT Analytics のチャンネルにデータが届いていればデータセットに格納されているはずなので、データセットのコンテンツにクエリを実行して確認してみましょう。

05-iot-analytics-dataset

次のような結果が確認できれば OK です。確かに OPC UA サーバ上のデータを確認することができました。

10-iot-analytics-dataset

最後に

今回は SiteWise ゲートウェイから IoT Analytics にデータを送ってみました。
これで、リアルタイムに可視化したいものは SiteWise Monitor で表示して、BI としてデータ分析したい場合は QuickSight に送る、といったような構成も採れるようになります。

IoT データの場合、蓄積したデータを分析して業務改善や予知保全に活用したいことが多いと思うので、この組み合わせについてもう少し深堀りしていこうと思います。

以上です。