この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
カフェチームの山本です。
前回の記事では、クラウド上で動画を処理するために、エッジデバイスから動画ファイルを送信する方法として、Pythonのプログラムを実装しました。(ここでは、予め動画ファイルが作成されていることが前提となっていました)
今回は、カメラで撮影した映像をすぐに送信するケースを考えます。送信する動画ファイルを作成するまでの時間(遅延)を短縮するために、撮影した画像をリアルタイムにエンコードする方法を調べました。この記事では、FFmpegを使用する方法を記載します。
実装したコード
早速結論ですが、以下のようなコードを実装することで、リアルタイムにエンコードできました。
video_writer.py
from enum import Enum
def video_filepath(device_id, producer_time):
return f"temp/{device_id}/{producer_time}.mkv"
class VideoWriterState(Enum):
OPEN = 0
RELEASED = 1
camera.py
OpenCVを使って動画を撮影するスクリプトです。
import cv2
import time
def estimate_capture_time(capture_time_post, time_offset):
capture_time_estimate = capture_time_post - time_offset
return capture_time_estimate
class Camera():
def __init__(self, idx_cam, width, height, fps, time_offset):
self._idx_cam = idx_cam
self._width = width
self._height = height
self._fps = fps
self._time_offset = time_offset
capture = cv2.VideoCapture(idx_cam)
capture.set(cv2.CAP_PROP_FRAME_WIDTH, width)
print(capture.get(cv2.CAP_PROP_FRAME_WIDTH), width)
assert capture.get(cv2.CAP_PROP_FRAME_WIDTH) == width
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
print(capture.get(cv2.CAP_PROP_FRAME_HEIGHT), height)
assert capture.get(cv2.CAP_PROP_FRAME_HEIGHT) == height
capture.set(cv2.CAP_PROP_FPS, fps)
print(capture.get(cv2.CAP_PROP_FPS), fps)
assert capture.get(cv2.CAP_PROP_FPS) == fps
self._capture = capture
def read(self):
# capture_time_pre = time.time()
ret, frame = self._capture.read()
capture_time_post = time.time()
capture_time_estimate = estimate_capture_time(capture_time_post, self._time_offset)
return ret, frame, capture_time_estimate
ffmpeg_video_writer.py
FFmpegを起動するコマンドを作成し、別プロセスとして立ち上げます。撮影した映像(画像)を、標準入力経由でそのプロセスに送信しつづけることで、リアルタイムにFFmpegで処理することができます。
コマンドは、「ffmpeg -y -f rawvideo -pixel_format bgr24 -video_size 640x480 -framerate 15 -i - -an -c:v libx264 -b:v 800k temp/0/1619508272.mkv」のようになり、それぞれ以下のような意味です。
- "-y":出力ファイルが存在する場合、上書きする
- "-f rawvideo":出力フォーマットを
- "-pixel_format bgr24":1ピクセルの画素の順番が青・緑・赤の順で8bitずつ(OpenCVの形式に合わせる)
- "-framerate 15":フレームレートが15fps
- "-i -":データ入力元を標準入力にする
- "-video_size 640x480":映像のサイズが横640・縦480
- "-an":音声なし
- "-c:v libx264":libx264でエンコードする(H264・ソフトウェアエンコーディング)
- "-b:v 800k":ビデオのビットレートを800kにする
- "temp/0/1619508272.mkv":出力先のファイル名カメラによっては対応していない解像度・FPSがあるので、CAPTURE_WIDTHなどを適宜変更してください。
import subprocess
import os
from video_writer import video_filepath, VideoWriterState
def create_command(codec, width, height, fps, pixel_format, filename, bitrate=None):
# pixel_format : ex) "bgr24"
# codec : ex) "h264"
# bitrate : ex) "800k"
dimension = f'{width}x{height}'
# filename_os = f'"{os.path.abspath(filename)}"'
if codec == "H264" or codec == "h264":
ffmpeg_codec = "libx264"
# ffmpeg_codec = "h264_omx" # hardware encoding if possible
else:
assert False
command = []
command.extend([
'ffmpeg',
'-y',
'-f', 'rawvideo',
'-pixel_format', pixel_format,
'-video_size', dimension,
'-framerate', str(fps),
'-i', '-',
'-an',
'-c:v', ffmpeg_codec,
])
if bitrate is not None:
command.extend([
'-b:v', bitrate,
])
command.extend([
filename
])
return command
class FFmpegVideoWriter():
def __init__(self, producer_time, device_id, codec_forcc, width, height, fps, pixel_format, bitrate=None):
self._producer_time = producer_time
self._codec_forcc = codec_forcc
self._width = width
self._height = height
self._fps = fps
filename = video_filepath(device_id, producer_time)
if os.path.dirname(filename) != "":
os.makedirs(os.path.dirname(filename), exist_ok=True)
self._filename = filename
command = create_command(codec_forcc, width, height, fps, pixel_format, filename, bitrate)
self._command = command
self._proc = subprocess.Popen(command, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
# print(" ".join(command))
self._frame_count = 0
self._state = VideoWriterState.OPEN
def write(self, frame):
if self._state == VideoWriterState.OPEN:
self._proc.stdin.write(frame.tobytes())
self._frame_count += 1
def release(self):
if not self._state == VideoWriterState.RELEASED:
# self._writer.release()
self._proc.stdin.close()
self._proc.stderr.close()
self._proc.wait()
self._state = VideoWriterState.RELEASED
return self._filename, self._frame_count
############################### (以下、実行処理) ####################################
if __name__ == "__main__":
CAMERA_DEVICE_INDEX = 0
CAPTURE_WIDTH, CAPTURE_HEIGHT = 640, 480
CAPTURE_FPS = 15
CAPTURE_TIME_OFFSET = 0.17
CAPTURE_PIXEL_FORMAT = "bgr24"
WRITER_CODEC_FOURCC = "H264"
WRITER_BITRATE = "800k"
import time
from camera import Camera
writer = FFmpegVideoWriter(int(time.time()), CAMERA_DEVICE_INDEX, WRITER_CODEC_FOURCC, CAPTURE_WIDTH, CAPTURE_HEIGHT, CAPTURE_FPS, CAPTURE_PIXEL_FORMAT, WRITER_BITRATE)
camera = Camera(CAMERA_DEVICE_INDEX, CAPTURE_WIDTH, CAPTURE_HEIGHT, CAPTURE_FPS, CAPTURE_TIME_OFFSET)
# 撮影&エンコード
n_frame_to_capture = 300
i_frame = 0
while True:
ret, frame, capture_time_estimate = camera.read()
writer.write(frame)
i_frame += 1
if i_frame >= n_frame_to_capture:
break
# 終了処理
filename, frame_count = writer.release()
# 結果表示・確認
import cv2
cap = cv2.VideoCapture(filename)
while True:
ret, frame = cap.read()
print(ret)
if not ret:
break
else:
cv2.imshow("frame", frame)
cv2.waitKey(int(1000/CAPTURE_FPS))
使用するカメラによっては対応していない解像度・FPSがあるので、CAPTURE_WIDTHなどを適宜変更してください。以下のコマンドなどで確認できます。
https://askubuntu.com/questions/214977/how-can-i-find-out-the-supported-webcam-resolutions
lsusb
lsusb -s 00x:00x -v | egrep "Width|Height"
v4l2-ctl --list-formats-ext
注意点
今回の方法で出力したファイルは、Windowsのエクスプローラや再生アプリでは、中身を表示できませんでした。
Windowsのエクスプローラ上での表示。サムネイルが表示されない。
Windowsの「動画 & テレビ」アプリで再生した際の表示。黒画面のまま進まない。
上記スクリプトの「# 結果表示・確認」部分のように、OpenCVを利用して読み込むと、正しく録画されていることが確認できました。また、このファイルを前回の記事のスクリプトでKinesis Video Streamsに送信すると、コンソール上で正しく表示されること確認できたので、クラウド側で処理するには問題なさそうです。
まとめ
撮影した映像をリアルタイムにエンコーディングするために、PythonからFFmpegのプロセスを起動し、画像を標準入力経由で入力するコードを実装しました。
参考にさせていただいたページ・サイト
https://stackoverflow.com/questions/34167691/pipe-opencv-images-to-ffmpeg-using-python