設備機器のデータを AWS IoT SiteWise ゲートウェイから Amazon Kinesis Data Streams に送ってみた
はじめに
SiteWise ゲートウェイを使うと、OPC UA プロトコルで設備機器からデータを収集して SiteWise にデータを送ることができます。
また、SiteWise Monitor を使うとリアルタイムにデータを可視化することも可能ですが、データ収集の対象が多い場合、利用料金が大きくなる場合があります。
そのため SiteWise にデータを送る場合はリアルタイムに可視化したいデータのみを送り、その他のデータは QuickSight などでバッチ的に可視化するという方法が考えられます。
今回はこのような前提の上で、SiteWise ゲートウェイで収集したデータを Kineis Data Streams に送る構成を検証してみました。
全体の構成
今回の構成は下記のようになります。各要素について簡単に説明します。
OPC UA Server
ゲートウェイとは別のデバイス(Raspberry Pi)上に Python で ダミーサーバを構築しました。検証環境なのでゲートウェイデバイス上に動かしても問題ありません。
SiteWise ゲートウェイ 〜 データ送信の仕組み
冒頭で紹介した記事の内容と同じ手順でゲートウェイを構築しています。SiteWise ゲートウェイは Greengrass V2 のコンポーネントとして動作するので、初期状態では「SiteWise ゲーウェイ用のコンポーネント」が起動しています。
また、デフォルトでは SiteWise ゲートウェイは 収集したデータを SiteWise に送ります(初期構築時に選択)。SiteWise へのデータ送信には Greengrass V2 のストリームマネージャーが使われるので、ゲートウェイのインストール時にストリームマネージャーのコンポーネントも依存関係でデプロイされます。
このときデータ送信の経路は次のようになります。
- SiteWise コンポーネントが OPC UA サーバからデータを収集
- SiteWise コンポーネントが収集したデータを SiteWise サービス管理のストリームマネージャーに送信
- ストリーム名は「SiteWise_Stream」
- ストリーム「SiteWise_Stream」が SiteWise にデータ送信
一方で SiteWise コンポーネントによるデータの送信先は SiteWise 以外に「ユーザーが作成した Stream Manager」を選択することも可能です。
今回は送信先を変えて Kinesis Data Streams に送っています。なお、ストリームマネージャーがデータを送ることができるのは次のサービスになります。
- Amazon Kinesis Data Streams
- Amazon S3
- AWS IoT Analytics
- AWS IoT SiteWise
SORACOM
今回は別の要件で SORACOM の SIM 経由でデータを送信していますが、普通に宅内ルーターから送っても問題ありません。
Kinesis Data Streams と AWS Lambda
Kinesis Data Streams で受けたデータを確認するために Lambda を使っています。
本格的に使う場合は、Lambda で必要な処理をして他のサービスに連携していくことになりますが、今回は動作確認が目的なのでこれ以上の構成は作りません。
ストリーム管理用のカスタムコンポーネントの作成
GDK 用の各種ファイルを用意
それでは環境を作っていきます。まずは Kinesis へデータを送るためのストリームマネージャーを定義・作成するコンポーネントを作成します。作成には GDK (Greengrass Development Kit) を使います。
GDK がインストールされた PC の任意の場所で作業用ディレクトリを作成します。
--name
で指定した名前で作業ディレクトリが作成されます。今回は StreamManager2Kinesis
という名前にしました。
$ gdk component init \ -l python \ --name StreamManager2Kinesis \ -t HelloWorld
次に各種ファイルを用意していきます。まずは gdk-config.json
です。
コンポーネント名は適当なものを付けます。コンポーネント名は作業ディレクトリにちなんで com.example.StreamManager2Kinesis
としました。
また、bucket
で指定した名前の S3 バケットにコンポーネントのアーティファクトが保存されますが、コンポーネントのパブリッシュ時にオプションでバケットを指定すると、そちらが優先されます。
{ "component": { "com.example.StreamManager2Kinesis": { "author": "CM-ICHIDA", "version": "NEXT_PATCH", "build": { "build_system": "zip" }, "publish": { "bucket": "gdk-my-bucket", "region": "ap-northeast-1" } } }, "gdk_version": "1.1.0" }
recipe.yaml
- 依存関係でストリームマネージャーを指定します。
Lifecycle
では、GDK が Zip 化したアーティファクトをデプロイ時に展開する想定のパスを指定します。
--- RecipeFormatVersion: "2020-01-25" ComponentName: "{COMPONENT_NAME}" ComponentVersion: "{COMPONENT_VERSION}" ComponentType: "aws.greengrass.generic" ComponentDescription: "Stream Manager to Kinesis" ComponentPublisher: "{COMPONENT_AUTHOR}" ComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: "^2.0.0" Manifests: - Platform: os: all Lifecycle: Install: pip3 install --user -r {artifacts:decompressedPath}/StreamManager2Kinesis/requirements.txt Run: | export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/StreamManager2Kinesis/stream_manager python3 -u {artifacts:decompressedPath}/StreamManager2Kinesis/main.py Artifacts: - URI: "s3://BUCKET_NAME/COMPONENT_NAME/COMPONENT_VERSION/StreamManager2Kinesis.zip" Unarchive: ZIP
main.py
- ストリームマネージャー用コンポーネントのコードです。
- Kinesis Data Stream 向けのカスタムのストリームを作るだけのコンポーネントです。
- 17行目は、このコンポーネントが作成するストリームマネージャーのストリーム名です。分かりやすい一意の名前を付けます。
- 18行目では実際に作成した Kinesis のストリーム名を指定しています。
import asyncio import logging import time from stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) def main(): try: logger=logging.getLogger() stream_name = "KinesisStream" kinesis_stream_name = "stream-sample" client = StreamManagerClient() # ストリームが存在する場合は、一旦削除する try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass # ストリーム作成(Kinesis Data Streams) exports = ExportDefinition( kinesis=[KinesisConfig( identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name, batch_size=10 # 確認しやすいように、10件溜まったら送信するようにする )] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) except asyncio.TimeoutError: print("Timed out") except Exception as e: print(e) print(type(e)) finally: if client: client.close() logging.basicConfig(level=logging.INFO) main()
- 参考ドキュメント
ストリーム作成時のパラメータについては下記ドキュメントが詳しいです。
requirements.txt
と SDK の ZIP ファイルを用意
次に、requirements.txt
と SDK の ZIP ファイルを用意します。具体的には下記ドキュメントにある「Use the Stream Manager SDK for Python」 の内に従って作業しますが、今回は GDK に合わせた内容に変更しています。
必要なファイル郡はストリームマネージャーの SDK のソースに含まれているので、適当なディレクトリで Python の SDK をダウンロードします。
$ git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-python.git
ダウンロードしたディレクトリに移動します。
$ cd aws-greengrass-stream-manager-sdk-python
aws-greengrass-stream-manager-sdk-python
ディレクトリには下記の通り requirements.txt
と stream_manager
ディレクトリがあるので、これを GDK のソースがある作業ディレクトリにコピーします。
$ ls -1a ./ ../ .git/ CHANGELOG.rst LICENSE MANIFEST.in NOTICE README.md docs/ requirements.txt samples/ setup.cfg setup.py stream_manager/ stream_manager_sdk.zip
$ cp -R stream_manager ~/YOUR-GDK-CODE-PATH/StreamManager2Kinesis/ $ cp requirements.txt ~/YOUR-GDK-CODE-PATH/StreamManager2Kinesis/
コンポーネントのビルド & アーティファクトのパブリッシュ
これでコンポーネントをビルドする準備ができたので、GDK の作業用ディレクトリに戻りビルドします。
$ gdk component build
特に問題なければアーティファクトとなる ZIP ファイルが zip-build
ディレクトリに作成されているはずなので、そのままパブリッシュします。
gdk component publish
を実行する場合は、S3 と Greengrass に対して IAM 権限が必要になるので事前に作業環境に対して付与しておくようにしましょう。権限の内容については、こちらの記事で紹介しています。
$ gdk component publish --bucket gdk-my-bucket
SiteWise ゲートウェイに IAM 権限を追加
ストリームマネージャーコンポーネントをデプロイした際は、このコンポーネントが Kinesis Data Streams にデータを送るので、デバイス側にその権限が必要になります。
追加する対象の IAM Role はゲートウェイ(Greengrass V2デバイス)の GreengrassV2TokenExchangeRole
になります。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:ap-northeast-1:*:stream/stream-sample" ] } ] }
- 参考ページ
Kinesis Data Stream の作成
データの送り先となる Kinesis Data Streams のリソースを作成します。
Lambdaの作成
Kinesis で受けたデータを確認する為の Lambda 関数を作成します。コードは下記です。
import json import base64 def lambda_handler(event, context): if("Records" in event): records = event["Records"] print("records len:{}".format(len(records))) for i, record in enumerate(records): decoded_data = base64.b64decode(record["kinesis"]["data"]) print(decoded_data)
Kinesis に合わせてトリガーも設定しておきます。
Kinesis からデータを取得できるように関数に権限を追加しておきます。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "kinesis:ListStreams", "kinesis:DescribeStreamSummary", "kinesis:Get*" ], "Resource": "arn:aws:kinesis:ap-northeast-1:*:stream/stream-sample" } ] }
- 参考ドキュメント
カスタムコンポーネントのデプロイ
GDK で問題なくパブリッシュができていれば、Greengrass のコンソールでコンポーネントを確認することができます。左上の「デプロイ」ボタンをクリックしてデプロイしましょう。
OPC UA サーバの起動
下記の記事で紹介しているコードを元に ダミーの OPC UA サーバを起動しておきます。
SiteWise ゲートウェイの設定変更
最後に SiteWise ゲートウェイの設定で、ゲートウェイからのデータ送信先を変更します。
SiteWise のコンソールで「ゲートウェイ」のページを開いて、該当するデータソースを選択します。
「送信先」を「AWS IoT Greengrass ストリームマネージャー」に変更します。次に「Greengrass ストリーム名」にコンポーネントのコード内で指定した「KinesisStream」を入力します。
しばらくするとゲートウェイに変更が反映されます。これで変更作業はすべて完了です。
動作確認
では動作確認してみましょう。無事に Kinesis Data Streams にデータが届いていれば Lambda でデータを受け取れているはずです。次のように CloudWatch Logs でその様子が確認できれば OK です。
最後に
今回は Kinesis Data Streams にデータを送ってみました。
リアルタイムに可視化したいデータだけ SiteWise に送って、他のデータは Kinesis に送るという構成も取れるので、要件に応じて組み合わせて使っていきたいですね。
次回は IoT Analytics に送ってみようと思います。
以上です。