Greengrassストリームマネージャーを使ってコアデバイス上でストリームデータを処理する
CX事業本部@大阪の岩田です。Greengrass Core1.10から利用可能になったストリームマネージャーを試してみました。
環境
今回利用した環境です
- OS: Amazon Linux2 (ami-011facbea5ec0363b)
- Greengrass Core: 1.10
- Javaランタイム: Openjdk 1.8.0
- Lambdaのランタイム: Python3.7
- Greengrass Core Python SDK: 1.5.0
Greengrassストリームマネージャーとは?
Greengrassのコアデバイス上で動作するデータストリームを処理するためのサービスです。
- 受信したストリームデータの保存ポリシー管理
- IoT AnalyticsもしくはKinesis Data Streamsに対するデータの自動エクスポート
- エクスポートするデータの優先順位付け
- データのエクスポートに使用できる帯域幅制御
- データエクスポート時のバッチサイズ設定
等の機能を備えています。
やってみる
実際にストリームマネージャーを使ってみましょう。今回はGreengrass Lambdaからストリームマネージャへの読み書きと、ストリームマネージャーからKinesis Data Streamsへのデータエクスポートを試してみます。
コアデバイスのセットアップ
今回はEC2をコアデバイスとして利用します。こちらのブログを参考にセットアップを行います。
今回はGreengrass LambdaのランタイムにPython3.7を利用するので、Python3.7をインストールします。
sudo yum install python3
ついでにgreengrasssdkもインストールしておきます。これでLambdaのパッケージにいちいちgreengrasssdkを詰め込まなくてもgreengrasssdkが利用できるようになります。
sudo pip3 install greengrasssdk
コアデバイスでストリームマネージャーを稼働させるにはJavaのランタイムが必要になるので、Open JDKを追加でインストールします。もしコアデバイスにJavaのランタイムが入っていないと、...failed to initialize with reason Unable to find java or java8 executables
といったエラーでGreengrassグループのデプロイが失敗します。
sudo yum install java-1.8.0-openjdk
ここまでできたら、対象のGreengrassグループの設定でストリームマネージャーが有効になっていることを確認し、Greengrassグループをデプロイしてみましょう。
正常にデプロイが完了すると、コアデバイス上でストリームマネージャーのプロセスが稼働していることが確認できます。
$ ps ax | grep stream 9958 ? Ssl 0:02 java -cp /lambda/stream_manager/AWSGreengrassStreamManager.jar:/runtime/java8/* com.amazonaws.iot.greengrass.streammanager.StreamManagerService
Kinesis Data Streamsの準備
ストリームマネージャーからデータをエクスポートするためにKinesisのストリームを作成しておきます。
aws kinesis create-stream --stream-name <適当なストリーム名> --shard-count 1
Greengrassロールの準備
GG Coreを実行するためのIAMロールを作成し、先程作成したKinesisのストリームにアクセスできるように以下のアクセス許可を追加します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:<リージョン>:<AWSアカウントID>:stream/<作成したストリーム名>", ] } ] }
準備ができたら作成したIAMロールをGreengrassグループに割り当てておきます。
Lambda Functionの作成
Lambdaを作成して、ストリームへのデータ書き込みとストリームからKinesis Data Streamsへのエクスポートを試してみます。公式ドキュメントで紹介されている以下のコードでLambdaを作成します。
import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "<先程作成したKinesisストリームの名前>" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
こちらのLambdaは後ほどLong-Lived LambdaとしてGreengrassグループにデプロイします。
ついでなので、ストリームから読み込みを行うLambdaも作ってみます。
import logging from greengrasssdk.stream_manager import ( StreamManagerClient, ReadMessagesOptions ) client = StreamManagerClient() logger=logging.getLogger() last_seq_no = 0 def lambda_handler(event, context): global last_seq_no message_list = client.read_messages( stream_name="SomeStream", #書き込み用Lambdaで指定したストリーム名と揃える options=ReadMessagesOptions( desired_start_sequence_number=last_seq_no + 1, min_message_count=1, max_message_count=100, read_timeout_millis=5000 ) ) for msg in message_list: logger.info(msg) last_seq_no = message_list[-1].sequence_number
動作確認
準備ができたらGreengrassグループのデプロイを行い、Lambdaの動作を確認します。Greengrassグループは以下のように設定してデプロイして下さい。
- データ書き込み用のLambda: Long-Lived Lambdaとして指定
- 読み込み用のLambda適当なMQTTトピックから起動するようにサブスクリプションを設定
デプロイ完了後にMQTTトピック経由で読み込み用のLambdaを起動すると、以下のようなログが出力されます。
[2020-01-13T07:55:45.856Z][INFO]-ipc_client.py:202,Got work item with invocation id [1f157dc2-8832-4411-4fb8-69fcf84c6f63] [2020-01-13T07:55:45.862Z][INFO]-lambda_function.py:25,<Class Message. stream_name: SomeStream, sequence_number: 31, ingest_time: 1578902127264, payload: b'\x00\x00\x01\xca'> [2020-01-13T07:55:45.862Z][INFO]-lambda_function.py:25,<Class Message. stream_name: SomeStream, sequence_number: 32, ingest_time: 1578902128268, payload: b'\x00\x00\x00\x94'> [2020-01-13T07:55:45.862Z][INFO]-lambda_function.py:25,<Class Message. stream_name: SomeStream, sequence_number: 33, ingest_time: 1578902129271, payload: b'\x00\x00\x01K'> [2020-01-13T07:55:45.862Z][INFO]-lambda_function.py:25,<Class Message. stream_name: SomeStream, sequence_number: 34, ingest_time: 1578902130274, payload: b'\x00\x00\x00\x8f'> [2020-01-13T07:55:45.862Z][INFO]-lambda_function.py:25,<Class Message. stream_name: SomeStream, sequence_number: 35, ingest_time: 1578902131278, payload: b'\x00\x00\x00\xa1'>
また、マネコンからKinesisストリームのモニタリングタブを確認するとストリームマネージャーからKinesis Data Streamsまでデータがエクスポートされていることが確認できます。
まとめ
Greengrassストリームマネージャーのご紹介でした。エッジ側で気軽にストリームサービスを実行できるようになったので、エッジ側で集計等のデータ加工処理を行ってからクラウドに連携するといったアーキテクチャが組みやすくなったのではないでしょうか?現在クラウド側だけでストリームデータの加工を行っているようなユースケースではストリームマネージャーを活用することでコストの削減も見込めるかもしれません。