Person Re-Identificationで人物を追跡してみました

2020.09.10

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX事業本部の平内(SIN)です。

機械学習で画像から人物を検出する事が可能ですが、検出された人物が、フレーム間で一致できれば、追跡やカウントも可能なります。

OpenVINO™ toolkitの紹介ビデオでは、ショッピングモールで、人物の追跡を行うデモが公開されています。
Pedestrian Tracking Demo | OpenVINO™ toolkit | Ep. 23 | Intel Software

今回は、上記(C++)を参考にさせて頂いて、Pythonで人物の追跡をやってみました。

最初に動作確認している様子です。まだまだですが・・・

動画は、Pixels Videosを利用させて頂きました。

2 しくみ

人物の追跡は、2つのモデルを組み合わせることで動作しています。

  • 人物検出モデル
  • 個人識別モデル

人物検出モデルで、人を検出しその部分を切り出します。 続いて、個人識別モデルで、部分画像の識別情報を生成し、インデックスを付けてデータベース化しています。

各フレームでは、人物画像をデータベースと照合し、類似性から同じ人物であるかどうかを判断してます。

3 モデル

推論のフレームワークは、OpenVINOを利用させて頂きました。

OpenVINOでは、色々な機械学習フレームワークで作成されたモデルを、中間表現(IR)フォーマットに変換して推論に使用していますが、既に、変換されたモデルがインテルによって公開されており、今回は、ここから利用させて頂いています。
open_model_zoo

(1) person-detection-retail-0013

Caffeで作成された、小売シナリオの歩行者検知器モデルです。
person-detection-retail-0013

入出力は、以下の通りです。

Input

[B✕C✕H✕W][1x3x320x544] Color: BGR

  • B - batch size
  • C - number of channels
  • H - image height
  • W - image width

Outputs

[1, 1, N, 7]

Nは検出された境界ボックスの数

[image_id, label, conf, x_min, y_min, x_max, y_max]

  • image_id - バッチ内の画像のID
  • label - クラスID
  • conf - 信頼度
  • (x_min, y_min) - 境界ボックス
  • (x_max, y_max) - 境界ボックス

(2) person-reidentification-retail-0265

PyTorchで作成された一般的なシナリオの個人識別モデルです。
person-reidentification-retail-0265

入出力は、以下の通りです。

Input

[B✕C✕H✕W][1x3x256x128] Color: BGR

  • B - batch size
  • C - number of channels
  • H - image height
  • W - image width

Outputs

[1、256] コサイン類似度で他の識別子と比較できます

4 識別情報のデータベース

実は、識別情報にインデックスを付けて、データベース化するところが、結構複雑です。そして、適用する分野(動画の種類)に応じて、ロジックの変更もきっと必要になると思います。

(1) 高い類似度

高い類似度でインデックスが得られた場合は、そのインデックスをそのまま利用すればい良いので、特に考慮は必要ありません。

(2) 低い類似度

低い類似度でインデックスが得られた場合は、たまたま、画像の類似性が低いだけなのか、それとも、別の人物なのかの判断が必要になります。

類似性が低いというだけで単純にデータベースに追加してしまうと、同じ人物のインデックスが複数となってしまい、その後の判定に悪影響があります。

ここでは、データベースに位置情報を追加し、最後に検出された位置を記録しています。類似度が低かった場合、その距離で、同一人物か、別人なのかを判定しています。

別人と判定された場合は、データベースにインデックスが追加されます。

(3) データベース更新

データベースの識別情報の更新にも注意が必要です。

フレームによっては、他の人物と重なっていたりして、その特徴をうまく表現できていない場合もあります。

そこで、他の人物と重なりがない場合のみ、その識別情報を更新することにしています。

5 コード

作成したコードです。

PersonDetector及び、PersonReidentificationは、それぞれ、人物検出と個人識別モデルを使用するクラスです。

Tracker内に、データベース化された識別情報が格納されており、getIds()でインデックスの取得及び、データベースの更新が行われています。

index.py

import numpy as np
import time
import random
import cv2
from openvino.inference_engine import IECore
from model import Model

class PersonDetector(Model):

    def __init__(self, model_path, device, ie_core, threshold, num_requests):
        super().__init__(model_path, device, ie_core, num_requests, None)
        _, _, h, w = self.input_size
        self.__input_height = h
        self.__input_width = w
        self.__threshold = threshold

    def __prepare_frame(self, frame):
        initial_h, initial_w = frame.shape[:2]
        scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width)
        in_frame = cv2.resize(frame, (self.__input_width, self.__input_height))
        in_frame = in_frame.transpose((2, 0, 1))
        in_frame = in_frame.reshape(self.input_size)
        return in_frame, scale_h, scale_w

    def infer(self, frame):
        in_frame, _, _ = self.__prepare_frame(frame)
        result = super().infer(in_frame)

        detections = []
        height, width = frame.shape[:2]
        for r in result[0][0]:
            conf = r[2]
            if(conf > self.__threshold):
                x1 = int(r[3] * width)
                y1 = int(r[4] * height)
                x2 = int(r[5] * width)
                y2 = int(r[6] * height)
                detections.append([x1, y1, x2, y2, conf])
        return detections

class PersonReidentification(Model):

    def __init__(self, model_path, device, ie_core, threshold, num_requests):
        super().__init__(model_path, device, ie_core, num_requests, None)
        _, _, h, w = self.input_size
        self.__input_height = h
        self.__input_width = w
        self.__threshold = threshold

    def __prepare_frame(self, frame):
        initial_h, initial_w = frame.shape[:2]
        scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width)
        in_frame = cv2.resize(frame, (self.__input_width, self.__input_height))
        in_frame = in_frame.transpose((2, 0, 1))
        in_frame = in_frame.reshape(self.input_size)
        return in_frame, scale_h, scale_w

    def infer(self, frame):
        in_frame, _, _ = self.__prepare_frame(frame)
        result =  super().infer(in_frame)
        return np.delete(result, 1)

class Tracker:
    def __init__(self):
        # 識別情報のDB
        self.identifysDb = None
        # 中心位置のDB
        self.center = []
    
    def __getCenter(self, person):
        x = person[0] - person[2]
        y = person[1] - person[3]
        return (x,y)

    def __getDistance(self, person, index):
        (x1, y1) = self.center[index]
        (x2, y2) = self.__getCenter(person)
        a = np.array([x1, y1])
        b = np.array([x2, y2])
        u = b - a
        return np.linalg.norm(u)

    def __isOverlap(self, persons, index):
        [x1, y1, x2, y2] = persons[index]
        for i, person in enumerate(persons):
            if(index == i):
                continue
            if(max(person[0], x1) <= min(person[2], x2) and max(person[1], y1) <= min(person[3], y2)):
                return True
        return False

    def getIds(self, identifys, persons):
        if(identifys.size==0):
            return []
        if self.identifysDb is None:
            self.identifysDb = identifys
            for person in persons:
                self.center.append(self.__getCenter(person))
        
        print("input: {} DB:{}".format(len(identifys), len(self.identifysDb)))
        similaritys = self.__cos_similarity(identifys, self.identifysDb)
        similaritys[np.isnan(similaritys)] = 0
        ids = np.nanargmax(similaritys, axis=1)

        for i, similarity in enumerate(similaritys):
            persionId = ids[i]
            d = self.__getDistance(persons[i], persionId)
            print("persionId:{} {} distance:{}".format(persionId,similarity[persionId], d))
            # 0.95以上で、重なりの無い場合、識別情報を更新する
            if(similarity[persionId] > 0.95):
                if(self.__isOverlap(persons, i) == False):
                    self.identifysDb[persionId] = identifys[i]
            # 0.5以下で、距離が離れている場合、新規に登録する
            elif(similarity[persionId] < 0.5):
                if(d > 500):
                    print("distance:{} similarity:{}".format(d, similarity[persionId]))
                    self.identifysDb = np.vstack((self.identifysDb, identifys[i]))
                    self.center.append(self.__getCenter(persons[i]))
                    ids[i] = len(self.identifysDb) - 1
                    print("> append DB size:{}".format(len(self.identifysDb)))

        print(ids)
        # 重複がある場合は、信頼度の低い方を無効化する
        for i, a in enumerate(ids):
            for e, b in enumerate(ids):
                if(e == i):
                    continue
                if(a == b):
                    if(similarity[a] > similarity[b]):
                        ids[i] = -1
                    else:
                        ids[e] = -1
        print(ids)
        return ids

    # コサイン類似度
    # 参考にさせて頂きました: https://github.com/kodamap/person_reidentification
    def __cos_similarity(self, X, Y):
        m = X.shape[0]
        Y = Y.T
        return np.dot(X, Y) / (
            np.linalg.norm(X.T, axis=0).reshape(m, 1) * np.linalg.norm(Y, axis=0)
        )


device = "CPU"
cpu_extension = None
ie_core = IECore()
if device == "CPU" and cpu_extension:
    ie_core.add_extension(cpu_extension, "CPU")

THRESHOLD= 0.8
person_detector = PersonDetector("./person-detection-retail-0013", device, ie_core, THRESHOLD, num_requests=2)

personReidentification = PersonReidentification("./person-reidentification-retail-0079", device, ie_core, THRESHOLD, num_requests=2)
tracker = Tracker()


#MOVIE = "./video001.mp4"
MOVIE = "./video002.mp4"
SCALE = 0.3

cap = cv2.VideoCapture (MOVIE)

TRACKING_MAX=50
colors = []
for i in range(TRACKING_MAX):
    b = random.randint(0, 255)
    g = random.randint(0, 255)
    r = random.randint(0, 255) 
    colors.append((b,g,r))

while True:
        
    grabbed, frame = cap.read()
    if not grabbed:# ループ再生
        cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
        continue
    if(frame is None):
        continue

    # Personを検知する
    persons = []
    detections =  person_detector.infer(frame)
    if(len(detections) > 0):
        print("-------------------")
        for detection in detections:
            x1 = int(detection[0])
            y1 = int(detection[1])
            x2 = int(detection[2])
            y2 = int(detection[3])
            conf = detection[4]
            print("{:.1f} ({},{})-({},{})".format(conf, x1, y1, x2, y2))
            persons.append([x1,y1,x2,y2])

    print("====================")
    # 各Personの画像から識別情報を取得する
    identifys = np.zeros((len(persons), 255))
    for i, person in enumerate(persons):
        # 各Personのimage取得
        img = frame[person[1] : person[3], person[0]: person[2]]
        h, w = img.shape[:2]
        if(h==0 or w==0):
            continue
        # identification取得
        identifys[i] = personReidentification.infer(img)

    # Idの取得
    ids = tracker.getIds(identifys, persons)
    
    # 枠及びIdを画像に追加
    for i, person in enumerate(persons):
        if(ids[i]!=-1):
            color = colors[int(ids[i])]
            frame = cv2.rectangle(frame, (person[0], person[1]), (person[2] ,person[3]), color, int(50 * SCALE))
            frame = cv2.putText(frame, str(ids[i]),  (person[0], person[1]), cv2.FONT_HERSHEY_PLAIN, int(50 * SCALE), color, int(30 * SCALE), cv2.LINE_AA )
    
    # 画像の縮小
    h, w = frame.shape[:2]
    frame = cv2.resize(frame, ((int(w * SCALE), int(h * SCALE))))
    # 画像の表示
    cv2.imshow('frame', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

こちらは、OpenVINOでコンピュータビジョン関連のモデルを使用する場合の、ベースとなるクラスです。

model.py

#
# 下記のコードを参考にさせて頂きました。
# https://github.com/openvinotoolkit/open_model_zoo/blob/master/demos/python_demos/asl_recognition_demo/asl_recognition_demo/common.py
#

class Model:
    def __init__(self, model_path, device, ie_core, num_requests, output_shape=None):
        if model_path.endswith((".xml", ".bin")):
            model_path = model_path[:-4]
        self.net = ie_core.read_network(model_path + ".xml", model_path + ".bin")
        assert len(self.net.input_info) == 1, "One input is expected"

        supported_layers = ie_core.query_network(self.net, device)
        not_supported_layers = [l for l in self.net.layers.keys() if l not in supported_layers]
        if len(not_supported_layers) > 0:
            raise RuntimeError("Following layers are not supported by the {} plugin:\n {}"
                               .format(device, ', '.join(not_supported_layers)))

        self.exec_net = ie_core.load_network(network=self.net,
                                             device_name=device,
                                             num_requests=num_requests)

        self.input_name = next(iter(self.net.input_info))
        if len(self.net.outputs) > 1:
            if output_shape is not None:
                candidates = []
                for candidate_name in self.net.outputs:
                    candidate_shape = self.exec_net.requests[0].output_blobs[candidate_name].buffer.shape
                    if len(candidate_shape) != len(output_shape):
                        continue

                    matches = [src == trg or trg < 0
                               for src, trg in zip(candidate_shape, output_shape)]
                    if all(matches):
                        candidates.append(candidate_name)

                if len(candidates) != 1:
                    raise Exception("One output is expected")

                self.output_name = candidates[0]
            else:
                raise Exception("One output is expected")
        else:
            self.output_name = next(iter(self.net.outputs))

        self.input_size = self.net.input_info[self.input_name].input_data.shape
        self.output_size = self.exec_net.requests[0].output_blobs[self.output_name].buffer.shape
        self.num_requests = num_requests

    def infer(self, data):
        input_data = {self.input_name: data}
        infer_result = self.exec_net.infer(input_data)
        return infer_result[self.output_name]

6 最後に

今回は、動画から人物追跡を行うプログラムを試してみました。

データベースの類似性判断や更新は、非常にデリケートであり、汎用的なものを作るのは、ちょっと難しいと感じています。

可能な限り、人の入れ替わり避けたいのか、あるいは、未検出を避けたいのかなど、用途によって、そのパラメータやロジックも変わってくると思います。

また、前段階の人物検出も、その信頼度のレベルをどれぐらいにするかは、適用場面によって調整が必要です。

ショッピングモールなどで、多数の人が小さく写っている場合は、人物検出の信頼度を少し下げないと、未検出が多数発生する事も予想されます。

色々試してみましたが、奥が深いです。

7 参考にさせて頂いたページ


Pedestrian Tracking Demo | OpenVINO™ toolkit | Ep. 23 | Intel Software
Pedestrian Tracker C++ Demo
OpenVINO の Person-reidentification(人再認識)モデルを使って人を追跡する
Github kodamap/person_reidentification
CVPR2019でPerson Re-Identificationの話をしている論文全てに1人で目を通す(加筆修正中)