物体検出モデルでシイタケの収穫時期を自動判定するまでにやったこと。〜②AWSデプロイ編〜

物体検出モデルでシイタケの収穫時期を自動判定するまでにやったこと。〜②AWSデプロイ編〜

2025.11.03

はじめに

皆様こんにちは、あかいけです。

先日、DevelopersIO 2025 TOKYOにてシイタケのLTをしたのですが、
そこに至るまでの過程をブログにしてみます。

https://dev.classmethod.jp/articles/developersio-2025-tokyo-devio2025-yolo-shitake/

このブログを読み終わることには、カスタムモデルで検出した動画をAWS上でリアルタイム配信する方法がわかるようになっているはずなので、ぜひ最後まで読んでもらえたら嬉しいです。

モチベーション

さて、前回のブログでは無事にシイタケの収穫時期を判定するための、カスタムモデルを作成するところまで実施しました。
検出精度に若干の問題はあるものの、一旦はシイタケを検出するという目的は果たせています。

スクリーンショット 2025-10-27 1.41.09

しかしこのままLTを迎えるには、以下の懸念材料が私の中で渦巻いていました。

  • 画像だけ見せられても、正直イメージしづらそう…?
  • インパクトが足りず、消化不良気味のLTになってしまう可能性がある
  • 実際に検出している様子をリアルタイムで見てほしい(これは個人的な願望)

というわけで上記の問題をまとめて解決するために、
シイタケを検出した動画をインターネット経由で見るためのシステムを作っていきます。

やりたいこと

  • シイタケを検出した動画をインターネット経由で見たい
  • 検出している様子をリアルタイムで配信したい
  • システムのデプロイ先としてAWSを利用したい

システム構成をどうするか

まずはシステム構成を考えていきます。

実装するシステムの特性として、
どこで「カスタムモデルを稼働させるか」という観点で大きく2つのパターンに分かれると考えました。

①ローカルでカスタムモデルを稼働させるパターン

ローカルでカスタムモデルを稼働させて、ローカルでシイタケ検出処理した動画をAWSへアップロードするパターンです。

  • メリット
    • ローカル側でシイタケ検出処理をするため、実装がシンプル
  • デメリット
    • 配信元に応じて、シイタケ検出処理用の端末が必要となる

スクリーンショット 2025-11-02 18.38.32

②AWS上でカスタムモデルを稼働させるパターン

ローカルで撮影した動画をそのままAWSへアップロードして、
AWS上でカスタムモデルを稼働させてシイタケ検出処理するパターンです。
なお例としてSageMakerとしていますが、カスタムモデルを稼働させられればなんでも大丈夫です。

スクリーンショット 2025-11-02 18.38.40

  • メリット
    • 複数の配信元に対して、AWS側で一括でシイタケ検出処理ができる
  • デメリット
    • シイタケ検出処理する量に応じて、AWSのランニング費用が高くなる

採用した実装

今回配信元は私の自宅のみのため、
簡単に実装できそうでなおかつAWSのランニングコストを抑えられる①の方式を選択しました。

具体的に今回作成したシステム構成は以下の通りです。

スクリーンショット 2025-11-02 18.38.50

これだけだとなんだかよくわからないので、以下2つの領域に分けてそれぞれ実装を説明します。
なお具体的なコードに関する言及は最小限とし、あくまで技術的な部分の解説となります。

  • AWSへ配信するための実装
  • ユーザーへ配信するための実装

実際に構築するためのコードは以下リポジトリに置いているので、
シイタケをモニタリングしたくなった際などに、ご自由にご利用ください。

https://github.com/Lamaglama39/shiitake-monitoring-system

AWSへ配信するための実装

まずは自宅からAWSへ動画を配信するための実装です。
以下赤枠の部分が該当します。

スクリーンショット 2025-11-02 18.39.02

ここでは大きく以下のことを行なっています。

    1. シイタケを撮影する
    1. 撮影した動画をカスタムモデルで物体検出する
    1. 物体検出した動画をKinesis Video Streamsへ配信する

0. Kinesis Video Streamsの作成

まずは物体検出した動画の配信先となるリソースをAWS上に作成します。
作成するリソースは以下の通りです。

(1) Kinesis Video Streams (KVS)

動画ストリーミングのためのマネージドサービスです。
リアルタイムの動画データを受信し、保存・配信する役割を担います。

主な設定項目:

  • ストリーム名: 配信先を識別するための名前
  • データ保持期間: 動画を保存する期間(時間単位で指定)
  • メディアタイプ: 今回はH.264形式の動画を扱うためvideo/h264を指定

(2) IAMユーザーとポリシー

ローカル環境からKVSへ動画を配信するために必要な認証情報と権限を設定します。

必要な権限:

  • kinesisvideo:PutMedia: KVSへ動画データを送信する権限
  • kinesisvideo:GetDataEndpoint: KVSのエンドポイント情報を取得する権限
  • kinesisvideo:DescribeStream: ストリームの情報を取得する権限

これらのリソースを作成後、以下の情報を取得しておきます。
これらはローカル環境からKVSへ配信する際に利用します。

  • 作成したKVSストリームの名前
  • 作成したIAMユーザーのアクセスキー
  • 作成したIAMユーザーのシークレットキー

1. シイタケを撮影する

シイタケの撮影はUSB接続できるWebカメラを使用しました。

hxgoosksm5agrgznoxfb

PCに接続して正常にカメラが動作することを確認して、
合わせて以下のコマンドでカメラが識別できていることを確認できればOKです。

$ v4l2-ctl --list-devices

C270 HD WEBCAM (usb-0000:04:00.3-3):
        /dev/video0
        /dev/video1
        /dev/media0

2. 撮影した動画をカスタムモデルで物体検出する

次に撮影した動画に対してYOLOカスタムモデルで物体検出処理を行います。

この実装で最も重要なのは、リアルタイム性を保ちながら物体検出処理を実行することです。
今回はPythonとGStreamerを組み合わせて実装しました。

(1) OpenCVによるカメラキャプチャ

まずOpenCVを使ってUSBカメラから映像を取得します。
カメラデバイスの初期化時に以下の設定を行います。

  • 解像度: 1280x720(720p)を設定
  • フォーマット: BGR形式で取得(OpenCVのデフォルト)

カメラからフレームを取得する際は、cv2.VideoCapture.read()メソッドでフレームごとに取得していきます。

(2) カスタムモデルによるリアルタイム物体検出

取得したフレームに対してカスタムモデルで物体検出を実行します。

ここでの技術的な課題はCPU/GPU負荷の軽減です。
全フレームで物体検出処理を行うと高負荷になり、処理が追いつかなくなる事象を確認しました。

そこで以下の工夫を実装しました。

フレーム間引き処理

N フレームごとにのみ物体検出処理を実行し、
それ以外のフレームでは前回の検出結果を再利用します。

# NフレームごとにYOLO処理を実行
if stream_frame_number % self.yolo_interval == 0:
    # YOLOモデルで検出実行
    results = self.model.predict(
        source=frame,
        conf=self.conf_threshold,
        verbose=False
    )
    # 検出結果をフレームに描画
    processed_frame = results[0].plot()
    # 結果をキャッシュ
    self.last_detection_frame = processed_frame.copy()
    self.last_detections = len(results[0].boxes)
else:
    # 前回の検出結果を使用
    processed_frame = self.last_detection_frame
    detections = self.last_detections

この方式により、例えば3フレームごとに処理する場合以下のようになります。

  • フレーム1: 物体検出処理実行 → 結果を描画
  • フレーム2: 前回の結果を再利用
  • フレーム3: 前回の結果を再利用
  • フレーム4: 物体検出処理実行 → 結果を描画
  • ...

実際の検証では、3フレームごとに処理することで処理負荷を大幅に軽減しつつ、
検出精度もほぼ劣化させずに動作させることができました。

(3) GStreamerによる動画エンコーディング

検出処理を施した動画をKVSに配信するため、GStreamerを使ってH.264形式にエンコードします。

GStreamerパイプラインの構成

GStreamerでは、複数の処理を「パイプライン」として繋げて動画処理を行います。
今回構築したパイプラインは以下の通りです。

  1. appsrc: 物体検出したフレームデータを指定
  2. videoconvert: 色空間変換やフォーマット変換
  3. x264enc: H.264形式へのエンコード
  4. gdppay: GDP(GStreamer Data Protocol)形式でペイロード化
  5. tcpserversink: TCP経由でデータを送信
pipeline_str = (
    f"appsrc name=source is-live=true format=time "
    f"caps=video/x-raw,format=BGR,width=1280,height=720,framerate=30/1 ! "
    f"videoconvert ! "
    f"x264enc tune=zerolatency speed-preset=ultrafast "
    f"key-int-max=90 bitrate=1500 bframes=0 ! "
    f"gdppay ! "
    f"tcpserversink host=0.0.0.0 port=5000"
)

エンコーディング設定の最適化

  • tune=zerolatency: 低遅延モード(ライブストリーミング向け)
  • speed-preset=ultrafast: エンコード速度を最優先(CPUリソースの節約)
  • key-int-max=90: キーフレーム間隔(90フレームごとにキーフレームを生成)
  • bitrate=1500: ビットレート1500kbps(720pで適切な画質)
  • bframes=0: Bフレームを使用しない(遅延を最小化)

キーフレーム間隔とKVSフラグメント

key-int-max=90という設定が特に重要です。
KVSはキーフレームを境界としてフラグメント(動画の断片)を生成します。

キーフレーム間隔を90フレームに設定することで、
定期的にフラグメントが生成されます。

このフラグメント単位を適切な間隔に設定することで、スムーズなストリーミング再生を実現できます。

タイムスタンプの管理

appsrcにフレームを送る際は、適切なタイムスタンプ(PTS/DTS)を設定します。

# フレーム番号からタイムスタンプを計算
# 実際のフレームレートに基づいて時刻情報を設定
timestamp = frame_number * self.frame_duration
buf.pts = timestamp  # Presentation Timestamp
buf.dts = timestamp  # Decode Timestamp
buf.duration = self.frame_duration

これにより、GStreamerのパイプライン全体で正確な時刻情報が保持され、
KVS側でも正しい時系列で動画が再生されるようになります。

3. 物体検出した動画をKinesis Video Streamsへ配信する

最後に、エンコードした動画をKinesis Video Streamsへ配信します。

ここではAWS公式のKinesis Video Streams Producer SDKを使用して、
GStreamerからの動画ストリームをKVSへ送信する仕組みを構築します。

(1) Dockerを使う

KVS Producer SDKはC++で実装されており、以下のような多くの依存ライブラリが必要です。

  • OpenSSL
  • libcurl
  • log4cplus
  • GStreamer開発ライブラリ
  • その他多数のシステムライブラリ

これらを手動でインストールして環境構築するのはかなり大変です。
特にバージョンの違いやOS依存の問題が発生しやすいです。

そこでAWS公式が提供しているDockerイメージを使用することで、
依存関係が解決された状態でProducer SDKを実行できます。

https://docs.aws.amazon.com/ja_jp/kinesisvideostreams/latest/dg/examples-gstreamer-plugin.html

(2) GStreamerとKVS Producer SDKの連携

GStreamerからKVS Producer SDKへのデータ転送は、TCP接続を介して行います。

データフローの仕組み

  1. GStreamerのtcpserversinkが0.0.0.0:5000でTCPサーバーを起動
  2. KVS Producer SDKがTCPクライアントとしてlocalhost:5000に接続
  3. GStreamerがGDP形式でエンコードされたH.264データを送信
  4. KVS Producer SDKがGDPをデコードしてH.264ストリームを取り出す
  5. 取り出したH.264ストリームをKVS APIでアップロード

(3) KVS Producer SDKの設定

Docker環境でKVS Producer SDKを実行する際、環境変数で以下の情報を設定します。

AWS認証情報

  • AWS_ACCESS_KEY_ID: 作成したIAMユーザーのアクセスキー
  • AWS_SECRET_ACCESS_KEY: 作成したIAMユーザーのシークレットキー

これらの認証情報を使ってKVS APIを呼び出し、動画データをアップロードします。

KVS接続情報

  • AWS_DEFAULT_REGION: KVSストリームが存在するリージョン(例: ap-northeast-1)
  • STREAM_NAME: 作成した配信先のKVSストリーム名

Producer SDK実行コマンド

Dockerコンテナ内で以下のようなコマンドを実行してKVSへの配信を開始します。

gst-launch-1.0 \
  tcpclientsrc host=host.docker.internal port=5000 ! \
  gdpdepay ! \
  h264parse ! \
  kvssink stream-name="${STREAM_NAME}" \
          storage-size=512 \
          aws-region="${AWS_DEFAULT_REGION}"
  • 各オプションの意味
    • tcpclientsrc: TCPクライアントとしてGStreamerに接続
    • gdpdepay: GDP形式をデコードしてH.264ストリームを取り出す
    • h264parse: H.264ストリームを解析してフレーム境界を検出
    • kvssink: KVS Producer SDKのGStreamerプラグイン(実際にKVSへアップロード)
    • storage-size=512: ローカルバッファサイズ(MB単位)
    • aws-region: KVSストリームのリージョン

(4) フラグメントの生成とアップロード

KVS Producer SDKは、受信したH.264ストリームを以下のように処理します。

  1. キーフレームの検出: h264parseがキーフレームを検出
  2. フラグメントの生成: キーフレームを境界としてフラグメントを作成
  3. バッファリング: storage-sizeで指定したサイズまでローカルにバッファ
  4. アップロード: PutMediaAPIを使ってKVSへ送信

このとき、先ほど設定したkey-int-max=90により、
90フレームごとにフラグメントが生成されてKVSへアップロードされます。

配信が開始されると、KVSのマネジメントコンソールからリアルタイムで動画を確認できるようになります。

画質がガビガビで申し訳ないですが、以下は実際に配信した例です。
(右側がシイタケたち、左側の画面がKVSのマネジメントコンソールから確認した様子)

pxl_20251017_112633831_720

以下はマネジメントコンソールのスクショです。
これはLT翌日の状態なので、みんな育ちすぎちゃってますね…。(反省)

Screenshot from 2025-10-19 12-13-08

ユーザーへ配信するための実装

次にユーザーへ配信するための実装です。
以下赤枠の部分が該当します。

スクリーンショット 2025-11-02 18.39.21

ここでは大きく以下のことを行なっています。

  • CloudFront経由の静的コンテンツの配信
  • CloudFront経由の動的コンテンツの配信

1. CloudFront経由の静的コンテンツの配信

静的なWebサイト(HTML/CSS/JavaScript)をユーザーに配信するための構成です。

構成要素

  • S3バケット: 静的ファイルの格納先
  • CloudFront: CDNとして静的コンテンツを配信
  • Origin Access Control (OAC): S3への直接アクセスを防ぎ、CloudFront経由のみアクセス可能にする

CloudFrontの設定ポイント

TerraformでCloudFrontディストリビューションを作成する際、以下を設定しています。

default_cache_behavior {
  allowed_methods        = ["GET", "HEAD", "OPTIONS"]
  cached_methods         = ["GET", "HEAD"]
  target_origin_id       = "S3-${aws_s3_bucket.frontend.id}"
  viewer_protocol_policy = "redirect-to-https"
  compress               = true

  min_ttl     = 0
  default_ttl = 3600
  max_ttl     = 86400
}

主なポイントは以下の通りです。

  • HTTPSへのリダイレクトを強制
  • Gzip圧縮を有効化してパフォーマンスを向上
  • 適切なキャッシュTTLを設定

2. CloudFront経由の動的コンテンツの配信

次に、KVSからHLS形式の動画ストリームURLを取得するためのAPI配信の構成です。

構成要素

  • API Gateway: REST APIのエンドポイント
  • Lambda: KVS APIを呼び出してHLS URLを生成
  • CloudFront: APIリクエストをキャッシュせずにプロキシ

API Gatewayの役割

Lambda関数へのトリガーとなるREST APIを提供します。

エンドポイント一覧:

  • /api/kvs/hls-url: HLSストリーミングURLを取得
  • /api/kvs/stream-status: ストリームの状態を確認

Lambda関数の処理フロー

Lambda関数(Python)で以下の処理を実行しています。

  1. KVS APIでデータエンド1. CloudFront経由の静的コンテンツの配信ポイントを取得
  2. Kinesis Video Archived Mediaクライアントを作成
  3. HLSストリーミングセッションURLを生成
  4. 生成したURLをJSON形式で返却
# 例: ライブモードでのHLS URL取得
def get_kvs_hls_url(query_params):
    stream_name = query_params.get('stream', KVS_STREAM_NAME)
    mode = query_params.get('mode', 'live')

    # Get HLS streaming endpoint
    endpoint_response = kvs_client.get_data_endpoint(
        StreamName=stream_name,
        APIName='GET_HLS_STREAMING_SESSION_URL'
    )

    # Create archived media client
    kvs_archived_media_client = boto3.client(
        'kinesis-video-archived-media',
        endpoint_url=endpoint_response['DataEndpoint']
    )

    # Get HLS URL
    hls_params = {
        'StreamName': stream_name,
        'PlaybackMode': 'LIVE',
        'Expires': 43200  # 12時間有効
    }

    hls_response = kvs_archived_media_client.get_hls_streaming_session_url(**hls_params)

    return hls_response['HLSStreamingSessionURL']

CloudFrontでのAPI配信設定

APIリクエストに対しては、キャッシュを無効化して常に最新のデータを取得します。

ordered_cache_behavior {
  path_pattern           = "/api/*"
  target_origin_id       = "API-Gateway"
  viewer_protocol_policy = "https-only"

  # キャッシュを無効化
  min_ttl     = 0
  default_ttl = 0
  max_ttl     = 0

  # クエリ文字列とヘッダーを転送
  forwarded_values {
    query_string = true
    headers      = ["Authorization", "Accept", "Content-Type"]
  }
}

3. フロントエンドでの動画再生

最後にWebブラウザでKVSストリームを再生する部分です。

HLS.jsによる再生

KVSから取得したHLS URLをHLS.jsライブラリで再生します。

// Lambda APIからHLS URLを取得
const response = await fetch(`${API_BASE_URL}/kvs/hls-url?stream=${streamName}`);
const data = await response.json();

// HLS.jsで再生
const hls = new Hls({
  enableWorker: true,
  lowLatencyMode: true,
  backBufferLength: 90
});

hls.loadSource(data.hlsUrl);
hls.attachMedia(videoElement);

hls.on(Hls.Events.MANIFEST_PARSED, () => {
  videoElement.play();
});

HLS URLの定期更新

KVSのHLS URLには有効期限(12時間)があるため、
定期的に新しいURLを取得して更新しています。

// 60秒ごとにURL更新
setInterval(async () => {
  const newHlsUrl = await getHLSStreamingEndpoint();
  if (newHlsUrl !== this.hlsUrl) {
    this.hlsUrl = newHlsUrl;
    hls.loadSource(newHlsUrl);
  }
}, 60000);

実際に配信している様子

今までのものすべてを組み合わせると、
以下のようにウェブサイトからシイタケをモニタリングできるようになります。

Screenshot from 2025-10-19 12-15-42

さいごに

以上、「物体検出モデルでシイタケの収穫時期を自動判定するまでにやったこと。〜②AWSデプロイ編〜」でした。
前後編に渡り読んでいただき、ありがとうございます。

振り返ってみると、カスタムモデルの作成からAWSへのデプロイまで、
想像以上に多くの技術要素が絡み合っていることに改めて気づきました。

特にKinesis Video Streamsを使ったライブストリーミングはGStreamerとの連携やHLS URLなど、思いのほか考慮する箇所が多々ありました。
ただその分、実際にLTでリアルタイムにシイタケの検出結果を見せることができた時は、
とても達成感がありました。

もちろん今回実装したシステムはシイタケ以外の物体検出に応用できるので、
皆様もぜひ好きなカスタムモデルを作ってAWSにデプロイしてみてください。

この記事をシェアする

FacebookHatena blogX

関連記事