この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
CX事業本部の平内(SIN)です。
最近、クロマキーで背景を透過させて、背景と合成することで、大量のデータセットを作成できることを確認しました。
上記では、最終的にアノテーション(バウンディングボックス)となるように、対象物の外接矩形を切り取ってファイル化する作業がありました。
コインであれば、概ね「表」と「裏」だけなので、何回か繰り返せば、それなりにデータセットとして有効だったのですが、「手に持った商品」となると、その見え方は、かなりのバリエーションとなり、それを網羅するように切り出すことは、かなりの作業となってしまいます。
そこで、今回は、この対象物を切り出す(最終的には、アノテーションとなる)作業を、プログラム化して、動画からデータセットを作成する要領を確認してみました。
最終的に作ったモデルで、推論している様子です。様々な角度で、商品検出できていることが分かると思います。
2 商品画像
背景が透過処理された、商品画像を作成した要領は、以下の通りです。
最初に、背景を処理しやすいように、極力1色にして、回転台の上に商品を置いて、スマフォで撮影しています。
撮影した動画を、Filmora 9でクロマキー処理して、背景を黒に変えています。
背景が黒くなった動画から、商品部分を切り取って、背景を透過にして保存しています。この時点で、商品ごとの画像は、200枚となっています。
ここまでの処理を紹介している動画です。
最後に、画像を切り取って、透過処理するコードは、以下です。
# -*- coding: utf-8 -*-
import shutil
import glob
import os
import cv2
import numpy as np
max = 200 # 1個の動画から生成する画像数
input_path = "./dataset/mp4"
output_path = "./dataset/output_png"
# 矩形検出
def detect_rectangle(img):
# グレースケール
gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 背景の多少のノイズは削除する
_, gray_img = cv2.threshold(gray_img, 50, 255, cv2.THRESH_BINARY)
# 輪郭検出
contours, _ = cv2.findContours(gray_img, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
rect = None
for contour in contours:
# ある程度の面積が有るものだけを対象にする
area = cv2.contourArea(contour, False);
if area < 1000:
continue
# 輪郭を直線近似する
epsilon = 0.1 * cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, epsilon, True)
# 最大サイズの矩形を取得する
x, y, w, h = cv2.boundingRect(contour)
if(rect != None):
if(w * h < rect[2] * rect[3]):
continue
rect = [x, y, w, h]
return rect
# 透過イメージ保存
def create_transparent_image(img):
# RGBを分離
ch_b, ch_g, ch_r = cv2.split(img[:,:,:3])
# アルファチャンネル生成
h, w, _ = img.shape
ch_a = np.zeros((h, w) ,dtype = 'uint8')
ch_a += 255
# 各チャンネルを結合
rgba_img = cv2.merge((ch_b, ch_g, ch_r, ch_a))
# マスク
color_lower = np.array([0, 0, 0, 255])
# color_upper = np.array([80, 80, 80, 255])
color_upper = np.array([40, 40, 40, 255])
mask = cv2.inRange(rgba_img, color_lower, color_upper)
return cv2.bitwise_not(rgba_img, rgba_img, mask=mask)
def save_image(class_name, img):
path = "{}/{}".format(output_path, class_name)
if os.path.exists(path) == False:
os.makedirs(path)
for i in range(1000):
filename = "{}/{}.png".format(path, i)
if os.path.exists(filename) == False:
cv2.imwrite(filename, img)
print(filename)
return
def main():
os.makedirs(output_path, exist_ok=True)
if(os.path.exists(output_path)==False):
os.makedirs(output_path)
moves = glob.glob("{}/*.mp4".format(input_path))
for move in moves:
basename = os.path.basename(move)
class_name = basename.split('_')[0]
cap = cv2.VideoCapture(move)
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
print("width:{} height:{} frames:{}".format(width, height, frame_count))
interval = int(frame_count / max)
counter = 0
while True:
counter += 1
# カメラ画像取得
_, frame = cap.read()
if(frame is None):
break
if(counter%interval != 0):
continue
# 縮小
frame = cv2.resize(frame, (int(width/2), int(height/2)))
# 矩形検出
rect = detect_rectangle(frame)
if(rect != None):
x, y, w, h = rect
# 切り取り
save_img = frame[y: y+h, x: x+w]
# 透過保存
img = create_transparent_image(save_img)
save_image(class_name, img)
# 表示
frame = cv2.rectangle(frame, (x, y), (x+w, y+h),(0,255,0),2)
# 画像表示
cv2.imshow('frame', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
main()
3 変形と合成
背景が透過となった画像を背景画像に合成して、データセットの画像を生成しますが、この時、商品画像は、サイズ変更や、回転などが行われています。
生成されたデータセット用の画像は、以下のような感じになります。(バウンディングボックスの描画は、確認用です、実際のデータセットには表示されていません)
変形と合成を行っているコードです。
"""
変換と合成によりGround Truth形式のデータセットを作成する
"""
import json
import glob
import random
import os
import shutil
import math
import numpy as np
import cv2
from PIL import Image
MAX = 6000 # 生成する画像数
CLASS_NAME=["ChipStar","Curry","Pringles","SeaFood","Butamen"]
COLORS = [(0,0,175),(175,0,0),(0,175,0),(175,0,175),(0,175,175)]
BACKGROUND_IMAGE_PATH = "./dataset/background_images"
TARGET_IMAGE_PATH = "./dataset/output_png"
OUTPUT_PATH = "./dataset/output_ground_truth"
S3Bucket = "s3://ground_truth_dataset"
manifestFile = "output.manifest"
BASE_WIDTH = 200 # 商品の基本サイズは、背景画像とのバランスより、横幅を200を基準とする
BACK_WIDTH = 640
BACK_HEIGHT = 480
# 背景画像取得クラス
class Background:
def __init__(self, backPath):
self.__backPath = backPath
def get(self):
imagePath = random.choice(glob.glob(self.__backPath + '/*.jpg'))
return cv2.imread(imagePath, cv2.IMREAD_UNCHANGED)
# 検出対象取得クラス (base_widthで指定された横幅を基準にリサイズされる)
class Target:
def __init__(self, target_path, base_width, class_name):
self.__target_path = target_path
self.__base_width = base_width
self.__class_name = class_name
def get(self, class_id):
# 商品画像
class_name = self.__class_name[class_id]
image_path = random.choice(glob.glob(self.__target_path + '/' + class_name + '/*.png'))
target_image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED)
# 基準(横)サイズに基づきリサイズ
h, w, _ = target_image.shape
aspect = h/w
target_image = cv2.resize(target_image, (int(self.__base_width * aspect), self.__base_width))
# ランダムに回転させて取り出す
mode = random.randint(0, 3)
if(mode == 0):
target_image = cv2.rotate(target_image, cv2.ROTATE_90_CLOCKWISE)
elif(mode == 1):
target_image = cv2.rotate(target_image, cv2.ROTATE_90_COUNTERCLOCKWISE)
elif(mode == 2):
target_image = cv2.rotate(target_image, cv2.ROTATE_180)
return target_image
# 変換クラス
class Transformer():
def __init__(self, width, height):
self.__width = width
self.__height = height
self.__min_scale = 0.3
self.__max_scale = 1
def warp(self, target_image):
# サイズ変更
target_image = self.__resize(target_image)
# ローテーション
mode = random.randint(0, 3)
if(mode == 0):
target_image = self.__rote(target_image, random.uniform(0, 30))
elif(mode == 1):
target_image = self.__rote(target_image, random.uniform(320, 360))
# 配置位置決定
h, w, _ = target_image.shape
left = random.randint(0, self.__width - w)
top = random.randint(0, self.__height - h)
rect = ((left, top), (left + w, top + h))
# 背景面との合成
new_image = self.__synthesize(target_image, left, top)
return (new_image, rect)
def __resize(self, img):
scale = random.uniform(self.__min_scale, self.__max_scale)
w, h, _ = img.shape
return cv2.resize(img, (int(w * scale), int(h * scale)))
def __rote(self, target_image, angle):
h, w, _ = target_image.shape
rate = h/w
scale = 1
if( rate < 0.9 or 1.1 < rate):
scale = 0.9
elif( rate < 0.8 or 1.2 < rate):
scale = 0.6
center = (int(w/2), int(h/2))
trans = cv2.getRotationMatrix2D(center, angle , scale)
return cv2.warpAffine(target_image, trans, (w,h))
def __synthesize(self, target_image, left, top):
background_image = np.zeros((self.__height, self.__width, 4), np.uint8)
back_pil = Image.fromarray(background_image)
front_pil = Image.fromarray(target_image)
back_pil.paste(front_pil, (left, top), front_pil)
return np.array(back_pil)
class Effecter():
# Gauss
def gauss(self, img, level):
return cv2.blur(img, (level * 2 + 1, level * 2 + 1))
# Noise
def noise(self, img):
img = img.astype('float64')
img[:,:,0] = self.__single_channel_noise(img[:,:,0])
img[:,:,1] = self.__single_channel_noise(img[:,:,1])
img[:,:,2] = self.__single_channel_noise(img[:,:,2])
return img.astype('uint8')
def __single_channel_noise(self, single):
diff = 255 - single.max()
noise = np.random.normal(0, random.randint(1, 100), single.shape)
noise = (noise - noise.min())/(noise.max()-noise.min())
noise= diff*noise
noise= noise.astype(np.uint8)
dst = single + noise
return dst
# バウンディングボックス描画
def box(frame, rect, class_id):
((x1,y1),(x2,y2)) = rect
label = "{}".format(CLASS_NAME[class_id])
img = cv2.rectangle(frame,(x1, y1), (x2, y2), COLORS[class_id],2)
img = cv2.rectangle(img,(x1, y1), (x1 + 150,y1-20), COLORS[class_id], -1)
cv2.putText(img,label,(x1+2, y1-2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
return img
# 背景と商品の合成
def marge_image(background_image, front_image):
back_pil = Image.fromarray(background_image)
front_pil = Image.fromarray(front_image)
back_pil.paste(front_pil, (0, 0), front_pil)
return np.array(back_pil)
# Manifest生成クラス
class Manifest:
def __init__(self, class_name):
self.__lines = ''
self.__class_map={}
for i in range(len(class_name)):
self.__class_map[str(i)] = class_name[i]
def appned(self, fileName, data, height, width):
date = "0000-00-00T00:00:00.000000"
line = {
"source-ref": "{}/{}".format(S3Bucket, fileName),
"boxlabel": {
"image_size": [
{
"width": width,
"height": height,
"depth": 3
}
],
"annotations": []
},
"boxlabel-metadata": {
"job-name": "xxxxxxx",
"class-map": self.__class_map,
"human-annotated": "yes",
"objects": {
"confidence": 1
},
"creation-date": date,
"type": "groundtruth/object-detection"
}
}
for i in range(data.max()):
(_, rect, class_id) = data.get(i)
((x1,y1),(x2,y2)) = rect
line["boxlabel"]["annotations"].append({
"class_id": class_id,
"width": x2 - x1,
"top": y1,
"height": y2 - y1,
"left": x1
})
self.__lines += json.dumps(line) + '\n'
def get(self):
return self.__lines
# 1画像分のデータを保持するクラス
class Data:
def __init__(self, rate):
self.__rects = []
self.__images = []
self.__class_ids = []
self.__rate = rate
def get_class_ids(self):
return self.__class_ids
def max(self):
return len(self.__rects)
def get(self, i):
return (self.__images[i], self.__rects[i], self.__class_ids[i])
# 追加(重複率が指定値以上の場合は失敗する)
def append(self, target_image, rect, class_id):
conflict = False
for i in range(len(self.__rects)):
iou = self.__multiplicity(self.__rects[i], rect)
if(iou > self.__rate):
conflict = True
break
if(conflict == False):
self.__rects.append(rect)
self.__images.append(target_image)
self.__class_ids.append(class_id)
return True
return False
# 重複率
def __multiplicity(self, a, b):
(ax_mn, ay_mn) = a[0]
(ax_mx, ay_mx) = a[1]
(bx_mn, by_mn) = b[0]
(bx_mx, by_mx) = b[1]
a_area = (ax_mx - ax_mn + 1) * (ay_mx - ay_mn + 1)
b_area = (bx_mx - bx_mn + 1) * (by_mx - by_mn + 1)
abx_mn = max(ax_mn, bx_mn)
aby_mn = max(ay_mn, by_mn)
abx_mx = min(ax_mx, bx_mx)
aby_mx = min(ay_mx, by_mx)
w = max(0, abx_mx - abx_mn + 1)
h = max(0, aby_mx - aby_mn + 1)
intersect = w*h
return intersect / (a_area + b_area - intersect)
# 各クラスのデータ数が同一になるようにカウントする
class Counter():
def __init__(self, max):
self.__counter = np.zeros(max)
def get(self):
n = np.argmin(self.__counter)
return int(n)
def inc(self, index):
self.__counter[index]+= 1
def print(self):
print(self.__counter)
def main():
# 出力先の初期化
if os.path.exists(OUTPUT_PATH):
shutil.rmtree(OUTPUT_PATH)
os.mkdir(OUTPUT_PATH)
target = Target(TARGET_IMAGE_PATH, BASE_WIDTH, CLASS_NAME)
background = Background(BACKGROUND_IMAGE_PATH)
transformer = Transformer(BACK_WIDTH, BACK_HEIGHT)
manifest = Manifest(CLASS_NAME)
counter = Counter(len(CLASS_NAME))
effecter = Effecter()
no = 0
while(True):
# 背景画像の取得
background_image = background.get()
# 商品データ
data = Data(0.1)
for _ in range(20):
# 現時点で作成数の少ないクラスIDを取得
class_id = counter.get()
# 商品画像の取得
target_image = target.get(class_id)
# 変換
(transform_image, rect) = transformer.warp(target_image)
frame = marge_image(background_image, transform_image)
# 商品の追加(重複した場合は、失敗する)
ret = data.append(transform_image, rect, class_id)
if(ret):
counter.inc(class_id)
print("max:{}".format(data.max()))
frame = background_image
for index in range(data.max()):
(target_image, _, _) = data.get(index)
# 合成
frame = marge_image(frame, target_image)
# アルファチャンネル削除
frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)
# エフェクト
frame = effecter.gauss(frame, random.randint(0, 2))
frame = effecter.noise(frame)
# 画像名
fileName = "{:05d}.png".format(no)
no+=1
# 画像保存
cv2.imwrite("{}/{}".format(OUTPUT_PATH, fileName), frame)
# manifest追加
manifest.appned(fileName, data, frame.shape[0], frame.shape[1])
for i in range(data.max()):
(_, rect, class_id) = data.get(i)
# バウンディングボックス描画(確認用)
frame = box(frame, rect, class_id)
counter.print()
print("no:{}".format(no))
if(MAX <= no):
break
# 表示(確認用)
cv2.imshow("frame", frame)
cv2.waitKey(1)
# manifest 保存
with open('{}/{}'.format(OUTPUT_PATH, manifestFile), 'w') as f:
f.write(manifest.get())
main()
4 モデル
モデルは、SageMakerのビルトインアルゴリズム(物体検出)で作成されています。
5 最後に
精度の高いモデルを作成するには、多数のデータセットが必要ですが、 クロマキーでのデータセット作成は、場面によっては強力だと思います。
今回生成された6000枚の画像には、各商品が約9050件づつ写っているので、45,250(9050 * 5)個のアノテーションが付与されていることになります。 商品の外接となる矩形をプログラム的に生成することで、比較的正確なアノテーション作業となりますし、また、データの偏りが生じないようする調整も、ある意味、容易かもしれません。