[Amazon SageMaker] イメージ分類を使用して、商品棚を点検してみました(間違った位置に置かれた商品を検出する)

2020.07.07

1 はじめに

CX事業本部の平内(SIN)です。

本日は、弊社クラスメソッドの創立記念日です。 創立記念日は、社員全員が、ブログを書く日と言うことで・・・今回は、イメージ分類を使用した「商品棚の点検」を紹介させて下さい。

最初に動作している様子です。

商品棚には、各段毎にマーカーが設置されており、マーカーからの相対位置で切り取った商品の画像を推論にかけています。 推論結果が、商品DBと一致した時に、正常表示となるようになっています。

2 メイン処理

メインとなるコードは、以下のとおりで、主な処理は、以下の各クラスになっています。

  • Maker マーカー関連処理
  • Video カメラ画像処理
  • Model 推論用モデル
  • Stock 商品のステータス管理

取得したカメラ画像(①)から、マーカーを基準に商品の画像を取り出し(②)、ステータスが異常状態の場合(③)、推論にかけて(④)結果を格納(⑤)しています。

index.py

# Webカメラで商品棚を確認するプログラム
import cv2
import numpy as np
import makerClass
import videoClass
import modelClass
import stockClass

# Webカメラ
DEVICE_ID = 0 
WIDTH = 1920
HEIGHT = 1080
FPS = 24

def main():

    video = videoClass.Video(DEVICE_ID, WIDTH, HEIGHT, FPS)
    maker = makerClass.Maker()
    model = modelClass.Model()
    stock = stockClass.Stock()

    while True:

        # カメラ画像取得(①)
        frame = video.read()
        if(frame is None):
            continue
        # 商品画像取得(②)
        cells = maker.getCells(frame)
        if(len(cells)>0):
            for (row, col, img) in cells:
                # ステータス表示が「正常」でない場合(③)
                if(stock.getStatus(col, row) == False):
                    # 推論にかける(④)
                    (id, name, probability) = model.inference(img)
                    # 結果を格納(⑤)
                    stock.setStatus(col, row, id)
        # カメラ画像表示
        if(video.show('frame', frame, 0.3)==False):
            break

    del video

if __name__ == '__main__':
    main()

3 モデル

モデルをラップして推論するクラスです。

SageMakerの画像分類(組み込みアルゴリズム)で作成したモデルは、./modelに置かれています。 メイン処理から送られた画像は、モデルの入力インターフェース(data=[1,3,224,224])に合わせた後、推論に回されます。

modelClass.py

# Webカメラで商品棚を確認するプログラム
# モデル関連クラス

import cv2
import numpy as np
import mxnet as mx
from collections import namedtuple

class Model():
    def __init__(self): 
        MODEL_PATH='./model/image-classification'
        SHAPE = 224
        input_shapes=[('data', (1, 3, SHAPE, SHAPE))]
        self.__Batch = namedtuple('Batch', ['data'])
        sym, arg_params, aux_params = mx.model.load_checkpoint(MODEL_PATH, 0)
        self.__mod = mx.mod.Module(symbol=sym, context=mx.cpu())
        self.__mod.bind(for_training=False, data_shapes=input_shapes)
        self.__mod.set_params(arg_params, aux_params)

        self.__CLASSES = ["PORIPPY(GREEN)","OREO","CUNTRY_MAM","PORIPPY(RED)","KAKINOTANE(WASABI)","TUUNO_TOUMOROKOSHI","CHEDDER_CHEESE","PEANUTS","STONE","PRETZEL(YELLOW)","UMIAJISEN","KAKINOTANE(NORMAL)","COLORFUL","FURUGURA(BROWN)","NOIR","BANANA(BLOWN)","CHEESE_ARARE","ORENO_OYATU","PRIME","CRATZ(RED)","CRATZ(GREEN)","AGEICHIBAN","PORIPPY(YELLOW)","KOTUBUKKO","ASPARAGUS","NORI_PI","ICHIGO","UMESHISO","TUUNO_EDAMAME","KAKINOTANE(UME)","PRETZEL(BLACK)","KARASHIMENTAIKO","CRATZ(ORANGE)","CHOCO_MERIZE","POTATO(RED)","BANANA(BLUE)","DENROKU","PUZZLE","FURUGURA(RED)","PRETZEL(GREEN)","POTATO(BLUE)"]

    def inference(self, img):
        # 入力インターフェースへの画像変換
        SHAPE = 224
        img = cv2.resize(img, (SHAPE, SHAPE)) # xxx * xxx -> 224*224
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # BGR -> RGB
        img = img.transpose((2, 0, 1)) # 224,224,3 -> 3,224,224
        img = img[np.newaxis, :] # 3,224,224 -> 1,3,224,224

        # 推論
        self.__mod.forward(self.__Batch([mx.nd.array(img)]))
        prob = self.__mod.get_outputs()[0].asnumpy()

        prob = np.squeeze(prob)
        probabilitys = {}
        for i,result in enumerate(prob):
            probabilitys[self.__CLASSES[i]] = result
        probabilitys = sorted(probabilitys.items(), key = lambda x:x[1], reverse=True)
        (name, probability) = probabilitys[0]
        return (self.__CLASSES.index(name), name, probability)

4 マーカー

マーカー関連のコードです。

画像に含まれる全てのマーカーをテーブル(self.__rows)と比較し、段の4隅が検出できている場合、そのエリアの画像を切り取っています。 切り取った画像は、1300*400で長方形に変形され、更にお菓子の位置を相対的に取得しています。

# Webカメラで商品棚を確認するプログラム
# マーカー関連クラス
import cv2
import numpy as np

class Maker():
    def __init__(self):
        self.__aruco = cv2.aruco
        self.__dictionary = self.__aruco.getPredefinedDictionary(self.__aruco.DICT_4X4_50)
        self.__drawColor = (255, 255, 0)
        self.__rows = [
            [1,2,3,4],
            [3,4,5,6],
            [5,6,7,8],
            [9,17,11,12],
            [11,12,13,14],
            [13,14,15,16],
        ]

    # 画像からマーカーを検出する
    def __detect(self, frame, draw = False):
        corners, ids, rejectedImgPoints = self.__aruco.detectMarkers(frame, self.__dictionary) 
        if(draw):
            self.__aruco.drawDetectedMarkers(frame, corners, ids, self.__drawColor) 
        return (corners, ids)

    # インデックスを指定してマーカーの中心座標を取得する
    def __getMarkerMean(self, ids, corners, index):
        for i, id in enumerate(ids):
            # マーカーのインデックス検索
            if(id[0] == index):
                v = np.mean(corners[i][0],axis=0) # マーカーの四隅の座標から中心の座標を取得する
                return [v[0],v[1]]
        return None

    # インデックスで指定した部分の座標を取得する
    def __getBox(self, corners, ids, row):
        p1 =  self.__getMarkerMean(ids, corners, row[0])
        p2 =  self.__getMarkerMean(ids, corners, row[1])
        p3 =  self.__getMarkerMean(ids, corners, row[2])
        p4 =  self.__getMarkerMean(ids, corners, row[3])
        return [p1, p2, p3, p4]

    # マーカー間の画像を変形する
    def __getTransformImage(self, box, frame,  width, height):
        frame_coordinates = np.float32(box)
        target_coordinates   = np.float32([[0, 0],[width, 0],[0, height],[width, height]])
        trans_mat = cv2.getPerspectiveTransform(frame_coordinates,target_coordinates)
        return cv2.warpPerspective(frame, trans_mat, (width, height))

    # 取得したid情報から有効なROWブロックを返す
    def __getRows(self, ids):
        rows = []
        for index,row in enumerate(self.__rows):
            if(np.count_nonzero(ids == row[0])==1 and
               np.count_nonzero(ids == row[1])==1 and
               np.count_nonzero(ids == row[2])==1 and
               np.count_nonzero(ids == row[3])==1):
                rows.append((index, row))
        return rows

    # お菓子(5個)の画像を取得する
    def __getSweetsItems(self, img):
        height, width, channels = img.shape[:3]
        m = int(width * 0.05)
        width = width * 0.85
        images = []
        w = int(width/5)
        for i in range(5):
            images.append(img[0: height, w*i+m: w*i+w+m])
        return images     

    def getCells(self, frame):
        (corners, ids) = self.__detect(frame, True)
        rows = self.__getRows(ids)
        cells = []
        for (i, row) in rows:
            # インデックスで指定した部分の座標を取得する
            box = self.__getBox(corners, ids, row)
            # エリアを変形して矩形を得る
            img = self.__getTransformImage(box, frame,  1300, 400)
            # 5分割してお菓子の画像を取得する
            images = self.__getSweetsItems(img)
            for col in range(5):
                cells.append((i, col, images[col]))   
            # 処理したエリアを表示する 
            pts = np.array(box, np.int32)
            cv2.polylines(frame, [pts], True, (255, 255, 0), thickness=2)
        return cells

5 商品のステータス

推論の結果は、Stockクラスに送られることで、現在の「正常」「異常」が管理されます。 推論の負荷を下げるため、一旦「正常」を検出できた場合は、再び推論に回すことが無いようになっています。

このクラスで保持するステータスは、html/status.csv に保存され、ブラウザで動作しているJavaScriptが一覧に反映します。

stockClass.py

# Webカメラで商品棚を確認するプログラム
# 商品棚関連クラス
import numpy as np

class Stock():
    def __init__(self):

        self.__table = [[32,19,20,12,40],[14,1,18,2,23],[6,16,5,24,17],[3,4,7,36,21],[10,29,11,35,15],[26,38,30,9,31]]
        self.__status = [[0,0,0,0,0],[0,0,0,0,0],[0,0,0,0,0],[0,0,0,0,0],[0,0,0,0,0],[0,0,0,0,0]]
        self.__fileName = 'html/status.csv'
        self.__saveStatus()       


    def getStatus(self, col, row):
        if(self.__status[row][col]==1):
            return True
        return False

    def setStatus(self, col, row, id):
        if(self.__table[row][col]==id):
            self.__status[row][col]=1
            self.__saveStatus()

    def __flatten(self):
        pass

    def __saveStatus(self):       
        with open(self.__fileName, 'w') as f:
            str = ''
            for l in sum(self.__status, []):
                str += "{},".format(l)
            f.writelines(str)

    def count(self):
        counter = 0
        for row in range(6):
            for col in range(5):
                if(self.__status[row][col] == 0):
                    counter += 1
        return counter

6 最後に

今回は、機械学習(イメージ分類)を利用した、商品棚の点検をプロトタイプ作成してみました。

この仕組は、正確に商品を推論できることが大きな要件となるため、比較的高い精度のモデルが必要になります。 また、適宜のモデル更新や、環境や条件によっては、期待した動作が出来ないことがある事も認識しておく必要があると思います。