[AWS IoT Greengrass V2] ストリームマネージャーを使用してコンポーネントからKinesis Data Streamsへデータを送ってみました

2021.09.02

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

IoT事業部の平内(SIN)です。

AWS IoT Greengrass V2では、AWSから提供されるいくつかの事前構築済みコンポーネントがありますが、その中のストリームマネージャー (aws.greengrass.StreamManager) を使用すると、ストリームのための共通インタフェースが利用可能になります。

ストリームマネージャーでは、次の送信先に対応しています。

  • AWS IoT Analytics
  • Amazon Kinesis Data Streams
  • AWS IoT SiteWise
  • Amazon S3

今回は、上記のうち、Kienesis Data Streamsへのデータ送信を試してみました。

2 構成

試したみた構成は以下のような感じです。

カスタムのコンポネントでは、ストリームマネージャーSDKを使用してデータをストリームに送ります。 ストリームの送信先は、Kinesis Data Streams(stream-sample)に設定されており、同ストリームをトリガとしたLambdaを設置し、そのログでデータの到着を確認しています。

3 Kinesis Data Streams

シャード1で、ストリーム(stream-sample)を作成しています。

4 レシピ

ストリームマネージャーを使用する場合、依存関係で aws.greengrass.StreamManager を設定する必要があります。

また、ストリームマネージャ SDKを必要とするために、Artifactsで追加しています。
参考:ストリームマネージャーを使用するコンポーネントレシピの定義

---
RecipeFormatVersion: "2020-01-25"
ComponentName: "com.example.KinesisSample"
ComponentVersion: "1.0.9"
ComponentType: "aws.greengrass.generic"
ComponentDependencies:
  aws.greengrass.StreamManager:
    VersionRequirement: "^2.0.0"
Manifests:
  - Platform:
      os: linux
    Lifecycle:
      Install: pip3 install --user -r {artifacts:path}/requirements.txt
      Run: |
        export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
        python3 -u {artifacts:path}/kinesis_sample.py
    Artifacts:
      - URI: s3://bucketname/artifacts/com.example.KinesisSample/1.0.0/stream_manager_sdk.zip
        Unarchive: ZIP
      - URI: s3://bucketname/artifacts/com.example.KinesisSample/1.0.0/kinesis_sample.py
      - URI: s3://bucketname/artifacts/com.example.KinesisSample/1.0.0/requirements.txt

5 コード

コンポーネントのコードは、以下のとおりです。


参考:https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_kinesis.py

kinesis_sample.py

import datetime
import time
from stream_manager import (
    ExportDefinition,
    KinesisConfig,
    MessageStreamDefinition,
    ResourceNotFoundException,
    StrategyOnFull,
    StreamManagerClient,
)

stream_name = "MyStream"
kinesis_stream_name = "stream-sample"
client = StreamManagerClient()

# ストリームが存在する場合は、一旦削除する
try:
    client.delete_message_stream(stream_name=stream_name)
except ResourceNotFoundException:
    pass

# ストリーム作成(Kinesis Data Streams)
exports = ExportDefinition(
    kinesis=[KinesisConfig(
        identifier="KinesisExport" + stream_name,
        kinesis_stream_name=kinesis_stream_name,
        batch_size=10 # 確認しやすいように、10件溜まったら送信するようにする
    )]
)
client.create_message_stream(
    MessageStreamDefinition(
        name=stream_name,
        strategy_on_full=StrategyOnFull.OverwriteOldestData,
        export_definition = exports
    )
)

while(True):
    # 1秒間隔でタイムスタンプを送る
    timestamp = (datetime.datetime.now()).strftime('%Y/%m/%d %H:%M:%S')
    record = "timestamp: {}".format(timestamp)
    result = client.append_message(stream_name, record.encode("utf-8"))
    print(result)
    time.sleep(1)

client.close()

ExportDefinition() で、ストリームの送信先としてKinesisを指定していますが、ここで指定するKinesisConfigが、Kinesisのストリームに対する細部の設定となります。

(一部抜粋)

  • kinesis_stream_name: Kinesis Data Stremasのストリーム名
  • batch_size: 蓄積サイズ(1〜500 デフォルト:500)
  • batch_interval_millis: 蓄積時間(60000〜9223372036854)
  • priority: アップロードの優先順位

batch_sizeと、batch_interval_millisの両方が設定されている場合、とちらかが条件にヒットした時、アップロードされます。デフォルトでは、データ500件毎にアップロードされますが、上記サンプルでは、動作確認がしやすいように、batch_size = 10としています。


参考:https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition

6 ポリシー

IoT Greengrassが、Kinesis Data Stremasのストリームに書き込むためには、PutRecordsが必要です。

作成したポリシーです。

KinesisStreamPolicy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:ap-northeast-1:*:stream/stream-sample"
            ]
        }
    ]
}

そして、上記ポリシーをGreengrassV2TokenExchangeRoleに追加しています。

7 動作確認

動作確認のためにストリームをトリガとしたLambdaです。

import json
import base64

def lambda_handler(event, context):
    if("Records" in event):
        records = event["Records"]
        print("records len:{}".format(len(records)))
        for i, record in enumerate(records):
            decoded_data = base64.b64decode(record["kinesis"]["data"])
            print(decoded_data)

Lambdaのログから1秒毎のレコードが、10件溜まる毎に送られてきている様子を確認できます。

8 最後に

ストリームマネージャーを使用して、Kienesis Data Streamsへのデータ送信を試してみました。

単純にKinesisにデータを送りたいだけであれば、トークンマネージャーなどで権限を付与して、直接送ってもいいのですが、ストリームマネージャーを使用することが、以下のような効果が期待できます。

  • 他のストリーム(S3や、AWS IoT Analyticsなど)との共通インターフェース
  • ストレージタイプ、サイズ、データ保持に関するポリシーを定義できる
  • エクスポート先、優先度、永続性などが定義できる
  • ネットワーク切断時のリカバリなどの考慮が必要なくなる(ファイルにバッファリングされている)

言わば、「ストリームに関するマネージドサービスが、エッジ側で軽易に利用可能になる」と言えると思います。

9 参考リンク


[AWS IoT Greengrass V2] RaspberryPIにインストールしてみました
[AWS IoT Greengrass V2] RaspberryPIでコンポーネントを作成してみました
[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました
[AWS IoT Greengrass V2] クラウド側からコンポーネントを削除してみました
[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
[AWS IoT Greengrass V2] Lambda関数(コンポーネント)をデプロイしてみました
[AWS IoT Greengrass V2] コンポーネントからIoT CoreのメッセージブローカーにPublish/Subscribeしてみました
[AWS IoT Greengrass V2] コンポーネントからシークレットマネージャにアクセスしてみました
[AWS IoT Greengrass V2] コンポーネントでコアデバイス間のPublish/Subscribeを試してみました
[AWS IoT Greengrass V2] ログマネージャでコンポーネントのログをCloudWatch Logsに送ってみました
[AWS IoT Greengrass V2] トークン交換サービスでコンポーネントからDynamoDBにアクセスしてみました