[Amazon SageMaker] 画像合成によるデータセット作成時における背景の扱いについて

2021.01.23

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX事業本部の平内(SIN)です。

下記では、冷蔵庫内を上からのアングルで見た映像で、商品を検出しています。

今回は、ここで作成したモデルについての紹介です。

最初に動画をご確認下さい。前段は、データセットを作成しているようす、そして、後段は、冷蔵庫内のカメラで商品を検出している場面です。

2 単一商品のデータセット

実は、当初、データ作成も容易なので、1枚の商品画像を背景と合成することでデータセットを作成しました。

しかし、このデータセットで作成したモデルは、商品の間隔が空いている時は、問題なく検出できるのですが

商品の間隔が狭くなると、途端に検出できなくなる問題がありました。

3 複数商品のデータセット

先のデータセットでは、常に商品の周りに背景が写っており、これを一緒に学習してしまっています。実際の場面で、商品の間隔が詰まった時、商品の周りの背景は確保されなくなるため、これが検出できない原因だと考えられます。

そこで、複数の商品を背景に合成してデータセットを作成するようにしました。 ランダムに商品画像を取り出し、完全に重ならないように配置して1つのデータとしています。

4 コード

1画像ずつ作成する段階で、データを保持するクラス(Data)に商品を追加する際、既に貼られている画像との重複度合いを確認し、一定以上の重複がある場合は、これを排除するようになっています。

また、データセットの画像に含まれる商品の分布がある程度平均化するように、Counterクラスでインデックスを調整しています。

"""
下記を参考にさせて頂きました。
https://github.com/aws-samples/smart-cooler/blob/master/_ml_model_package/synthetic-dataset.ipynb
"""

import json
import glob
import random
import os
import shutil
import math
import numpy as np
import cv2
from PIL import Image

MAX = 3000 # 生成する画像数

CLASS_NAME=["jagabee","chipstar","butamen","kyo_udon","koara","curry"]
COLORS = [(0,0,175),(175,0,0),(0,175,0),(175,175,0),(0,175,175),(175,175,175)]
SIZE=[150, 150, 150 ,185, 150, 185]

BACK_PATH = "./backgrounds"
PRODUCT_PATH = "./products"
OUTPUT_PATH = "./output"
S3Bucket = "s3://ground_truth_dataset"
manifestFile = "output.manifest"

def euler_to_mat(yaw, pitch, roll):
    c, s = math.cos(yaw), math.sin(yaw)
    M = np.matrix([[  c, 0.,  s], [ 0., 1., 0.], [ -s, 0.,  c]])

    c, s = math.cos(pitch), math.sin(pitch)
    M = np.matrix([[ 1., 0., 0.], [ 0.,  c, -s], [ 0.,  s,  c]]) * M

    c, s = math.cos(roll), math.sin(roll)
    M = np.matrix([[  c, -s, 0.], [  s,  c, 0.], [ 0., 0., 1.]]) * M
    return M

def make_affine_transform(from_shape, to_shape, 
                        min_scale, max_scale,
                        scale_variation=1.0,
                        rotation_variation=1.0,
                        translation_variation=1.0):

    from_size = np.array([[from_shape[1], from_shape[0]]]).T
    to_size = np.array([[to_shape[1], to_shape[0]]]).T

    scale = random.uniform((min_scale + max_scale) * 0.5 -
                            (max_scale - min_scale) * 0.5 * scale_variation,
                            (min_scale + max_scale) * 0.5 +
                            (max_scale - min_scale) * 0.5 * scale_variation)
    roll = random.uniform(-1.0, 1.0) * rotation_variation
    pitch = random.uniform(-0.15, 0.15) * rotation_variation
    yaw = random.uniform(-0.15, 0.15) * rotation_variation

    M = euler_to_mat(yaw, pitch, roll)[:2, :2]
    h = from_shape[0]
    w = from_shape[1]
    corners = np.matrix([[-w, +w, -w, +w],
                            [-h, -h, +h, +h]]) * 0.5
    skewed_size = np.array(np.max(M * corners, axis=1) -
                                np.min(M * corners, axis=1))

    scale *= np.min(to_size / skewed_size)

    trans = (np.random.random((2,1)) - 0.5) * translation_variation
    trans = ((2.0 * trans) ** 5.0) / 2.0
    trans = (to_size - skewed_size * scale) * trans

    center_to = to_size / 2.
    center_from = from_size / 2.

    M = euler_to_mat(yaw, pitch, roll)[:2, :2]
    M *= scale
    M = np.hstack([M, trans + center_to - M * center_from])
    return M

# アフィン変換
def transform(backImage, productImage, productSize):
    M = make_affine_transform(
                        from_shape=productImage.shape,
                        to_shape=backImage.shape,
                        min_scale=1.0,
                        max_scale=1.0,
                        rotation_variation=3.5,
                        # scale_variation=2.0,
                        scale_variation=1.0,
                        translation_variation=0.98)
    object_topleft = tuple(M.dot(np.array((0, 0) + (1, ))).tolist()[0])
    object_topright = tuple(M.dot(np.array((productSize, 0) + (1,))).tolist()[0])
    object_bottomleft = tuple(M.dot(np.array((0,productSize) + (1,))).tolist()[0])
    object_bottomright = tuple(M.dot(np.array((productSize, productSize) + (1,))).tolist()[0])

    object_tups = (object_topleft, object_topright, object_bottomleft, object_bottomright)
    object_xmin = (min(object_tups, key=lambda item:item[0])[0])
    object_xmax = (max(object_tups, key=lambda item:item[0])[0]) 
    object_ymin = (min(object_tups, key=lambda item:item[1])[1])
    object_ymax = (max(object_tups, key=lambda item:item[1])[1])
    rect = ((int(object_xmin),int(object_ymin)),(int(object_xmax),int(object_ymax)))    

    productImage =  cv2.warpAffine(productImage, M, (backImage.shape[1], backImage.shape[0]))
    return productImage, rect


# 背景と商品の合成
def margeImage(backImg, productImg):
    # PIL形式で重ねる
    back_pil = Image.fromarray(backImg)
    product_pil = Image.fromarray(productImg)
    back_pil.paste(product_pil, (0, 0), product_pil)
    return np.array(back_pil)

# エフェクト(Gauss)
def addGauss(img, level):
    return cv2.blur(img, (level * 2 + 1, level * 2 + 1))

# エフェクト(Noise)
def addNoiseSingleChannel(single):
    diff = 255 - single.max()
    noise = np.random.normal(0, random.randint(1, 100), single.shape)
    noise = (noise - noise.min())/(noise.max()-noise.min())
    noise= diff*noise
    noise= noise.astype(np.uint8)
    dst = single + noise
    return dst

# エフェクト(Noise)
def addNoise(img):
    img = img.astype('float64')
    img[:,:,0] = addNoiseSingleChannel(img[:,:,0])
    img[:,:,1] = addNoiseSingleChannel(img[:,:,1])
    img[:,:,2] = addNoiseSingleChannel(img[:,:,2])
    return img.astype('uint8')

# バウンディングボックス描画
def box(frame, rect, class_id):
    ((x1,y1),(x2,y2)) = rect
    label = "{}".format(CLASS_NAME[class_id])
    img = cv2.rectangle(frame,(x1, y1), (x2, y2), COLORS[class_id],2)
    img = cv2.rectangle(img,(x1, y1), (x1 + 150,y1-20), COLORS[class_id], -1)
    cv2.putText(img,label,(x1+2, y1-2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
    return img

# Manifest生成クラス
class Manifest:
    def __init__(self, class_name):
        self.__lines = ''
        self.__class_map={}
        for i in range(len(class_name)):
            self.__class_map[str(i)] = class_name[i]

    def appned(self, fileName, data, height, width):

        date = "0000-00-00T00:00:00.000000"
        line = {
            "source-ref": "{}/{}".format(S3Bucket, fileName),
            "boxlabel": {
                "image_size": [
                    {
                        "width": width,
                        "height": height,
                        "depth": 3
                    }
                ],
                "annotations": []
            },
            "boxlabel-metadata": {
                "job-name": "xxxxxxx",
                "class-map": self.__class_map,
                "human-annotated": "yes",
                "objects": {
                    "confidence": 1
                },
                "creation-date": date,
                "type": "groundtruth/object-detection"
            }
        }
        for i in range(data.max()):
            (_, rect, class_id) = data.get(i)
            ((x1,y1),(x2,y2)) = rect
            line["boxlabel"]["annotations"].append({
                "class_id": class_id,
                "width": x2 - x1,
                "top": y1,
                "height": y2 - y1,
                "left": x1
            })
        self.__lines += json.dumps(line) + '\n'

    def get(self):
        return self.__lines

# 背景画像生成クラス
class Backgrounds:
    def __init__(self, backPath):
        self.__backPath = backPath

    def get(self):
        imagePath = random.choice(glob.glob(self.__backPath + '/*.png'))
        return cv2.imread(imagePath, cv2.IMREAD_UNCHANGED) 

# 商品画像生成クラス
class Products:
    def __init__(self, productPath, class_name, size):
        self.__productPath = productPath
        self.__class_name = class_name
        self.__size = size

    def get(self, class_id):
        # 商品画像
        class_name = self.__class_name[class_id]
        image_path = random.choice(glob.glob(self.__productPath + '/' + class_name + '/*.png'))
        product_image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED) 

        # 商品画像のサイズ
        size = self.__size[class_id]
        return (self.__resize(product_image, size), size, class_id)

    # 商品画像のサイズ調整
    def __resize(self, img, size):
        org_h, org_w = img.shape[:2]
        imageArray = np.zeros((org_h, org_w, 4), np.uint8)
        img = cv2.resize(img, (size, size))
        imageArray[0:size, 0:size] = img
        return imageArray

# 1画像分のデータを保持するクラス
class Data:
    def __init__(self, rate):
        self.__rects = []
        self.__images = []
        self.__class_ids = []
        self.__rate = rate

    def get_class_ids(self):
        return self.__class_ids

    def max(self):
        return len(self.__rects)

    def get(self, i):
        return (self.__images[i], self.__rects[i], self.__class_ids[i])

    # 追加(重複率が指定値以上の場合は失敗する)
    def append(self, productImage, rect, class_id):
        conflict = False
        for i in range(len(self.__rects)):
            iou = self.__multiplicity(self.__rects[i], rect)
            if(iou > self.__rate):
                conflict = True
                break
        if(conflict == False):  
            self.__rects.append(rect)
            self.__images.append(productImage)
            self.__class_ids.append(class_id)
            return True
        return False

    # 重複率
    def __multiplicity(self, a, b):
        (ax_mn, ay_mn) = a[0]
        (ax_mx, ay_mx) = a[1]
        (bx_mn, by_mn) = b[0]
        (bx_mx, by_mx) = b[1]
        a_area = (ax_mx - ax_mn + 1) * (ay_mx - ay_mn + 1)
        b_area = (bx_mx - bx_mn + 1) * (by_mx - by_mn + 1)
        abx_mn = max(ax_mn, bx_mn)
        aby_mn = max(ay_mn, by_mn)
        abx_mx = min(ax_mx, bx_mx)
        aby_mx = min(ay_mx, by_mx)
        w = max(0, abx_mx - abx_mn + 1)
        h = max(0, aby_mx - aby_mn + 1)
        intersect = w*h
        return intersect / (a_area + b_area - intersect)

# 各クラスのデータ数が同一になるようにカウントする
class Counter():
    def __init__(self, max):
        self.__counter = np.zeros(max)

    def get(self):
        n = np.argmin(self.__counter)
        self.__counter[n] += 1
        return int(n)

    def print(self):
        print(self.__counter)

def main():

    # 出力先の初期化
    if os.path.exists(OUTPUT_PATH):
        shutil.rmtree(OUTPUT_PATH)
    os.mkdir(OUTPUT_PATH)

    backgrounds = Backgrounds(BACK_PATH)
    products = Products(PRODUCT_PATH, CLASS_NAME, SIZE)
    manifest = Manifest(CLASS_NAME)

    counter = Counter(len(CLASS_NAME))
    no = 0

    while(True):
        # 背景画像の取得
        backImage = backgrounds.get()

        # 商品データ
        data = Data(0.15)
        for _ in range(20):
            class_id = counter.get()
            # class_id = random.randint(0, len(CLASS_NAME)-1)
            # 商品画像の取得
            product_image, product_size, class_id  = products.get(class_id)
            # アフィン変換
            product_image, rect =  transform(backImage, product_image, product_size)
            # 商品の追加(重複した場合は、失敗する)
            data.append(product_image, rect, class_id)

        frame = backImage
        for index in range(data.max()):
            (product_image, _, _) = data.get(index)
            # 合成
            frame = margeImage(frame, product_image)

        # アルファチャンネル削除
        frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)

        # エフェクト(gauss)
        frame = addGauss(frame, random.randint(0, 2))

        # エフェクト(Noise)
        frame = addNoise(frame)

        # 画像名
        fileName = "{:04d}.png".format(no)
        no+=1

        # 画像保存            
        cv2.imwrite("{}/{}".format(OUTPUT_PATH, fileName), frame)
        # manifest追加
        manifest.appned(fileName, data, frame.shape[0], frame.shape[1])

        for i in range(data.max()):
            (_, rect, class_id) = data.get(i)
            # バウンディングボックス描画(確認用)
            frame = box(frame, rect, class_id)

        counter.print()
        if(MAX < no):
            break

        # 表示(確認用)
        cv2.imshow("frame", frame)
        cv2.waitKey(1)

    # manifest 保存
    with open('{}/{}'.format(OUTPUT_PATH, manifestFile), 'w') as f:
        f.write(manifest.get())

main()

5 RecordIO形式

上記で作成したAmazon SageMaker Ground Truth形式のデータをim2rec.pyを利用してRecordIO形式に変換しているコードです。

curl -O https://raw.githubusercontent.com/apache/incubator-mxnet/master/tools/im2rec.py

データセットに含まれる、商品の分布が予め平均化されているため、単純に学習用と検証用に分割しています。

import json
import os
import subprocess

# 定義
inputPath = '/tmp/input'
outputPath = '/tmp/output'
manifest = 'output.manifest'

CLASS_NAME=["jagabee","chipstar","butamen","kyo_udon","koara","curry"]

# 分割の比率
ratio = 0.8 # 8:2に分割する

# 1件のデータを表現するクラス
class Data():
    def __init__(self, src):
        # プロジェクト名の取得
        for key in src.keys():
            index = key.rfind("-metadata")
            if(index!=-1):
                projectName = key[0:index]

        # メタデータの取得
        metadata = src[projectName + '-metadata']
        class_map = metadata["class-map"]

        # 画像名の取得
        self.__imgFileName = os.path.basename(src["source-ref"])

        # 画像サイズの取得
        project = src[projectName]
        image_size = project["image_size"]
        self.__img_width = image_size[0]["width"]
        self.__img_height = image_size[0]["height"]

        self.__annotations = []
        # アノテーションの取得
        for annotation in project["annotations"]:
            class_id = annotation["class_id"]
            top = annotation["top"]
            left = annotation["left"]
            width = annotation["width"]
            height = annotation["height"]

            self.__annotations.append({
                "label": class_map[str(class_id)],
                "width": width,
                "top": top,
                "height": height,
                "left": left
            })

    @property
    def annotations(self):
        return self.__annotations

    # 指定されたラベルを含むかどうか
    def exsists(self, label):
        for annotation in self.__annotations:
            if(annotation["label"] == label):
                return True
        return False

    # .lstを生成して追加する
    def appendLst(self, lst, cls_list):

        index = len(lst.split('\n'))
        headerSize = 4 # hederSize,dataSize,imageWidth,imageHeight
        dataSize = 5
        str = "{}\t{}\t{}\t{}\t{}".format(index, headerSize, dataSize, self.__img_width, self.__img_height)

        for annotation in self.__annotations:
            cls_id = cls_list.index(annotation["label"])
            left = annotation["left"]
            right = left + annotation["width"]
            top = annotation["top"]
            bottom = top + annotation["height"]

            left = round(left / self.__img_width, 3)
            right = round(right / self.__img_width, 3)
            top = round(top / self.__img_height, 3)
            bottom = round(bottom / self.__img_height, 3)

            str += "\t{}\t{}\t{}\t{}\t{}".format(cls_id, left, top, right, bottom)          
        fileName = self.__imgFileName
        str += "\t{}".format(fileName)
        lst += str + "\n"
        return lst

# 全てのJSONデータを読み込む
def getDataList(inputPath, manifest):
    dataList = []
    with open("{}/{}".format(inputPath, manifest), 'r') as f:
        srcList = f.read().split('\n')
        for src in srcList:
            if(src != ''):
                json_src = json.loads(src)
                dataList.append(Data(json.loads(src)))
    return dataList

def main():

    # 出力先フォルダ生成
    os.makedirs(outputPath, exist_ok=True)

    # 全てのJSONデータを読み込む
    dataList = getDataList(inputPath, manifest)
    total_len = len(dataList)
    train_len = int(total_len * ratio)
    print("全データ: {}件 train: {} validation: {}".format(total_len, train_len, total_len-train_len))

    # .lst形式
    train = ''
    validation = ''

    for i in range(train_len):
        data = dataList.pop()
        train = data.appendLst(train, CLASS_NAME)

    for data in dataList:
        validation = data.appendLst(validation, CLASS_NAME)

    # .lstファイルの生成
    trainLst = "{}/train.lst".format(outputPath)
    validationLst = "{}/validation.lst".format(outputPath)
    with open(trainLst, mode='w') as f:
        f.write(train)
    with open(validationLst, mode='w') as f:
        f.write(validation)

    # im2rec.pyによるRecordIOファイル生成
    # python im2rec.py --pack-label <your_lst_file_name> <your_image_folder>
    im2rec = "{}/im2rec.py".format(os.getcwd())

    cmd = ["python3", im2rec, "--pack-label", "validation.lst", inputPath]
    result = subprocess.run(cmd, cwd=outputPath)
    print(result)

    cmd = ["python3", im2rec, "--pack-label", "train.lst", inputPath]
    result = subprocess.run(cmd, cwd=outputPath)
    print(result)

main()

6 学習

学習は、SageMakerの組み込みアルゴリズム(物体検出)を使用していますが、参考のため、使用したパラメータを残しておきます。

epochs = 10
num_classes = 6
num_training_samples = 2400
od_model.set_hyperparameters(base_network='resnet-50',
                             use_pretrained_model=1,
                             num_classes=num_classes,
                             mini_batch_size=32,
                             epochs=epochs,
                             learning_rate=0.001,
                             lr_scheduler_step='3,6',
                             lr_scheduler_factor=0.1,
                             optimizer='sgd',
                             momentum=0.9,
                             weight_decay=0.0005,
                             overlap_threshold=0.5,
                             nms_threshold=0.45,
                             image_shape=512,
                             label_width=350,
                             num_training_samples=num_training_samples)

7 最後に

今回は、合成によるデータセット作成での背景の扱いについての紹介でした。 合成も、実際にモデルを使用する場面で、どのような状況になるかを考えて行うことが重要だと思います。