PoseNet Pythonで棒人間化した動画をKinesis Video Streamsに配信してみた

2021.07.07

CX事業本部@大阪の岩田です。以前このツイートを見た時に気になっていたPoseNetを使って、Macのカメラで撮影した動画の棒人間化に挑戦してみました。

単に棒人間化する記事は以前Zennに投稿しているので、棒人間化した動画をKinesis Video Streamsに配信するところまで実装してみます。

やること

今回やることはざっくり以下の通りです

  • Mac付属のカメラで動画を撮影する
  • 撮影した動画はPoseNet Pythonで棒人間化する
  • 棒人間化した動画はKinesis Video Streamsに配信する
  • Kinesis Video Streamsへの配信にはGStreamerを使わない
  • これを全部Pythonで実装する

環境

今回利用した環境です

  • OS Mac OS X 10.15
  • Python: 3.7.8
    • av: 8.0.3
    • boto3: 1.17.98
    • botocore: 1.20.98
    • numpy: 1.21.0
    • opencv-python: 4.5.2.54
    • posenet-python
    • PyYAML: 5.4.1
    • scipy: 1.7.0
    • tensorflow: 1.13.1

事前準備

今回動画の棒人間化にはPoseNetのPython実装であるPoseNet Pythonを利用します。まずはPoseNet Pythonのリポジトリをクローンします

$ git clone https://github.com/rwightman/posenet-python
Cloning into 'posenet-python'...
remote: Enumerating objects: 119, done.
remote: Total 119 (delta 0), reused 0 (delta 0), pack-reused 119
Receiving objects: 100% (119/119), 36.56 KiB | 4.57 MiB/s, done.
Resolving deltas: 100% (68/68), done.

続いて仮想環境を作成し、必要な依存ライブラリをインストールします。

$ cd posenet-python/
$ python -m venv .venv
$ source .venv/bin/activate
$ pip install numpy opencv-python scipy tensorflow==1.13.1 pyyaml

Collecting numpy
  Using cached numpy-1.21.0-cp37-cp37m-macosx_10_9_x86_64.whl (16.8 MB)
Collecting opencv-python
  Using cached opencv_python-4.5.2.54-cp37-cp37m-macosx_10_15_x86_64.whl (43.7 MB)
  ...略

ここまで準備できたらリポジトリに含まれているサンプルコードが動作することを確認します。

$ python webcam_demo.py

ここまでで棒人間化の準備が整いました。続いてKinesis Video Streamsに動画を配信するための準備をしていきます。Kinesis Video Streamのエンドポイント取得とSIGV4の署名を作成するためにboto3を、カメラで撮影した画像からMKVファイルを作成するためにPyAVをインストールします

$ pip install boto3 av

これで実装の準備が整いました。今回詳細は割愛しますが

  • Kinesis Video Streamsのストリーム作成
  • 作成したストリームにPutMedia可能なクレデンシャル情報

も準備しておいて下さい。

実装

ここからは実際に棒人間の動画をKinesis Video Streamsに配信するロジックを実装します。先程Posenet Pythonの動作確認に使用したサンプルファイルwebcam_demo.pyを流用しつつ実装していきます。最終的なコードは以下のようになりました。エラーハンドリングなどは省略していますし、関数の分け方等もあまりキレイではないので、もし参考にされる場合はあくまで参考程度として下さい。

また、前提としてKinesis Video Streamsのストリームは事前作成済み、ストリームに書き込むためのクレデンシャル情報はセットアップ済みとします。

import argparse
import concurrent.futures
import time
import io

import av
import boto3
import botocore
from botocore.auth import SigV4Auth
import cv2
import hashlib
import tensorflow as tf
import numpy as np

import posenet


class gen_request_parameters:
    def __init__(self, data):
        self._data = data
        self._pointer = 0
        self._size = len(self._data)
    def __iter__(self):
        return self
    def __next__(self):
        if self._pointer >= self._size:
            raise StopIteration  # signals "the end"
        left = self._size - self._pointer
        chunksz = 16000
        if left < chunksz:
            chunksz = left
        pointer_start = self._pointer
        self._pointer += chunksz
        return self._data[pointer_start:self._pointer]

def get_data_endpoint(stream_name, region):
    kvs_client = boto3.client('kinesisvideo', region_name=region)
    res = kvs_client.get_data_endpoint(StreamName=stream_name, APIName='PUT_MEDIA')
    return res['DataEndpoint']

def put_media(stream_name, url, start_ts, data):

    sha256_digest = hashlib.sha256(data).hexdigest()
    headers= {
        'x-amzn-stream-name': stream_name,
        'x-amzn-fragment-timecode-type': 'RELATIVE',
        'x-amzn-producer-start-timestamp': str(int(start_ts)),
        'x-amz-content-sha256': sha256_digest
    }
    request = botocore.awsrequest.AWSRequest(method='POST', url=url, headers=headers, data=gen_request_parameters(data))
    credentials =  boto3.Session().get_credentials()
    service = 'kinesisvideo'
    region = 'ap-northeast-1'
    botocore.auth.SigV4Auth(credentials, service, region).add_auth(request)
    res = botocore.httpsession.URLLib3Session().send(request.prepare())
    print(res.headers)

parser = argparse.ArgumentParser()
parser.add_argument('--model', type=int, default=101)
parser.add_argument('--cam_width', type=int, default=1280)
parser.add_argument('--cam_height', type=int, default=720)
parser.add_argument('--scale_factor', type=float, default=0.7125)
args = parser.parse_args()


def main():

    stream_name = <事前に作成したKVSのストリーム名>
    region = 'ap-northeast-1'
    data_endpoint = get_data_endpoint(stream_name, region)
    url = f'{data_endpoint}/putMedia'


    with tf.Session() as sess, concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
        model_cfg, model_outputs = posenet.load_model(args.model, sess)
        output_stride = model_cfg['output_stride']


        cap = cv2.VideoCapture(0)
        cap.set(3, args.cam_width)
        cap.set(4, args.cam_height)
        cap_fps = 10

        while True:

            out_buffer = io.BytesIO()
            container = av.open(out_buffer, 'w', 'matroska')
            stream = container.add_stream('h264', rate=cap_fps)
            stream.width = args.cam_width
            stream.height = args.cam_height
            stream.pix_fmt = 'yuv420p'

            i = 0
            start_ts = time.time()
            while i < (cap_fps * 9):
                input_image, display_image, output_scale = posenet.read_cap(
                    cap, scale_factor=args.scale_factor, output_stride=output_stride)

                heatmaps_result, offsets_result, displacement_fwd_result, displacement_bwd_result = sess.run(
                    model_outputs,
                    feed_dict={'image:0': input_image}
                )

                pose_scores, keypoint_scores, keypoint_coords = posenet.decode_multi.decode_multiple_poses(
                    heatmaps_result.squeeze(axis=0),
                    offsets_result.squeeze(axis=0),
                    displacement_fwd_result.squeeze(axis=0),
                    displacement_bwd_result.squeeze(axis=0),
                    output_stride=output_stride,
                    max_pose_detections=10,
                    min_pose_score=0.15)

                keypoint_coords *= output_scale

                skeleton_img = np.zeros(display_image.shape, dtype=np.uint8)
                skeleton_img = posenet.draw_skeleton(
                                skeleton_img, pose_scores, keypoint_scores, keypoint_coords,
                                min_pose_confidence=0.02, min_part_confidence=0.02)

                frame = av.VideoFrame.from_ndarray(skeleton_img, format='rgb24')
                for packet in stream.encode(frame):
                    container.mux(packet)
                i += 1

                cv2.imshow('posenet', skeleton_img)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break


            # Flush stream
            for packet in stream.encode():
                container.mux(packet)
            container.close()

            out_buffer.seek(0)
            data = out_buffer.read()

            executor.submit(put_media, stream_name, url, start_ts, data)


if __name__ == "__main__":
    main()

1つづつコードを解説していきます。まずはgen_request_parametersです。

class gen_request_parameters:
    def __init__(self, data):
        self._data = data
        self._pointer = 0
        self._size = len(self._data)
    def __iter__(self):
        return self
    def __next__(self):
        if self._pointer >= self._size:
            raise StopIteration  # signals "the end"
        left = self._size - self._pointer
        chunksz = 16000
        if left < chunksz:
            chunksz = left
        pointer_start = self._pointer
        self._pointer += chunksz
        return self._data[pointer_start:self._pointer]

こちらは後ほどKinesis Video Streamのエンドポイントに送信する動画のバイナリデータをイテレータオブジェクトにラップするためのクラスです。こちらのコードをほぼそのまま流用させて頂きました。

https://coderoad.ru/59481174/Amazon-AWS-Kinesis-Video-Boto-GetMedia-PutMedia

続いて get_data_endpoint

def get_data_endpoint(stream_name, region):
    kvs_client = boto3.client('kinesisvideo', region_name=region)
    res = kvs_client.get_data_endpoint(StreamName=stream_name, APIName='PUT_MEDIA')
    return res['DataEndpoint']

Kinesis Video Streamのエンドポイントを取得するためのシンプルな関数です

次は put_mediaです。こちらはKinesis Video StreamのエンドポイントにPutMediaするための処理です。

def put_media(stream_name, url, start_ts, data):

    sha256_digest = hashlib.sha256(data).hexdigest()
    headers= {
        'x-amzn-stream-name': stream_name,
        'x-amzn-fragment-timecode-type': 'RELATIVE',
        'x-amzn-producer-start-timestamp': str(int(start_ts)),
        'x-amz-content-sha256': sha256_digest
    }
    request = botocore.awsrequest.AWSRequest(method='POST', url=url, headers=headers, data=gen_request_parameters(data))
    credentials =  boto3.Session().get_credentials()
    service = 'kinesisvideo'
    region = 'ap-northeast-1'
    botocore.auth.SigV4Auth(credentials, service, region).add_auth(request)
    res = botocore.httpsession.URLLib3Session().send(request.prepare())
    print(res.headers)

PutMediaにはSIGV4の署名が必要になるので、SigV4Authクラスのadd_authで署名を作成&付与しています。署名を作成するためのクレデンシャル情報はboto3.Session().get_credentials()を利用すると簡単に取得できます。以下の部分です

    request = botocore.awsrequest.AWSRequest(method='POST', url=url, headers=headers, data=gen_request_parameters(data))
    credentials =  boto3.Session().get_credentials()
    service = 'kinesisvideo'
    region = 'ap-northeast-1'
    botocore.auth.SigV4Auth(credentials, service, region).add_auth(request)

最後にメインの処理です。カメラで撮影した映像を読み込んで棒人間化、Kinesis Video Streamsまで配信します。

    stream_name = <事前に作成したKVSのストリーム名>
    region = 'ap-northeast-1'
    data_endpoint = get_data_endpoint(stream_name, region)
    url = f'{data_endpoint}/putMedia'


    with tf.Session() as sess, concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
        model_cfg, model_outputs = posenet.load_model(args.model, sess)
        output_stride = model_cfg['output_stride']


        cap = cv2.VideoCapture(0)
        cap.set(3, args.cam_width)
        cap.set(4, args.cam_height)
        cap_fps = 10

        while True:

            out_buffer = io.BytesIO()
            container = av.open(out_buffer, 'w', 'matroska')
            stream = container.add_stream('h264', rate=cap_fps)
            stream.width = args.cam_width
            stream.height = args.cam_height
            stream.pix_fmt = 'yuv420p'

            i = 0
            start_ts = time.time()
            while i < (cap_fps * 9):
                input_image, display_image, output_scale = posenet.read_cap(
                    cap, scale_factor=args.scale_factor, output_stride=output_stride)

                heatmaps_result, offsets_result, displacement_fwd_result, displacement_bwd_result = sess.run(
                    model_outputs,
                    feed_dict={'image:0': input_image}
                )

                pose_scores, keypoint_scores, keypoint_coords = posenet.decode_multi.decode_multiple_poses(
                    heatmaps_result.squeeze(axis=0),
                    offsets_result.squeeze(axis=0),
                    displacement_fwd_result.squeeze(axis=0),
                    displacement_bwd_result.squeeze(axis=0),
                    output_stride=output_stride,
                    max_pose_detections=10,
                    min_pose_score=0.15)

                keypoint_coords *= output_scale

                skeleton_img = np.zeros(display_image.shape, dtype=np.uint8)
                skeleton_img = posenet.draw_skeleton(
                                skeleton_img, pose_scores, keypoint_scores, keypoint_coords,
                                min_pose_confidence=0.02, min_part_confidence=0.02)

                frame = av.VideoFrame.from_ndarray(skeleton_img, format='rgb24')
                for packet in stream.encode(frame):
                    container.mux(packet)
                i += 1

            # Flush stream
            for packet in stream.encode():
                container.mux(packet)
            container.close()

            out_buffer.seek(0)
            data = out_buffer.read()

            executor.submit(put_media, stream_name, url, start_ts, data)

メイン処理の中のポイントをいくつか解説していきます。

MKVファイルを作成する処理

PyAVを利用してメモリ上にMKVファイルを作成します。まずはav.open()OutputContainerを作成します。第3引数に matroskaを指定することでマルチメディアコンテナフォーマットがMKV(Matroska Video File)形式になります。

out_buffer = io.BytesIO()
container = av.open(out_buffer, 'w', 'matroska')

続いてOutputContainerにストリームを追加します。Kinesis Video Streamsに送信できるようにコーデックにはh264を指定します

stream = container.add_stream('h264', rate=cap_fps)
stream.width = args.cam_width
stream.height = args.cam_height
stream.pix_fmt = 'yuv420p'

以後はこのストリームに順次フレームを追加していきます。棒人間画像はNumPyのndarray形式で取得されているので、av.VideoFrame.from_ndarrayでフレームに変換し、コンテナに追加していきます。

frame = av.VideoFrame.from_ndarray(skeleton_img, format='rgb24')
for packet in stream.encode(frame):
	container.mux(packet)

Kinesis Video Streamsの上限に引っかからない程度にフレームを追加し終わったら、MKVファイルをメモリ上に書き出し&読み出してMKVファイルのバイナリデータをdataという変数に格納します。

for packet in stream.encode():
	container.mux(packet)
container.close()

out_buffer.seek(0)
data = out_buffer.read()

executor.submit(put_media, stream_name, url, start_ts, data)

バイナリデータの準備ができたので、別プロセスでKinesis Video StreamsにPutMediaを実行します。

棒人間画像を生成する処理

続いて棒人間画像を生成する部分です。posenet.read_capでカメラから画像を読み込み、読み込んだ画像に対してTensorFlowのSessionrunし、その結果に対してPoseNet Pythonの decode_multiple_poses を実行することで骨格検出の結果を取得します。

input_image, display_image, output_scale = posenet.read_cap(
	cap, scale_factor=args.scale_factor, output_stride=output_stride)

heatmaps_result, offsets_result, displacement_fwd_result, displacement_bwd_result = sess.run(
	model_outputs,
  feed_dict={'image:0': input_image}
)

pose_scores, keypoint_scores, keypoint_coords = posenet.decode_multi.decode_multiple_poses(
	heatmaps_result.squeeze(axis=0),
  offsets_result.squeeze(axis=0),
  displacement_fwd_result.squeeze(axis=0),
  displacement_bwd_result.squeeze(axis=0),
  output_stride=output_stride,
  max_pose_detections=10,
  min_pose_score=0.15)

最後にnp.zerosで真っ黒な画像を生成し、検出された骨格のラインをposenet.draw_skeletonで書き出して棒人間の画像を生成します。あとはこの画像skeleton_imgを先程解説したMKVファイルを作成する処理に渡してKinesis Video Streamsに送信すればOKです

skeleton_img = np.zeros(display_image.shape, dtype=np.uint8)
skeleton_img = posenet.draw_skeleton(
	skeleton_img, pose_scores, keypoint_scores, keypoint_coords,
	min_pose_confidence=0.02, min_part_confidence=0.02)

実行結果

準備ができたので実行してみましょう

$ python kvs.py

マネコンを監視していると...

棒人間...と言えるレベルではありませんが、棒が表示されました。まあこの辺は各種の閾値の調整だったり被写体の距離やポーズによってもう少し棒人間っぽくなってくれると思います。

Kinesis Video StreamsからダウンロードしたMP4ファイルもちゃんと棒人間?が表示できています

まとめ

バッファしたMKVファイルをPutMediaする度に若干映像に隙間が発生してしまって、ニアリアルタイムなストリーム配信としては微妙なできなのですが、一応は目的が達成できました。Kinesis Video Streamsを業務で利用する場合はプライバシーへの配慮も重要になってくると思います。エッジ側で動画を棒人間化してしまえば個人が識別できない動画としてクラウドに送信できるるので、色々と面倒なことを考えなくて済むようになりそうです。ユースケース次第ではこういった処理も使えるかもしれませんね。

参考