Person Re-Identificationで人物を追跡してみました
1 はじめに
CX事業本部の平内(SIN)です。
機械学習で画像から人物を検出する事が可能ですが、検出された人物が、フレーム間で一致できれば、追跡やカウントも可能なります。
OpenVINO™ toolkitの紹介ビデオでは、ショッピングモールで、人物の追跡を行うデモが公開されています。
Pedestrian Tracking Demo | OpenVINO™ toolkit | Ep. 23 | Intel Software
今回は、上記(C++)を参考にさせて頂いて、Pythonで人物の追跡をやってみました。
最初に動作確認している様子です。まだまだですが・・・
動画は、Pixels Videosを利用させて頂きました。
2 しくみ
人物の追跡は、2つのモデルを組み合わせることで動作しています。
- 人物検出モデル
- 個人識別モデル
人物検出モデルで、人を検出しその部分を切り出します。 続いて、個人識別モデルで、部分画像の識別情報を生成し、インデックスを付けてデータベース化しています。
各フレームでは、人物画像をデータベースと照合し、類似性から同じ人物であるかどうかを判断してます。
3 モデル
推論のフレームワークは、OpenVINOを利用させて頂きました。
OpenVINOでは、色々な機械学習フレームワークで作成されたモデルを、中間表現(IR)フォーマットに変換して推論に使用していますが、既に、変換されたモデルがインテルによって公開されており、今回は、ここから利用させて頂いています。
open_model_zoo
(1) person-detection-retail-0013
Caffeで作成された、小売シナリオの歩行者検知器モデルです。
person-detection-retail-0013
入出力は、以下の通りです。
Input
[B✕C✕H✕W][1x3x320x544] Color: BGR
- B - batch size
- C - number of channels
- H - image height
- W - image width
Outputs
[1, 1, N, 7]
Nは検出された境界ボックスの数
[image_id, label, conf, x_min, y_min, x_max, y_max]
- image_id - バッチ内の画像のID
- label - クラスID
- conf - 信頼度
- (x_min, y_min) - 境界ボックス
- (x_max, y_max) - 境界ボックス
(2) person-reidentification-retail-0265
PyTorchで作成された一般的なシナリオの個人識別モデルです。
person-reidentification-retail-0265
入出力は、以下の通りです。
Input
[B✕C✕H✕W][1x3x256x128] Color: BGR
- B - batch size
- C - number of channels
- H - image height
- W - image width
Outputs
[1、256] コサイン類似度で他の識別子と比較できます
4 識別情報のデータベース
実は、識別情報にインデックスを付けて、データベース化するところが、結構複雑です。そして、適用する分野(動画の種類)に応じて、ロジックの変更もきっと必要になると思います。
(1) 高い類似度
高い類似度でインデックスが得られた場合は、そのインデックスをそのまま利用すればい良いので、特に考慮は必要ありません。
(2) 低い類似度
低い類似度でインデックスが得られた場合は、たまたま、画像の類似性が低いだけなのか、それとも、別の人物なのかの判断が必要になります。
類似性が低いというだけで単純にデータベースに追加してしまうと、同じ人物のインデックスが複数となってしまい、その後の判定に悪影響があります。
ここでは、データベースに位置情報を追加し、最後に検出された位置を記録しています。類似度が低かった場合、その距離で、同一人物か、別人なのかを判定しています。
別人と判定された場合は、データベースにインデックスが追加されます。
(3) データベース更新
データベースの識別情報の更新にも注意が必要です。
フレームによっては、他の人物と重なっていたりして、その特徴をうまく表現できていない場合もあります。
そこで、他の人物と重なりがない場合のみ、その識別情報を更新することにしています。
5 コード
作成したコードです。
PersonDetector及び、PersonReidentificationは、それぞれ、人物検出と個人識別モデルを使用するクラスです。
Tracker内に、データベース化された識別情報が格納されており、getIds()でインデックスの取得及び、データベースの更新が行われています。
index.py
import numpy as np import time import random import cv2 from openvino.inference_engine import IECore from model import Model class PersonDetector(Model): def __init__(self, model_path, device, ie_core, threshold, num_requests): super().__init__(model_path, device, ie_core, num_requests, None) _, _, h, w = self.input_size self.__input_height = h self.__input_width = w self.__threshold = threshold def __prepare_frame(self, frame): initial_h, initial_w = frame.shape[:2] scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width) in_frame = cv2.resize(frame, (self.__input_width, self.__input_height)) in_frame = in_frame.transpose((2, 0, 1)) in_frame = in_frame.reshape(self.input_size) return in_frame, scale_h, scale_w def infer(self, frame): in_frame, _, _ = self.__prepare_frame(frame) result = super().infer(in_frame) detections = [] height, width = frame.shape[:2] for r in result[0][0]: conf = r[2] if(conf > self.__threshold): x1 = int(r[3] * width) y1 = int(r[4] * height) x2 = int(r[5] * width) y2 = int(r[6] * height) detections.append([x1, y1, x2, y2, conf]) return detections class PersonReidentification(Model): def __init__(self, model_path, device, ie_core, threshold, num_requests): super().__init__(model_path, device, ie_core, num_requests, None) _, _, h, w = self.input_size self.__input_height = h self.__input_width = w self.__threshold = threshold def __prepare_frame(self, frame): initial_h, initial_w = frame.shape[:2] scale_h, scale_w = initial_h / float(self.__input_height), initial_w / float(self.__input_width) in_frame = cv2.resize(frame, (self.__input_width, self.__input_height)) in_frame = in_frame.transpose((2, 0, 1)) in_frame = in_frame.reshape(self.input_size) return in_frame, scale_h, scale_w def infer(self, frame): in_frame, _, _ = self.__prepare_frame(frame) result = super().infer(in_frame) return np.delete(result, 1) class Tracker: def __init__(self): # 識別情報のDB self.identifysDb = None # 中心位置のDB self.center = [] def __getCenter(self, person): x = person[0] - person[2] y = person[1] - person[3] return (x,y) def __getDistance(self, person, index): (x1, y1) = self.center[index] (x2, y2) = self.__getCenter(person) a = np.array([x1, y1]) b = np.array([x2, y2]) u = b - a return np.linalg.norm(u) def __isOverlap(self, persons, index): [x1, y1, x2, y2] = persons[index] for i, person in enumerate(persons): if(index == i): continue if(max(person[0], x1) <= min(person[2], x2) and max(person[1], y1) <= min(person[3], y2)): return True return False def getIds(self, identifys, persons): if(identifys.size==0): return [] if self.identifysDb is None: self.identifysDb = identifys for person in persons: self.center.append(self.__getCenter(person)) print("input: {} DB:{}".format(len(identifys), len(self.identifysDb))) similaritys = self.__cos_similarity(identifys, self.identifysDb) similaritys[np.isnan(similaritys)] = 0 ids = np.nanargmax(similaritys, axis=1) for i, similarity in enumerate(similaritys): persionId = ids[i] d = self.__getDistance(persons[i], persionId) print("persionId:{} {} distance:{}".format(persionId,similarity[persionId], d)) # 0.95以上で、重なりの無い場合、識別情報を更新する if(similarity[persionId] > 0.95): if(self.__isOverlap(persons, i) == False): self.identifysDb[persionId] = identifys[i] # 0.5以下で、距離が離れている場合、新規に登録する elif(similarity[persionId] < 0.5): if(d > 500): print("distance:{} similarity:{}".format(d, similarity[persionId])) self.identifysDb = np.vstack((self.identifysDb, identifys[i])) self.center.append(self.__getCenter(persons[i])) ids[i] = len(self.identifysDb) - 1 print("> append DB size:{}".format(len(self.identifysDb))) print(ids) # 重複がある場合は、信頼度の低い方を無効化する for i, a in enumerate(ids): for e, b in enumerate(ids): if(e == i): continue if(a == b): if(similarity[a] > similarity[b]): ids[i] = -1 else: ids[e] = -1 print(ids) return ids # コサイン類似度 # 参考にさせて頂きました: https://github.com/kodamap/person_reidentification def __cos_similarity(self, X, Y): m = X.shape[0] Y = Y.T return np.dot(X, Y) / ( np.linalg.norm(X.T, axis=0).reshape(m, 1) * np.linalg.norm(Y, axis=0) ) device = "CPU" cpu_extension = None ie_core = IECore() if device == "CPU" and cpu_extension: ie_core.add_extension(cpu_extension, "CPU") THRESHOLD= 0.8 person_detector = PersonDetector("./person-detection-retail-0013", device, ie_core, THRESHOLD, num_requests=2) personReidentification = PersonReidentification("./person-reidentification-retail-0079", device, ie_core, THRESHOLD, num_requests=2) tracker = Tracker() #MOVIE = "./video001.mp4" MOVIE = "./video002.mp4" SCALE = 0.3 cap = cv2.VideoCapture (MOVIE) TRACKING_MAX=50 colors = [] for i in range(TRACKING_MAX): b = random.randint(0, 255) g = random.randint(0, 255) r = random.randint(0, 255) colors.append((b,g,r)) while True: grabbed, frame = cap.read() if not grabbed:# ループ再生 cap.set(cv2.CAP_PROP_POS_FRAMES, 0) continue if(frame is None): continue # Personを検知する persons = [] detections = person_detector.infer(frame) if(len(detections) > 0): print("-------------------") for detection in detections: x1 = int(detection[0]) y1 = int(detection[1]) x2 = int(detection[2]) y2 = int(detection[3]) conf = detection[4] print("{:.1f} ({},{})-({},{})".format(conf, x1, y1, x2, y2)) persons.append([x1,y1,x2,y2]) print("====================") # 各Personの画像から識別情報を取得する identifys = np.zeros((len(persons), 255)) for i, person in enumerate(persons): # 各Personのimage取得 img = frame[person[1] : person[3], person[0]: person[2]] h, w = img.shape[:2] if(h==0 or w==0): continue # identification取得 identifys[i] = personReidentification.infer(img) # Idの取得 ids = tracker.getIds(identifys, persons) # 枠及びIdを画像に追加 for i, person in enumerate(persons): if(ids[i]!=-1): color = colors[int(ids[i])] frame = cv2.rectangle(frame, (person[0], person[1]), (person[2] ,person[3]), color, int(50 * SCALE)) frame = cv2.putText(frame, str(ids[i]), (person[0], person[1]), cv2.FONT_HERSHEY_PLAIN, int(50 * SCALE), color, int(30 * SCALE), cv2.LINE_AA ) # 画像の縮小 h, w = frame.shape[:2] frame = cv2.resize(frame, ((int(w * SCALE), int(h * SCALE)))) # 画像の表示 cv2.imshow('frame', frame) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows()
こちらは、OpenVINOでコンピュータビジョン関連のモデルを使用する場合の、ベースとなるクラスです。
model.py
# # 下記のコードを参考にさせて頂きました。 # https://github.com/openvinotoolkit/open_model_zoo/blob/master/demos/python_demos/asl_recognition_demo/asl_recognition_demo/common.py # class Model: def __init__(self, model_path, device, ie_core, num_requests, output_shape=None): if model_path.endswith((".xml", ".bin")): model_path = model_path[:-4] self.net = ie_core.read_network(model_path + ".xml", model_path + ".bin") assert len(self.net.input_info) == 1, "One input is expected" supported_layers = ie_core.query_network(self.net, device) not_supported_layers = [l for l in self.net.layers.keys() if l not in supported_layers] if len(not_supported_layers) > 0: raise RuntimeError("Following layers are not supported by the {} plugin:\n {}" .format(device, ', '.join(not_supported_layers))) self.exec_net = ie_core.load_network(network=self.net, device_name=device, num_requests=num_requests) self.input_name = next(iter(self.net.input_info)) if len(self.net.outputs) > 1: if output_shape is not None: candidates = [] for candidate_name in self.net.outputs: candidate_shape = self.exec_net.requests[0].output_blobs[candidate_name].buffer.shape if len(candidate_shape) != len(output_shape): continue matches = [src == trg or trg < 0 for src, trg in zip(candidate_shape, output_shape)] if all(matches): candidates.append(candidate_name) if len(candidates) != 1: raise Exception("One output is expected") self.output_name = candidates[0] else: raise Exception("One output is expected") else: self.output_name = next(iter(self.net.outputs)) self.input_size = self.net.input_info[self.input_name].input_data.shape self.output_size = self.exec_net.requests[0].output_blobs[self.output_name].buffer.shape self.num_requests = num_requests def infer(self, data): input_data = {self.input_name: data} infer_result = self.exec_net.infer(input_data) return infer_result[self.output_name]
6 最後に
今回は、動画から人物追跡を行うプログラムを試してみました。
データベースの類似性判断や更新は、非常にデリケートであり、汎用的なものを作るのは、ちょっと難しいと感じています。
可能な限り、人の入れ替わり避けたいのか、あるいは、未検出を避けたいのかなど、用途によって、そのパラメータやロジックも変わってくると思います。
また、前段階の人物検出も、その信頼度のレベルをどれぐらいにするかは、適用場面によって調整が必要です。
ショッピングモールなどで、多数の人が小さく写っている場合は、人物検出の信頼度を少し下げないと、未検出が多数発生する事も予想されます。
色々試してみましたが、奥が深いです。
7 参考にさせて頂いたページ
Pedestrian Tracking Demo | OpenVINO™ toolkit | Ep. 23 | Intel Software
Pedestrian Tracker C++ Demo
OpenVINO の Person-reidentification(人再認識)モデルを使って人を追跡する
Github kodamap/person_reidentification
CVPR2019でPerson Re-Identificationの話をしている論文全てに1人で目を通す(加筆修正中)