[Amazon SageMaker] 手に持った商品を検出してみました
1 はじめに
CX事業本部の平内(SIN)です。
最近、クロマキーで背景を透過させて、背景と合成することで、大量のデータセットを作成できることを確認しました。
上記では、最終的にアノテーション(バウンディングボックス)となるように、対象物の外接矩形を切り取ってファイル化する作業がありました。
コインであれば、概ね「表」と「裏」だけなので、何回か繰り返せば、それなりにデータセットとして有効だったのですが、「手に持った商品」となると、その見え方は、かなりのバリエーションとなり、それを網羅するように切り出すことは、かなりの作業となってしまいます。
そこで、今回は、この対象物を切り出す(最終的には、アノテーションとなる)作業を、プログラム化して、動画からデータセットを作成する要領を確認してみました。
最終的に作ったモデルで、推論している様子です。様々な角度で、商品検出できていることが分かると思います。
2 商品画像
背景が透過処理された、商品画像を作成した要領は、以下の通りです。
最初に、背景を処理しやすいように、極力1色にして、回転台の上に商品を置いて、スマフォで撮影しています。
撮影した動画を、Filmora 9でクロマキー処理して、背景を黒に変えています。
背景が黒くなった動画から、商品部分を切り取って、背景を透過にして保存しています。この時点で、商品ごとの画像は、200枚となっています。
ここまでの処理を紹介している動画です。
最後に、画像を切り取って、透過処理するコードは、以下です。
# -*- coding: utf-8 -*- import shutil import glob import os import cv2 import numpy as np max = 200 # 1個の動画から生成する画像数 input_path = "./dataset/mp4" output_path = "./dataset/output_png" # 矩形検出 def detect_rectangle(img): # グレースケール gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # 背景の多少のノイズは削除する _, gray_img = cv2.threshold(gray_img, 50, 255, cv2.THRESH_BINARY) # 輪郭検出 contours, _ = cv2.findContours(gray_img, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) rect = None for contour in contours: # ある程度の面積が有るものだけを対象にする area = cv2.contourArea(contour, False); if area < 1000: continue # 輪郭を直線近似する epsilon = 0.1 * cv2.arcLength(contour, True) approx = cv2.approxPolyDP(contour, epsilon, True) # 最大サイズの矩形を取得する x, y, w, h = cv2.boundingRect(contour) if(rect != None): if(w * h < rect[2] * rect[3]): continue rect = [x, y, w, h] return rect # 透過イメージ保存 def create_transparent_image(img): # RGBを分離 ch_b, ch_g, ch_r = cv2.split(img[:,:,:3]) # アルファチャンネル生成 h, w, _ = img.shape ch_a = np.zeros((h, w) ,dtype = 'uint8') ch_a += 255 # 各チャンネルを結合 rgba_img = cv2.merge((ch_b, ch_g, ch_r, ch_a)) # マスク color_lower = np.array([0, 0, 0, 255]) # color_upper = np.array([80, 80, 80, 255]) color_upper = np.array([40, 40, 40, 255]) mask = cv2.inRange(rgba_img, color_lower, color_upper) return cv2.bitwise_not(rgba_img, rgba_img, mask=mask) def save_image(class_name, img): path = "{}/{}".format(output_path, class_name) if os.path.exists(path) == False: os.makedirs(path) for i in range(1000): filename = "{}/{}.png".format(path, i) if os.path.exists(filename) == False: cv2.imwrite(filename, img) print(filename) return def main(): os.makedirs(output_path, exist_ok=True) if(os.path.exists(output_path)==False): os.makedirs(output_path) moves = glob.glob("{}/*.mp4".format(input_path)) for move in moves: basename = os.path.basename(move) class_name = basename.split('_')[0] cap = cv2.VideoCapture(move) width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) print("width:{} height:{} frames:{}".format(width, height, frame_count)) interval = int(frame_count / max) counter = 0 while True: counter += 1 # カメラ画像取得 _, frame = cap.read() if(frame is None): break if(counter%interval != 0): continue # 縮小 frame = cv2.resize(frame, (int(width/2), int(height/2))) # 矩形検出 rect = detect_rectangle(frame) if(rect != None): x, y, w, h = rect # 切り取り save_img = frame[y: y+h, x: x+w] # 透過保存 img = create_transparent_image(save_img) save_image(class_name, img) # 表示 frame = cv2.rectangle(frame, (x, y), (x+w, y+h),(0,255,0),2) # 画像表示 cv2.imshow('frame', frame) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() if __name__ == '__main__': main()
3 変形と合成
背景が透過となった画像を背景画像に合成して、データセットの画像を生成しますが、この時、商品画像は、サイズ変更や、回転などが行われています。
生成されたデータセット用の画像は、以下のような感じになります。(バウンディングボックスの描画は、確認用です、実際のデータセットには表示されていません)
変形と合成を行っているコードです。
""" 変換と合成によりGround Truth形式のデータセットを作成する """ import json import glob import random import os import shutil import math import numpy as np import cv2 from PIL import Image MAX = 6000 # 生成する画像数 CLASS_NAME=["ChipStar","Curry","Pringles","SeaFood","Butamen"] COLORS = [(0,0,175),(175,0,0),(0,175,0),(175,0,175),(0,175,175)] BACKGROUND_IMAGE_PATH = "./dataset/background_images" TARGET_IMAGE_PATH = "./dataset/output_png" OUTPUT_PATH = "./dataset/output_ground_truth" S3Bucket = "s3://ground_truth_dataset" manifestFile = "output.manifest" BASE_WIDTH = 200 # 商品の基本サイズは、背景画像とのバランスより、横幅を200を基準とする BACK_WIDTH = 640 BACK_HEIGHT = 480 # 背景画像取得クラス class Background: def __init__(self, backPath): self.__backPath = backPath def get(self): imagePath = random.choice(glob.glob(self.__backPath + '/*.jpg')) return cv2.imread(imagePath, cv2.IMREAD_UNCHANGED) # 検出対象取得クラス (base_widthで指定された横幅を基準にリサイズされる) class Target: def __init__(self, target_path, base_width, class_name): self.__target_path = target_path self.__base_width = base_width self.__class_name = class_name def get(self, class_id): # 商品画像 class_name = self.__class_name[class_id] image_path = random.choice(glob.glob(self.__target_path + '/' + class_name + '/*.png')) target_image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED) # 基準(横)サイズに基づきリサイズ h, w, _ = target_image.shape aspect = h/w target_image = cv2.resize(target_image, (int(self.__base_width * aspect), self.__base_width)) # ランダムに回転させて取り出す mode = random.randint(0, 3) if(mode == 0): target_image = cv2.rotate(target_image, cv2.ROTATE_90_CLOCKWISE) elif(mode == 1): target_image = cv2.rotate(target_image, cv2.ROTATE_90_COUNTERCLOCKWISE) elif(mode == 2): target_image = cv2.rotate(target_image, cv2.ROTATE_180) return target_image # 変換クラス class Transformer(): def __init__(self, width, height): self.__width = width self.__height = height self.__min_scale = 0.3 self.__max_scale = 1 def warp(self, target_image): # サイズ変更 target_image = self.__resize(target_image) # ローテーション mode = random.randint(0, 3) if(mode == 0): target_image = self.__rote(target_image, random.uniform(0, 30)) elif(mode == 1): target_image = self.__rote(target_image, random.uniform(320, 360)) # 配置位置決定 h, w, _ = target_image.shape left = random.randint(0, self.__width - w) top = random.randint(0, self.__height - h) rect = ((left, top), (left + w, top + h)) # 背景面との合成 new_image = self.__synthesize(target_image, left, top) return (new_image, rect) def __resize(self, img): scale = random.uniform(self.__min_scale, self.__max_scale) w, h, _ = img.shape return cv2.resize(img, (int(w * scale), int(h * scale))) def __rote(self, target_image, angle): h, w, _ = target_image.shape rate = h/w scale = 1 if( rate < 0.9 or 1.1 < rate): scale = 0.9 elif( rate < 0.8 or 1.2 < rate): scale = 0.6 center = (int(w/2), int(h/2)) trans = cv2.getRotationMatrix2D(center, angle , scale) return cv2.warpAffine(target_image, trans, (w,h)) def __synthesize(self, target_image, left, top): background_image = np.zeros((self.__height, self.__width, 4), np.uint8) back_pil = Image.fromarray(background_image) front_pil = Image.fromarray(target_image) back_pil.paste(front_pil, (left, top), front_pil) return np.array(back_pil) class Effecter(): # Gauss def gauss(self, img, level): return cv2.blur(img, (level * 2 + 1, level * 2 + 1)) # Noise def noise(self, img): img = img.astype('float64') img[:,:,0] = self.__single_channel_noise(img[:,:,0]) img[:,:,1] = self.__single_channel_noise(img[:,:,1]) img[:,:,2] = self.__single_channel_noise(img[:,:,2]) return img.astype('uint8') def __single_channel_noise(self, single): diff = 255 - single.max() noise = np.random.normal(0, random.randint(1, 100), single.shape) noise = (noise - noise.min())/(noise.max()-noise.min()) noise= diff*noise noise= noise.astype(np.uint8) dst = single + noise return dst # バウンディングボックス描画 def box(frame, rect, class_id): ((x1,y1),(x2,y2)) = rect label = "{}".format(CLASS_NAME[class_id]) img = cv2.rectangle(frame,(x1, y1), (x2, y2), COLORS[class_id],2) img = cv2.rectangle(img,(x1, y1), (x1 + 150,y1-20), COLORS[class_id], -1) cv2.putText(img,label,(x1+2, y1-2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA) return img # 背景と商品の合成 def marge_image(background_image, front_image): back_pil = Image.fromarray(background_image) front_pil = Image.fromarray(front_image) back_pil.paste(front_pil, (0, 0), front_pil) return np.array(back_pil) # Manifest生成クラス class Manifest: def __init__(self, class_name): self.__lines = '' self.__class_map={} for i in range(len(class_name)): self.__class_map[str(i)] = class_name[i] def appned(self, fileName, data, height, width): date = "0000-00-00T00:00:00.000000" line = { "source-ref": "{}/{}".format(S3Bucket, fileName), "boxlabel": { "image_size": [ { "width": width, "height": height, "depth": 3 } ], "annotations": [] }, "boxlabel-metadata": { "job-name": "xxxxxxx", "class-map": self.__class_map, "human-annotated": "yes", "objects": { "confidence": 1 }, "creation-date": date, "type": "groundtruth/object-detection" } } for i in range(data.max()): (_, rect, class_id) = data.get(i) ((x1,y1),(x2,y2)) = rect line["boxlabel"]["annotations"].append({ "class_id": class_id, "width": x2 - x1, "top": y1, "height": y2 - y1, "left": x1 }) self.__lines += json.dumps(line) + '\n' def get(self): return self.__lines # 1画像分のデータを保持するクラス class Data: def __init__(self, rate): self.__rects = [] self.__images = [] self.__class_ids = [] self.__rate = rate def get_class_ids(self): return self.__class_ids def max(self): return len(self.__rects) def get(self, i): return (self.__images[i], self.__rects[i], self.__class_ids[i]) # 追加(重複率が指定値以上の場合は失敗する) def append(self, target_image, rect, class_id): conflict = False for i in range(len(self.__rects)): iou = self.__multiplicity(self.__rects[i], rect) if(iou > self.__rate): conflict = True break if(conflict == False): self.__rects.append(rect) self.__images.append(target_image) self.__class_ids.append(class_id) return True return False # 重複率 def __multiplicity(self, a, b): (ax_mn, ay_mn) = a[0] (ax_mx, ay_mx) = a[1] (bx_mn, by_mn) = b[0] (bx_mx, by_mx) = b[1] a_area = (ax_mx - ax_mn + 1) * (ay_mx - ay_mn + 1) b_area = (bx_mx - bx_mn + 1) * (by_mx - by_mn + 1) abx_mn = max(ax_mn, bx_mn) aby_mn = max(ay_mn, by_mn) abx_mx = min(ax_mx, bx_mx) aby_mx = min(ay_mx, by_mx) w = max(0, abx_mx - abx_mn + 1) h = max(0, aby_mx - aby_mn + 1) intersect = w*h return intersect / (a_area + b_area - intersect) # 各クラスのデータ数が同一になるようにカウントする class Counter(): def __init__(self, max): self.__counter = np.zeros(max) def get(self): n = np.argmin(self.__counter) return int(n) def inc(self, index): self.__counter[index]+= 1 def print(self): print(self.__counter) def main(): # 出力先の初期化 if os.path.exists(OUTPUT_PATH): shutil.rmtree(OUTPUT_PATH) os.mkdir(OUTPUT_PATH) target = Target(TARGET_IMAGE_PATH, BASE_WIDTH, CLASS_NAME) background = Background(BACKGROUND_IMAGE_PATH) transformer = Transformer(BACK_WIDTH, BACK_HEIGHT) manifest = Manifest(CLASS_NAME) counter = Counter(len(CLASS_NAME)) effecter = Effecter() no = 0 while(True): # 背景画像の取得 background_image = background.get() # 商品データ data = Data(0.1) for _ in range(20): # 現時点で作成数の少ないクラスIDを取得 class_id = counter.get() # 商品画像の取得 target_image = target.get(class_id) # 変換 (transform_image, rect) = transformer.warp(target_image) frame = marge_image(background_image, transform_image) # 商品の追加(重複した場合は、失敗する) ret = data.append(transform_image, rect, class_id) if(ret): counter.inc(class_id) print("max:{}".format(data.max())) frame = background_image for index in range(data.max()): (target_image, _, _) = data.get(index) # 合成 frame = marge_image(frame, target_image) # アルファチャンネル削除 frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR) # エフェクト frame = effecter.gauss(frame, random.randint(0, 2)) frame = effecter.noise(frame) # 画像名 fileName = "{:05d}.png".format(no) no+=1 # 画像保存 cv2.imwrite("{}/{}".format(OUTPUT_PATH, fileName), frame) # manifest追加 manifest.appned(fileName, data, frame.shape[0], frame.shape[1]) for i in range(data.max()): (_, rect, class_id) = data.get(i) # バウンディングボックス描画(確認用) frame = box(frame, rect, class_id) counter.print() print("no:{}".format(no)) if(MAX <= no): break # 表示(確認用) cv2.imshow("frame", frame) cv2.waitKey(1) # manifest 保存 with open('{}/{}'.format(OUTPUT_PATH, manifestFile), 'w') as f: f.write(manifest.get()) main()
4 モデル
モデルは、SageMakerのビルトインアルゴリズム(物体検出)で作成されています。
5 最後に
精度の高いモデルを作成するには、多数のデータセットが必要ですが、 クロマキーでのデータセット作成は、場面によっては強力だと思います。
今回生成された6000枚の画像には、各商品が約9050件づつ写っているので、45,250(9050 * 5)個のアノテーションが付与されていることになります。 商品の外接となる矩形をプログラム的に生成することで、比較的正確なアノテーション作業となりますし、また、データの偏りが生じないようする調整も、ある意味、容易かもしれません。