[Amazon Bedrock] Amazon Titan Multimodal Embeddings G1モデル を使用して、「きのこの山」と「たけのこの里」の分類モデルを作成してみました
1. はじめに
CX事業本部製造ビジネステクノロジー部の平内(SIN)です。
Amazon Bedrockで利用可能なAmazon Titan Multimodal Embeddings G1モデル は、 テキスト、イメージ、または、その組み合わせによるマルチモーダル埋め込みモデルです。
今回は、これを利用して、画像の分類モデルを作成してみました。
2.検証
(1) データ
使用したデータは、下記のブログで作成した「きのこの山」と「たけのこの里」の画像です。回転台に乗せて撮影し、Segment Anything Modelで切り取って背景を白にしたものです。
ファイルは、下記のようにimagesの階層下に配置しています。
% tree ./images ./images ├── KINOKO │ ├── 000000000_w.png │ ├── 000000003_w.png │ ├── 000000006_w.png ・・・略・・・ │ ├── 000001113_w.png │ ├── 000001116_w.png │ └── 000001119_w.png └── TAKENOKO ├── 000000000_w.png ├── 000000003_w.png ├── 000000006_w.png ・・・略・・・ ├── 000001044_w.png ├── 000001047_w.png └── 000001050_w.png
なお、各ファイルは、290 * 224 程度(一定ではない)のpng画像です。
% file 000000000_w.png 000000000_w.png: PNG image data, 290 x 224, 8-bit/color RGB, non-interlaced % ls -la 000000000_w.png -rw-r--r--@ 1 hirauchi.shinichi staff 61725 5 12 01:45 000000000_w.png
※ Amazon Titan Multimodal Embeddings G1モデルで入力可能な画像サイズは、5MBとなっているので、大きな画像を扱う場合は、注意が必要です。
(2) ベクトルデータ生成
下記のコードで、画像をBase64文字列化してベクトルデータを取得しています。
import os from typing import List import boto3 import base64 import json import glob # ソースコードの位置を基準にして、フルパスを取得する def get_full_path(filename: str) -> str: return os.path.join(os.path.dirname(__file__), filename) # ベクトルデータのファイルからの読み込み def read_data(filename: str) -> List[List[float]]: with open(get_full_path(filename)) as f: return [[float(s) for s in line.rstrip("\n").split(",")] for line in f] # ベクトルデータのファイルへの保存 def save_data(array: List[List[float]], filename: str) -> None: with open(get_full_path(filename), "w") as f: f.writelines(",".join(map(str, x)) + "\n" for x in array) # 画像からベクトルデータを取得 def get_embedding(bedrock, image_path: str, dimensions: int) -> List[float]: with open(image_path, "rb") as image: body = image.read() response = bedrock.invoke_model( body=json.dumps( { "inputImage": base64.b64encode(body).decode("utf8"), "embeddingConfig": {"outputEmbeddingLength": dimensions}, } ), modelId="amazon.titan-embed-image-v1", accept="application/json", contentType="application/json", ) response_body = json.loads(response.get("body").read()) return response_body.get("embedding") def main() -> None: bedrock = boto3.client(service_name="bedrock-runtime", region_name="us-east-1") dimensions = 1024 for kind in ["KINOKO", "TAKENOKO"]: image_path = f"images/{kind}" files = glob.glob(f"{get_full_path(image_path)}/*.png") embedding_list = [get_embedding(bedrock, file, dimensions) for file in files] save_data(embedding_list, f"{kind}_1024.csv") if __name__ == "__main__": main()
プログラムを実行すると、「KINOKO_1024.csv」及び「TAKENOKO_1024.csv」が出力されます。
% ls -la *.csv -rw-r--r-- 1 hirauchi.shinichi staff 4768568 5 12 01:54 KINOKO_1024.csv -rw-r--r-- 1 hirauchi.shinichi staff 4469167 5 12 01:57 TAKENOKO_1024.csv
出力ファイルには、1画像分の1行でベクトルの配列が格納されています。各画像は、約350枚ほどあるので、約350行となっています。
% wc -l *_1024.csv 374 KINOKO_1024.csv 351 TAKENOKO_1024.csv 725 total
(3) 分布
このデータで、うまく分類できそうかどうかを確認するため、生成した多次元のベクトルデータを2次元まで削減して、表示してみました。
import os from typing import List import matplotlib.pyplot as plt import numpy as np from sklearn import random_projection # スパースランダム投影で次元削減したデータを表示 def show(embedding_data_list: List[List[List[float]]]) -> None: colors = ["teal", "tomato"] for i, embedding_data in enumerate(embedding_data_list): matrix = np.array(embedding_data, dtype="float") X_Sprojected = random_projection.SparseRandomProjection( n_components=2 ).fit_transform(matrix) plt.scatter(X_Sprojected[:, 0], X_Sprojected[:, 1], c=colors[i], alpha=0.5) plt.show() def main() -> None: embedding_data_files = ["KINOKO_1024.csv", "TAKENOKO_1024.csv"] embedding_data_list = [read_data(file) for file in embedding_data_files] show(embedding_data_list) if __name__ == "__main__": main()
「きのこ」が、赤で、「たけのこ」が、緑です。これを見た限りでは、充分に期待できそうです。
(4) 比較
「コサイン類似性」で検索が可能かどうかを検証するために、各ベクトルデータの間の類似性を出力してみました。
import os from typing import List import numpy as np from sklearn.metrics.pairwise import cosine_similarity # ソースコードの位置を基準にして、フルパスを取得する def get_full_path(filename: str) -> str: return os.path.join(os.path.dirname(__file__), filename) # ベクトルデータのファイルからの読み込み def read_data(filename: str) -> List[List[float]]: with open(get_full_path(filename)) as f: return [[float(s) for s in line.rstrip("\n").split(",")] for line in f] # ベクトルデータ間のコサイン類似性を計算する def compare( test_datas: List[List[float]], target_datas: List[List[float]], threshold: float ) -> None: counter = 0 for test_data in test_datas: # 各データに対するコサイン類似度の計算 similarities = [ cosine_similarity([test_data], [target_data])[0][0] for target_data in target_datas ] # コサイン類似度の平均 avg = np.mean(similarities) if avg > threshold: print(f"{avg:.2f}") counter += 1 else: print(f"↓ {avg:.2f}") print(f"{threshold}以上のデータ: {counter / len(target_datas) * 100:.1f}%") def main() -> None: kinoko_datas = read_data("KINOKO_1024.csv") takenoko_datas = read_data("TAKENOKO_1024.csv") threshold = 0.85 print("\n各「きのこのデータ」を全部の「きのこデータ」と比較する") compare(kinoko_datas, kinoko_datas, threshold) print("\n各「きのこのデータ」を全部の「たけのこデータ」と比較する") compare(kinoko_datas, takenoko_datas, threshold) if __name__ == "__main__": main()
「きのこ画像」同士の場合、0.85 以上のものが、79.14%であったのに対し、「たけのこ画像」との間では、0%でした。この辺の状況をうまく利用すれば、簡単に分類モデルが作成できそうです。
各「きのこのデータ」を全部の「きのこデータ」と比較する 0.90 0.88 0.86 ↓ 0.84 0.87 ・・・略・・・ 0.85 0.85 0.88 0.90 0.85以上のデータ: 79.1% 各「きのこのデータ」を全部の「たけのこデータ」と比較する ↓ 0.83 ↓ 0.74 ↓ 0.76 ↓ 0.82 ・・・略・・・ ↓ 0.82 ↓ 0.78 ↓ 0.79 ↓ 0.82 0.85以上のデータ: 0.0%
作業中、ベクトルは、256次元も試してみたのですが、コサイン類似性で、ちょっと精度が出なかたので、ある程度の次元が必要なのかなと感じました。
3.実装
(1) Pinecone
ベクトルデータベースには、Pineconeを使用しました。
インデックスは、下記の設定で、Pineconeのコンソールから作成しています。
name: my-pinecone-index metric: cosine コサイン類似度 Dimensions: 1024 次元 Capacity mode: SERVERLESS
(2) データ挿入
データベースに入れるデータは、100件のみとしました。もしかすると、もっと少なくても大丈夫かもしれません。 メタデータは、特に設定していませんが、IDを「kinoko_連番」「takenoko_連番」のような形式とし、検索結果が、どちらにヒットしたかが分かるようにしました。
% head -n 100 KINOKO_1024.csv > KINOKO_1024.data % head -n 100 TAKENOKO_1024.csv > TAKENOKO_1024.data
データ挿入のコードは、以下です。予め、PineconeのAPI KEYを環境変数にセットして利用しています。
% export PINECONE_API_KEY=xxxxxxxxx-xxxxxxxx-xxxxxxxx
import os from typing import List from pinecone import Pinecone # ソースコードの位置を基準にして、フルパスを取得する def get_full_path(filename: str) -> str: return os.path.join(os.path.dirname(__file__), filename) # ベクトルデータのファイルからの読み込み def read_data(filename: str) -> List[List[float]]: with open(get_full_path(filename)) as f: return [[float(s) for s in line.rstrip("\n").split(",")] for line in f] def upsert_data(index, data_list, label): for i, data in enumerate(data_list): index.upsert(vectors=[(f"{label}_{i}", data)]) def main() -> None: PINEKCON_API_KEY = os.environ.get("PINECONE_API_KEY") PINECONE_INDEX = "my-pinecone-index" pinecone = Pinecone(api_key=PINEKCON_API_KEY) index = pinecone.Index(PINECONE_INDEX) kinoko_datas = read_data("KINOKO_1024.data") takenoko_datas = read_data("TAKENOKO_1024.data") upsert_data(index, kinoko_datas, "kinoko") upsert_data(index, takenoko_datas, "takenoko") if __name__ == "__main__": main()
データが格納されている状況です。
(3) Query
テスト用に用意した画像は、以下の10枚です。回転台で撮影した動画から、改めて切り出しました。
上記の画像を順に読み込み、ベクトルを生成し、PineconeでQueryをかけています。
import os from typing import List import base64 import json import boto3 import glob from pinecone import Pinecone # ソースコードの位置を基準にして、フルパスを取得する def get_full_path(filename: str) -> str: return os.path.join(os.path.dirname(__file__), filename) # 画像からベクトルデータを取得 def get_embedding(bedrock, image_path: str, dimensions: int) -> List[float]: with open(image_path, "rb") as image: body = image.read() response = bedrock.invoke_model( body=json.dumps( { "inputImage": base64.b64encode(body).decode("utf8"), "embeddingConfig": {"outputEmbeddingLength": dimensions}, } ), modelId="amazon.titan-embed-image-v1", accept="application/json", contentType="application/json", ) response_body = json.loads(response.get("body").read()) return response_body.get("embedding") # ベクトルデータの取得 def query_index(index, vector): result = index.query( vector=vector, top_k=1, include_values=False, ) print(f"id:{result.matches[0].id} score:{result.matches[0].score}") def main() -> None: PINEKCON_API_KEY = os.environ.get("PINECONE_API_KEY") PINECONE_INDEX = "my-pinecone-index" pinecone = Pinecone(api_key=PINEKCON_API_KEY) index = pinecone.Index(PINECONE_INDEX) bedrock = boto3.client(service_name="bedrock-runtime", region_name="us-east-1") dimensions = 1024 # images/TESTに置かれているテスト画像を、順に推論する image_path = f"images/TEST" files = glob.glob(f"{get_full_path(image_path)}/*.png") files.sort() for file in files: print(f"\nfilename:{os.path.basename(file)}") vector = get_embedding(bedrock, file, dimensions) query_index(index, vector) if __name__ == "__main__": main()
結果は、以下の通りでした。正解率は100%で、精度は充分だと思いました。
% python3 query.py filename:kinoko_001.png id:kinoko_15 score:0.775042295 filename:kinoko_002.png id:kinoko_34 score:0.735480487 filename:kinoko_003.png id:kinoko_92 score:0.744099379 filename:kinoko_004.png id:kinoko_83 score:0.745024681 filename:kinoko_005.png id:kinoko_29 score:0.793084085 filename:takenoko_001.png id:takenoko_97 score:0.745306611 filename:takenoko_002.png id:takenoko_83 score:0.735507786 filename:takenoko_003.png id:takenoko_67 score:0.747619569 filename:takenoko_004.png id:takenoko_7 score:0.792165279 filename:takenoko_005.png id:takenoko_62 score:0.790254414
4. 最後に
今回は、Bedrockで利用可能なAmazon Titan Multimodal Embeddings G1モデルを使用して、画像の分類モデルを作成してみました。
思った以上に、簡単に作成できて驚いています。
分類モデルは、コンピュータービジョンの中でも、比較的、少量のデータでも作成可能ですが、今回のような方法だと、要件によっては、もっと効率的に作成できるかも知れないと感じました。
5. 参考リンク
Pincone Docs - Upsert data
Pincone Docs - Query data
PineconeをPythonで使う方法
AWS Marketplace の Pinecone を Amazon Bedrock のナレッジベースとして利用する
生成AIで外観検査をやってみた
tSNEでMNISTを軽やかに鮮やかにマッピングしていく
sklearn.random_projection.SparseRandomProjection