「ちょい見せ」で、お菓子の種類を推論してみました

2020.09.22

1 はじめに

CX事業本部の平内(SIN)です。

機械学習の推論には、ある程度の時間がかかるため、動画で高速なFPSを出すのは、比較的難しいと思います。

今回は、対象の商品を一瞬だけカメラに見せて「ちょい見せ」、推論する仕組みを検討してみました。

使用したのは、RaspberryPi 4とRaspberryPiカメラモジュール、そしてIntel NeuralStick2(NCS2)です。

最初に、動作している様子です。

2 モデル

モデルは、Amazon SageMakerの組み込みアルゴリズム(画像分類)で作成したものです。

OpenVINOツールキットで使用可能なように、Model OptimizerでIR(中間表現フォーマット)に変換しました。

3 90fpsで撮影

RaspberryPiのカメラモジュールは、VGA(640x480)で90fpsの撮影が可能です。
参考:Raspberry Pi カメラモジュール V2

「ちょい見せ」で、可能な限り鮮明な画像を取得できるように、このカメラを使用して、撮影に専念するスレッドを作成しました。

このスレッドでは、前フレームとの差分を常に計算し、変化がある間だけ画像を保存しています。 変化が始まると、保存が開始され、変化が終わった時、保存された画像のうち時間的に真ん中に位置する画像を取り出し「推論」を行っています。

4 コード

作成したコードは、以下のとおりです。

主要なクラスは以下のようになっています。

  • Video Webカメラを処理するクラス
  • Images 差分が生じている間に撮影画像を蓄積するクラス、中央画像を取り出すと、初期化されます
  • Syohin40 商品識別モデルをラップするクラス、内部で入力形式への変換を行います
  • Manager 状態遷移を管理するクラス、IDLE(待機) => SVAE(画像保存) => START(推論開始) => INFER(推論中)と遷移します
from numpy import asarray
from dlr import DLRModel
from PIL import Image
import os
import time
import numpy as np
import cv2
from dispFps import DispFps
import threading
from model import Model
from openvino.inference_engine import IECore 

CLASSES = ["ポリッピー(GREEN)","OREO","カントリーマム","ポリッピー(RED)","柿の種(わさび)"
          ,"通のとうもろこし","CHEDDER_CHEESE","ピーナッツ","ストーンチョコ","PRETZEL(YELLOW)"
          ,"海味鮮","柿の種","カラフルチョコ","フルグラ(BROWN)","NOIR"
          ,"BANANA(BLOWN)","チーズあられ","俺のおやつ","PRIME","CRATZ(RED)"
          ,"CRATZ(GREEN)","揚一番","ポリッピー(YELLOW)","こつぶっこ","アスパラガス"
          ,"海苔ピーパック","いちご","梅しそチーズあられ","通のえだ豆","柿の種(梅しそ)"
          ,"PRETZEL(BLACK)","辛子明太子","CRATZ(ORANGE)","チョコメリゼ","フライドポテト(じゃがバター味)"
          ,"BANANA(BLUE)","でん六豆","パズル","フルグラ(RED)","PRETZEL(GREEN)"
          ,"フライドポテト(しお味)",]

# Webカメラを処理するクラス
class Video():
    def __init__(self):
        WIDTH = 640
        HEIGHT = 480
        FPS = 90
        self.__cap = cv2.VideoCapture(0)
        self.__cap.set(cv2.CAP_PROP_FRAME_WIDTH, WIDTH)
        self.__cap.set(cv2.CAP_PROP_FRAME_HEIGHT, HEIGHT)
        self.__cap.set(cv2.CAP_PROP_FPS, FPS)

    def __del__(self):
        self.__cap.release()
        cv2.destroyAllWindows()

    def read(self):
        return self.__cap.read()

# 差分が生じている間に撮影画像を蓄積するクラス
class Images():
    def __init__(self, dir):
        self.__dir = dir
        os.makedirs(self.__dir, exist_ok=True)
        self.__saveCounter = 0

    def __fileName(self, index):
        return "{}/{}.jpg".format(self.__dir, index)

    def save(self, frame):
        cv2.imwrite(self.__fileName(self.__saveCounter), frame)
        print("saved.")
        self.__saveCounter += 1

    def get(self):
        # 撮影した画像の中心部分を使用する
        center = int(self.__saveCounter/2)
        image = cv2.imread(self.__fileName(center))
        # 取得した時点で蓄積画像は削除する
        for i in range(self.__saveCounter):
            os.remove(self.__fileName(i))
        self.__saveCounter = 0
        return image


# 商品識別モデル        
class Syohin40(Model):
    def __init__(self, model):

        ie = IECore()
        device = "MYRIAD"
        super().__init__(ie, device, model)

        _, _, h, w = self.input_size
        self.__input_height = h
        self.__input_width = w

    def __prepare_frame(self, frame):
        initial_h, initial_w = frame.shape[:2]
        scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width)
        in_frame = cv2.resize(frame, (self.__input_width, self.__input_height))
        in_frame = in_frame.transpose((2, 0, 1))
        in_frame = in_frame.reshape(self.input_size)
        return in_frame, scale_h, scale_w

    def infer(self, frame):
        in_frame, _, _ = self.__prepare_frame(frame)
        result = super().infer(in_frame)
        return result.squeeze()

# 状態遷移を管理するクラス
class Manager():
    def __init__(self, previous):
        self.__previous = previous
        self.__status = "IDLE" # IDLE => SVAE => START => INFER
        self.__threshold = 1000 # 変化の敷居値
        self.__span = 3  # 静止間隔
        self.__counter = 0

    def check(self, frame):
        diff = self.__getDiff(self.__previous, frame)
        self.__previous = frame

        if(self.__threshold < diff):
            self.__counter = 0
        else:
            self.__counter += 1

        if(self.__status == "IDLE"):
            if(self.__threshold < diff):
                self.__status = "SAVE"
        elif(self.__status == "SAVE"):
            print("diff:{}".format(diff))
            if(self.__span < self.__counter):
                self.__status = "START"       

    @property
    def Status(self):
        return self.__status

    @Status.setter
    def Status(self, status):
        self.__status = status

    # 差分を数値化
    def __getDiff(self, img1, img2):
        # グレースケール変換
        img1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
        img2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
        # 差分取得
        mask = cv2.absdiff(img1, img2)
        # 2値化
        _, mask = cv2.threshold(mask, 50, 255, cv2.THRESH_BINARY)
        diff = cv2.countNonZero(mask) # 白の要素数
        mask = cv2.resize(mask, (320, 240))
        cv2.imshow('mask', mask)
        return diff 

class Main():
    def __init__(self):

        self.__images = Images("./tmpDir")
        self.__syohin40 = Syohin40("image-classification-0012")
        self.__video = Video()
        _, previous = self.__video.read()
        self.__manager = Manager(previous)

    def __infer(self):

        # 撮影した画像の中央画像を取得する
        image = self.__images.get()

        # 推論
        start = time.time()
        out = self.__syohin40.infer(image)
        processing_time = time.time() - start
        print("processing_time {} sec".format(processing_time))

        # 結果表示
        prob = np.max(out)
        index = np.argmax(out)
        print("--------------------------------")
        print("Class: %s, probability: %f" % (CLASSES[index], prob))
        print("--------------------------------")

        self.__manager.Status = "IDLE"

    def start(self):
        while(True):
            _, frame = self.__video.read()

            self.__manager.check(frame)

            if(self.__manager.Status == "SAVE"):
                self.__images.save(frame)

            elif(self.__manager.Status == "START"):
                self.__manager.Status = "INFER"
                infer = threading.Thread(target=self.__infer)
                infer.start()

            cv2.imshow('frame', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        del self.__video

main = Main()
main.start()

model.py

class Model:
    def __init__(self, ie, device, model):

        modelPath = "./FP32/{}".format(model)
        if(device=="MYRIAD"):
            modelPath = "./FP16/{}".format(model)

        net = ie.read_network(modelPath + ".xml", modelPath + ".bin")
        self.exec_net = ie.load_network(network=net, device_name=device, num_requests=2)

        self.input_name = next(iter(net.input_info))
        self.output_name = next(iter(net.outputs))

        self.input_size = net.input_info[self.input_name].input_data.shape
        self.output_size = net.outputs[self.output_name].shape


    def infer(self, data):
        input_data = {self.input_name: data}
        infer_result = self.exec_net.infer(input_data)
        return infer_result[self.output_name]

5 最後に

今回は、可能な限り高速で撮影し、「ちょい見せ」での推論が、どれぐらい可能かを試してみました。

もう少し、推論処理に時間をかけられるなら、1枚の画像でなく、複数の画像で推論し、その合計値で判定すると、もっと精度が上がるかも知れません。