Greengrassストリームマネージャーを使ってコアデバイス上でストリームデータを処理する

2020.01.13

CX事業本部@大阪の岩田です。Greengrass Core1.10から利用可能になったストリームマネージャーを試してみました。

環境

今回利用した環境です

  • OS: Amazon Linux2 (ami-011facbea5ec0363b)
  • Greengrass Core: 1.10
  • Javaランタイム: Openjdk 1.8.0
  • Lambdaのランタイム: Python3.7
  • Greengrass Core Python SDK: 1.5.0

Greengrassストリームマネージャーとは?

Greengrassのコアデバイス上で動作するデータストリームを処理するためのサービスです。

  • 受信したストリームデータの保存ポリシー管理
  • IoT AnalyticsもしくはKinesis Data Streamsに対するデータの自動エクスポート
  • エクスポートするデータの優先順位付け
  • データのエクスポートに使用できる帯域幅制御
  • データエクスポート時のバッチサイズ設定

等の機能を備えています。

やってみる

実際にストリームマネージャーを使ってみましょう。今回はGreengrass Lambdaからストリームマネージャへの読み書きと、ストリームマネージャーからKinesis Data Streamsへのデータエクスポートを試してみます。

コアデバイスのセットアップ

今回はEC2をコアデバイスとして利用します。こちらのブログを参考にセットアップを行います。

デバイスがなくても大丈夫!EC2でAWS IoT Greengrassを動かす #reinvent

今回はGreengrass LambdaのランタイムにPython3.7を利用するので、Python3.7をインストールします。

sudo yum install python3

ついでにgreengrasssdkもインストールしておきます。これでLambdaのパッケージにいちいちgreengrasssdkを詰め込まなくてもgreengrasssdkが利用できるようになります。

sudo pip3 install greengrasssdk

コアデバイスでストリームマネージャーを稼働させるにはJavaのランタイムが必要になるので、Open JDKを追加でインストールします。もしコアデバイスにJavaのランタイムが入っていないと、...failed to initialize with reason Unable to find java or java8 executablesといったエラーでGreengrassグループのデプロイが失敗します。

sudo yum install java-1.8.0-openjdk

ここまでできたら、対象のGreengrassグループの設定でストリームマネージャーが有効になっていることを確認し、Greengrassグループをデプロイしてみましょう。

正常にデプロイが完了すると、コアデバイス上でストリームマネージャーのプロセスが稼働していることが確認できます。

$ ps ax | grep stream
 9958 ?        Ssl    0:02 java -cp /lambda/stream_manager/AWSGreengrassStreamManager.jar:/runtime/java8/* com.amazonaws.iot.greengrass.streammanager.StreamManagerService

Kinesis Data Streamsの準備

ストリームマネージャーからデータをエクスポートするためにKinesisのストリームを作成しておきます。

aws kinesis create-stream --stream-name <適当なストリーム名> --shard-count 1

Greengrassロールの準備

GG Coreを実行するためのIAMロールを作成し、先程作成したKinesisのストリームにアクセスできるように以下のアクセス許可を追加します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:<リージョン>:<AWSアカウントID>:stream/<作成したストリーム名>",
            ]
        }
    ]
}

準備ができたら作成したIAMロールをGreengrassグループに割り当てておきます。

Lambda Functionの作成

Lambdaを作成して、ストリームへのデータ書き込みとストリームからKinesis Data Streamsへのエクスポートを試してみます。公式ドキュメントで紹介されている以下のコードでLambdaを作成します。

import asyncio
import logging
import random
import time

from greengrasssdk.stream_manager import (
    ExportDefinition,
    KinesisConfig,
    MessageStreamDefinition,
    ReadMessagesOptions,
    ResourceNotFoundException,
    StrategyOnFull,
    StreamManagerClient,
)


# This example creates a local stream named "SomeStream".
# It starts writing data into that stream and then stream manager automatically exports  
# the data to a customer-created Kinesis data stream named "MyKinesisStream". 
# This example runs forever until the program is stopped.

# The size of the local stream on disk will not exceed the default (which is 256 MB).
# Any data appended after the stream reaches the size limit continues to be appended, and
# stream manager deletes the oldest data until the total stream size is back under 256 MB.
# The Kinesis data stream in the cloud has no such bound, so all the data from this script is
# uploaded to Kinesis and you will be charged for that usage.


def main(logger):
    try:
        stream_name = "SomeStream"
        kinesis_stream_name = "<先程作成したKinesisストリームの名前>"

        # Create a client for the StreamManager
        client = StreamManagerClient()

        # Try deleting the stream (if it exists) so that we have a fresh start
        try:
            client.delete_message_stream(stream_name=stream_name)
        except ResourceNotFoundException:
            pass

        exports = ExportDefinition(
            kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)]
        )
        client.create_message_stream(
            MessageStreamDefinition(
                name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports
            )
        )

        # Append two messages and print their sequence numbers
        logger.info(
            "Successfully appended message to stream with sequence number %d",
            client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")),
        )
        logger.info(
            "Successfully appended message to stream with sequence number %d",
            client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")),
        )

        # Try reading the two messages we just appended and print them out
        logger.info(
            "Successfully read 2 messages: %s",
            client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)),
        )

        logger.info("Now going to start writing random integers between 0 and 1000 to the stream")
        # Now start putting in random data between 0 and 1000 to emulate device sensor input
        while True:
            logger.debug("Appending new random integer to stream")
            client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big"))
            time.sleep(1)

    except asyncio.TimeoutError:
        logger.exception("Timed out while executing")
    except Exception:
        logger.exception("Exception while running")


def function_handler(event, context):
    return


logging.basicConfig(level=logging.INFO)
# Start up this sample code
main(logger=logging.getLogger())

こちらのLambdaは後ほどLong-Lived LambdaとしてGreengrassグループにデプロイします。

ついでなので、ストリームから読み込みを行うLambdaも作ってみます。

import logging

from greengrasssdk.stream_manager import (
    StreamManagerClient,
    ReadMessagesOptions
)

client = StreamManagerClient()
logger=logging.getLogger() 
last_seq_no = 0

def lambda_handler(event, context):
    global last_seq_no
    message_list = client.read_messages(
        stream_name="SomeStream", #書き込み用Lambdaで指定したストリーム名と揃える
        options=ReadMessagesOptions(
            desired_start_sequence_number=last_seq_no + 1,
            min_message_count=1,
            max_message_count=100,
            read_timeout_millis=5000
        )
    )

    for msg in message_list:
        logger.info(msg)
    last_seq_no = message_list[-1].sequence_number

動作確認

準備ができたらGreengrassグループのデプロイを行い、Lambdaの動作を確認します。Greengrassグループは以下のように設定してデプロイして下さい。

  • データ書き込み用のLambda: Long-Lived Lambdaとして指定
  • 読み込み用のLambda適当なMQTTトピックから起動するようにサブスクリプションを設定

デプロイ完了後にMQTTトピック経由で読み込み用のLambdaを起動すると、以下のようなログが出力されます。

[2020-01-13T07:55:45.856Z][INFO]-ipc_client.py:202,Got work item with invocation id [1f157dc2-8832-4411-4fb8-69fcf84c6f63]
[2020-01-13T07:55:45.862Z][INFO]-lambda_function.py:25,<Class Message. stream_name: SomeStream, sequence_number: 31, ingest_time: 1578902127264, payload: b'\x00\x00\x01\xca'>
[2020-01-13T07:55:45.862Z][INFO]-lambda_function.py:25,<Class Message. stream_name: SomeStream, sequence_number: 32, ingest_time: 1578902128268, payload: b'\x00\x00\x00\x94'>
[2020-01-13T07:55:45.862Z][INFO]-lambda_function.py:25,<Class Message. stream_name: SomeStream, sequence_number: 33, ingest_time: 1578902129271, payload: b'\x00\x00\x01K'>
[2020-01-13T07:55:45.862Z][INFO]-lambda_function.py:25,<Class Message. stream_name: SomeStream, sequence_number: 34, ingest_time: 1578902130274, payload: b'\x00\x00\x00\x8f'>
[2020-01-13T07:55:45.862Z][INFO]-lambda_function.py:25,<Class Message. stream_name: SomeStream, sequence_number: 35, ingest_time: 1578902131278, payload: b'\x00\x00\x00\xa1'>

また、マネコンからKinesisストリームのモニタリングタブを確認するとストリームマネージャーからKinesis Data Streamsまでデータがエクスポートされていることが確認できます。

まとめ

Greengrassストリームマネージャーのご紹介でした。エッジ側で気軽にストリームサービスを実行できるようになったので、エッジ側で集計等のデータ加工処理を行ってからクラウドに連携するといったアーキテクチャが組みやすくなったのではないでしょうか?現在クラウド側だけでストリームデータの加工を行っているようなユースケースではストリームマネージャーを活用することでコストの削減も見込めるかもしれません。