[Amazon SageMaker] クロマキー合成とアフィン変換で生成したデータセットで硬貨を検出してみました

2021.02.28

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX事業本部の平内(SIN)です。

精度の高い物体検出モデルを作成するためには、データセットが重要でが、物体検出モデルでは、画像だけでなく、アノテーション作業も必要となるため、大量に質の高いデータを用意するのは、結構課題のある話だと思います。

下記では、商品画像の背景を透過処理して、アフィン変換しながら背景と合成することでデータセット作成しました。

上では、背景透過処理は、手動で行いましたが、これも大変だということで、今回は、背景をグリーンバックで撮影し、クロマキー処理で、背景透過も自動化してみました。

最初に、今回の作業で、作成されたモデルで推論している様子です。クラスは、1円、5円、10円、50円、100円の5つです。

用意した画像は、各硬貨ごと表10枚、裏10枚の100枚で、そこから生成したデータセットは、画像が3,000枚、アノテーションが約69,000個となってます。

2 元画像から切り出し

元画像は、グリーンの布上に硬貨を並べて撮影しています。

撮影した画像をプレビューで開いて、1個づつ選択して、Comand+C Command+Nで切り出していきます。この切り出しが、そのままアノテーション情報になるので、比較的に丁寧に行ったほうが良いと思います。

切り出したファイルは、Command+S でそれぞれのファルダに連番で保存しました。

3 クロマキー処理

上段が、元画像で、下がクロマキーで背景を透過したものです。

コードは、以下のようになっています。

元画像をアルファチャンネル有りで読み込み、HSV色空有間に変換して、背景の色相だけカットしています。

ちなみに色相においてグリーンは、60°〜150°ぐらいですが、OpenCVでは、180度で指定するため、約1/2で指定しています。

※ 50〜79は、今回の画像で、綺麗に背景が抜ける色相ということで、調整した結果です。

"""
クロマキー合成のための元画像の作成
背景が緑色の硬貨画像から、背景を透過した画像に変換する
"""
import os
import cv2
import glob
import numpy as np

dirs = [
	"1_back","1_front",
	"5_back","5_front",
	"10_back","10_front",
	"50_back","50_front",
	"100_back","100_front",
]

input = "./data/coins_org"
output = "./data/coins_transparent"

for dir in dirs:
	in_dir = "{}/{}".format(input, dir)
	out_dir = "{}/{}".format(output, dir)

	os.makedirs(out_dir, exist_ok=True)

	files = glob.glob("{}/*.png".format(in_dir))
	for file in files:
		print(file)
		# アルファチャンネル有り画像読込み
		img = cv2.imread(file, -1)
		# HSVに変換
		hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
		# クロマキーによるマスク生成
		mask = cv2.inRange(hsv, (50, 0, 0), (79, 255, 255))
		# マクス削除
		img = cv2.bitwise_not(img, img, mask=mask)
		# 画像保存
		basename = os.path.basename(file)
		cv2.imwrite("{}/{}".format(out_dir, basename), img)

4 アフィン変換と合成

背景を透過した画像をアフィン変換して、背景画像と合成しているようすです。

データ生成中には、アノテーション結果(バウンディングボックス)も確認のために表示していますが、データ画像には有りません。

以下が、処理しているコードです。生成されるデータセットは、Ground Truth形式となっています。

"""
アフィン変換とクロマキー合成によりGround Truth形式のデータセットを作成する

アフィン変換は、下記を参考にさせて頂きました。
https://github.com/aws-samples/smart-cooler/blob/master/_ml_model_package/synthetic-dataset.ipynb
"""

import json
import glob
import random
import os
import shutil
import math
import numpy as np
import cv2
from PIL import Image

# MAX = 3000 # 生成する画像数
MAX = 3000 # 生成する画像数

CLASS_NAME=["1en","5en","10en","50en","100en"]
COLORS = [(0,0,175),(175,0,0),(0,175,0),(175,175,0),(0,175,175)]
SIZE=[90, 90, 90 ,90, 90]

BACK_PATH = "./data/backgrounds"
PRODUCT_PATH = "./data/products"
OUTPUT_PATH = "./data/outputGroundTruth"
S3Bucket = "s3://ground_truth_dataset"
manifestFile = "output.manifest"

def euler_to_mat(yaw, pitch, roll):
	c, s = math.cos(yaw), math.sin(yaw)
	M = np.matrix([[  c, 0.,  s], [ 0., 1., 0.], [ -s, 0.,  c]])

	c, s = math.cos(pitch), math.sin(pitch)
	M = np.matrix([[ 1., 0., 0.], [ 0.,  c, -s], [ 0.,  s,  c]]) * M

	c, s = math.cos(roll), math.sin(roll)
	M = np.matrix([[  c, -s, 0.], [  s,  c, 0.], [ 0., 0., 1.]]) * M
	return M

def make_affine_transform(from_shape, to_shape, 
						min_scale, max_scale,
						scale_variation=1.0,
						rotation_variation=1.0,
						translation_variation=1.0):

	from_size = np.array([[from_shape[1], from_shape[0]]]).T
	to_size = np.array([[to_shape[1], to_shape[0]]]).T

	scale = random.uniform((min_scale + max_scale) * 0.5 -
							(max_scale - min_scale) * 0.5 * scale_variation,
							(min_scale + max_scale) * 0.5 +
							(max_scale - min_scale) * 0.5 * scale_variation)
	roll = random.uniform(-1.0, 1.0) * rotation_variation
	pitch = random.uniform(-0.15, 0.15) * rotation_variation
	yaw = random.uniform(-0.15, 0.15) * rotation_variation

	M = euler_to_mat(yaw, pitch, roll)[:2, :2]
	h = from_shape[0]
	w = from_shape[1]
	corners = np.matrix([[-w, +w, -w, +w],
							[-h, -h, +h, +h]]) * 0.5
	skewed_size = np.array(np.max(M * corners, axis=1) -
								np.min(M * corners, axis=1))

	scale *= np.min(to_size / skewed_size)

	trans = (np.random.random((2,1)) - 0.5) * translation_variation
	trans = ((2.0 * trans) ** 5.0) / 2.0
	trans = (to_size - skewed_size * scale) * trans

	center_to = to_size / 2.
	center_from = from_size / 2.

	M = euler_to_mat(yaw, pitch, roll)[:2, :2]
	M *= scale
	M = np.hstack([M, trans + center_to - M * center_from])
	return M

# アフィン変換
def transform(backImage, productImage, productSize):
	M = make_affine_transform(
						from_shape=productImage.shape,
						to_shape=backImage.shape,
						min_scale=0.8,
						max_scale=0.5,
						# rotation_variation=3.5,
						rotation_variation=2.5,
						scale_variation=0.5,
						# scale_variation=1.0,
						# translation_variation=0.98)
						translation_variation=1.0) # 1.0を超えられない
	object_topleft = tuple(M.dot(np.array((0, 0) + (1, ))).tolist()[0])
	object_topright = tuple(M.dot(np.array((productSize, 0) + (1,))).tolist()[0])
	object_bottomleft = tuple(M.dot(np.array((0,productSize) + (1,))).tolist()[0])
	object_bottomright = tuple(M.dot(np.array((productSize, productSize) + (1,))).tolist()[0])

	object_tups = (object_topleft, object_topright, object_bottomleft, object_bottomright)
	object_xmin = (min(object_tups, key=lambda item:item[0])[0])
	object_xmax = (max(object_tups, key=lambda item:item[0])[0]) 
	object_ymin = (min(object_tups, key=lambda item:item[1])[1])
	object_ymax = (max(object_tups, key=lambda item:item[1])[1])
	rect = ((int(object_xmin),int(object_ymin)),(int(object_xmax),int(object_ymax)))    

	productImage =  cv2.warpAffine(productImage, M, (backImage.shape[1], backImage.shape[0]))
	return productImage, rect


# 背景と商品の合成
def margeImage(backImg, productImg):
	# PIL形式で重ねる
	back_pil = Image.fromarray(backImg)
	product_pil = Image.fromarray(productImg)
	back_pil.paste(product_pil, (0, 0), product_pil)
	return np.array(back_pil)

# エフェクト(Gauss)
def addGauss(img, level):
	return cv2.blur(img, (level * 2 + 1, level * 2 + 1))

# エフェクト(Noise)
def addNoiseSingleChannel(single):
	diff = 255 - single.max()
	noise = np.random.normal(0, random.randint(1, 100), single.shape)
	noise = (noise - noise.min())/(noise.max()-noise.min())
	noise= diff*noise
	noise= noise.astype(np.uint8)
	dst = single + noise
	return dst

# エフェクト(Noise)
def addNoise(img):
	img = img.astype('float64')
	img[:,:,0] = addNoiseSingleChannel(img[:,:,0])
	img[:,:,1] = addNoiseSingleChannel(img[:,:,1])
	img[:,:,2] = addNoiseSingleChannel(img[:,:,2])
	return img.astype('uint8')

# バウンディングボックス描画
def box(frame, rect, class_id):
	((x1,y1),(x2,y2)) = rect
	label = "{}".format(CLASS_NAME[class_id])
	img = cv2.rectangle(frame,(x1, y1), (x2, y2), COLORS[class_id],2)
	img = cv2.rectangle(img,(x1, y1), (x1 + 150,y1-20), COLORS[class_id], -1)
	cv2.putText(img,label,(x1+2, y1-2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
	return img

# Manifest生成クラス
class Manifest:
	def __init__(self, class_name):
		self.__lines = ''
		self.__class_map={}
		for i in range(len(class_name)):
			self.__class_map[str(i)] = class_name[i]

	def appned(self, fileName, data, height, width):

		date = "0000-00-00T00:00:00.000000"
		line = {
			"source-ref": "{}/{}".format(S3Bucket, fileName),
			"boxlabel": {
				"image_size": [
					{
						"width": width,
						"height": height,
						"depth": 3
					}
				],
				"annotations": []
			},
			"boxlabel-metadata": {
				"job-name": "xxxxxxx",
				"class-map": self.__class_map,
				"human-annotated": "yes",
				"objects": {
					"confidence": 1
				},
				"creation-date": date,
				"type": "groundtruth/object-detection"
			}
		}
		for i in range(data.max()):
			(_, rect, class_id) = data.get(i)
			((x1,y1),(x2,y2)) = rect
			line["boxlabel"]["annotations"].append({
				"class_id": class_id,
				"width": x2 - x1,
				"top": y1,
				"height": y2 - y1,
				"left": x1
			})
		self.__lines += json.dumps(line) + '\n'

	def get(self):
		return self.__lines

# 背景画像生成クラス
class Backgrounds:
	def __init__(self, backPath):
		self.__backPath = backPath

	def get(self):
		imagePath = random.choice(glob.glob(self.__backPath + '/*.png'))
		return cv2.imread(imagePath, cv2.IMREAD_UNCHANGED) 

# 商品画像生成クラス
class Products:
	def __init__(self, productPath, class_name, size):
		self.__productPath = productPath
		self.__class_name = class_name
		self.__size = size

	def get(self, class_id):
		# 商品画像
		class_name = self.__class_name[class_id]
		image_path = random.choice(glob.glob(self.__productPath + '/' + class_name + '/*.png'))
		product_image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED) 

		# 商品画像のサイズ
		size = self.__size[class_id]
		return (self.__resize(product_image, size), size, class_id)

	# 商品画像のサイズ調整
	def __resize(self, img, size):
		org_h, org_w = img.shape[:2]
		imageArray = np.zeros((org_h, org_w, 4), np.uint8)
		img = cv2.resize(img, (size, size))
		imageArray[0:size, 0:size] = img
		return imageArray

# 1画像分のデータを保持するクラス
class Data:
	def __init__(self, rate):
		self.__rects = []
		self.__images = []
		self.__class_ids = []
		self.__rate = rate

	def get_class_ids(self):
		return self.__class_ids

	def max(self):
		return len(self.__rects)

	def get(self, i):
		return (self.__images[i], self.__rects[i], self.__class_ids[i])

	# 追加(重複率が指定値以上の場合は失敗する)
	def append(self, productImage, rect, class_id):
		conflict = False
		for i in range(len(self.__rects)):
			iou = self.__multiplicity(self.__rects[i], rect)
			if(iou > self.__rate):
				conflict = True
				break
		if(conflict == False):  
			self.__rects.append(rect)
			self.__images.append(productImage)
			self.__class_ids.append(class_id)
			return True
		return False

	# 重複率
	def __multiplicity(self, a, b):
		(ax_mn, ay_mn) = a[0]
		(ax_mx, ay_mx) = a[1]
		(bx_mn, by_mn) = b[0]
		(bx_mx, by_mx) = b[1]
		a_area = (ax_mx - ax_mn + 1) * (ay_mx - ay_mn + 1)
		b_area = (bx_mx - bx_mn + 1) * (by_mx - by_mn + 1)
		abx_mn = max(ax_mn, bx_mn)
		aby_mn = max(ay_mn, by_mn)
		abx_mx = min(ax_mx, bx_mx)
		aby_mx = min(ay_mx, by_mx)
		w = max(0, abx_mx - abx_mn + 1)
		h = max(0, aby_mx - aby_mn + 1)
		intersect = w*h
		return intersect / (a_area + b_area - intersect)

# 各クラスのデータ数が同一になるようにカウントする
class Counter():
	def __init__(self, max):
		self.__counter = np.zeros(max)

	def get(self):
		n = np.argmin(self.__counter)
		self.__counter[n] += 1
		return int(n)

	def print(self):
		print(self.__counter)

def main():

	# 出力先の初期化
	if os.path.exists(OUTPUT_PATH):
		shutil.rmtree(OUTPUT_PATH)
	os.mkdir(OUTPUT_PATH)

	backgrounds = Backgrounds(BACK_PATH)
	products = Products(PRODUCT_PATH, CLASS_NAME, SIZE)
	manifest = Manifest(CLASS_NAME)

	counter = Counter(len(CLASS_NAME))
	no = 0

	while(True):
		# 背景画像の取得
		backImage = backgrounds.get()

		# 商品データ
		data = Data(0.15)
		for _ in range(250):
			class_id = counter.get()
			# class_id = random.randint(0, len(CLASS_NAME)-1)
			# 商品画像の取得
			product_image, product_size, class_id  = products.get(class_id)
			# アフィン変換
			product_image, rect =  transform(backImage, product_image, product_size)
			# 商品の追加(重複した場合は、失敗する)
			ret = data.append(product_image, rect, class_id)
			if(ret):
				print("ret:{} rect:{}".format(ret, rect))
		print("max:{}".format(data.max()))
		frame = backImage
		for index in range(data.max()):
			(product_image, _, _) = data.get(index)
			# 合成
			frame = margeImage(frame, product_image)

		# アルファチャンネル削除
		frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)

		# エフェクト(gauss)
		frame = addGauss(frame, random.randint(0, 2))

		# エフェクト(Noise)
		frame = addNoise(frame)

		# 画像名
		fileName = "{:04d}.png".format(no)
		no+=1

		# 画像保存            
		cv2.imwrite("{}/{}".format(OUTPUT_PATH, fileName), frame)
		# manifest追加
		manifest.appned(fileName, data, frame.shape[0], frame.shape[1])

		for i in range(data.max()):
			(_, rect, class_id) = data.get(i)
			# バウンディングボックス描画(確認用)
			frame = box(frame, rect, class_id)

		counter.print()
		if(MAX <= no):
			break

		# 表示(確認用)
		cv2.imshow("frame", frame)
		cv2.waitKey(1)

	# manifest 保存
	with open('{}/{}'.format(OUTPUT_PATH, manifestFile), 'w') as f:
		f.write(manifest.get())

main()

5 RecordIO形式への変換

Ground Truth形式のデータをRecordIO形式に変換しています。生成時に各クラスの出力分布が均等になるように調整されているので、ここでは、単純に80%を訓練用、残りを検証用にしています。

"""
GroundTruth形式のデータセットからRecordIO形式のデータセットを作成する
"""
import json
import os
import subprocess

# 定義
inputPath = './data/outputGroundTruth'
outputPath = './data/outputRecordIO'

inputPath = '/private/tmp/coin/data/outputGroundTruth'
outputPath = '/private/tmp/coin/data/outputRecordIO'

manifest = 'output.manifest'

CLASS_NAME=["1en","5en","10en","50en","100en"]

# 分割の比率
ratio = 0.8 # 8:2に分割する

# 1件のデータを表現するクラス
class Data():
    def __init__(self, src):
        # プロジェクト名の取得
        for key in src.keys():
            index = key.rfind("-metadata")
            if(index!=-1):
                projectName = key[0:index]

        # メタデータの取得
        metadata = src[projectName + '-metadata']
        class_map = metadata["class-map"]

        # 画像名の取得
        self.__imgFileName = os.path.basename(src["source-ref"])

        # 画像サイズの取得
        project = src[projectName]
        image_size = project["image_size"]
        self.__img_width = image_size[0]["width"]
        self.__img_height = image_size[0]["height"]

        self.__annotations = []
        # アノテーションの取得
        for annotation in project["annotations"]:
            class_id = annotation["class_id"]
            top = annotation["top"]
            left = annotation["left"]
            width = annotation["width"]
            height = annotation["height"]

            self.__annotations.append({
                "label": class_map[str(class_id)],
                "width": width,
                "top": top,
                "height": height,
                "left": left
            })

    @property
    def annotations(self):
        return self.__annotations

    # 指定されたラベルを含むかどうか
    def exsists(self, label):
        for annotation in self.__annotations:
            if(annotation["label"] == label):
                return True
        return False

    # .lstを生成して追加する
    def appendLst(self, lst, cls_list):

        index = len(lst.split('\n'))
        headerSize = 4 # hederSize,dataSize,imageWidth,imageHeight
        dataSize = 5
        str = "{}\t{}\t{}\t{}\t{}".format(index, headerSize, dataSize, self.__img_width, self.__img_height)

        for annotation in self.__annotations:
            cls_id = cls_list.index(annotation["label"])
            left = annotation["left"]
            right = left + annotation["width"]
            top = annotation["top"]
            bottom = top + annotation["height"]

            left = round(left / self.__img_width, 3)
            right = round(right / self.__img_width, 3)
            top = round(top / self.__img_height, 3)
            bottom = round(bottom / self.__img_height, 3)

            str += "\t{}\t{}\t{}\t{}\t{}".format(cls_id, left, top, right, bottom)          
        fileName = self.__imgFileName
        str += "\t{}".format(fileName)
        lst += str + "\n"
        return lst

# 全てのJSONデータを読み込む
def getDataList(inputPath, manifest):
    dataList = []
    with open("{}/{}".format(inputPath, manifest), 'r') as f:
        srcList = f.read().split('\n')
        for src in srcList:
            if(src != ''):
                json_src = json.loads(src)
                dataList.append(Data(json.loads(src)))
    return dataList

def main():

    # 出力先フォルダ生成
    os.makedirs(outputPath, exist_ok=True)

    # 全てのJSONデータを読み込む
    dataList = getDataList(inputPath, manifest)
    total_len = len(dataList)
    train_len = int(total_len * ratio)
    print("全データ: {}件 train: {} validation: {}".format(total_len, train_len, total_len-train_len))

    # .lst形式
    train = ''
    validation = ''

    for i in range(train_len):
        data = dataList.pop()
        train = data.appendLst(train, CLASS_NAME)

    for data in dataList:
        validation = data.appendLst(validation, CLASS_NAME)

    # .lstファイルの生成
    trainLst = "{}/train.lst".format(outputPath)
    validationLst = "{}/validation.lst".format(outputPath)
    with open(trainLst, mode='w') as f:
        f.write(train)
    with open(validationLst, mode='w') as f:
        f.write(validation)

    # im2rec.pyによるRecordIOファイル生成
    # python im2rec.py --pack-label <your_lst_file_name> <your_image_folder>
    im2rec = "{}/im2rec.py".format(os.getcwd())

    cmd = ["python3", im2rec, "--pack-label", "validation.lst", inputPath]
    result = subprocess.run(cmd, cwd=outputPath)
    print(result)

    cmd = ["python3", im2rec, "--pack-label", "train.lst", inputPath]
    result = subprocess.run(cmd, cwd=outputPath)
    print(result)

main()

6 モデル

ml.p2.16xlargeで40分ぐらいかかってます。

参考のために、設定したパラメータです。

early_stoppingでepoch 38で止まりました。

epoch	mAP	smooth_l1	cross_entropy
-----------------------------------------
0	0.155	0.532	1.624
5	0.899	0.169	0.518
10	0.948	0.129	0.399
15	0.953	0.111	0.355
20	0.96	0.101	0.329
25	0.961	0.095	0.314
30	0.962	0.088	0.298
31	0.963	0.088	0.297
32	0.962	0.088	0.293
33	0.963	0.087	0.292
34	0.963	0.085	0.289
35	0.962	0.084	0.287
36	0.963	0.083	0.285
37	0.963	0.083	0.284
38	0.963	0.081	0.279

最後に、推論しているコードです。

from boto3.session import Session
import json
import cv2

profile = 'developer'
endPoint = 'sampleEndPoint'
categories = ["1en","5en","10en","50en","100en"]
tanka = [1,5,10,50,100]


deviceId = 1 # Webカメラ
height = 600
width = 800
linewidth = 2
colors = [(0,0,175),(175,0,0),(0,175,0),(175,175,0),(0,175,175)]

class SageMaker():
	def __init__(self, profile, endPoint):
		self.__endPoint = endPoint
		session = Session(profile_name = profile)
		self.__client = session.client('sagemaker-runtime')

	def invoke(self, image):
		data = self.__client.invoke_endpoint(
			EndpointName = self.__endPoint,
			Body = image,
			ContentType='image/jpeg'
		)
		results = data['Body'].read()
		return json.loads(results)

sageMake = SageMaker(profile, endPoint)

cap = cv2.VideoCapture(deviceId)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)

fps = cap.get(cv2.CAP_PROP_FPS)
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
print("FPS:{} WIDTH:{} HEIGHT:{}".format(fps, width, height))

while True:
	
	# カメラ画像取得
	ret, frame = cap.read()
	if(frame is None):
		continue
	
	_, jpg = cv2.imencode('.jpg', frame)
	detections = sageMake.invoke(jpg.tostring())
	
	total = 0
	for detection in detections["prediction"]:
		clsId = int(detection[0])
		confidence = detection[1]
		x1 = int(detection[2] * width)
		y1 = int(detection[3] * height)
		x2 = int(detection[4] * width)
		y2 = int(detection[5] * height)
		label = "{} {:.2f}".format(categories[clsId], confidence)
		if(confidence > 0.6):
			frame = cv2.rectangle(frame,(x1, y1), (x2, y2), colors[clsId],linewidth)
			frame = cv2.rectangle(frame,(x1, y1), (x1 + 100,y1-20), colors[clsId], -1)
			cv2.putText(frame,label,(x1+2, y1-2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
			total  += tanka[clsId]

	cv2.putText(frame,"TOTAL: {:,}.-".format(total),(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
	
	cv2.imshow('frame', frame)
	cv2.waitKey(1)

cap.release()
cv2.destroyAllWindows()

7 最後に

今回は、簡単に効果が確認できるように、表裏の2面しかない硬貨を対象に、クロマキー処理による背景透過を試してみました。

今後、もっと立体的な物体を回転台などに載せて撮影し、クロマキー処理で自動で背景透過出来ないかと考えています。

8 参考にさせて頂いたリンク


OpenCV – マスク画像を利用した画像処理について
RPi + Python + OpenCV その3「クロマキー」
色空間の変換