1 はじめに
商品の陳列で、準備した場所に、想定外の商品を戻されたりすると、商品名や価格の表示など、色々と問題があります。 また、Developers.IO CAFEのようなレジレスの仕組みでは、結構重要な問題となってしまいます。しかし、常に正常な状態であることを、人力で確認するのは、結構コストが高いように感じます。
最初に、動作している様子です。 上の大きな画面は、カメラが撮影している動画です。そして、下にある小さな画面が、各商品をそれぞれ認識している画面です。 各商品は、マーカーから算出された相対位置にあるものとして、認識されています。また、商品画面は、手の出入りなど、変化するものは、無視するようになっています。
2 商品の位置検出
参考:[OpenCV] ArUcoマーカーを使用して、売り切れ商品を検出してみました。
正確に計算するには、まず、マーカーの設置位置と商品の関係位置に保証が必要になります。また、カメラの画像との関係でいうと、3次元で計算する必要もありそうです。 正直大変なので、今回は、その辺、全く気にせずに、4箇所の基準位置からの相対的な関係だけで、簡易に算出しています。
# 商品エリアの取得 def getTargetArea(basis, L, T, W, H): x0 = basis[0][0] y0 = basis[0][1] x1 = basis[1][0] y1 = basis[1][1] x2 = basis[2][0] y2 = basis[2][1] x3 = basis[3][0] y3 = basis[3][1] w0 = x1-x0 w1 = x3-x2 h0 = y2-y0 h1 = y3-y1 target = [] x = x0 + w0 * L y = y0 + h0 * T target.append([x,y]) x = x1 - w0 * (1-L-W) y = y1+ h1 * T target.append([x,y]) x = x2 + w1 * L y = y2 - h0 * (1- T-H) target.append([x,y]) x = x3 - w1 * (1- L-W) y = y3 - h1 * (1- T-H) target.append([x,y]) return target T = 0.15 # 上からの相対位置 W = 0.16 # 幅 H = 0.7 # 高さ M = 0.06 # 幅(1商品分) for i in range(5): # 監視エリアの取得 L = 0.06 + (W * i ) # 左の相対位置 target = getTargetArea(basis, L, T, W + M, H)
def getTransformImage(target, frame, width, height): frame_coordinates = np.float32(target) target_coordinates = np.float32([[0, 0],[width, 0],[0, height],[width, height]]) trans_mat = cv2.getPerspectiveTransform(frame_coordinates,target_coordinates) return cv2.warpPerspective(frame, trans_mat, (width, height)) # 監視対象の画像を変形して表示 targetFrame = getTransformImage(target, frame, 260, 350)
3 不要データの排除
4 推論
推論のためのモデルは、Amazon SageMakerの組み込み物体検出アルゴリズムを使用して作成しました。
training_image = get_image_uri(sess.boto_region_name, 'object-detection', repo_version="latest")
def detectSageMaker(bytes, frame, i): response = client.invoke_endpoint(EndpointName=endpoint_name, ContentType='application/x-image', Body=bytes) result = response['Body'].read() result = json.loads(result) if(len(result["prediction"])>0): index = int(result["prediction"][0][0]) c = result["prediction"][0][1] w = round(result["prediction"][0][4], 2) h = round(result["prediction"][0][5], 2) name = categories[index] print("{} {} {}".format(i,index, c)) color = (255, 255, 255) if(name != categories[i]): color = (0, 0, 255) cv2.putText(frame, "{}".format(name), (5,30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, thickness=2) cv2.imshow(categories[i], frame)
<br /># 監視対象の画像(targetFrame)をファイルに保存する tmpFile = "/tmp/{}.jpg".format(i) cv2.imwrite(tmpFile, targetFrame) # 保存されたファイルを読み込む with open(tmpFile, 'rb') as f: payload = payload = bytearray(payload) # 推論にかける detectSageMaker(payload, targetFrame, i)
5 コード
全てのコードです。試験のため、Rekognition Custom Labelsで作成したモデルでも動作するようになっています。
import cv2 from boto3.session import Session import boto3 import json import asyncio # Rekognition Custom Labelsを使用するか、SageMakerを使用するかのフラグ isRekognition = False if(isRekognition == False): # SageMaker session = Session(profile_name='developer', region_name='ap-northeast-1') endpoint_name = 'sampleEndPoint' client = session.client('sagemaker-runtime') else: # Rekognition session = Session(profile_name='developer', region_name='us-east-1') client = session.client('rekognition', 'us-east-1') projectVersionArn = 'arn:aws:rekognition:us-east-1:xxxxxxxx:project/SAMPLE/version/2020-03-26T15.28.04/1585204084956'; from PIL import Image import numpy as np aruco = cv2.aruco dictionary = aruco.getPredefinedDictionary(aruco.DICT_4X4_50) # 2値化 def binarization(img, threshold=100): ret, img = cv2.threshold(img, threshold, 255, cv2.THRESH_BINARY) return img # 差分を数値化 def getDiff(img1, img2): # グレースケール変換 img1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) img2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) # 差分取得 mask = cv2.absdiff(img1, img2) # 2値化 mask = binarization(mask) return cv2.countNonZero(mask) # 白の要素数 # インデックスを指定してマーカーの中心座標を取得する def getMarkerMean(ids, corners, index): for i, id in enumerate(ids): # マーカーのインデックス検索 if(id[0] == index): v = np.mean(corners[i][0],axis=0) # マーカーの四隅の座標から中心の座標を取得する return [v[0],v[1]] return None # 基準となるマーカーの取得 def getBasisMarker(ids, corners): # 左上、右上、左下、右下の順にマーカーの「中心座標」を取得 basis = [] basis.append(getMarkerMean(ids, corners, 5)) basis.append(getMarkerMean(ids, corners, 6)) basis.append(getMarkerMean(ids, corners, 7)) basis.append(getMarkerMean(ids, corners, 8)) return basis # 商品エリアの取得 def getTargetArea(basis, L, T, W, H): x0 = basis[0][0] y0 = basis[0][1] x1 = basis[1][0] y1 = basis[1][1] x2 = basis[2][0] y2 = basis[2][1] x3 = basis[3][0] y3 = basis[3][1] w0 = x1-x0 w1 = x3-x2 h0 = y2-y0 h1 = y3-y1 target = [] x = x0 + w0 * L y = y0 + h0 * T target.append([x,y]) x = x1 - w0 * (1-L-W) y = y1+ h1 * T target.append([x,y]) x = x2 + w1 * L y = y2 - h0 * (1- T-H) target.append([x,y]) x = x3 - w1 * (1- L-W) y = y3 - h1 * (1- T-H) target.append([x,y]) return target def getTransformImage(target, frame, width, height): frame_coordinates = np.float32(target) target_coordinates = np.float32([[0, 0],[width, 0],[0, height],[width, height]]) trans_mat = cv2.getPerspectiveTransform(frame_coordinates,target_coordinates) return cv2.warpPerspective(frame, trans_mat, (width, height)) def show(frame): # フレームを表示 resized_frame = cv2.resize(frame,(int(width/2), int(height/2))) cv2.imshow("Flame", resized_frame) cv2.waitKey(1) def detectSageMaker(bytes, frame, i): response = client.invoke_endpoint(EndpointName=endpoint_name, ContentType='application/x-image', Body=bytes) result = response['Body'].read() result = json.loads(result) if(len(result["prediction"])>0): index = int(result["prediction"][0][0]) c = result["prediction"][0][1] w = round(result["prediction"][0][4], 2) h = round(result["prediction"][0][5], 2) name = categories[index] print("{} {} {}".format(i,index, c)) color = (255, 255, 255) if(name != categories[i]): color = (0, 0, 255) cv2.putText(frame, "{}".format(name), (5,30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, thickness=2) cv2.imshow(categories[i], frame) def detectRekognition(bytes, frame, i): response = client.detect_custom_labels( ProjectVersionArn=projectVersionArn, Image = { "Bytes": bytes, }, MinConfidence = 0 ) customLabels = response["CustomLabels"] for label in customLabels: name = '' c = 0 box = label["Geometry"]["BoundingBox"] w = box["Width"] h = box["Height"] if(0.5 < h and 0.5 < w): name = label["Name"] c = label["Confidence"] print("[{}] {} {}".format(i, name, c)) break color = (255, 255, 255) if(name != categories[i]): color = (0, 0, 255) cv2.putText(frame, "{} {}".format(name, round(c, 1)), (5,30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, thickness=2) cv2.imshow(categories[i], frame) span = 30 # 静止間隔 threshold = 3000 # 変化の敷居値 categories = ['ASPARA','CRATZ','OREO','PRETZEL','PRIME'] # 動画ファイルのキャプチャ width = 2304 height = 1536 cap = cv2.VideoCapture(0) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) cap.set(cv2.CAP_PROP_FPS, 30) # 最初のフレームを背景画像に設定 ret, previous = counter=0 ColorCyan = (255, 255, 0) while(cap.isOpened()): # フレームの取得 ret, frame = # マーカ検出 corners, ids, rejectedImgPoints = aruco.detectMarkers(frame, dictionary) if (ids is None or len(ids)!=4) : show(frame) continue # 基準となる四隅のマーカーを取得 basis = getBasisMarker(ids, corners) # 差分計算 diff = getDiff(previous, frame) if(diff < threshold): counter+=1 else: counter=0 print("diff:{} counter:{}".format(diff,counter)) # 一定以下の変化量が、一定時間続いたら描画する if(span < counter): counter = 0 T = 0.15 W = 0.16 M = 0.06 H = 0.7 for i in range(5): # 監視エリアの取得 L = 0.06 + (W * i ) target = getTargetArea(basis, L, T, W + M, H) # 監視対象の画像を変形して表示 targetFrame = getTransformImage(target, frame, 260, 350) tmpFile = "/tmp/{}.jpg".format(i) cv2.imwrite(tmpFile, targetFrame) with open(tmpFile, 'rb') as f: payload = payload = bytearray(payload) if(isRekognition == False): # SageMaker detectSageMaker(payload, targetFrame, i) else: # Rekognition detectRekognition(payload, targetFrame, i) # 基準線の描画 if(basis[0]!=None and basis[1]!=None and basis[2]!=None and basis[3]!=None ): cv2.line(frame, (basis[0][0], basis[0][1]), (basis[1][0], basis[1][1]), ColorCyan, thickness=1, lineType=cv2.LINE_4) cv2.line(frame, (basis[0][0], basis[0][1]), (basis[2][0], basis[2][1]), ColorCyan, thickness=1, lineType=cv2.LINE_4) cv2.line(frame, (basis[0][0], basis[0][1]), (basis[3][0], basis[3][1]), ColorCyan, thickness=1, lineType=cv2.LINE_4) #マーカ描画 aruco.drawDetectedMarkers(frame, corners, ids, ColorCyan) # 今回のフレームを1つ前として保存する previous = frame # フレームを表示 show(frame) cap.release() cv2.destroyAllWindows()
6 最後に
モデルをSageMakerで作成すると、Rekognition Custom Labelsと比較して、どうしても精度が低くなり、私自身の技術力の低さを痛感しています。しかし、最終的に推論をエッジ側で行えるよう、なんとか、試行錯誤したいと思います。(救済大歓迎)