Person Re-Identification 複数カメラで人物の追跡をしてみました

複数カメラで人物の追跡ができると、ちょっとPerson Re-Identificationの威力を感じます。
2020.09.15

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX事業本部の平内(SIN)です。

人物や、顔の画像から特徴を出力し、その類似性で人物の追跡が可能です。

下記は、それぞれ、人物の画像と顔の画像で試してみたのです。

今回は、複数のカメラから撮影された映像で人物の追跡を試してみました。

最初に試してみた様子です。

動画は、EPFLのComputer Vision LaboratoryにあるMulti-camera pedestrians videoから利用させて頂きました。

2 モデル

仕組みとしては、下記の2つのモデルが使用されています。

モデル及び、その適用方法は、Person Re-Identificationで人物を追跡してみましたと同じですので、詳しくは、こちらをご参照下さい。

3 識別情報のデータベース

人物を検出し、その画像を個人識別のデータベースと照合し、類似性から固有のインデックスを取得するわけですが、そのデータベースを3つの動画で共有しています。データベースへの追加や、インデックス検出は、分野(動画の種類)に応じて、ロジックが変わりますが、今回は、以下のような内容になっています。

(1) バウンディングボックの大きさ

一定以上の大きさのバウンディングボックスのものだけを対象としています。遠くに離れて小さく写った人物は、そもそも検出も評価も、精度が低くなるはずなので、一定の大きさ以下は、無視するようになっています。

(2) 人物検出の信頼度

更新は、人物検出の信頼度が0.95以上のものだけを使用していま。最初の段階で人物検出が行われますが、スレッシュホールドレベルとして0.5以上の信頼度のもの表示対象としていますが、識別情報のデータベースの精度を上げるために、データベースの更新は、0.95以上を対象としています。

(3) 人物検出の重なり

更新は、バウンディックスボックスが重複のないものだけ使用しています。信頼度が、0.95以上であっても、他の検出と重複部分のあるデータは、個人の識別情報としては、やや汚染の危険性が有るということで、対象外としました。

(4) 人物検出の信頼度

人物検出の信頼度が高いデータを優先しています。識別情報のデータベースには、人物検出の信頼度も記録し、より高い信頼度のもの更新対象にしています。

(5) 類似度 0.9以上で更新

類似度が0.9以上で、かつ、上記の条件をクリアしたものだけが、データベースに保存されます。

(6) 類似度 0.15以下で新規

類似度が、0.15以下の場合、初めての人物が登場したと判断しデータベースに追加しています。 実は、この閾値が、一番重要だと思います。低く設定しすぎると、新しい人物が検出できなくなり、高くしすぎると、同じ人物の情報が複数生成されてしまい表示が乱れます。

4 コード

作成したコードです。

PersonDetector及び、PersonReidentificationは、それぞれ、人物検出と個人識別モデルを使用するクラスです。

Tracker内に、データベース化された識別情報が格納されており、getIds()でインデックスの取得及び、データベースの更新が行われています。

import numpy as np
import time
import random
import cv2
from openvino.inference_engine import IEPlugin  
import subprocess
import sys
from model import Model

class ParsonDetector(Model):

    def __init__(self, plugin, model_path, threshold, num_requests=2):
        super().__init__(plugin, model_path, num_requests, None)
        _, _, h, w = self.input_size
        self.__input_height = h
        self.__input_width = w
        self.__threshold = threshold

    def __prepare_frame(self, frame):
        initial_h, initial_w = frame.shape[:2]
        scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width)
        in_frame = cv2.resize(frame, (self.__input_width, self.__input_height))
        in_frame = in_frame.transpose((2, 0, 1))
        in_frame = in_frame.reshape(self.input_size)
        return in_frame, scale_h, scale_w

    def infer(self, frame):
        in_frame, _, _ = self.__prepare_frame(frame)
        result = super().infer(in_frame)

        detections = []
        height, width = frame.shape[:2]
        for r in result[0][0]:
            conf = r[2]
            if(conf > self.__threshold):
                x1 = int(r[3] * width)
                y1 = int(r[4] * height)
                x2 = int(r[5] * width)
                y2 = int(r[6] * height)
                detections.append([x1, y1, x2, y2, conf])
        return detections

class ParsonReidentification(Model):

    def __init__(self, plugin, model_path, num_requests=2):
        super().__init__(plugin, model_path, num_requests, None)
        _, _, h, w = self.input_size
        self.__input_height = h
        self.__input_width = w

    def __prepare_frame(self, frame):
        initial_h, initial_w = frame.shape[:2]
        scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width)
        in_frame = cv2.resize(frame, (self.__input_width, self.__input_height))
        in_frame = in_frame.transpose((2, 0, 1))
        in_frame = in_frame.reshape(self.input_size)
        return in_frame, scale_h, scale_w

    def infer(self, frame):
        in_frame, _, _ = self.__prepare_frame(frame)
        result =  super().infer(in_frame)
        return np.delete(result, 1)

class Tracker:
    def __init__(self):
        # 識別情報のDB
        self.identifysDb = None
        # 人物検出の信頼度
        self.conf = []

    def __isOverlap(self, persons, index):
        [x1, y1, x2, y2, conf] = persons[index]
        for i, person in enumerate(persons):
            if(index == i):
                continue
            if(max(person[0], x1) <= min(person[2], x2) and max(person[1], y1) <= min(person[3], y2)):
                return True
        return False

    def getIds(self, identifys, persons):
        if(identifys.size==0):
            return []
        if self.identifysDb is None:
            self.identifysDb = identifys
            for person in persons:
                self.conf.append(person[4])

        print("input: {} DB:{}".format(len(identifys), len(self.identifysDb)))
        similaritys = self.__cos_similarity(identifys, self.identifysDb)
        similaritys[np.isnan(similaritys)] = 0
        ids = np.nanargmax(similaritys, axis=1)

        for i, similarity in enumerate(similaritys):

            # DBの更新と追加は、人物検出の信頼度が0.95以上のものだけ
            if(persons[i][4] < 0.95): 
                continue
            # DBの更新と追加は、バウンディングボックスに重なりがないものだけ
            if(self.__isOverlap(persons, i)):
                continue 

            persionId = ids[i]
            print("persionId:{} {}".format(persionId,similarity[persionId]))

            # 0.9以上
            if(similarity[persionId] > 0.9):
                # DBの更新は、信頼度が既存のものより高い場合だけ
                if(persons[i][4] > self.conf[persionId]):
                    self.identifysDb[persionId] = identifys[i]
            # 0.15以下で新規に登録する
            elif(similarity[persionId] < 0.15):
                print("similarity:{}".format(similarity[persionId]))
                self.identifysDb = np.vstack((self.identifysDb, identifys[i]))
                self.conf.append(persons[i][4])
                ids[i] = len(self.identifysDb) - 1
                print("> append DB size:{}".format(len(self.identifysDb)))

        print(ids)
        # 重複がある場合は、信頼度の低い方を無効化する
        for i, a in enumerate(ids):
            for e, b in enumerate(ids):
                if(e == i):
                    continue
                if(a == b):
                    if(similarity[a] > similarity[b]):
                        ids[i] = -1
                    else:
                        ids[e] = -1
        print(ids)
        return ids

    # コサイン類似度
    # 参考にさせて頂きました: https://github.com/kodamap/person_reidentification
    def __cos_similarity(self, X, Y):
        m = X.shape[0]
        Y = Y.T
        return np.dot(X, Y) / (
            np.linalg.norm(X.T, axis=0).reshape(m, 1) * np.linalg.norm(Y, axis=0)
        )

# MacOS
device = "CPU"
plugin_dirs = "/opt/intel/openvino/deployment_tools/inference_engine/lib/intel64"
modelPath = "./FP32/"
# RespberryPi
p = subprocess.run (('uname', '-a'), stdout = subprocess.PIPE, stderr = subprocess.PIPE)
uname = p.stdout.decode()
if("armv7l GNU/Linux" in uname):
    device = "MYRIAD"
    plugin_dirs = "/opt/intel/openvino/inference_engine/lib/armv7l"
    modelPath = "./FP16/"

plugin = IEPlugin(device=device, plugin_dirs = plugin_dirs)

THRESHOLD= 0.5
person_detector = ParsonDetector(plugin, modelPath + "person-detection-retail-0013", THRESHOLD)
personReidentification = ParsonReidentification(plugin, modelPath + "person-reidentification-retail-0270")
tracker = Tracker()

MOVIES = ["video/campus4-c0.mp4", "video/campus4-c1.mp4", "video/campus4-c2.mp4"]

SCALE = 1.5
caps = []
for i in range(len(MOVIES)):
    caps.append(cv2.VideoCapture (MOVIES[i]))

colors = []
colors.append((255,255,255))
colors.append((80,80,255))
colors.append((255,255,80))
colors.append((255,80,255))
colors.append((80,255,80))
colors.append((128,80,80))
colors.append((128,128,80))
colors.append((128,128,128))

frames = []
for i in range(len(MOVIES)):
    frames.append(None)

while True:

    for i in range(len(MOVIES)):
        grabbed, frames[i] = caps[i].read()
        if not grabbed:
            break
    if not grabbed:# ループ再生
        for cap in caps:
            cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
        continue

    for frame in frames:
        # Personを検知する
        persons = []
        detections =  person_detector.infer(frame)
        if(len(detections) > 0):
            print("-------------------")
            for detection in detections:
                x1 = int(detection[0])
                y1 = int(detection[1])
                x2 = int(detection[2])
                y2 = int(detection[3])
                conf = detection[4]
                print("{:.1f} ({},{})-({},{})".format(conf, x1, y1, x2, y2))
                h = y2- y1
                if(h<50):
                    print("? HEIGHT:{}".format(h))
                else:
                    print("{:.1f} ({},{})-({},{})".format(conf, x1, y1, x2, y2))
                    persons.append([x1, y1, x2, y2, conf])

        print("====================")
        # 各Personの画像から識別情報を取得する
        identifys = np.zeros((len(persons), 255))
        for i, person in enumerate(persons):
            # 各Personのimage取得
            img = frame[person[1] : person[3], person[0]: person[2]]
            h, w = img.shape[:2]
            if(h==0 or w==0):
                continue
            # identification取得
            identifys[i] = personReidentification.infer(img)

        # Idの取得
        ids = tracker.getIds(identifys, persons)

        # 枠及びIdを画像に追加
        for i, person in enumerate(persons):
            if(ids[i]!=-1):
                color = colors[int(ids[i])]
                frame = cv2.rectangle(frame, (person[0], person[1]), (person[2] ,person[3]), color, 2)
                frame = cv2.putText(frame, str(ids[i]),  (person[0], person[1]), cv2.FONT_HERSHEY_PLAIN, 2, color, 1, cv2.LINE_AA )


    # 画像の縮小
    h, w = frames[0].shape[:2]
    for i, frame in enumerate(frames):
        frames[i] = cv2.resize(frame, ((int(w * SCALE), int(h * SCALE))))

    m_h = cv2.hconcat([frames[0], frames[1], frames[2]])
    cv2.imshow('frame2', m_h)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

こちらは、OpenVINOでコンピュータビジョン関連のモデルを使用する場合の、ベースとなるクラスです。

model.py

#
# 下記のコードを参考にさせて頂きました。
# https://github.com/openvinotoolkit/open_model_zoo/blob/master/demos/python_demos/asl_recognition_demo/asl_recognition_demo/common.py
#

from openvino.inference_engine import IENetwork

class Model:
    def __init__(self, plugin, model_path, num_requests, output_shape=None):

        if model_path.endswith((".xml", ".bin")):
            model_path = model_path[:-4]

        model = IENetwork(model_path + ".xml", model_path + ".bin")    
        self.net = plugin.load(network=model)

        assert len(self.net.input_info) == 1, "One input is expected"

        self.input_name = next(iter(self.net.input_info))
        if len(self.net.outputs) > 1:
            if output_shape is not None:
                candidates = []
                for candidate_name in self.net.outputs:
                    candidate_shape = self.exec_net.requests[0].output_blobs[candidate_name].buffer.shape
                    if len(candidate_shape) != len(output_shape):
                        continue

                    matches = [src == trg or trg < 0
                               for src, trg in zip(candidate_shape, output_shape)]
                    if all(matches):
                        candidates.append(candidate_name)

                if len(candidates) != 1:
                    raise Exception("One output is expected")

                self.output_name = candidates[0]
            else:
                raise Exception("One output is expected")
        else:
            self.output_name = next(iter(self.net.outputs))

        self.input_size = self.net.input_info[self.input_name].input_data.shape
        self.output_size = self.net.requests[0].output_blobs[self.output_name].buffer.shape
        self.num_requests = num_requests

    def infer(self, data):
        input_data = {self.input_name: data}
        infer_result = self.net.infer(input_data)
        return infer_result[self.output_name]

5 最後に

今回は、別の角度から複数のカメラで撮影された動画で人物追跡を行うプログラムを試してみました。

複数カメラで追跡ができると、ちょっとPerson Re-Identificationの威力を感じました。

6 参考にさせて頂いたページ


Pedestrian Tracking Demo | OpenVINO™ toolkit | Ep. 23 | Intel Software
Pedestrian Tracker C++ Demo
OpenVINO の Person-reidentification(人再認識)モデルを使って人を追跡する
Github kodamap/person_reidentification
CVPR2019でPerson Re-Identificationの話をしている論文全てに1人で目を通す(加筆修正中)