[Amazon SageMaker] 手に持った商品を検出してみました

2021.03.28

1 はじめに

CX事業本部の平内(SIN)です。

最近、クロマキーで背景を透過させて、背景と合成することで、大量のデータセットを作成できることを確認しました。

上記では、最終的にアノテーション(バウンディングボックス)となるように、対象物の外接矩形を切り取ってファイル化する作業がありました。

コインであれば、概ね「表」と「裏」だけなので、何回か繰り返せば、それなりにデータセットとして有効だったのですが、「手に持った商品」となると、その見え方は、かなりのバリエーションとなり、それを網羅するように切り出すことは、かなりの作業となってしまいます。

そこで、今回は、この対象物を切り出す(最終的には、アノテーションとなる)作業を、プログラム化して、動画からデータセットを作成する要領を確認してみました。

最終的に作ったモデルで、推論している様子です。様々な角度で、商品検出できていることが分かると思います。

2 商品画像

背景が透過処理された、商品画像を作成した要領は、以下の通りです。

最初に、背景を処理しやすいように、極力1色にして、回転台の上に商品を置いて、スマフォで撮影しています。

撮影した動画を、Filmora 9でクロマキー処理して、背景を黒に変えています。

背景が黒くなった動画から、商品部分を切り取って、背景を透過にして保存しています。この時点で、商品ごとの画像は、200枚となっています。

ここまでの処理を紹介している動画です。

最後に、画像を切り取って、透過処理するコードは、以下です。

# -*- coding: utf-8 -*-
import shutil
import glob
import os
import cv2
import numpy as np

max = 200 # 1個の動画から生成する画像数
input_path = "./dataset/mp4"
output_path = "./dataset/output_png"

# 矩形検出
def detect_rectangle(img):
    # グレースケール
    gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    # 背景の多少のノイズは削除する
    _, gray_img  = cv2.threshold(gray_img, 50, 255, cv2.THRESH_BINARY)      
    # 輪郭検出
    contours, _ = cv2.findContours(gray_img, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)

    rect = None
    for contour in contours:
        # ある程度の面積が有るものだけを対象にする
        area = cv2.contourArea(contour, False);
        if area < 1000:
            continue
        # 輪郭を直線近似する
        epsilon = 0.1 * cv2.arcLength(contour, True)
        approx = cv2.approxPolyDP(contour,  epsilon, True)
        # 最大サイズの矩形を取得する
        x, y, w, h = cv2.boundingRect(contour)
        if(rect != None):
            if(w * h < rect[2] * rect[3]):
                continue
        rect = [x, y, w, h]
    return rect

# 透過イメージ保存
def create_transparent_image(img):
        # RGBを分離
        ch_b, ch_g, ch_r = cv2.split(img[:,:,:3])

        # アルファチャンネル生成
        h, w, _ = img.shape
        ch_a = np.zeros((h, w) ,dtype = 'uint8')
        ch_a += 255
        # 各チャンネルを結合
        rgba_img = cv2.merge((ch_b, ch_g, ch_r, ch_a))
        # マスク
        color_lower = np.array([0, 0, 0, 255]) 
        # color_upper = np.array([80, 80, 80, 255]) 
        color_upper = np.array([40, 40, 40, 255]) 
        mask = cv2.inRange(rgba_img, color_lower, color_upper)
        return cv2.bitwise_not(rgba_img, rgba_img, mask=mask)

def save_image(class_name, img):
    path = "{}/{}".format(output_path, class_name)
    if os.path.exists(path) == False:
        os.makedirs(path)
    for i in range(1000):
        filename = "{}/{}.png".format(path, i)
        if os.path.exists(filename) == False:
            cv2.imwrite(filename, img)
            print(filename)
            return

def main():

    os.makedirs(output_path, exist_ok=True)

    if(os.path.exists(output_path)==False):
        os.makedirs(output_path)

    moves = glob.glob("{}/*.mp4".format(input_path))
    for move in moves:
        basename = os.path.basename(move) 
        class_name = basename.split('_')[0]


        cap = cv2.VideoCapture(move)
        width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
        height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
        frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
        print("width:{} height:{} frames:{}".format(width, height, frame_count))
        interval = int(frame_count / max)

        counter = 0
        while True:
            counter += 1    
            # カメラ画像取得
            _, frame = cap.read()
            if(frame is None):
                break
            if(counter%interval != 0):
                continue
            # 縮小
            frame = cv2.resize(frame, (int(width/2), int(height/2)))

            # 矩形検出
            rect = detect_rectangle(frame)
            if(rect != None):
                x, y, w, h = rect
                # 切り取り
                save_img = frame[y: y+h, x: x+w]
                # 透過保存
                img = create_transparent_image(save_img)
                save_image(class_name, img)
                # 表示
                frame = cv2.rectangle(frame, (x, y), (x+w, y+h),(0,255,0),2)

            # 画像表示
            cv2.imshow('frame', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    cap.release()
    cv2.destroyAllWindows()

if __name__ == '__main__':
    main()

3 変形と合成

背景が透過となった画像を背景画像に合成して、データセットの画像を生成しますが、この時、商品画像は、サイズ変更や、回転などが行われています。

生成されたデータセット用の画像は、以下のような感じになります。(バウンディングボックスの描画は、確認用です、実際のデータセットには表示されていません)

変形と合成を行っているコードです。

"""
変換と合成によりGround Truth形式のデータセットを作成する
"""
import json
import glob
import random
import os
import shutil
import math
import numpy as np
import cv2
from PIL import Image

MAX = 6000 # 生成する画像数

CLASS_NAME=["ChipStar","Curry","Pringles","SeaFood","Butamen"]
COLORS = [(0,0,175),(175,0,0),(0,175,0),(175,0,175),(0,175,175)]

BACKGROUND_IMAGE_PATH = "./dataset/background_images"
TARGET_IMAGE_PATH = "./dataset/output_png"
OUTPUT_PATH = "./dataset/output_ground_truth"

S3Bucket = "s3://ground_truth_dataset"
manifestFile = "output.manifest"


BASE_WIDTH = 200 # 商品の基本サイズは、背景画像とのバランスより、横幅を200を基準とする
BACK_WIDTH = 640
BACK_HEIGHT = 480

# 背景画像取得クラス
class Background:
    def __init__(self, backPath):
        self.__backPath = backPath

    def get(self):
        imagePath = random.choice(glob.glob(self.__backPath + '/*.jpg'))
        return cv2.imread(imagePath, cv2.IMREAD_UNCHANGED) 

# 検出対象取得クラス (base_widthで指定された横幅を基準にリサイズされる)
class Target:
    def __init__(self, target_path, base_width, class_name):
        self.__target_path = target_path
        self.__base_width = base_width
        self.__class_name = class_name

    def get(self, class_id):
        # 商品画像
        class_name = self.__class_name[class_id]
        image_path = random.choice(glob.glob(self.__target_path + '/' + class_name + '/*.png'))
        target_image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED) 

        # 基準(横)サイズに基づきリサイズ
        h, w, _ = target_image.shape
        aspect = h/w
        target_image = cv2.resize(target_image, (int(self.__base_width * aspect), self.__base_width))

        # ランダムに回転させて取り出す
        mode = random.randint(0, 3)
        if(mode == 0):
            target_image = cv2.rotate(target_image, cv2.ROTATE_90_CLOCKWISE)
        elif(mode == 1):
            target_image = cv2.rotate(target_image, cv2.ROTATE_90_COUNTERCLOCKWISE)
        elif(mode == 2):
            target_image = cv2.rotate(target_image, cv2.ROTATE_180)
        return target_image

# 変換クラス
class Transformer():
    def __init__(self, width, height):
        self.__width = width
        self.__height = height
        self.__min_scale = 0.3
        self.__max_scale = 1

    def warp(self, target_image):
        # サイズ変更
        target_image = self.__resize(target_image)

        # ローテーション
        mode = random.randint(0, 3)
        if(mode == 0):
            target_image = self.__rote(target_image, random.uniform(0, 30))
        elif(mode == 1):
            target_image = self.__rote(target_image, random.uniform(320, 360))

        # 配置位置決定
        h, w, _ = target_image.shape
        left =  random.randint(0, self.__width - w)
        top = random.randint(0, self.__height - h)
        rect = ((left, top), (left + w, top + h))

        # 背景面との合成
        new_image = self.__synthesize(target_image, left, top)
        return (new_image, rect)

    def __resize(self, img):
        scale = random.uniform(self.__min_scale, self.__max_scale)
        w, h, _ = img.shape
        return cv2.resize(img, (int(w * scale), int(h * scale)))

    def __rote(self, target_image, angle):
        h, w, _ = target_image.shape
        rate = h/w
        scale = 1
        if( rate < 0.9 or 1.1 < rate):
            scale = 0.9
        elif( rate < 0.8 or 1.2 < rate):
            scale = 0.6
        center = (int(w/2), int(h/2))
        trans = cv2.getRotationMatrix2D(center, angle , scale)
        return cv2.warpAffine(target_image, trans, (w,h))

    def __synthesize(self, target_image, left, top):
        background_image = np.zeros((self.__height, self.__width, 4), np.uint8)
        back_pil = Image.fromarray(background_image)
        front_pil = Image.fromarray(target_image)
        back_pil.paste(front_pil, (left, top), front_pil)
        return np.array(back_pil)

class Effecter():

    # Gauss
    def gauss(self, img, level):
        return cv2.blur(img, (level * 2 + 1, level * 2 + 1))

    # Noise
    def noise(self, img):
        img = img.astype('float64')
        img[:,:,0] = self.__single_channel_noise(img[:,:,0])
        img[:,:,1] = self.__single_channel_noise(img[:,:,1])
        img[:,:,2] = self.__single_channel_noise(img[:,:,2])
        return img.astype('uint8')

    def __single_channel_noise(self, single):
        diff = 255 - single.max()
        noise = np.random.normal(0, random.randint(1, 100), single.shape)
        noise = (noise - noise.min())/(noise.max()-noise.min())
        noise= diff*noise
        noise= noise.astype(np.uint8)
        dst = single + noise
        return dst

# バウンディングボックス描画
def box(frame, rect, class_id):
    ((x1,y1),(x2,y2)) = rect
    label = "{}".format(CLASS_NAME[class_id])
    img = cv2.rectangle(frame,(x1, y1), (x2, y2), COLORS[class_id],2)
    img = cv2.rectangle(img,(x1, y1), (x1 + 150,y1-20), COLORS[class_id], -1)
    cv2.putText(img,label,(x1+2, y1-2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
    return img

# 背景と商品の合成
def marge_image(background_image, front_image):
    back_pil = Image.fromarray(background_image)
    front_pil = Image.fromarray(front_image)
    back_pil.paste(front_pil, (0, 0), front_pil)
    return np.array(back_pil)

# Manifest生成クラス
class Manifest:
    def __init__(self, class_name):
        self.__lines = ''
        self.__class_map={}
        for i in range(len(class_name)):
            self.__class_map[str(i)] = class_name[i]

    def appned(self, fileName, data, height, width):

        date = "0000-00-00T00:00:00.000000"
        line = {
            "source-ref": "{}/{}".format(S3Bucket, fileName),
            "boxlabel": {
                "image_size": [
                    {
                        "width": width,
                        "height": height,
                        "depth": 3
                    }
                ],
                "annotations": []
            },
            "boxlabel-metadata": {
                "job-name": "xxxxxxx",
                "class-map": self.__class_map,
                "human-annotated": "yes",
                "objects": {
                    "confidence": 1
                },
                "creation-date": date,
                "type": "groundtruth/object-detection"
            }
        }
        for i in range(data.max()):
            (_, rect, class_id) = data.get(i)
            ((x1,y1),(x2,y2)) = rect
            line["boxlabel"]["annotations"].append({
                "class_id": class_id,
                "width": x2 - x1,
                "top": y1,
                "height": y2 - y1,
                "left": x1
            })
        self.__lines += json.dumps(line) + '\n'

    def get(self):
        return self.__lines

# 1画像分のデータを保持するクラス
class Data:
    def __init__(self, rate):
        self.__rects = []
        self.__images = []
        self.__class_ids = []
        self.__rate = rate

    def get_class_ids(self):
        return self.__class_ids

    def max(self):
        return len(self.__rects)

    def get(self, i):
        return (self.__images[i], self.__rects[i], self.__class_ids[i])

    # 追加(重複率が指定値以上の場合は失敗する)
    def append(self, target_image, rect, class_id):
        conflict = False
        for i in range(len(self.__rects)):
            iou = self.__multiplicity(self.__rects[i], rect)
            if(iou > self.__rate):
                conflict = True
                break
        if(conflict == False):  
            self.__rects.append(rect)
            self.__images.append(target_image)
            self.__class_ids.append(class_id)
            return True
        return False

    # 重複率
    def __multiplicity(self, a, b):
        (ax_mn, ay_mn) = a[0]
        (ax_mx, ay_mx) = a[1]
        (bx_mn, by_mn) = b[0]
        (bx_mx, by_mx) = b[1]
        a_area = (ax_mx - ax_mn + 1) * (ay_mx - ay_mn + 1)
        b_area = (bx_mx - bx_mn + 1) * (by_mx - by_mn + 1)
        abx_mn = max(ax_mn, bx_mn)
        aby_mn = max(ay_mn, by_mn)
        abx_mx = min(ax_mx, bx_mx)
        aby_mx = min(ay_mx, by_mx)
        w = max(0, abx_mx - abx_mn + 1)
        h = max(0, aby_mx - aby_mn + 1)
        intersect = w*h
        return intersect / (a_area + b_area - intersect)

# 各クラスのデータ数が同一になるようにカウントする
class Counter():
    def __init__(self, max):
        self.__counter = np.zeros(max)

    def get(self):
        n = np.argmin(self.__counter)
        return int(n)

    def inc(self, index):
        self.__counter[index]+= 1

    def print(self):
        print(self.__counter)

def main():

    # 出力先の初期化
    if os.path.exists(OUTPUT_PATH):
        shutil.rmtree(OUTPUT_PATH)
    os.mkdir(OUTPUT_PATH)

    target = Target(TARGET_IMAGE_PATH, BASE_WIDTH, CLASS_NAME)
    background = Background(BACKGROUND_IMAGE_PATH)

    transformer = Transformer(BACK_WIDTH, BACK_HEIGHT)
    manifest = Manifest(CLASS_NAME)
    counter = Counter(len(CLASS_NAME))
    effecter = Effecter()

    no = 0

    while(True):
        # 背景画像の取得
        background_image = background.get()

        # 商品データ
        data = Data(0.1)
        for _ in range(20):
            # 現時点で作成数の少ないクラスIDを取得
            class_id = counter.get()
            # 商品画像の取得
            target_image = target.get(class_id)
            # 変換
            (transform_image, rect) = transformer.warp(target_image)
            frame = marge_image(background_image, transform_image)
            # 商品の追加(重複した場合は、失敗する)
            ret = data.append(transform_image, rect, class_id)
            if(ret):
                counter.inc(class_id)

        print("max:{}".format(data.max()))
        frame = background_image
        for index in range(data.max()):
            (target_image, _, _) = data.get(index)
            # 合成
            frame = marge_image(frame, target_image)

        # アルファチャンネル削除
        frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)

        # エフェクト
        frame = effecter.gauss(frame, random.randint(0, 2))
        frame = effecter.noise(frame)

        # 画像名
        fileName = "{:05d}.png".format(no)
        no+=1

        # 画像保存            
        cv2.imwrite("{}/{}".format(OUTPUT_PATH, fileName), frame)
        # manifest追加
        manifest.appned(fileName, data, frame.shape[0], frame.shape[1])

        for i in range(data.max()):
            (_, rect, class_id) = data.get(i)
            # バウンディングボックス描画(確認用)
            frame = box(frame, rect, class_id)

        counter.print()
        print("no:{}".format(no))
        if(MAX <= no):
            break

        # 表示(確認用)
        cv2.imshow("frame", frame)
        cv2.waitKey(1)

    # manifest 保存
    with open('{}/{}'.format(OUTPUT_PATH, manifestFile), 'w') as f:
        f.write(manifest.get())

main()

4 モデル

モデルは、SageMakerのビルトインアルゴリズム(物体検出)で作成されています。

5 最後に

精度の高いモデルを作成するには、多数のデータセットが必要ですが、 クロマキーでのデータセット作成は、場面によっては強力だと思います。

今回生成された6000枚の画像には、各商品が約9050件づつ写っているので、45,250(9050 * 5)個のアノテーションが付与されていることになります。 商品の外接となる矩形をプログラム的に生成することで、比較的正確なアノテーション作業となりますし、また、データの偏りが生じないようする調整も、ある意味、容易かもしれません。