この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
前回の記事では、SiteWise ゲートウェイから Kinesis Data Streams にデータを送りました。今回は送り先を変えて IoT Analytics に送ってみたいと思います。
IoT Analytics は QuickSight のデータソースにすることができるので、BI ツールとしての可視化も容易になります。
全体の構成
デバイス側の構成は前回のものと変わりません。ストリームマネージャーで利用するコンポーネントが変わるだけです。クラウド側も送り先が IoT Analytics になっています。
AWS IoT Analytics リソースの作成
最初に IoT Analytics の環境を作成しておきます。今回はテストなのでシングルステップセットアップで作成します。シングルステップセットアップについては下記の記事を参照してください。
MQTT トピックは使わないので適当なものを設定します。
作成できた「チャンネル名」を控えておいてください。後で作成する Greengrass コンポーネントのコードの中で送信先に指定します。
ストリーム管理用のカスタムコンポーネントの作成
前回と同様に、IoT Analytics にデータ送信するカスタムコンポーネントは GDK (Greengrass Development Kit) で作成します。そのためパラメータ及びコンポーネントのコードが異なるだけで、手順自体は前回と同じです。
GDK 用の各種ファイルを用意
GDK がインストールされた PC の任意の場所で作業用ディレクトリを作成します。
--name
で指定した名前で作業ディレクトリが作成されます。今回は StreamManager2IoTAnalytics
という名前にしました。
$ gdk component init \
-l python \
--name StreamManager2IoTAnalytics \
-t HelloWorld
次に各種ファイルを用意していきます。まずは gdk-config.json
です。
コンポーネント名は適当なものを付けます。コンポーネント名は作業ディレクトリにちなんで com.example.StreamManager2IoTAnalytics
としました。
また、bucket
で指定した名前の S3 バケットにコンポーネントのアーティファクトが保存されますが、コンポーネントのパブリッシュ時にオプションでバケットを指定すると、そちらが優先されます。
gdk-config.json
{
"component": {
"com.example.StreamManager2IoTAnalytics": {
"author": "CM-ICHIDA",
"version": "NEXT_PATCH",
"build": {
"build_system": "zip"
},
"publish": {
"bucket": "gdk-my-bucket",
"region": "ap-northeast-1"
}
}
},
"gdk_version": "1.1.0"
}
recipe.yaml
- 依存関係でストリームマネージャーを指定します。
Lifecycle
では、GDK が Zip 化したアーティファクトをデプロイ時に展開する想定のパスを指定します。
recipe.yaml
---
RecipeFormatVersion: "2020-01-25"
ComponentName: "{COMPONENT_NAME}"
ComponentVersion: "{COMPONENT_VERSION}"
ComponentType: "aws.greengrass.generic"
ComponentDescription: "Stream Manager to IoT Analytics"
ComponentPublisher: "{COMPONENT_AUTHOR}"
ComponentDependencies:
aws.greengrass.StreamManager:
VersionRequirement: "^2.0.0"
Manifests:
- Platform:
os: all
Lifecycle:
Install: pip3 install --user -r {artifacts:decompressedPath}/StreamManager2IoTAnalytics/requirements.txt
Run: |
export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/StreamManager2IoTAnalytics/stream_manager
python3 -u {artifacts:decompressedPath}/StreamManager2IoTAnalytics/main.py
Artifacts:
- URI: "s3://BUCKET_NAME/COMPONENT_NAME/COMPONENT_VERSION/StreamManager2IoTAnalytics.zip"
Unarchive: ZIP
main.py
- ストリームマネージャー用コンポーネントのコードです。
- IoT Analytics 向けのカスタムのストリームを作るだけのコンポーネントです。
- 17行目は、このコンポーネントが作成するストリームマネージャーのストリーム名です。分かりやすい一意の名前を付けます。
- 18行目では、実際に作成した IoT Analytics のチャンネル名を指定しています。
- 19行目では、各メッセージ ID の前に付けられるプレフィックス名を指定します。適当な一意のものを付けます。
import asyncio
import logging
import time
from stream_manager import (
ExportDefinition,
IoTAnalyticsConfig,
MessageStreamDefinition,
ResourceNotFoundException,
StrategyOnFull,
StreamManagerClient,
)
def main():
try:
logger=logging.getLogger()
stream_name = "IoTAnalyticsStream"
iot_channel = "ggcstreammanager_channel"
iot_msg_id_prefix = "samplePrefix"
client = StreamManagerClient()
# ストリームが存在する場合は、一旦削除する
try:
client.delete_message_stream(stream_name=stream_name)
except ResourceNotFoundException:
pass
# ストリーム作成(IoT Analytics)
exports = ExportDefinition(
iot_analytics=[IoTAnalyticsConfig(
identifier="IoTAnalyticsExport" + stream_name,
iot_channel=iot_channel,
iot_msg_id_prefix=iot_msg_id_prefix,
batch_size=10 # 確認しやすいように、10件溜まったら送信するようにする
)]
)
client.create_message_stream(
MessageStreamDefinition(
name=stream_name,
strategy_on_full=StrategyOnFull.OverwriteOldestData,
export_definition=exports
)
)
except asyncio.TimeoutError:
print("Timed out")
except Exception as e:
print(e)
print(type(e))
finally:
if client:
client.close()
logging.basicConfig(level=logging.INFO)
# Start up this sample code
main()
- 参考ドキュメント
ストリーム作成時のパラメータについては下記ドキュメントが詳しいです。
requirements.txt
と SDK の ZIP ファイルを用意
次に、requirements.txt
と SDK の ZIP ファイルを用意します。具体的には下記ドキュメントにある「Use the Stream Manager SDK for Python」 の内に従って作業しますが、今回は GDK に合わせた内容に変更しています。
必要なファイル郡はストリームマネージャーの SDK のソースに含まれているので、適当なディレクトリで Python の SDK をダウンロードします。
$ git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-python.git
ダウンロードしたディレクトリに移動します。
$ cd aws-greengrass-stream-manager-sdk-python
aws-greengrass-stream-manager-sdk-python
ディレクトリには下記の通り requirements.txt
と stream_manager
ディレクトリがあるので、これを GDK のソースがある作業ディレクトリにコピーします。
$ ls -1a
./
../
.git/
CHANGELOG.rst
LICENSE
MANIFEST.in
NOTICE
README.md
docs/
requirements.txt
samples/
setup.cfg
setup.py
stream_manager/
stream_manager_sdk.zip
$ cp -R stream_manager ~/YOUR-GDK-CODE-PATH/StreamManager2IoTAnalytics/
$ cp requirements.txt ~/YOUR-GDK-CODE-PATH/StreamManager2IoTAnalytics/
コンポーネントのビルド & アーティファクトのパブリッシュ
これでコンポーネントをビルドする準備ができたので、GDK の作業用ディレクトリに戻りビルドします。
$ gdk component build
特に問題なければアーティファクトとなる ZIP ファイルが zip-build
ディレクトリに作成されているはずなので、そのままパブリッシュします。
gdk component publish
を実行する場合は、S3 と Greengrass に対して IAM 権限が必要になるので事前に作業環境に対して付与しておくようにしましょう。権限の内容については、こちらの記事で紹介しています。
$ gdk component publish --bucket gdk-my-bucket
SiteWise ゲートウェイに IAM 権限を追加
ストリームマネージャーコンポーネントをデプロイした際は、このコンポーネントが IoT Analytics にデータを送るので、デバイス側にその権限が必要になります。
追加する対象の IAM Role はゲートウェイ(Greengrass V2デバイス)の GreengrassV2TokenExchangeRole
になります。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iotanalytics:BatchPutMessage"
],
"Resource": "arn:aws:iotanalytics:ap-northeast-1:*:channel/ggcstreammanager_channel"
}
]
}
- 参考ページ
カスタムコンポーネントのデプロイ
GDK で問題なくパブリッシュができていれば、Greengrass のコンソールでコンポーネントを確認することができます。左上の「デプロイ」ボタンをクリックしてデプロイしましょう。
OPC UA サーバの起動
下記の記事で紹介しているコードを元に ダミーの OPC UA サーバを起動しておきます。
SiteWise ゲートウェイの設定変更
最後に SiteWise ゲートウェイの設定で、ゲートウェイからのデータ送信先を変更します。
SiteWise のコンソールで「ゲートウェイ」のページを開いて、該当するデータソースを選択します。
「送信先」を「AWS IoT Greengrass ストリームマネージャー」に変更します。次に「Greengrass ストリーム名」にコンポーネントのコード内で指定した「IoTAnalyticsStream」を入力します。
しばらくするとゲートウェイに変更が反映されます。これで変更作業はすべて完了です。
動作確認
では動作確認してみましょう。無事に IoT Analytics のチャンネルにデータが届いていればデータセットに格納されているはずなので、データセットのコンテンツにクエリを実行して確認してみましょう。
次のような結果が確認できれば OK です。確かに OPC UA サーバ上のデータを確認することができました。
最後に
今回は SiteWise ゲートウェイから IoT Analytics にデータを送ってみました。
これで、リアルタイムに可視化したいものは SiteWise Monitor で表示して、BI としてデータ分析したい場合は QuickSight に送る、といったような構成も採れるようになります。
IoT データの場合、蓄積したデータを分析して業務改善や予知保全に活用したいことが多いと思うので、この組み合わせについてもう少し深堀りしていこうと思います。
以上です。