この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
CX事業本部@大阪の岩田です。Greengrass Core1.10から利用可能になったストリームマネージャーを試してみました。
環境
今回利用した環境です
- OS: Amazon Linux2 (ami-011facbea5ec0363b)
- Greengrass Core: 1.10
- Javaランタイム: Openjdk 1.8.0
- Lambdaのランタイム: Python3.7
- Greengrass Core Python SDK: 1.5.0
Greengrassストリームマネージャーとは?
Greengrassのコアデバイス上で動作するデータストリームを処理するためのサービスです。
- 受信したストリームデータの保存ポリシー管理
- IoT AnalyticsもしくはKinesis Data Streamsに対するデータの自動エクスポート
- エクスポートするデータの優先順位付け
- データのエクスポートに使用できる帯域幅制御
- データエクスポート時のバッチサイズ設定
等の機能を備えています。
やってみる
実際にストリームマネージャーを使ってみましょう。今回はGreengrass Lambdaからストリームマネージャへの読み書きと、ストリームマネージャーからKinesis Data Streamsへのデータエクスポートを試してみます。
コアデバイスのセットアップ
今回はEC2をコアデバイスとして利用します。こちらのブログを参考にセットアップを行います。
今回はGreengrass LambdaのランタイムにPython3.7を利用するので、Python3.7をインストールします。
sudo yum install python3
ついでにgreengrasssdkもインストールしておきます。これでLambdaのパッケージにいちいちgreengrasssdkを詰め込まなくてもgreengrasssdkが利用できるようになります。
sudo pip3 install greengrasssdk
コアデバイスでストリームマネージャーを稼働させるにはJavaのランタイムが必要になるので、Open JDKを追加でインストールします。もしコアデバイスにJavaのランタイムが入っていないと、...failed to initialize with reason Unable to find java or java8 executables
といったエラーでGreengrassグループのデプロイが失敗します。
sudo yum install java-1.8.0-openjdk
ここまでできたら、対象のGreengrassグループの設定でストリームマネージャーが有効になっていることを確認し、Greengrassグループをデプロイしてみましょう。
正常にデプロイが完了すると、コアデバイス上でストリームマネージャーのプロセスが稼働していることが確認できます。
$ ps ax | grep stream
9958 ? Ssl 0:02 java -cp /lambda/stream_manager/AWSGreengrassStreamManager.jar:/runtime/java8/* com.amazonaws.iot.greengrass.streammanager.StreamManagerService
Kinesis Data Streamsの準備
ストリームマネージャーからデータをエクスポートするためにKinesisのストリームを作成しておきます。
aws kinesis create-stream --stream-name <適当なストリーム名> --shard-count 1
Greengrassロールの準備
GG Coreを実行するためのIAMロールを作成し、先程作成したKinesisのストリームにアクセスできるように以下のアクセス許可を追加します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecords"
],
"Resource": [
"arn:aws:kinesis:<リージョン>:<AWSアカウントID>:stream/<作成したストリーム名>",
]
}
]
}
準備ができたら作成したIAMロールをGreengrassグループに割り当てておきます。
Lambda Functionの作成
Lambdaを作成して、ストリームへのデータ書き込みとストリームからKinesis Data Streamsへのエクスポートを試してみます。公式ドキュメントで紹介されている以下のコードでLambdaを作成します。
import asyncio
import logging
import random
import time
from greengrasssdk.stream_manager import (
ExportDefinition,
KinesisConfig,
MessageStreamDefinition,
ReadMessagesOptions,
ResourceNotFoundException,
StrategyOnFull,
StreamManagerClient,
)
# This example creates a local stream named "SomeStream".
# It starts writing data into that stream and then stream manager automatically exports
# the data to a customer-created Kinesis data stream named "MyKinesisStream".
# This example runs forever until the program is stopped.
# The size of the local stream on disk will not exceed the default (which is 256 MB).
# Any data appended after the stream reaches the size limit continues to be appended, and
# stream manager deletes the oldest data until the total stream size is back under 256 MB.
# The Kinesis data stream in the cloud has no such bound, so all the data from this script is
# uploaded to Kinesis and you will be charged for that usage.
def main(logger):
try:
stream_name = "SomeStream"
kinesis_stream_name = "<先程作成したKinesisストリームの名前>"
# Create a client for the StreamManager
client = StreamManagerClient()
# Try deleting the stream (if it exists) so that we have a fresh start
try:
client.delete_message_stream(stream_name=stream_name)
except ResourceNotFoundException:
pass
exports = ExportDefinition(
kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)]
)
client.create_message_stream(
MessageStreamDefinition(
name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports
)
)
# Append two messages and print their sequence numbers
logger.info(
"Successfully appended message to stream with sequence number %d",
client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")),
)
logger.info(
"Successfully appended message to stream with sequence number %d",
client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")),
)
# Try reading the two messages we just appended and print them out
logger.info(
"Successfully read 2 messages: %s",
client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)),
)
logger.info("Now going to start writing random integers between 0 and 1000 to the stream")
# Now start putting in random data between 0 and 1000 to emulate device sensor input
while True:
logger.debug("Appending new random integer to stream")
client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big"))
time.sleep(1)
except asyncio.TimeoutError:
logger.exception("Timed out while executing")
except Exception:
logger.exception("Exception while running")
def function_handler(event, context):
return
logging.basicConfig(level=logging.INFO)
# Start up this sample code
main(logger=logging.getLogger())
こちらのLambdaは後ほどLong-Lived LambdaとしてGreengrassグループにデプロイします。
ついでなので、ストリームから読み込みを行うLambdaも作ってみます。
import logging
from greengrasssdk.stream_manager import (
StreamManagerClient,
ReadMessagesOptions
)
client = StreamManagerClient()
logger=logging.getLogger()
last_seq_no = 0
def lambda_handler(event, context):
global last_seq_no
message_list = client.read_messages(
stream_name="SomeStream", #書き込み用Lambdaで指定したストリーム名と揃える
options=ReadMessagesOptions(
desired_start_sequence_number=last_seq_no + 1,
min_message_count=1,
max_message_count=100,
read_timeout_millis=5000
)
)
for msg in message_list:
logger.info(msg)
last_seq_no = message_list[-1].sequence_number
動作確認
準備ができたらGreengrassグループのデプロイを行い、Lambdaの動作を確認します。Greengrassグループは以下のように設定してデプロイして下さい。
- データ書き込み用のLambda: Long-Lived Lambdaとして指定
- 読み込み用のLambda適当なMQTTトピックから起動するようにサブスクリプションを設定
デプロイ完了後にMQTTトピック経由で読み込み用のLambdaを起動すると、以下のようなログが出力されます。
[2020-01-13T07:55:45.856Z][INFO]-ipc_client.py:202,Got work item with invocation id [1f157dc2-8832-4411-4fb8-69fcf84c6f63]
[2020-01-13T07:55:45.862Z][INFO]-lambda_function.py:25,<Class Message. stream_name: SomeStream, sequence_number: 31, ingest_time: 1578902127264, payload: b'\x00\x00\x01\xca'>
[2020-01-13T07:55:45.862Z][INFO]-lambda_function.py:25,<Class Message. stream_name: SomeStream, sequence_number: 32, ingest_time: 1578902128268, payload: b'\x00\x00\x00\x94'>
[2020-01-13T07:55:45.862Z][INFO]-lambda_function.py:25,<Class Message. stream_name: SomeStream, sequence_number: 33, ingest_time: 1578902129271, payload: b'\x00\x00\x01K'>
[2020-01-13T07:55:45.862Z][INFO]-lambda_function.py:25,<Class Message. stream_name: SomeStream, sequence_number: 34, ingest_time: 1578902130274, payload: b'\x00\x00\x00\x8f'>
[2020-01-13T07:55:45.862Z][INFO]-lambda_function.py:25,<Class Message. stream_name: SomeStream, sequence_number: 35, ingest_time: 1578902131278, payload: b'\x00\x00\x00\xa1'>
また、マネコンからKinesisストリームのモニタリングタブを確認するとストリームマネージャーからKinesis Data Streamsまでデータがエクスポートされていることが確認できます。
まとめ
Greengrassストリームマネージャーのご紹介でした。エッジ側で気軽にストリームサービスを実行できるようになったので、エッジ側で集計等のデータ加工処理を行ってからクラウドに連携するといったアーキテクチャが組みやすくなったのではないでしょうか?現在クラウド側だけでストリームデータの加工を行っているようなユースケースではストリームマネージャーを活用することでコストの削減も見込めるかもしれません。