[AWS IoT Greengrass V2] ストリームマネージャーを使用してWebカメラの画像を毎秒2フレームでS3に送信してみました

2021.09.14

1 はじめに

IoT事業部の平内(SIN)です。

AWS IoT Greengrass V2では、AWSから提供されるいくつかの事前構築済みコンポーネントがありますが、その中のストリームマネージャー (aws.greengrass.StreamManager) を使用すると、ストリームのための共通インタフェースが利用可能になります。

以前に、このストリームマネージャーを使用して、Kinesis Data Streamsへの送信を試してみました。

今回は、同じくストリームマネージャーを使用していますが、宛先をS3バケットとし、デバイスに接続されたWebカメラの画像をリアルタイムに送信してみました。

2 構成

構成は、以下の通りです。

Greengrassコアでは、2つのカスタムコンポーネントが動作しています。

  • 画像取得
  • 画像送信

1つ目(画像取得)は、Webカメラの画像をOpenCVで取得し、特定のフォルダに、タイムスタンプ名で、どんどん保存しています。そして2つ目(画像送信)のコンポーネントは、フォルダに置かれた画像を、順次ストリームマネージャーに送信しています。送信が完了した画像は、その時点で削除されます。

後は、ストリームマネージャーが、指定されたS3バケットへの送信をマネージドで行ってくれるという按配です。

3 動作のようす

  • Webカメラの前には、トマト羊とアヒルが置かれています。

  • 1秒に2フレーム、次々画像がS3バケットに溜まっていきます。

4 画像取得

画像取得を担当しているコンポーネントのコードです。

OpenCVでWebカメラの画像を取得し、2FPSでフォルダ(/tmp/s3_stream)に保存しているだけです。

web_cam.py

import cv2
import datetime
import os

# Webカメラ
DEVICE_ID = 0
WIDTH = 640
HEIGHT = 480
FPS = 20

folder_name = "/tmp/s3_stream"
os.makedirs(folder_name, exist_ok=True)

def main():
    cap = cv2.VideoCapture(DEVICE_ID)
    # フォーマット・解像度・FPSの設定
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, WIDTH)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, HEIGHT)
    cap.set(cv2.CAP_PROP_FPS, FPS)

    # フォーマット・解像度・FPSの取得
    width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
    height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
    fps = cap.get(cv2.CAP_PROP_FPS)
    print("fps:{} width:{} height:{}".format(fps, width, height))

    counter = 0
    while True:
        # カメラ画像取得
        _, frame = cap.read()
        if(frame is None):
            continue

        # 10フレームに1回保存
        if(counter%10==0):
            dt = datetime.datetime.now()
            file_name = dt.strftime('%Y_%m_%d_%H.%M.%S.jpg')
            print(file_name)
            cv2.imwrite("{}/{}".format(folder_name, file_name), frame)
        counter += 1

if __name__ == '__main__':
        main()

そして、レシピです。

---
RecipeFormatVersion: "2020-01-25"
ComponentName: "com.example.WebCam"
ComponentVersion: "1.0.0"
ComponentType: "aws.greengrass.generic"
Manifests:
  - Platform:
      os: linux
    Lifecycle:
      Run: |
        export LD_PRELOAD=/usr/lib/arm-linux-gnueabihf/libatomic.so.1
        python3 -u {artifacts:path}/web_cam.py
    Artifacts:
      - URI: s3://gg-artifacts-2021-08-11/artifacts/com.example.WebCam/1.0.0/web_cam.py

5 画像送信

画像送信を担当しているコンポーネントのコードです。

ストリームマネージャーを使用して、send_file()で指定されたファイルを送信するクラスが中心となっています。
参考:https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_s3.py

import os
import time
import glob

from stream_manager import (
    ExportDefinition,
    MessageStreamDefinition,
    ReadMessagesOptions,
    ResourceNotFoundException,
    S3ExportTaskDefinition,
    S3ExportTaskExecutorConfig,
    Status,
    StatusConfig,
    StatusLevel,
    StatusMessage,
    StrategyOnFull,
    StreamManagerClient,
    StreamManagerException,
)
from stream_manager.util import Util

class S3Stream:
    def __init__(self, stream_name, bucket_name, folder_name):
        
        self.stream_name = stream_name
        self.status_stream_name = "status" + self.stream_name
        self.bucket_name = bucket_name
        self.folder_name = folder_name

        os.makedirs(self.folder_name, exist_ok=True)

        self.client = StreamManagerClient()

        # ストリームが既に存在する場合は、一旦削除する
        try:
            self.client.delete_message_stream(stream_name=self.status_stream_name)
        except ResourceNotFoundException:
            pass
        try:
            self.client.delete_message_stream(stream_name=self.stream_name)
        except ResourceNotFoundException:
            pass
        
        # ストリーム作成(S3)
        exports = ExportDefinition(
            s3_task_executor=[
                S3ExportTaskExecutorConfig(
                    identifier="S3TaskExecutor" + stream_name,
                    status_config=StatusConfig(
                        status_level=StatusLevel.INFO, 
                        status_stream_name=self.status_stream_name,
                    ),
                )
            ]
        )
        self.client.create_message_stream(
            MessageStreamDefinition(
                name=self.status_stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData)
        )
        self.client.create_message_stream(
            MessageStreamDefinition(
                name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports
            )
        )

    # folder_name/file_name を bucket_name/key_name に送信する
    # 送信完了後、folder_name/file_nameは、削除される
    def send_file(self, file_name, key_name):

        print("😍 send_file: {}/{} to s3://{}/{}".format(self.folder_name, file_name, self.bucket_name, key_name))
        
        # S3タスク定義を追加し、シーケンス番号を出力
        input_url = "file:{}/{}".format(self.folder_name, file_name)
        s3_export_task_definition = S3ExportTaskDefinition(input_url=input_url, bucket=self.bucket_name, key=key_name)

        ret = self.client.append_message(self.stream_name, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
        print("sequence number {}".format(ret))
        stop_checking = False
        next_seq = 0
        while not stop_checking:
            print("next_seq:{}".format(next_seq))
            try:
                messages_list = self.client.read_messages(
                    self.status_stream_name,
                    ReadMessagesOptions(
                        desired_start_sequence_number=next_seq, min_message_count=1, read_timeout_millis=1000
                    ),
                )
                for message in messages_list:
                    status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage)
                    if status_message.status == Status.Success:
                        print("Status.Success")
                        # 送信完了したのファイルを削除する
                        file_path = "{}/{}".format(self.folder_name, file_name)
                        os.remove(file_path)
                        print("{} removed.".format(file_path))
                        stop_checking = True
                    elif status_message.status == Status.InProgress:
                        print("Status.InProgress.")
                        next_seq = message.sequence_number + 1
                    elif status_message.status == Status.Failure or status_message.status == Status.Canceled:
                        print("Status.Failure or Canceled")
                        stop_checking = True
                if not stop_checking:
                    print("not stop_checking. sleep(5)")
                    time.sleep(5)
            except StreamManagerException:
                print("Exception while running. sleep(5)")
                time.sleep(5)
        print("while out.")

def main():

    stream_name = "SomeStream"
    bucket_name = "stream-manager-sample-2021-09-03"
    folder_name = "/tmp/s3_stream"

    s3_stream = S3Stream(stream_name, bucket_name, folder_name)

    # folder_nameの下に、置かれたファイルを、順次S3に送信する
    while(True):
        files = glob.glob("{}/*".format(folder_name))
        for file in files:
            file_name = os.path.basename(file)
            key_name = file_name.replace("_","/")
            s3_stream.send_file(file_name, key_name)
        time.sleep(1)

main()

レシピです。ストームマネージャーのクライアントSDKも一所に送っていますが、以前、Kinesis Data Streamsで送信した時と同じ要領です。

参考:[AWS IoT Greengrass V2] ストリームマネージャーを使用してコンポーネントからKinesis Data Streamsへデータを送ってみました

---
RecipeFormatVersion: "2020-01-25"
ComponentName: "com.example.S3Sample"
ComponentVersion: "1.0.0"
ComponentType: "aws.greengrass.generic"
ComponentDependencies:
  aws.greengrass.StreamManager:
    VersionRequirement: "^2.0.0"
Manifests:
  - Platform:
      os: linux
    Lifecycle:
      Install: pip3 install --user -r {artifacts:path}/requirements.txt
      Run: |
        export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
        python3 -u {artifacts:path}/s3_sample.py
    Artifacts:
      - URI: s3://gg-artifacts-2021-08-11/artifacts/com.example.S3Sample/1.0.0/stream_manager_sdk.zip
        Unarchive: ZIP
      - URI: s3://gg-artifacts-2021-08-11/artifacts/com.example.S3Sample/1.0.0/s3_sample.py
      - URI: s3://gg-artifacts-2021-08-11/artifacts/com.example.S3Sample/1.0.0/requirements.txt

6 デプロイ

2つのコンポーネントをデプロイしています。

デバイス上では、以下のように見えます。

$ sudo /greengrass/v2/bin/greengrass-cli component list

・・・略・・・

Component Name: aws.greengrass.StreamManager
    Version: 2.0.12
    State: RUNNING
    Configuration: {"JVM_ARGS":"","LOG_LEVEL":"INFO","port":"8088","STREAM_MANAGER_AUTHENTICATE_CLIENT":"true","STREAM_MANAGER_ENABLE_LOCK_ON_METADATA_STORE":"false","STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH":"2147483647","STREAM_MANAGER_EXPORTER_S3_DESTINATION_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES":"5242880","STREAM_MANAGER_EXPORTER_THREAD_POOL_SIZE":"5","STREAM_MANAGER_SERVER_PORT":"8088","STREAM_MANAGER_STORE_ROOT_DIR":"."}

・・・略・・・

Component Name: com.example.WebCam
    Version: 1.0.0
    State: RUNNING
    Configuration: {}

・・・略・・・

Component Name: com.example.S3Sample
    Version: 1.2.3
    State: RUNNING
    Configuration: {}

・・・略・・・

また、下図は、LocalDebugConsoleで、見ている様子です。この画面から、コンポーネントの状態確認や、起動停止が簡単に行えるので、ほんと捗ります。
参考:[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました

7 ポリシー

ストリームマネージャー経由でS3へ保存する場合も、バケットに対するPutObject権限が必要です。

ここでは、下記のポリシーを GreengrassV2TokenExchangeRole ロールに追加しています。

stream-manager-sample-2021-09-03

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject"
            ],
            "Resource": "arn:aws:s3:::stream-manager-sample-2021-09-03/*"
        }
    ]
}

8 デバイス上のアクセス権

Greengrassで動作するコンポーネントは、デフォルトでユーザ(ggc_user)、グループ(ggc_user)となっています。 コンポーネントからデバイス上のリソースを使用する場合、Greengrassが動作するユーザーで利用可能になるように構成しておくことが必要です。

今回設定したのは、以下の2点です。

(1) 画像を保管するフォルダのオーナーをggc_userとする

$ ls -la /tmp/s3_stream/
total 764
drwxr-xr-x  2 ggc_user ggc_user   4096 Sep 14 02:26 .

(2) videoのグループにggc_userを追加

カメラデバイスへのアクセスのため

 $ cat /etc/group | grep video
video:x:44:pi

$ sudo usermod -a -G video ggc_user
$ cat /etc/group | grep video
video:x:44:pi,ggc_user

9 最後に

今回は、「Raspberry Pi でWebカメラの画像をS3へ送信」というような、よく見るパターンのサンプルをGreengrassでやってみました。

ストリームマネージャーを使用することで、ネットワーク切断時の対応、送信スピード(書き込み頻度)、他のストリームとの優先などの実装が必要なくなります。

もう少し、要件を複雑にしたり、環境をいじめると、この辺の効果を実感できるかもしれません。

10 参考リンク


[AWS IoT Greengrass V2] RaspberryPIにインストールしてみました
[AWS IoT Greengrass V2] RaspberryPIでコンポーネントを作成してみました
[AWS IoT Greengrass V2] クラウド側から複数のコアデバイスにコンポーネントをデプロイしてみました
[AWS IoT Greengrass V2] クラウド側からコンポーネントを削除してみました
[AWS IoT Greengrass V2] ローカルデバッグコンソール(aws.greengrass.LocalDebugConsole)を使用してみました
[AWS IoT Greengrass V2] Lambda関数(コンポーネント)をデプロイしてみました
[AWS IoT Greengrass V2] コンポーネントからIoT CoreのメッセージブローカーにPublish/Subscribeしてみました
[AWS IoT Greengrass V2] コンポーネントからシークレットマネージャにアクセスしてみました
[AWS IoT Greengrass V2] コンポーネントでコアデバイス間のPublish/Subscribeを試してみました
[AWS IoT Greengrass V2] ログマネージャでコンポーネントのログをCloudWatch Logsに送ってみました
[AWS IoT Greengrass V2] トークン交換サービスでコンポーネントからDynamoDBにアクセスしてみました
[AWS IoT Greengrass V2] ストリームマネージャーを使用してコンポーネントからKinesis Data Streamsへデータを送ってみました
[AWS IoT Greengrass V2] プロセス間通信 (IPC) を使用してコンポーネントの設定値を使用してみました