OpenCVとSageMakerで商品が、間違った場所に置かれた事を検出してみました。

2020.04.01

1 はじめに

CX事業本部の平内(SIN)です。

商品の陳列で、準備した場所に、想定外の商品を戻されたりすると、商品名や価格の表示など、色々と問題があります。 また、Developers.IO CAFEのようなレジレスの仕組みでは、結構重要な問題となってしまいます。しかし、常に正常な状態であることを、人力で確認するのは、結構コストが高いように感じます。

今回は、実験的ですが、この問題を解決する仕組みを検討してみました。

最初に、動作している様子です。 上の大きな画面は、カメラが撮影している動画です。そして、下にある小さな画面が、各商品をそれぞれ認識している画面です。 各商品は、マーカーから算出された相対位置にあるものとして、認識されています。また、商品画面は、手の出入りなど、変化するものは、無視するようになっています。

商品認識の画面は、静止するごとに、機械学習(オブジェクト検出)で推論を行い、写っている商品が想定外のものである場合、商品なを赤色で表示しました。

2 商品の位置検出

推論の精度を上げるため、対象を商品が表示されている位置のみに限定してます。

商品の表示位置は、カメラの微動等によって、画面中の位置が変わるため、OpenCVのArUcoマーカーを基準として使用しています。

参考:[OpenCV] ArUcoマーカーを使用して、売り切れ商品を検出してみました。

下図は、4箇所の基準マーカーを検出し、水色で表示している様子です。

基準となるマーカーは、カメラの角度によって、長方形に表示されるとは限りません。

左右が平行である保証もありません。

正確に計算するには、まず、マーカーの設置位置と商品の関係位置に保証が必要になります。また、カメラの画像との関係でいうと、3次元で計算する必要もありそうです。 正直大変なので、今回は、その辺、全く気にせずに、4箇所の基準位置からの相対的な関係だけで、簡易に算出しています。

# 商品エリアの取得
def getTargetArea(basis, L, T, W, H):
x0 = basis[0][0]
y0 = basis[0][1]
x1 = basis[1][0]
y1 = basis[1][1]
x2 = basis[2][0]
y2 = basis[2][1]
x3 = basis[3][0]
y3 = basis[3][1]
w0 = x1-x0
w1 = x3-x2
h0 = y2-y0
h1 = y3-y1

target = []
x = x0 + w0 * L
y = y0 + h0 * T
target.append([x,y])
x = x1 - w0 * (1-L-W)
y = y1+ h1 * T
target.append([x,y])
x = x2 + w1 * L
y = y2 - h0 * (1- T-H)
target.append([x,y])
x = x3 - w1 * (1- L-W)
y = y3 - h1 * (1- T-H)
target.append([x,y])
return target

T = 0.15 # 上からの相対位置
W = 0.16 # 幅
H = 0.7 # 高さ
M = 0.06 # 幅(1商品分)
for i in range(5):
# 監視エリアの取得
L = 0.06 + (W * i ) # 左の相対位置
target = getTargetArea(basis, L, T, W + M, H)

そして、取得したエリアを矩形に変改させています。

def getTransformImage(target, frame, width, height):
frame_coordinates = np.float32(target)
target_coordinates = np.float32([[0, 0],[width, 0],[0, height],[width, height]])
trans_mat = cv2.getPerspectiveTransform(frame_coordinates,target_coordinates)
return cv2.warpPerspective(frame, trans_mat, (width, height))

# 監視対象の画像を変形して表示
targetFrame = getTransformImage(target, frame, 260, 350)

各商品エリアは、別ウインドウで表示しています。

3 不要データの排除

商品とカメラとの間に、人や、手が映り込むことは十分に考えられます。そのような商品以外の画像を含めて推論することも、精度を下げる原因になりそうなので、ここでは、動かない画像のみを対象にしています。

OpenCVによる差分検出という手法で、商品画像は、一定期間静止した画像のみを扱っています。

詳細は、下記をご参照ください。

参考:[OpenCV] 一定期間動かないものだけを撮影する

4 推論

推論のためのモデルは、Amazon SageMakerの組み込み物体検出アルゴリズムを使用して作成しました。

training_image = get_image_uri(sess.boto_region_name, 'object-detection', repo_version="latest")

作成されたモデルを使用しているコードは、以下のとおりです。

def detectSageMaker(bytes, frame, i):
response = client.invoke_endpoint(EndpointName=endpoint_name, ContentType='application/x-image', Body=bytes)
result = response['Body'].read()
result = json.loads(result)
if(len(result["prediction"])>0):
index = int(result["prediction"][0][0])
c = result["prediction"][0][1]
w = round(result["prediction"][0][4], 2)
h = round(result["prediction"][0][5], 2)
name = categories[index]

print("{} {} {}".format(i,index, c))

color = (255, 255, 255)
if(name != categories[i]):
color = (0, 0, 255)

cv2.putText(frame, "{}".format(name), (5,30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, thickness=2)
cv2.imshow(categories[i], frame)

ちょっと不細工なのですが、OpenCVでの画像(Matクラス)からinvoke_endpoint()のBodyへ受け渡す方法が分からなかったので、一旦、ファイルに落としてしまっています。

<br /># 監視対象の画像(targetFrame)をファイルに保存する
tmpFile = "/tmp/{}.jpg".format(i)
cv2.imwrite(tmpFile, targetFrame)

# 保存されたファイルを読み込む
with open(tmpFile, 'rb') as f:
payload = f.read()
payload = bytearray(payload)

# 推論にかける
detectSageMaker(payload, targetFrame, i)

5 コード

全てのコードです。試験のため、Rekognition Custom Labelsで作成したモデルでも動作するようになっています。

import cv2
from boto3.session import Session
import boto3
import json
import asyncio

# Rekognition Custom Labelsを使用するか、SageMakerを使用するかのフラグ
isRekognition = False

if(isRekognition == False):
# SageMaker
session = Session(profile_name='developer', region_name='ap-northeast-1')
endpoint_name = 'sampleEndPoint'
client = session.client('sagemaker-runtime')
else:
# Rekognition
session = Session(profile_name='developer', region_name='us-east-1')
client = session.client('rekognition', 'us-east-1')
projectVersionArn = 'arn:aws:rekognition:us-east-1:xxxxxxxx:project/SAMPLE/version/2020-03-26T15.28.04/1585204084956';

from PIL import Image
import numpy as np
aruco = cv2.aruco
dictionary = aruco.getPredefinedDictionary(aruco.DICT_4X4_50)

# 2値化
def binarization(img, threshold=100):
ret, img = cv2.threshold(img, threshold, 255, cv2.THRESH_BINARY)
return img

# 差分を数値化
def getDiff(img1, img2):
# グレースケール変換
img1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
img2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
# 差分取得
mask = cv2.absdiff(img1, img2)
# 2値化
mask = binarization(mask)
return cv2.countNonZero(mask) # 白の要素数

# インデックスを指定してマーカーの中心座標を取得する
def getMarkerMean(ids, corners, index):
for i, id in enumerate(ids):
# マーカーのインデックス検索
if(id[0] == index):
v = np.mean(corners[i][0],axis=0) # マーカーの四隅の座標から中心の座標を取得する
return [v[0],v[1]]
return None

# 基準となるマーカーの取得
def getBasisMarker(ids, corners):
# 左上、右上、左下、右下の順にマーカーの「中心座標」を取得
basis = []
basis.append(getMarkerMean(ids, corners, 5))
basis.append(getMarkerMean(ids, corners, 6))
basis.append(getMarkerMean(ids, corners, 7))
basis.append(getMarkerMean(ids, corners, 8))
return basis

# 商品エリアの取得
def getTargetArea(basis, L, T, W, H):
x0 = basis[0][0]
y0 = basis[0][1]
x1 = basis[1][0]
y1 = basis[1][1]
x2 = basis[2][0]
y2 = basis[2][1]
x3 = basis[3][0]
y3 = basis[3][1]
w0 = x1-x0
w1 = x3-x2
h0 = y2-y0
h1 = y3-y1

target = []
x = x0 + w0 * L
y = y0 + h0 * T
target.append([x,y])
x = x1 - w0 * (1-L-W)
y = y1+ h1 * T
target.append([x,y])
x = x2 + w1 * L
y = y2 - h0 * (1- T-H)
target.append([x,y])
x = x3 - w1 * (1- L-W)
y = y3 - h1 * (1- T-H)
target.append([x,y])
return target

def getTransformImage(target, frame, width, height):
frame_coordinates = np.float32(target)
target_coordinates = np.float32([[0, 0],[width, 0],[0, height],[width, height]])
trans_mat = cv2.getPerspectiveTransform(frame_coordinates,target_coordinates)
return cv2.warpPerspective(frame, trans_mat, (width, height))

def show(frame):
# フレームを表示
resized_frame = cv2.resize(frame,(int(width/2), int(height/2)))
cv2.imshow("Flame", resized_frame)
cv2.waitKey(1)

def detectSageMaker(bytes, frame, i):
response = client.invoke_endpoint(EndpointName=endpoint_name, ContentType='application/x-image', Body=bytes)
result = response['Body'].read()
result = json.loads(result)
if(len(result["prediction"])>0):
index = int(result["prediction"][0][0])
c = result["prediction"][0][1]
w = round(result["prediction"][0][4], 2)
h = round(result["prediction"][0][5], 2)
name = categories[index]

print("{} {} {}".format(i,index, c))

color = (255, 255, 255)
if(name != categories[i]):
color = (0, 0, 255)

cv2.putText(frame, "{}".format(name), (5,30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, thickness=2)
cv2.imshow(categories[i], frame)

def detectRekognition(bytes, frame, i):
response = client.detect_custom_labels(
ProjectVersionArn=projectVersionArn,
Image = {
"Bytes": bytes,
},
MinConfidence = 0
)
customLabels = response["CustomLabels"]
for label in customLabels:
name = ''
c = 0
box = label["Geometry"]["BoundingBox"]
w = box["Width"]
h = box["Height"]
if(0.5 < h and 0.5 < w):
name = label["Name"]
c = label["Confidence"]
print("[{}] {} {}".format(i, name, c))
break

color = (255, 255, 255)
if(name != categories[i]):
color = (0, 0, 255)

cv2.putText(frame, "{} {}".format(name, round(c, 1)), (5,30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, thickness=2)
cv2.imshow(categories[i], frame)

span = 30 # 静止間隔
threshold = 3000 # 変化の敷居値

categories = ['ASPARA','CRATZ','OREO','PRETZEL','PRIME']

# 動画ファイルのキャプチャ
width = 2304
height = 1536
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
cap.set(cv2.CAP_PROP_FPS, 30)

# 最初のフレームを背景画像に設定
ret, previous = cap.read()

counter=0
ColorCyan = (255, 255, 0)

while(cap.isOpened()):
# フレームの取得
ret, frame = cap.read()

# マーカ検出
corners, ids, rejectedImgPoints = aruco.detectMarkers(frame, dictionary)

if (ids is None or len(ids)!=4) :
show(frame)
continue

# 基準となる四隅のマーカーを取得
basis = getBasisMarker(ids, corners)

# 差分計算
diff = getDiff(previous, frame)

if(diff < threshold):
counter+=1
else:
counter=0

print("diff:{} counter:{}".format(diff,counter))

# 一定以下の変化量が、一定時間続いたら描画する
if(span < counter):
counter = 0

T = 0.15
W = 0.16
M = 0.06
H = 0.7
for i in range(5):
# 監視エリアの取得
L = 0.06 + (W * i )
target = getTargetArea(basis, L, T, W + M, H)

# 監視対象の画像を変形して表示
targetFrame = getTransformImage(target, frame, 260, 350)

tmpFile = "/tmp/{}.jpg".format(i)
cv2.imwrite(tmpFile, targetFrame)

with open(tmpFile, 'rb') as f:
payload = f.read()
payload = bytearray(payload)

if(isRekognition == False):
# SageMaker
detectSageMaker(payload, targetFrame, i)
else:
# Rekognition
detectRekognition(payload, targetFrame, i)

# 基準線の描画
if(basis[0]!=None and basis[1]!=None and basis[2]!=None and basis[3]!=None ):
cv2.line(frame, (basis[0][0], basis[0][1]), (basis[1][0], basis[1][1]), ColorCyan, thickness=1, lineType=cv2.LINE_4)
cv2.line(frame, (basis[0][0], basis[0][1]), (basis[2][0], basis[2][1]), ColorCyan, thickness=1, lineType=cv2.LINE_4)
cv2.line(frame, (basis[0][0], basis[0][1]), (basis[3][0], basis[3][1]), ColorCyan, thickness=1, lineType=cv2.LINE_4)
#マーカ描画
aruco.drawDetectedMarkers(frame, corners, ids, ColorCyan)

# 今回のフレームを1つ前として保存する
previous = frame

# フレームを表示
show(frame)

cap.release()
cv2.destroyAllWindows()

6 最後に

今回は、商品が間違った位置に戻されたことを検出する仕組みを検討してみました。実際に展開するとなると、まだまだ問題は多いと思いますが、色々試してみたいと考えています。

モデルをSageMakerで作成すると、Rekognition Custom Labelsと比較して、どうしても精度が低くなり、私自身の技術力の低さを痛感しています。しかし、最終的に推論をエッジ側で行えるよう、なんとか、試行錯誤したいと思います。(救済大歓迎)