Face Re-Identificationで顔を追跡してみました

(妄想です)人物追跡と組み合わせて、ショッピングモールの「入り口」「2Fフロア」「3Fフロア」などで、同一人物を検出できるかも知れません。
2020.09.14

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX事業本部の平内(SIN)です。

機械学習で画像から顔を検出する事が可能ですが、検出された顔が、フレーム間で一致できれば、色々応用がききそうです。

前回、人物検出と人物識別のモデルで人の追跡を試してみましたが、これと組み合わせることで、追跡や識別を強化できるかも知れません。

最初に、試した見た様子です。

動画は、Pixels Videosを利用させて頂きました。

2 しくみ

顔の識別は、2つのモデルを組み合わせることで動作しています。

  • 顔検出モデル
  • 顔識別モデル

顔検出モデルで、顔を検出しその画像を切り出します。 続いて、顔識別モデルで、部分画像の識別情報を生成し、インデックスを付けてデータベース化しています。

各フレームでは、顔画像をデータベースと照合し、類似性から同じ人物であるかどうかを判断してます。

3 モデル

推論のフレームワークは、OpenVINOを利用させて頂きました。

OpenVINOでは、色々な機械学習フレームワークで作成されたモデルを、中間表現(IR)フォーマットに変換して推論に使用していますが、既に、変換されたモデルがインテルによって公開されており、今回は、ここから利用させて頂いています。
open_model_zoo

(1) face-detection-0100

MobileNetV2に基づく顔検出モデルです。


https://docs.openvinotoolkit.org/latest/omz_models_intel_face_detection_0100_description_face_detection_0100.html

入出力は、以下の通りです。

Inputs

Name: input, shape: [1x3x256x256] - An input image in the format [BxCxHxW], where:

  • B - batch size
  • C - number of channels
  • H - image height
  • W - image width

Expected color order: BGR.

Outputs

The net outputs blob with shape: [1, 1, N, 7], where N is the number of detected bounding boxes. Each detection has the format [image_id, label, conf, x_min, y_min, x_max, y_max], where:

  • image_id - ID of the image in the batch
  • label - predicted class ID
  • conf - confidence for the predicted class
  • (x_min, y_min) - coordinates of the top left bounding box corner
  • (x_max, y_max) - coordinates of the bottom right bounding box corner.

(2) face-reidentification-retail-0095

顔の再識別シナリオ用の軽量ネットワークです。
face-reidentification-retail-0095
/opencv/2020/openvinotoolkit/2020.4/open_model_zoo/models_bin/1/face-reidentification-retail-0095/

入出力は、以下の通りです。

Inputs

Name: "data" , shape: [1x3x128x128] - An input image in the format [BxCxHxW], where:

  • B - batch size
  • C - number of channels
  • H - image height
  • W - image width

Expected color order is BGR.

Outputs

The net outputs a blob with the shape [1, 256, 1, 1], containing a row-vector of 256 floating point values. Outputs on different images are comparable in cosine distance.

4 識別情報のデータベース

識別情報にインデックスを付けて、データベース化するところは、この仕組みの中心ですが、前回の人物追跡よりは、やや、簡単になりました。

しかし、適用する分野(動画の種類)に応じて、ロジックの変更が必要なのは変わらないと思います。

(1) 類似度0.3以上を適用

正面から撮影している動画であり、比較的安定して顔の識別情報が取れるため、類似度0.3以上は、全て採用しています。 そして、同じインデックスが複数になった場合は、類似度の低い方を無効としました。

(2) 類似度0.3以下は追加

類似度0.3以下の場合、新規の人物が登場したと判断し、インデックスを追加しています。

(3) データベースの更新

今回のデータベースには、顔検出の時点の信頼度もデータベースとして追加されています。

類似度が、0.9以上で、データベースを更新するのですが、この際、既存のデータの信頼度と比較して、それを超える場合のみ更新としています。 これにより、顔が横を向いていたり、一部隠れていたりした場合の信頼度の低い情報を排除しています。

5 コード

作成したコードです。

FaceDetector及び、FaceReidentificationは、それぞれ、人物検出と個人識別モデルを使用するクラスです。

Tracker内に、データベース化された識別情報が格納されており、getIds()でインデックスの取得及び、データベースの更新が行われています。

index.py

import numpy as np
import time
import random
import cv2
from openvino.inference_engine import IEPlugin  
from model import Model

class FaceDetector(Model):
    def __init__(self, plugin, model_path, threshold, num_requests=2):
        super().__init__(plugin, model_path, num_requests, None)
        _, _, h, w = self.input_size
        self.__input_height = h
        self.__input_width = w
        self.__threshold = threshold

    def __prepare_frame(self, frame):
        # shape: [1x3x256x256] - An input image in the format [BxCxHxW], where:
        initial_h, initial_w = frame.shape[:2]
        scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width)
        in_frame = cv2.resize(frame, (self.__input_width, self.__input_height))
        in_frame = in_frame.transpose((2, 0, 1))
        in_frame = in_frame.reshape(self.input_size)
        return in_frame, scale_h, scale_w

    def infer(self, frame):
        in_frame, _, _ = self.__prepare_frame(frame)
        result = super().infer(in_frame)
        # The net outputs blob with shape: [1, 1, N, 7], 
        facese = []
        height, width = frame.shape[:2]
        for d in result[0][0]:
            if(d[2]>self.__threshold):
                face = [
                        int(d[3] * width),
                        int(d[4] * height),
                        int(d[5] * width),
                        int(d[6] * height),
                        d[2]
                ]
                facese.append(face)
        return facese

class FaceReidentification(Model):
    def __init__(self, plugin, model_path, num_requests=2):
        super().__init__(plugin, model_path, num_requests, None)
        _, _, h, w = self.input_size
        self.__input_height = h
        self.__input_width = w

    def __prepare_frame(self, frame):
        initial_h, initial_w = frame.shape[:2]
        scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width)
        in_frame = cv2.resize(frame, (self.__input_width, self.__input_height))
        in_frame = in_frame.transpose((2, 0, 1))
        in_frame = in_frame.reshape(self.input_size)
        return in_frame, scale_h, scale_w

    def infer(self, frame):
        in_frame, _, _ = self.__prepare_frame(frame)
        result =  super().infer(in_frame)
        # (1, 256, 1, 1) => (256)
        return result[0, :, 0, 0]


class Tracker:
    def __init__(self):
        # 識別情報のDB
        self.identifysDb = None
        # 顔の信頼度のDB
        self.conf = []

    def getIds(self, identifys, persons):
        if(identifys.size==0):
            return []
        if self.identifysDb is None:
            self.identifysDb = identifys
            for person in persons:
                self.conf.append(person[4])

        print("input: {} DB:{}".format(len(identifys), len(self.identifysDb)))
        similaritys = self.__cos_similarity(identifys, self.identifysDb)
        similaritys[np.isnan(similaritys)] = 0
        ids = np.nanargmax(similaritys, axis=1)

        for i, similarity in enumerate(similaritys):
            persionId = ids[i]
            print("persionId:{} {} conf:{}".format(persionId,similarity[persionId],  persons[i][4]))
            # 0.9以上で、顔検出の信頼度が既存のものより高い場合、識別情報を更新する
            if(similarity[persionId] > 0.9 and persons[i][4] > self.conf[persionId]):
                print("? refresh id:{} conf:{}".format(persionId, persons[i][4]))
                self.identifysDb[persionId] = identifys[i]
            # 0.3以下の場合、追加する
            elif(similarity[persionId] < 0.3):
                self.identifysDb = np.vstack((self.identifysDb, identifys[i]))
                self.conf.append(persons[i][4])
                ids[i] = len(self.identifysDb) - 1
                print("append id:{} similarity:{}".format(ids[i], similarity[persionId]))

        print(ids)
        # 重複がある場合は、信頼度の低い方を無効化する(今回、この可能性は低い)
        for i, a in enumerate(ids):
            for e, b in enumerate(ids):
                if(e == i):
                    continue
                if(a == b):
                    if(similarity[a] > similarity[b]):
                        ids[i] = -1
                    else:
                        ids[e] = -1
        print(ids)
        return ids

    # コサイン類似度
    # 参考にさせて頂きました: https://github.com/kodamap/person_reidentification
    def __cos_similarity(self, X, Y):
        m = X.shape[0]
        Y = Y.T
        return np.dot(X, Y) / (
            np.linalg.norm(X.T, axis=0).reshape(m, 1) * np.linalg.norm(Y, axis=0)
        )

# MacOS
device = "CPU"
plugin_dirs = "/opt/intel/openvino/deployment_tools/inference_engine/lib/intel64"
modelPath = "./FP32/"

plugin = IEPlugin(device=device, plugin_dirs = plugin_dirs)

THRESHOLD= 0.5
face_detector = FaceDetector(plugin, modelPath + "face-detection-0100", THRESHOLD)
faceReidentification = FaceReidentification(plugin, modelPath + "face-reidentification-retail-0095")
tracker = Tracker()

#MOVIE = "./video001.mp4"
MOVIE = "./video002.mp4"

SCALE = 0.3
cap = cv2.VideoCapture (MOVIE)

TRACKING_MAX=50
colors = []
for i in range(TRACKING_MAX):
    b = random.randint(0, 25) * 10 
    g = random.randint(0, 25) * 10
    r = random.randint(0, 25) * 10
    colors.append((b,g,r))

while True:

    grabbed, frame = cap.read()
    if not grabbed:# ループ再生
        cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
        continue
    if(frame is None):
        continue

    # Personを検知する
    persons = []
    faces =  face_detector.infer(frame)
    if(len(faces) > 0):
        print("-------------------")
        for face in faces:
            x1 = int(face[0])
            y1 = int(face[1])
            x2 = int(face[2])
            y2 = int(face[3])
            conf = face[4]
            print("{:.1f} ({},{})-({},{})".format(conf, x1, y1, x2, y2))
            persons.append([x1, y1, x2, y2, conf])

    print("====================")
    # 各顔画像から識別情報を取得する
    identifys = np.zeros((len(persons), 256))
    for i, person in enumerate(persons):
        # 各顔画像の取得
        img = frame[person[1] : person[3], person[0]: person[2]]
        h, w = img.shape[:2]
        if(h==0 or w==0):
            continue
        # 類似度の取得
        identifys[i] = faceReidentification.infer(img)

    #インデックスの取得
    ids = tracker.getIds(identifys, persons)

    #枠及びインデックスを画像に追加
    for i, person in enumerate(persons):
        if(ids[i]!=-1):
            color = colors[int(ids[i])]
            frame = cv2.rectangle(frame, (person[0], person[1]), (person[2] ,person[3]), color, int(50 * SCALE))
            frame = cv2.putText(frame, str(ids[i]),  (person[0], person[1]), cv2.FONT_HERSHEY_PLAIN, int(50 * SCALE), color, int(30 * SCALE), cv2.LINE_AA )

    # 画像の縮小
    h, w = frame.shape[:2]
    frame = cv2.resize(frame, ((int(w * SCALE), int(h * SCALE))))
    # 画像の表示
    cv2.imshow('frame', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

こちらは、OpenVINOでコンピュータビジョン関連のモデルを使用する場合の、ベースとなるクラスです。

model.py

#
# 下記のコードを参考にさせて頂きました。
# https://github.com/openvinotoolkit/open_model_zoo/blob/master/demos/python_demos/asl_recognition_demo/asl_recognition_demo/common.py
#

from openvino.inference_engine import IENetwork

class Model:
    def __init__(self, plugin, model_path, num_requests, output_shape=None):

        if model_path.endswith((".xml", ".bin")):
            model_path = model_path[:-4]

        model = IENetwork(model_path + ".xml", model_path + ".bin")    
        self.net = plugin.load(network=model)

        assert len(self.net.input_info) == 1, "One input is expected"

        self.input_name = next(iter(self.net.input_info))
        if len(self.net.outputs) > 1:
            if output_shape is not None:
                candidates = []
                for candidate_name in self.net.outputs:
                    candidate_shape = self.exec_net.requests[0].output_blobs[candidate_name].buffer.shape
                    if len(candidate_shape) != len(output_shape):
                        continue

                    matches = [src == trg or trg < 0
                               for src, trg in zip(candidate_shape, output_shape)]
                    if all(matches):
                        candidates.append(candidate_name)

                if len(candidates) != 1:
                    raise Exception("One output is expected")

                self.output_name = candidates[0]
            else:
                raise Exception("One output is expected")
        else:
            self.output_name = next(iter(self.net.outputs))

        self.input_size = self.net.input_info[self.input_name].input_data.shape
        self.output_size = self.net.requests[0].output_blobs[self.output_name].buffer.shape
        self.num_requests = num_requests

    def infer(self, data):
        input_data = {self.input_name: data}
        infer_result = self.net.infer(input_data)
        return infer_result[self.output_name]

6 最後に

今回は、顔の追跡を行うプログラムを試してみましたが、データベースの類似性判断や更新が非常にデリケートなのは、人物の追跡と同じでした。

なお、「人物追跡」と「顔追跡」を組み合わせると、「非常に高精度な人物識別が可能なのでは」と妄想しています。

この識別の仕組みは、データベースとなっている情報が、背景などに大きく影響を受けない場合、カメラが複数であっても、また、別の場所や時間であっても適用可能です。

ショッピングモールの「入り口」「2Fフロア」「3Fフロア」などで、同一人物を検出できるかも知れません。

色々試してみたいです。

7 参考にさせて頂いたページ


Pedestrian Tracking Demo | OpenVINO™ toolkit | Ep. 23 | Intel Software
Pedestrian Tracker C++ Demo
OpenVINO の Person-reidentification(人再認識)モデルを使って人を追跡する
Github kodamap/person_reidentification
CVPR2019でPerson Re-Identificationの話をしている論文全てに1人で目を通す(加筆修正中)