この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
IoT事業部の平内(SIN)です。
AWS IoT Greengrass V2では、AWSから提供されるいくつかの事前構築済みコンポーネントがありますが、その中のストリームマネージャー (aws.greengrass.StreamManager) を使用すると、ストリームのための共通インタフェースが利用可能になります。
以前に、このストリームマネージャーを使用して、Kinesis Data Streamsへの送信を試してみました。
今回は、同じくストリームマネージャーを使用していますが、宛先をS3バケットとし、デバイスに接続されたWebカメラの画像をリアルタイムに送信してみました。
2 構成
構成は、以下の通りです。
Greengrassコアでは、2つのカスタムコンポーネントが動作しています。
- 画像取得
- 画像送信
1つ目(画像取得)は、Webカメラの画像をOpenCVで取得し、特定のフォルダに、タイムスタンプ名で、どんどん保存しています。そして2つ目(画像送信)のコンポーネントは、フォルダに置かれた画像を、順次ストリームマネージャーに送信しています。送信が完了した画像は、その時点で削除されます。
後は、ストリームマネージャーが、指定されたS3バケットへの送信をマネージドで行ってくれるという按配です。
3 動作のようす
- Webカメラの前には、トマト羊とアヒルが置かれています。
- 1秒に2フレーム、次々画像がS3バケットに溜まっていきます。
4 画像取得
画像取得を担当しているコンポーネントのコードです。
OpenCVでWebカメラの画像を取得し、2FPSでフォルダ(/tmp/s3_stream)に保存しているだけです。
web_cam.py
import cv2
import datetime
import os
# Webカメラ
DEVICE_ID = 0
WIDTH = 640
HEIGHT = 480
FPS = 20
folder_name = "/tmp/s3_stream"
os.makedirs(folder_name, exist_ok=True)
def main():
cap = cv2.VideoCapture(DEVICE_ID)
# フォーマット・解像度・FPSの設定
cap.set(cv2.CAP_PROP_FRAME_WIDTH, WIDTH)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, HEIGHT)
cap.set(cv2.CAP_PROP_FPS, FPS)
# フォーマット・解像度・FPSの取得
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
fps = cap.get(cv2.CAP_PROP_FPS)
print("fps:{} width:{} height:{}".format(fps, width, height))
counter = 0
while True:
# カメラ画像取得
_, frame = cap.read()
if(frame is None):
continue
# 10フレームに1回保存
if(counter%10==0):
dt = datetime.datetime.now()
file_name = dt.strftime('%Y_%m_%d_%H.%M.%S.jpg')
print(file_name)
cv2.imwrite("{}/{}".format(folder_name, file_name), frame)
counter += 1
if __name__ == '__main__':
main()
そして、レシピです。
---
RecipeFormatVersion: "2020-01-25"
ComponentName: "com.example.WebCam"
ComponentVersion: "1.0.0"
ComponentType: "aws.greengrass.generic"
Manifests:
- Platform:
os: linux
Lifecycle:
Run: |
export LD_PRELOAD=/usr/lib/arm-linux-gnueabihf/libatomic.so.1
python3 -u {artifacts:path}/web_cam.py
Artifacts:
- URI: s3://gg-artifacts-2021-08-11/artifacts/com.example.WebCam/1.0.0/web_cam.py
5 画像送信
画像送信を担当しているコンポーネントのコードです。
ストリームマネージャーを使用して、send_file()で指定されたファイルを送信するクラスが中心となっています。
参考:https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_s3.py
import os
import time
import glob
from stream_manager import (
ExportDefinition,
MessageStreamDefinition,
ReadMessagesOptions,
ResourceNotFoundException,
S3ExportTaskDefinition,
S3ExportTaskExecutorConfig,
Status,
StatusConfig,
StatusLevel,
StatusMessage,
StrategyOnFull,
StreamManagerClient,
StreamManagerException,
)
from stream_manager.util import Util
class S3Stream:
def __init__(self, stream_name, bucket_name, folder_name):
self.stream_name = stream_name
self.status_stream_name = "status" + self.stream_name
self.bucket_name = bucket_name
self.folder_name = folder_name
os.makedirs(self.folder_name, exist_ok=True)
self.client = StreamManagerClient()
# ストリームが既に存在する場合は、一旦削除する
try:
self.client.delete_message_stream(stream_name=self.status_stream_name)
except ResourceNotFoundException:
pass
try:
self.client.delete_message_stream(stream_name=self.stream_name)
except ResourceNotFoundException:
pass
# ストリーム作成(S3)
exports = ExportDefinition(
s3_task_executor=[
S3ExportTaskExecutorConfig(
identifier="S3TaskExecutor" + stream_name,
status_config=StatusConfig(
status_level=StatusLevel.INFO,
status_stream_name=self.status_stream_name,
),
)
]
)
self.client.create_message_stream(
MessageStreamDefinition(
name=self.status_stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData)
)
self.client.create_message_stream(
MessageStreamDefinition(
name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports
)
)
# folder_name/file_name を bucket_name/key_name に送信する
# 送信完了後、folder_name/file_nameは、削除される
def send_file(self, file_name, key_name):
print("? send_file: {}/{} to s3://{}/{}".format(self.folder_name, file_name, self.bucket_name, key_name))
# S3タスク定義を追加し、シーケンス番号を出力
input_url = "file:{}/{}".format(self.folder_name, file_name)
s3_export_task_definition = S3ExportTaskDefinition(input_url=input_url, bucket=self.bucket_name, key=key_name)
ret = self.client.append_message(self.stream_name, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
print("sequence number {}".format(ret))
stop_checking = False
next_seq = 0
while not stop_checking:
print("next_seq:{}".format(next_seq))
try:
messages_list = self.client.read_messages(
self.status_stream_name,
ReadMessagesOptions(
desired_start_sequence_number=next_seq, min_message_count=1, read_timeout_millis=1000
),
)
for message in messages_list:
status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage)
if status_message.status == Status.Success:
print("Status.Success")
# 送信完了したのファイルを削除する
file_path = "{}/{}".format(self.folder_name, file_name)
os.remove(file_path)
print("{} removed.".format(file_path))
stop_checking = True
elif status_message.status == Status.InProgress:
print("Status.InProgress.")
next_seq = message.sequence_number + 1
elif status_message.status == Status.Failure or status_message.status == Status.Canceled:
print("Status.Failure or Canceled")
stop_checking = True
if not stop_checking:
print("not stop_checking. sleep(5)")
time.sleep(5)
except StreamManagerException:
print("Exception while running. sleep(5)")
time.sleep(5)
print("while out.")
def main():
stream_name = "SomeStream"
bucket_name = "stream-manager-sample-2021-09-03"
folder_name = "/tmp/s3_stream"
s3_stream = S3Stream(stream_name, bucket_name, folder_name)
# folder_nameの下に、置かれたファイルを、順次S3に送信する
while(True):
files = glob.glob("{}/*".format(folder_name))
for file in files:
file_name = os.path.basename(file)
key_name = file_name.replace("_","/")
s3_stream.send_file(file_name, key_name)
time.sleep(1)
main()
レシピです。ストームマネージャーのクライアントSDKも一所に送っていますが、以前、Kinesis Data Streamsで送信した時と同じ要領です。
参考:[AWS IoT Greengrass V2] ストリームマネージャーを使用してコンポーネントからKinesis Data Streamsへデータを送ってみました
---
RecipeFormatVersion: "2020-01-25"
ComponentName: "com.example.S3Sample"
ComponentVersion: "1.0.0"
ComponentType: "aws.greengrass.generic"
ComponentDependencies:
aws.greengrass.StreamManager:
VersionRequirement: "^2.0.0"
Manifests:
- Platform:
os: linux
Lifecycle:
Install: pip3 install --user -r {artifacts:path}/requirements.txt
Run: |
export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
python3 -u {artifacts:path}/s3_sample.py
Artifacts:
- URI: s3://gg-artifacts-2021-08-11/artifacts/com.example.S3Sample/1.0.0/stream_manager_sdk.zip
Unarchive: ZIP
- URI: s3://gg-artifacts-2021-08-11/artifacts/com.example.S3Sample/1.0.0/s3_sample.py
- URI: s3://gg-artifacts-2021-08-11/artifacts/com.example.S3Sample/1.0.0/requirements.txt
6 デプロイ
2つのコンポーネントをデプロイしています。
デバイス上では、以下のように見えます。
$ sudo /greengrass/v2/bin/greengrass-cli component list
・・・略・・・
Component Name: aws.greengrass.StreamManager
Version: 2.0.12
State: RUNNING
Configuration: {"JVM_ARGS":"","LOG_LEVEL":"INFO","port":"8088","STREAM_MANAGER_AUTHENTICATE_CLIENT":"true","STREAM_MANAGER_ENABLE_LOCK_ON_METADATA_STORE":"false","STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH":"2147483647","STREAM_MANAGER_EXPORTER_S3_DESTINATION_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES":"5242880","STREAM_MANAGER_EXPORTER_THREAD_POOL_SIZE":"5","STREAM_MANAGER_SERVER_PORT":"8088","STREAM_MANAGER_STORE_ROOT_DIR":"."}
・・・略・・・
Component Name: com.example.WebCam
Version: 1.0.0
State: RUNNING
Configuration: {}
・・・略・・・
Component Name: com.example.S3Sample
Version: 1.2.3
State: RUNNING
Configuration: {}
・・・略・・・
また、下図は、LocalDebugConsoleで、見ている様子です。この画面から、コンポーネントの状態確認や、起動停止が簡単に行えるので、ほんと捗ります。
参考:[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
7 ポリシー
ストリームマネージャー経由でS3へ保存する場合も、バケットに対するPutObject権限が必要です。
ここでは、下記のポリシーを GreengrassV2TokenExchangeRole ロールに追加しています。
stream-manager-sample-2021-09-03
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject"
],
"Resource": "arn:aws:s3:::stream-manager-sample-2021-09-03/*"
}
]
}
8 デバイス上のアクセス権
Greengrassで動作するコンポーネントは、デフォルトでユーザ(ggc_user)、グループ(ggc_user)となっています。 コンポーネントからデバイス上のリソースを使用する場合、Greengrassが動作するユーザーで利用可能になるように構成しておくことが必要です。
今回設定したのは、以下の2点です。
(1) 画像を保管するフォルダのオーナーをggc_userとする
$ ls -la /tmp/s3_stream/
total 764
drwxr-xr-x 2 ggc_user ggc_user 4096 Sep 14 02:26 .
(2) videoのグループにggc_userを追加
カメラデバイスへのアクセスのため
$ cat /etc/group | grep video
video:x:44:pi
$ sudo usermod -a -G video ggc_user
$ cat /etc/group | grep video
video:x:44:pi,ggc_user
9 最後に
今回は、「Raspberry Pi でWebカメラの画像をS3へ送信」というような、よく見るパターンのサンプルをGreengrassでやってみました。
ストリームマネージャーを使用することで、ネットワーク切断時の対応、送信スピード(書き込み頻度)、他のストリームとの優先などの実装が必要なくなります。
もう少し、要件を複雑にしたり、環境をいじめると、この辺の効果を実感できるかもしれません。
10 参考リンク
[AWS IoT Greengrass V2] RaspberryPIにインストールしてみました
[AWS IoT Greengrass V2] RaspberryPIでコンポーネントを作成してみました
[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました
[AWS IoT Greengrass V2] クラウド側からコンポーネントを削除してみました
[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
[AWS IoT Greengrass V2] Lambda関数(コンポーネント)をデプロイしてみました
[AWS IoT Greengrass V2] コンポーネントからIoT CoreのメッセージブローカーにPublish/Subscribeしてみました
[AWS IoT Greengrass V2] コンポーネントからシークレットマネージャにアクセスしてみました
[AWS IoT Greengrass V2] コンポーネントでコアデバイス間のPublish/Subscribeを試してみました
[AWS IoT Greengrass V2] ログマネージャでコンポーネントのログをCloudWatch Logsに送ってみました
[AWS IoT Greengrass V2] トークン交換サービスでコンポーネントからDynamoDBにアクセスしてみました
[AWS IoT Greengrass V2] ストリームマネージャーを使用してコンポーネントからKinesis Data Streamsへデータを送ってみました
[AWS IoT Greengrass V2] プロセス間通信 (IPC) を使用してコンポーネントの設定値を使用してみました