設備機器のデータを AWS IoT SiteWise ゲートウェイから Amazon Kinesis Data Streams に送ってみた

製造業 IoT の為の AWS アーキテクチャパターンの紹介です。今回は簡単に Kinesis Data Streams へデータを送る方法を紹介します。
2022.06.13

はじめに

SiteWise ゲートウェイを使うと、OPC UA プロトコルで設備機器からデータを収集して SiteWise にデータを送ることができます。

また、SiteWise Monitor を使うとリアルタイムにデータを可視化することも可能ですが、データ収集の対象が多い場合、利用料金が大きくなる場合があります。

そのため SiteWise にデータを送る場合はリアルタイムに可視化したいデータのみを送り、その他のデータは QuickSight などでバッチ的に可視化するという方法が考えられます。

今回はこのような前提の上で、SiteWise ゲートウェイで収集したデータを Kineis Data Streams に送る構成を検証してみました。

全体の構成

今回の構成は下記のようになります。各要素について簡単に説明します。

00-kinesis-diagram

OPC UA Server

ゲートウェイとは別のデバイス(Raspberry Pi)上に Python で ダミーサーバを構築しました。検証環境なのでゲートウェイデバイス上に動かしても問題ありません。

SiteWise ゲートウェイ 〜 データ送信の仕組み

冒頭で紹介した記事の内容と同じ手順でゲートウェイを構築しています。SiteWise ゲートウェイは Greengrass V2 のコンポーネントとして動作するので、初期状態では「SiteWise ゲーウェイ用のコンポーネント」が起動しています。
また、デフォルトでは SiteWise ゲートウェイは 収集したデータを SiteWise に送ります(初期構築時に選択)。SiteWise へのデータ送信には Greengrass V2 のストリームマネージャーが使われるので、ゲートウェイのインストール時にストリームマネージャーのコンポーネントも依存関係でデプロイされます。

このときデータ送信の経路は次のようになります。

  • SiteWise コンポーネントが OPC UA サーバからデータを収集
  • SiteWise コンポーネントが収集したデータを SiteWise サービス管理のストリームマネージャーに送信
    • ストリーム名は「SiteWise_Stream」
  • ストリーム「SiteWise_Stream」が SiteWise にデータ送信

00-default-diagram

一方で SiteWise コンポーネントによるデータの送信先は SiteWise 以外に「ユーザーが作成した Stream Manager」を選択することも可能です。
今回は送信先を変えて Kinesis Data Streams に送っています。なお、ストリームマネージャーがデータを送ることができるのは次のサービスになります。

  • Amazon Kinesis Data Streams
  • Amazon S3
  • AWS IoT Analytics
  • AWS IoT SiteWise

SORACOM

今回は別の要件で SORACOM の SIM 経由でデータを送信していますが、普通に宅内ルーターから送っても問題ありません。

Kinesis Data Streams と AWS Lambda

Kinesis Data Streams で受けたデータを確認するために Lambda を使っています。
本格的に使う場合は、Lambda で必要な処理をして他のサービスに連携していくことになりますが、今回は動作確認が目的なのでこれ以上の構成は作りません。

ストリーム管理用のカスタムコンポーネントの作成

GDK 用の各種ファイルを用意

それでは環境を作っていきます。まずは Kinesis へデータを送るためのストリームマネージャーを定義・作成するコンポーネントを作成します。作成には GDK (Greengrass Development Kit) を使います。

GDK がインストールされた PC の任意の場所で作業用ディレクトリを作成します。
--name で指定した名前で作業ディレクトリが作成されます。今回は StreamManager2Kinesis という名前にしました。

$ gdk component init \
  -l python \
  --name StreamManager2Kinesis \
  -t HelloWorld

次に各種ファイルを用意していきます。まずは gdk-config.json です。
コンポーネント名は適当なものを付けます。コンポーネント名は作業ディレクトリにちなんで com.example.StreamManager2Kinesis としました。 また、bucket で指定した名前の S3 バケットにコンポーネントのアーティファクトが保存されますが、コンポーネントのパブリッシュ時にオプションでバケットを指定すると、そちらが優先されます。

gdk-config.json

{
  "component": {
    "com.example.StreamManager2Kinesis": {
      "author": "CM-ICHIDA",
      "version": "NEXT_PATCH",
      "build": {
        "build_system": "zip"
      },
      "publish": {
        "bucket": "gdk-my-bucket",
        "region": "ap-northeast-1"
      }
    }
  },
  "gdk_version": "1.1.0"
}
  • recipe.yaml
    • 依存関係でストリームマネージャーを指定します。
    • Lifecycle では、GDK が Zip 化したアーティファクトをデプロイ時に展開する想定のパスを指定します。

recipe.yaml

---
RecipeFormatVersion: "2020-01-25"
ComponentName: "{COMPONENT_NAME}"
ComponentVersion: "{COMPONENT_VERSION}"
ComponentType: "aws.greengrass.generic"
ComponentDescription: "Stream Manager to Kinesis"
ComponentPublisher: "{COMPONENT_AUTHOR}"
ComponentDependencies:
  aws.greengrass.StreamManager:
    VersionRequirement: "^2.0.0"
Manifests:
  - Platform:
      os: all
    Lifecycle:
      Install: pip3 install --user -r {artifacts:decompressedPath}/StreamManager2Kinesis/requirements.txt
      Run: |
        export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/StreamManager2Kinesis/stream_manager
        python3 -u {artifacts:decompressedPath}/StreamManager2Kinesis/main.py
    Artifacts:
      - URI: "s3://BUCKET_NAME/COMPONENT_NAME/COMPONENT_VERSION/StreamManager2Kinesis.zip"
        Unarchive: ZIP
  • main.py
    • ストリームマネージャー用コンポーネントのコードです。
    • Kinesis Data Stream 向けのカスタムのストリームを作るだけのコンポーネントです。
    • 17行目は、このコンポーネントが作成するストリームマネージャーのストリーム名です。分かりやすい一意の名前を付けます。
    • 18行目では実際に作成した Kinesis のストリーム名を指定しています。
import asyncio
import logging
import time

from stream_manager import (
    ExportDefinition,
    KinesisConfig,
    MessageStreamDefinition,
    ResourceNotFoundException,
    StrategyOnFull,
    StreamManagerClient,
)

def main():
    try:
        logger=logging.getLogger()
        stream_name = "KinesisStream"
        kinesis_stream_name = "stream-sample"
        client = StreamManagerClient()

        # ストリームが存在する場合は、一旦削除する
        try:
            client.delete_message_stream(stream_name=stream_name)
        except ResourceNotFoundException:
            pass

        # ストリーム作成(Kinesis Data Streams)
        exports = ExportDefinition(
            kinesis=[KinesisConfig(
                identifier="KinesisExport" + stream_name,
                kinesis_stream_name=kinesis_stream_name,
                batch_size=10 # 確認しやすいように、10件溜まったら送信するようにする
            )]
        )
        client.create_message_stream(
            MessageStreamDefinition(
                name=stream_name,
                strategy_on_full=StrategyOnFull.OverwriteOldestData,
                export_definition=exports
            )
        )

    except asyncio.TimeoutError:
        print("Timed out")
    except Exception as e:
        print(e)
        print(type(e))
    finally:
        if client:
            client.close()

logging.basicConfig(level=logging.INFO)
main()
  • 参考ドキュメント

ストリーム作成時のパラメータについては下記ドキュメントが詳しいです。

requirements.txt と SDK の ZIP ファイルを用意

次に、requirements.txtと SDK の ZIP ファイルを用意します。具体的には下記ドキュメントにある「Use the Stream Manager SDK for Python」 の内に従って作業しますが、今回は GDK に合わせた内容に変更しています。

必要なファイル郡はストリームマネージャーの SDK のソースに含まれているので、適当なディレクトリで Python の SDK をダウンロードします。

$ git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-python.git

ダウンロードしたディレクトリに移動します。

$ cd aws-greengrass-stream-manager-sdk-python

aws-greengrass-stream-manager-sdk-python ディレクトリには下記の通り requirements.txtstream_manager ディレクトリがあるので、これを GDK のソースがある作業ディレクトリにコピーします。

$ ls -1a

./
../
.git/
CHANGELOG.rst
LICENSE
MANIFEST.in
NOTICE
README.md
docs/
requirements.txt
samples/
setup.cfg
setup.py
stream_manager/
stream_manager_sdk.zip
$ cp -R stream_manager ~/YOUR-GDK-CODE-PATH/StreamManager2IoTAnalytics/
$ cp requirements.txt ~/YOUR-GDK-CODE-PATH/StreamManager2IoTAnalytics/

コンポーネントのビルド & アーティファクトのパブリッシュ

これでコンポーネントをビルドする準備ができたので、GDK の作業用ディレクトリに戻りビルドします。

$ gdk component build

特に問題なければアーティファクトとなる ZIP ファイルが zip-build ディレクトリに作成されているはずなので、そのままパブリッシュします。
gdk component publish を実行する場合は、S3 と Greengrass に対して IAM 権限が必要になるので事前に作業環境に対して付与しておくようにしましょう。権限の内容については、こちらの記事で紹介しています。

$ gdk component publish --bucket gdk-my-bucket

SiteWise ゲートウェイに IAM 権限を追加

ストリームマネージャーコンポーネントをデプロイした際は、このコンポーネントが Kinesis Data Streams にデータを送るので、デバイス側にその権限が必要になります。
追加する対象の IAM Role はゲートウェイ(Greengrass V2デバイス)の GreengrassV2TokenExchangeRole になります。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:ap-northeast-1:*:stream/stream-sample"
            ]
        }
    ]
}
  • 参考ページ

Kinesis Data Stream の作成

データの送り先となる Kinesis Data Streams のリソースを作成します。

01-kinesis-stream

Lambdaの作成

Kinesis で受けたデータを確認する為の Lambda 関数を作成します。コードは下記です。

import json
import base64

def lambda_handler(event, context):
    if("Records" in event):
        records = event["Records"]
        print("records len:{}".format(len(records)))
        for i, record in enumerate(records):
            decoded_data = base64.b64decode(record["kinesis"]["data"])
            print(decoded_data)

Kinesis に合わせてトリガーも設定しておきます。

02-lambda-trigger

Kinesis からデータを取得できるように関数に権限を追加しておきます。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "kinesis:ListStreams",
                "kinesis:DescribeStreamSummary",
                "kinesis:Get*"
            ],
            "Resource": "arn:aws:kinesis:ap-northeast-1:*:stream/stream-sample"
        }
    ]
}
  • 参考ドキュメント

カスタムコンポーネントのデプロイ

GDK で問題なくパブリッシュができていれば、Greengrass のコンソールでコンポーネントを確認することができます。左上の「デプロイ」ボタンをクリックしてデプロイしましょう。

04-custom-component-to-kinesis

OPC UA サーバの起動

下記の記事で紹介しているコードを元に ダミーの OPC UA サーバを起動しておきます。

SiteWise ゲートウェイの設定変更

最後に SiteWise ゲートウェイの設定で、ゲートウェイからのデータ送信先を変更します。
SiteWise のコンソールで「ゲートウェイ」のページを開いて、該当するデータソースを選択します。

05-edit-data-source

「送信先」を「AWS IoT Greengrass ストリームマネージャー」に変更します。次に「Greengrass ストリーム名」にコンポーネントのコード内で指定した「KinesisStream」を入力します。

06-change-stream

しばらくするとゲートウェイに変更が反映されます。これで変更作業はすべて完了です。

動作確認

では動作確認してみましょう。無事に Kinesis Data Streams にデータが届いていれば Lambda でデータを受け取れているはずです。次のように CloudWatch Logs でその様子が確認できれば OK です。

03-cwlogs-from-stream-manager

最後に

今回は Kinesis Data Streams にデータを送ってみました。
リアルタイムに可視化したいデータだけ SiteWise に送って、他のデータは Kinesis に送るという構成も取れるので、要件に応じて組み合わせて使っていきたいですね。

次回は IoT Analytics に送ってみようと思います。

以上です。