[Amazon SageMaker] Amazon SageMaker Ground Truth で作成したデータをオブジェクト検出で利用可能なイメージ形式に変換してみました

2020.04.10

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX事業本部の平内(SIN)です。

Amazon SageMaker の組み込みの物体検出アルゴリズム(object-detection)では、イメージ形式のデータセットが利用可能です。

今回は、Amazon SageMaker Ground Truth(以下、Ground Truth)で作成したデータを変換して、このイメージ形式のデータセットを作成してみました。

2 Ground Truth

最初に、プライベートなプロジェクトを作成して、ごく少数ですが、データセットを作成しました。

作業が完了すると、Output dataset locationに示されるバケットにデータが保存されます。

生成されるデータは、output.manifestというファイルです。

output.manifestは、各画像ごと1行のアノテーション情報となっており、1行を見やすく展開すると以下のようになります。

output.manifest

{
    "source-ref": "s3://sagemaker-working-bucket-001/GroundTruth-Input/AHIRU-1586397259091391.jpg",
    "AHIRU-Project": {
        "annotations": [
            {
                "class_id": 0,
                "width": 249,
                "top": 210,
                "height": 256,
                "left": 209
            }
        ],
        "image_size": [
            {
                "width": 800,
                "depth": 3,
                "height": 600
            }
        ]
    },
    "AHIRU-Project-metadata": {
        "job-name": "labeling-job/ahiru-project",
        "class-map": {
            "0": "AHIRU"
        },
        "human-annotated": "yes",
        "objects": [
            {
                "confidence": 0.09
            }
        ],
        "creation-date": "2020-04-09T02:45:23.185192",
        "type": "groundtruth/object-detection"
    }
}

3 イメージ形式のデータ

一方、SageMakerのオブジェクト検出で使用されるイメージ形式のデータは、画像とアノテーションで構成されています。

下記は、アノテーション用のJSONファイルの一例です。fileは、画像ファイル名ですが、S3のバケットとプレフィックスまで、fit()data_channelsとして渡しますので、そこからの相対パスになります。また .jsonファイルは対応する画像と同じ名前である必要もあります。

class_idは、0から始まるオブジェクトクラスのインデックスです。

{
    "file": "AHIRU-1586397287273306.jpg",
    "image_size": [
        {
            "width": 800,
            "height": 600,
            "depth": 3
        }
    ],
    "annotations": [
        {
            "class_id": 1,
            "top": 218,
            "left": 209,
            "width": 263,
            "height": 226
        }
    ],
    "categories": [
        {
            "class_id": 0,
            "name": "DOG"
        },
        {
            "class_id": 1,
            "name": "AHIRU"
        }
    ]
}

4 学習データと検証データ

教師あり学習のアルゴリズムである、オブジェクト検出では、学習用のデータと、検証用のデータが必要です。

ここでは、以下のプレフィックスで保存します。

  • train (学習用の画像)
  • validation (検証用の画像)
  • train_annotation (学習用のアノテーション)
  • validation_annotation (検証用のアノテーション)

学習用と検証用として、元データを、一定の比率で分割する場合、単純に分割することができません。全てのデータに均等に全てのラベルが含まれているわけでは無いからです。

たとえば、先頭から40件目まで、Aというラベルが指定されており、41〜50件目まで、Bが指定されているとします。そして、これを単純に4:1に分割すると、1〜40と41〜50となるため、学習用データにBのデータが存在しないことになってしまいます。

この問題に対応するため、変換対象となる全てのデータから、含まれるラベルの数をカウントし、サンプル数の少ないラベルから順に、分割するようにしました。

# ラベルの件数の少ない順に並べ替える(配列のインデックスが、クラスIDとなる)
def getLabel(dataList):
    labels = {}
    for data in dataList:
        for annotation in data.annotations:
            label = annotation["label"]
            if(label in labels):
                labels[label] += 1
            else:
                labels[label] = 1
    # ラベルの件数の少ない順に並べ替える(配列のインデックスが、クラスIDとなる)
    labels = sorted(labels.items(), key=lambda x:x[1])
    return labels

labels = getLabel(dataList)

# ラベルの数の少ないものから優先して分割する
for i,label in enumerate(labels):
    # ・・・分割処理

5 コード

変換を行う、すべてコードは、以下のとおりです。以下を定義を設定して利用可能です。

  • inputPath 元データ(画像データとoutput.manifestを置く)
  • outputPath 出力先
  • ratio 学習データと検証データの分割比率
import json
import glob
import os
import shutil

# 定義
inputPath = '/tmp/GroundTruth'
outputPath = '/tmp/SageMakerImageDataSet'
manifest = 'output.manifest'
# 学習用と検証用の分割比率
ratio = 0.8  # 80%対、20%に分割する

# 1件のJデータを表現するクラス
class Data():
    def __init__(self, src):
        # プロジェクト名の取得
        for key in src.keys():
            index = key.rfind("-metadata")
            if(index!=-1):
                projectName = key[0:index]

        # メタデータの取得
        metadata = src[projectName + '-metadata']
        class_map = metadata["class-map"]

        # 画像名の取得
        self.imgFileName = os.path.basename(src["source-ref"])
        self.baseName = self.imgFileName.split('.')[0]
        # 画像サイズの取得
        project = src[projectName]
        image_size = project["image_size"]
        self.img_width = image_size[0]["width"]
        self.img_height = image_size[0]["height"]

        self.annotations = []
        # アノテーションの取得
        for annotation in project["annotations"]:
            class_id = annotation["class_id"]
            top = annotation["top"]
            left = annotation["left"]
            width = annotation["width"]
            height = annotation["height"]

            self.annotations.append({
                "label": class_map[str(class_id)],
                "width": width,
                "top": top,
                "height": height,
                "left": left
            })

    # 指定されたラベルを含むかどうか
    def exsists(self, label):
        for annotation in self.annotations:
            if(annotation["label"] == label):
                return True
        return False

    def store(self, dataPath, annotationPath, inputPath, labels):
        cls_list = []
        for label in labels:
            cls_list.append(label[0])

        jsonData = {}
        jsonData["file"] = self.imgFileName
        jsonData["image_size"] = []
        jsonData["image_size"].append({
            "width": self.img_width,
            "height": self.img_height,
            "depth": 3
        })
        jsonData["annotations"] = []
        for annotation in self.annotations:
            cls_id = cls_list.index(annotation["label"])
            jsonData["annotations"].append({
                "class_id": cls_id,
                "top": annotation["top"],
                "left": annotation["left"],
                "width": annotation["width"],
                "height": annotation["height"]
            })
        jsonData["categories"] = []
        for i, cls_name in enumerate(cls_list):
            jsonData["categories"].append(
                {
                    "class_id": i,
                    "name": cls_name
                }
            )
        # jsonの保存
        with open("{}/{}.json".format(annotationPath, self.baseName), mode='w') as f:
            json.dump(jsonData, f)
        # 画像のコピー
        shutil.copyfile("{}/{}".format(inputPath, self.imgFileName),"{}/{}".format(dataPath, self.imgFileName))

# dataListをラベルを含むものと、含まないものに分割する
def deviedDataList(dataList, label):
    targetList = []
    unTargetList = []
    for data in dataList:
        if(data.exsists(label)):
            targetList.append(data)
        else:
            unTargetList.append(data)
    return (targetList, unTargetList)

# ラベルの件数の少ない順に並べ替える(配列のインデックスが、クラスIDとなる)
def getLabel(dataList):
    labels = {}
    for data in dataList:
        for annotation in data.annotations:
            label = annotation["label"]
            if(label in labels):
                labels[label] += 1
            else:
                labels[label] = 1
    # ラベルの件数の少ない順に並べ替える(配列のインデックスが、クラスIDとなる)
    labels = sorted(labels.items(), key=lambda x:x[1])
    return labels

# 全てのJSONデータを読み込む
def getDataList(inputPath, manifest):
    dataList = []
    with open("{}/{}".format(inputPath, manifest), 'r') as f:
        srcList = f.read().split('\n')
        for src in srcList:
            if(src != ''):
                json_src = json.loads(src)
                dataList.append(Data(json.loads(src)))
    return dataList

def main():

    # 出力先フォルダ生成
    train = "{}/train".format(outputPath)
    validation = "{}/validation".format(outputPath)
    train_annotation = "{}/train_annotation".format(outputPath)
    validation_annotation = "{}/validation_annotation".format(outputPath)
    os.makedirs(outputPath, exist_ok=True)
    os.makedirs(train, exist_ok=True)
    os.makedirs(validation, exist_ok=True)
    os.makedirs(train_annotation, exist_ok=True)
    os.makedirs(validation_annotation, exist_ok=True)

    # 全てのJSONデータを読み込む
    dataList = getDataList(inputPath, manifest)
    log = "全データ: {}件 ".format(len(dataList))

    # ラベルの件数の少ない順に並べ替える(配列のインデックスが、クラスIDとなる)
    labels = getLabel(dataList)
    for i,label in enumerate(labels):
        log += "[{}]{}: {}件 ".format(i, label[0], label[1])
    print(log)

    # 保存済みリスト
    storedList = [] 
    log = ''
    # ラベルの数の少ないものから優先して分割する
    for i,label in enumerate(labels):
        log = ''
        log += "{} => ".format(label[0])
        # dataListをラベルが含まれるものと、含まないものに分割する
        (targetList, unTargetList) = deviedDataList(dataList, label[0])
        # 保存済みリストから、当該ラベルで既に保存済の件数をカウントする
        (include, notInclude) = deviedDataList(storedList, label[0])
        storedCounst = len(include)
        # train用に必要な件数
        count = int(label[1] * ratio) - storedCounst
        log += "{}:".format(count)
        # train側への保存
        for i in range(count):
            data = targetList.pop()
            data.store(train, train_annotation, inputPath, labels)
            storedList.append(data)
        # validation側への保存
        log += "{} ".format(len(targetList))
        for data in targetList:
            data.store(validation, validation_annotation, inputPath, labels)
            storedList.append(data)

        dataList = unTargetList
        log += "残り:{}件".format(len(dataList))
        print(log)

main()

例えば、画像ファイルが50件で、ラベルDOGが10件、ラベルAHIRUが50件アノテーションされているデータをを変換すると、以下のようなログが出力されます。

全データ: 50件 [0]DOG: 10件 [1]AHIRU: 50件 
DOG => 8:2 残り:40件
AHIRU => 30:10 残り:0件

6 オブジェクト検出

非常に限られたサンプル(データ)数ですが、とりあえず、モデルを作成してみました。DOGは、10件という事で、無理ですが、AHIRUの方は、限定的ですが、少し検出出来ている感じがします。

7 最後に

今回は、Ground Truthで作成されたデータをSageMakerで利用するためのデータに変換する作業を行ってみました。

Ground Truthでは、アノテーションの作業を外注できる仕組みとなっています。このデータを中心に、モデル作成(更新)の一連の流れを上手く作っていけたらなと妄想しています。