この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
IoT事業部の平内(SIN)です。
AWS IoT Greengrass V2では、AWSから提供されるいくつかの事前構築済みコンポーネントがありますが、その中のストリームマネージャー (aws.greengrass.StreamManager) を使用すると、ストリームのための共通インタフェースが利用可能になります。
ストリームマネージャーでは、次の送信先に対応しています。
- AWS IoT Analytics
- Amazon Kinesis Data Streams
- AWS IoT SiteWise
- Amazon S3
今回は、上記のうち、Kienesis Data Streamsへのデータ送信を試してみました。
2 構成
試したみた構成は以下のような感じです。
カスタムのコンポネントでは、ストリームマネージャーSDKを使用してデータをストリームに送ります。 ストリームの送信先は、Kinesis Data Streams(stream-sample)に設定されており、同ストリームをトリガとしたLambdaを設置し、そのログでデータの到着を確認しています。
3 Kinesis Data Streams
シャード1で、ストリーム(stream-sample)を作成しています。
4 レシピ
ストリームマネージャーを使用する場合、依存関係で aws.greengrass.StreamManager を設定する必要があります。
また、ストリームマネージャ SDKを必要とするために、Artifactsで追加しています。
参考:ストリームマネージャーを使用するコンポーネントレシピの定義
---
RecipeFormatVersion: "2020-01-25"
ComponentName: "com.example.KinesisSample"
ComponentVersion: "1.0.9"
ComponentType: "aws.greengrass.generic"
ComponentDependencies:
aws.greengrass.StreamManager:
VersionRequirement: "^2.0.0"
Manifests:
- Platform:
os: linux
Lifecycle:
Install: pip3 install --user -r {artifacts:path}/requirements.txt
Run: |
export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
python3 -u {artifacts:path}/kinesis_sample.py
Artifacts:
- URI: s3://bucketname/artifacts/com.example.KinesisSample/1.0.0/stream_manager_sdk.zip
Unarchive: ZIP
- URI: s3://bucketname/artifacts/com.example.KinesisSample/1.0.0/kinesis_sample.py
- URI: s3://bucketname/artifacts/com.example.KinesisSample/1.0.0/requirements.txt
5 コード
コンポーネントのコードは、以下のとおりです。
kinesis_sample.py
import datetime
import time
from stream_manager import (
ExportDefinition,
KinesisConfig,
MessageStreamDefinition,
ResourceNotFoundException,
StrategyOnFull,
StreamManagerClient,
)
stream_name = "MyStream"
kinesis_stream_name = "stream-sample"
client = StreamManagerClient()
# ストリームが存在する場合は、一旦削除する
try:
client.delete_message_stream(stream_name=stream_name)
except ResourceNotFoundException:
pass
# ストリーム作成(Kinesis Data Streams)
exports = ExportDefinition(
kinesis=[KinesisConfig(
identifier="KinesisExport" + stream_name,
kinesis_stream_name=kinesis_stream_name,
batch_size=10 # 確認しやすいように、10件溜まったら送信するようにする
)]
)
client.create_message_stream(
MessageStreamDefinition(
name=stream_name,
strategy_on_full=StrategyOnFull.OverwriteOldestData,
export_definition = exports
)
)
while(True):
# 1秒間隔でタイムスタンプを送る
timestamp = (datetime.datetime.now()).strftime('%Y/%m/%d %H:%M:%S')
record = "timestamp: {}".format(timestamp)
result = client.append_message(stream_name, record.encode("utf-8"))
print(result)
time.sleep(1)
client.close()
ExportDefinition() で、ストリームの送信先としてKinesisを指定していますが、ここで指定するKinesisConfigが、Kinesisのストリームに対する細部の設定となります。
(一部抜粋)
- kinesis_stream_name: Kinesis Data Stremasのストリーム名
- batch_size: 蓄積サイズ(1〜500 デフォルト:500)
- batch_interval_millis: 蓄積時間(60000〜9223372036854)
- priority: アップロードの優先順位
batch_sizeと、batch_interval_millisの両方が設定されている場合、とちらかが条件にヒットした時、アップロードされます。デフォルトでは、データ500件毎にアップロードされますが、上記サンプルでは、動作確認がしやすいように、batch_size = 10としています。
6 ポリシー
IoT Greengrassが、Kinesis Data Stremasのストリームに書き込むためには、PutRecordsが必要です。
作成したポリシーです。
KinesisStreamPolicy
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecords"
],
"Resource": [
"arn:aws:kinesis:ap-northeast-1:*:stream/stream-sample"
]
}
]
}
そして、上記ポリシーをGreengrassV2TokenExchangeRoleに追加しています。
7 動作確認
動作確認のためにストリームをトリガとしたLambdaです。
import json
import base64
def lambda_handler(event, context):
if("Records" in event):
records = event["Records"]
print("records len:{}".format(len(records)))
for i, record in enumerate(records):
decoded_data = base64.b64decode(record["kinesis"]["data"])
print(decoded_data)
Lambdaのログから1秒毎のレコードが、10件溜まる毎に送られてきている様子を確認できます。
8 最後に
ストリームマネージャーを使用して、Kienesis Data Streamsへのデータ送信を試してみました。
単純にKinesisにデータを送りたいだけであれば、トークンマネージャーなどで権限を付与して、直接送ってもいいのですが、ストリームマネージャーを使用することが、以下のような効果が期待できます。
- 他のストリーム(S3や、AWS IoT Analyticsなど)との共通インターフェース
- ストレージタイプ、サイズ、データ保持に関するポリシーを定義できる
- エクスポート先、優先度、永続性などが定義できる
- ネットワーク切断時のリカバリなどの考慮が必要なくなる(ファイルにバッファリングされている)
言わば、「ストリームに関するマネージドサービスが、エッジ側で軽易に利用可能になる」と言えると思います。
9 参考リンク
[AWS IoT Greengrass V2] RaspberryPIにインストールしてみました
[AWS IoT Greengrass V2] RaspberryPIでコンポーネントを作成してみました
[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました
[AWS IoT Greengrass V2] クラウド側からコンポーネントを削除してみました
[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
[AWS IoT Greengrass V2] Lambda関数(コンポーネント)をデプロイしてみました
[AWS IoT Greengrass V2] コンポーネントからIoT CoreのメッセージブローカーにPublish/Subscribeしてみました
[AWS IoT Greengrass V2] コンポーネントからシークレットマネージャにアクセスしてみました
[AWS IoT Greengrass V2] コンポーネントでコアデバイス間のPublish/Subscribeを試してみました
[AWS IoT Greengrass V2] ログマネージャでコンポーネントのログをCloudWatch Logsに送ってみました
[AWS IoT Greengrass V2] トークン交換サービスでコンポーネントからDynamoDBにアクセスしてみました