[Amazon SageMaker] 予め撮影した検証用動画による推論と追加学習用のデータ収集について

2021.01.23

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX事業本部の平内(SIN)です。

機械学習(物体検出)において、一発で完璧なモデルを作成するというのは、なかなかハードルの高い作業です。

しかし、作成したモデルを効率よく評価し、「未検出」若しくは、「誤検知」のデータ(画像)をうまく収集して、追加学習していくことで、精度を向上させることは可能です。

今回は、作成したモデルを、手元の評価用動画で確認し、「未検出」若しくは、「誤検知」があった場合に、そのフレームを、そのままスナップショットとして保存し、追加学習用のデータ(画像)を収集するしくみを試してみました。

最初に作業している様子をご確認下さい。

予め評価用に動画を撮影しておき、作成したモデルで推論して確認しています。この中で、「未検出」若しくは、「誤検知」の場面があった場合は、画面をクリックするだけで、そのフレームが保存されるようになっています。(注:動作確認のため「未検出」「誤検知」に限らず、クリックしてスナップショットを撮っています)

2 コード

フレームワークは、MXNetになっています。SageMakerの物体検出で作成したモデルは、変換が必要です。
[Amazon SageMaker] 組み込みアルゴリズム(物体検出)モデルをMXNetで利用できるようにする環境をSageMaker Studio Notebooksで作ってみました

クラス名と表示色は、モデルに合わせて変更が必要です。

インターバル(INTERVAL)を調整する事で、フレームを間引いて再生します。

クリックして保存される画像は、OUTPUT_PATHに保存されます。

index.py

# -*- coding: utf-8 -*-
"""
動画再生によるモデルの評価と、スナップショット取得
"""

import cv2
import sys, os
from datetime import datetime as dt
import numpy as np
from model import Model

MODEL_PATH = './model/deploy_model_algo_1'

# クラスと表示色(確認用)
CLASSES = ["jagabee","chipstar","butamen","kyo_udon","koara","curry"]
COLORS = [(0,0,175),(175,0,0),(0,175,0),(175,175,0),(0,175,175),(175,175,175)]

OUTPUT_PATH = './output' # 画像を出力するフォルダ
INTERVAL = 24 # フレーム再生間隔

def on_click(event, x, y, flags, frame):
    if event == cv2.EVENT_LBUTTONUP:
        tdatetime = dt.now()
        date_str = tdatetime.strftime('%Y%m%d_%H%M%S')
        filename = "{}/{}.jpg".format(OUTPUT_PATH, date_str)
        cv2.imwrite(filename, frame)
        print("Saved. {}".format(filename))

def main():

    file_path = os.getcwd() + '/sample.mp4'
    cap = cv2.VideoCapture(file_path)
    if not cap.isOpened():
        sys.exit()

    width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
    height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
    fps = cap.get(cv2.CAP_PROP_FPS)
    print("fps:{} width:{} height:{}".format(fps, width, height))

    model = Model(CLASSES, COLORS, height, width, MODEL_PATH)

    counter = 0
    while True:

        # カメラ画像取得
        _, frame = cap.read()
        if(frame is None):
            cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
            continue

        if(counter % INTERVAL == 0):
            # 保存用
            save_image = frame.copy()
            (result, disp_frame) = model.inference(frame)
            print("{}".format(result))

        # 画像表示
        cv2.imshow('frame', disp_frame)

        # マウスクリックでスナップ撮影
        cv2.setMouseCallback('frame', on_click, save_image) 
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

        counter += 1

    cap.release()
    cv2.destroyAllWindows()

if __name__ == '__main__':
    main()

MXNetによる推論クラスです。inference()で画像を与えると、モデルに合わせたサイズ変換が行われ、結果を描画した画像が返されます。

model.py

# -*- coding: utf-8 -*-
"""
物体検出モデルの推論クラス
"""

import time
import numpy as np
import os
import cv2
import dlr
import time
import numpy as np
import os
import cv2
import mxnet as mx
import sys, os
import numpy as np
from collections import namedtuple

class Model():
    def __init__(self, names, colors, height, width, model_path):

        self.__names = names
        self.__colors = colors
        self.__shape = 512
        self.__h_magnification = self.__shape/height
        self.__w_magnification = self.__shape/width
        print("magnification H:{} W:{}".format(1/self.__h_magnification, 1/self.__w_magnification))

        input_shapes=[('data', (1, 3, self.__shape, self.__shape))]
        self.__Batch = namedtuple('Batch', ['data'])
        sym, arg_params, aux_params = mx.model.load_checkpoint(model_path, 0)
        self.__mod = mx.mod.Module(symbol=sym, label_names=[], context=mx.cpu())
        self.__mod.bind(for_training=False, data_shapes=input_shapes)
        self.__mod.set_params(arg_params, aux_params)

    def inference(self, frame):

        # 入力インターフェースへの画像変換
        img = cv2.resize(frame, dsize=(self.__shape, self.__shape)) # height * width => 512 * 512
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # BGR => RGB
        img = img.transpose((2, 0, 1)) # 512,512,3 => 3,512,512
        img = img[np.newaxis, :] # 3,512,512 => 1,3,512,512
        print("img.shape: {}".format(img.shape))

        # 推論
        start = time.time()

        self.__mod.forward(self.__Batch([mx.nd.array(img)]))
        prob = self.__mod.get_outputs()[0].asnumpy()
        prob = np.squeeze(prob)

        elapsed_time = time.time() - start
        print("{} [Sec]".format(elapsed_time))

        result = []
        for det in prob:
            if(det[0]==-1):
                continue
            index = int(det[0])
            confidence = det[1]
            # 検出座標(縮尺に合わせて座標を変換する)
            x1 = int(det[2] * self.__shape * 1/self.__w_magnification)
            y1 = int(det[3] * self.__shape * 1/self.__h_magnification)
            x2 = int(det[4] * self.__shape * 1/self.__w_magnification)
            y2 = int(det[5] * self.__shape * 1/self.__h_magnification)
            print("[{}] {:.1f} {}, {}, {}, {}".format(self.__names[index], confidence, x1, y1, x2, y2))
            if(confidence > 0.2):
                frame = cv2.rectangle(frame,(x1, y1), (x2, y2), self.__colors[index],2)
                frame = cv2.rectangle(frame,(x1, y1), (x1 + 150,y1-20), self.__colors[index], -1)
                label = "{} {:.2f}".format(self.__names[index], confidence)
                frame = cv2.putText(frame,label,(x1+2, y1-2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
                result.append(self.__names[index])
        return (result, frame)

3 最後に

今回は、予め準備した評価用の動画を使用してモデルの検証を行い、追加学習用の画像を収集する仕組みを試してみました。 これで、ファインチューニングによる効率的なモデルの精度向上を目指したいです。