データアナリティクス事業本部 機械学習チームの鈴木です。
BigQueryでは、Vertex AIと連携して格納したデータを生成AIで処理することが可能です。
例えばテーブルに格納済みのテキストをもとに埋め込みベクトルや別のテキストを生成することができます。
特に埋め込みベクトルがあれば興味があるテキストに類似したテキストをBigQuery内で検索し、類似レコードの特徴から関心のあるテキストを分析することもできます。また、RAGに使用することもできます。
今回はBigQueryとVertex AIを使って、テーブルに格納済みのテキストに対して生成AIによる分析の結果を付与しておき、埋め込みベクトル間のコサイン距離の観点で興味のあるテキストに類似したテキストを検索するための仕組みを作成したのでご紹介します。
実現したいこと
以下のような構成で、テーブルに格納されているテキストデータについて埋め込みベクトルと感情分析(感情分析用のプロンプトを入力した場合のテキスト生成結果という意味で)を付与した新しいテーブルを作成します。
結果テーブルはベクトル用の列があるため、ML.DISTANCE関数のような距離計算用の関数を使うことで類似テキストを簡単に調べることができます。
登場する技術要素としては特に以下があります。
- Vertex AI
- PaLM2 API
- Vertex AI Pipeline
- BigQuery
- 外部データソースへの接続
- Vertex AIリモートモデル
- ML関数
この仕組みにより、定期的に分析用のテーブルを新しいデータで更新しておき、関心があるテキストと類似する情報を簡単に取得できます。
具体例としては過去のアンケートデータなどです。この例については最後にユースケース例として見てみます。
なぜVertex AI Pipeline?
Vertex AIパイプラインを使うことで、BigQueryテーブルおよびBigQueryから参照できるCloud Storage上のデータなどを対象に、定期実行やGoogle Cloud Pipeline Components・Vertex AIメタデータなどVertex AIの機能の恩恵を受けることができます。
Google Cloud上でこのジョブのオーケストレーションを行うためには様々な選択肢があります。私が理解している中では、以下の選択肢があります。
- Vertex AI Pipeline
- BigQueryのスケジュールクエリ
- BigQueryのDataform
- Workflows
- Cloud Composer
この中でもVertex AI Pipelineは、Kubeflow Pipelines SDK(またはTensorFlow Extended)を使用して構築されたパイプラインを実行できる、サーバーレスなMLワークフローのオーケストレーションサービスです。Kubeflow Pipelinesを実行するためパイプラインのポータビリティが高く、Google Cloud Pipeline Componentsにより、ほかのGoogle Cloudサービスとの連携も可能です。
一方でKubeflow Pipelinesに関する基礎的な知見は必要です。こちらについては以下のブログにまとめました。
BigQueryのテキストデータをエンべディングする
現状の方法
この記事ではテキストからの埋め込みベクトル生成を想定して記載します。
Google Cloudの製品では、Vertex AIからPaLM2 APIを使用して埋め込みベクトルを生成できます。
エンべディング用途では記事執筆時点では以下の2モデルが候補としてあり、
textembedding-gecko
textembedding-gecko-multilingual
プロンプトが日本語の場合は、ドキュメントにサポートが明記されているtextembedding-gecko-multilingual
がよいでしょう。
後ほど見るように、BigQueryでCREATE MODEL
コマンドを実行する際に、ENDPOINT
で指定することでtextembedding-gecko-multilingual
をML.GENERATE_TEXT_EMBEDDINGで使用可能です。(ただしML.GENERATE_TEXT_EMBEDDING
は記事執筆時点でプレビュー中です。)
なお、マルチモーダルな埋め込みもGoogle Cloudではサポートされています。
ドキュメントによるとRESTによる問い合わせの例が紹介されているため、例えばBigQueryにGCSにアップロードした画像のURIを登録しておき、Cloud Functionsなどのリモート関数で問い合わせをするとよいかもしれません。
埋め込みベクトルの類似度検索
ベクトル間の類似度はML.DISTANCE関数で検索することが可能です。
単体で利用する場合、全てのレコードに対して類似度計算を行い、大きい順にソートするという使い方になります。
加えて、2024年1月にベクトルインデックスの機能もプレビュー提供が開始されています。
同じくプレビューのSearch関数にて検索効率を向上させることができるようです。
Vertex AIパイプラインからのBigQueryへのSQL実行
Google Cloud Pipeline Componentsの利用
Google Cloud Pipeline Components(GCPC)のコンポーネントのうち、BigQuery MLコンポーネントを使うことで、簡単にVertex AIパイプラインからBigQueryに対してSQLを実行することができます。
詳しい実行例は以下の記事で取り上げました。
スケジュール実行
Vertex AIパイプラインはスケジュール実行が可能です。cron式での指定になります。
以下はコンソールからのスケジュール設定部分です。
Vertex AIメタデータとの連携
今回はおまけ的なところにはなりますが、Google Cloud Pipeline Componentsを使うことで、作成されたテーブルの情報をVertex AIメタデータに残すことができます。
ユースケースとしては、過去に実行した履歴の検索があります。
フィルタを上手く使うことでテーブル名でメタデータを絞り、作成されたテーブルがどのパイプライン実行からできたのか……と遡ることができます。パイプラインの実行に関するデータはかなり細かく残るため、障害発生時などかなり重宝しそうです。
仕組みを構築する
それでは冒頭でご紹介した仕組みの構築方法をご紹介します。
なお、全てを記載すると長くなりすぎるので、以下ではポイントをご紹介します。
コードについては以下のGitHubレポジトリに格納しますのでご確認ください。
環境面での補足
Pythonライブラリのバージョン
今回のサンプルでは、kubeflowパイプラインの定義のため、以下のライブラリを使用しました。
kfp
:2.4.0
google-cloud-pipeline-components
:2.6.0
プレビュー中の機能の利用について
サンプルの中ではPaLM2のML.GENERATE_TEXT_EMBEDDING関数のみ、記事執筆時点でプレビュー中の機能を使用していることにご留意ください。
この関数によりリクエストを処理しているtextembedding-gecko-multilingual
モデルは一般提供中です。
一般提供中の機能のみで実現したい場合、Cloud Functionsなどでリモート関数を挟むとできると思いますが、ここでは紹介したい要素がさらに増えてややこしくなるため、ここではML.GENERATE_TEXT_EMBEDDING
関数を使います。
Vertex AIパイプラインの定義
以下のようにkubeflowパイプラインの定義を作成しました。
create_pipeline_yaml.py
from kfp import compiler
from kfp import dsl
from bqml_utils import *
def main():
@dsl.pipeline
def bq_llm_pipeline():
from google_cloud_pipeline_components.v1.bigquery import (
BigqueryCreateModelJobOp, BigqueryEvaluateModelJobOp,
BigqueryPredictModelJobOp, BigqueryQueryJobOp)
# エンべディング生成用のSQLをBigQueryで実行する
bq_embedding_op = BigqueryQueryJobOp(
query=generate_create_embedding_query(),
project=get_project(),
location="asia-northeast1",
)
# 感情分析用のSQLをBigQueryで実行する
bq_sentiment_analysis_op = BigqueryQueryJobOp(
query=generate_sentiment_analysis_query(),
project=get_project(),
location="asia-northeast1",
).after(bq_embedding_op)
# これまでの結果をまとめて結果テーブルを作成する
bq_create_result_table_op = BigqueryQueryJobOp(
query=generate_create_joined_table_query(),
project=get_project(),
location="asia-northeast1",
).after(bq_sentiment_analysis_op)
compiler.Compiler().compile(bq_llm_pipeline, 'pipeline.yaml')
if __name__ == "__main__":
main()
ローカルで以下のようにIR YAMLを作成し、Vertex AIパイプラインより実行しました。
python create_pipeline_yaml.py
# pipeline.yamlが作成される。
パイプラインからは別スクリプトに切り出した関数より、BigQueryで実行するSQLを読み込みます。
特にLLM利用に関するSQLのポイントは以降に紹介します。パイプラインでどのように使っているかの実装の詳細はGitHubレポジトリをご確認ください。
Vertex AIのリモート接続作成
以下のブログ同様に作業をしました。
以下の2点を行いました。
- 外部データソースへの接続から、「Vertex AIリモートモデル、リモート関数、Big Lake(Cloudリソース)」を選び、東京リージョンを指定して作成する。
- 接続のサービスアカウントに、Vertex AIユーザーのロールを割り当てる。
感情分析
まず、text-bison向けにモデルを作成しました。
CREATE OR REPLACE MODEL llm_sample.generate_text_model
REMOTE WITH CONNECTION `asia-northeast1.llm_connection`
OPTIONS (remote_service_type = 'CLOUD_AI_LARGE_LANGUAGE_MODEL_V1');
パイプラインからは、以下のように感情分析を実行するようにしました。
SELECT
-- 空白発生時の除去のために入れておく。
TRIM(ml_generate_text_llm_result) AS sentiment,
CAST(JSON_EXTRACT_SCALAR(ml_generate_text_rai_result, '$.blocked') AS BOOL) AS is_safety_filter_blocked,
* EXCEPT (ml_generate_text_llm_result,
ml_generate_text_rai_result)
FROM
ML.GENERATE_TEXT( MODEL llm_sample.generate_text_model,
(
SELECT
-- プロンプトは以下のブログ記事を参考にしました。
-- https://blog.g-gen.co.jp/entry/using-palm2-with-bigquery-ml
CONCAT( 'Classify the sentiment of this review as Positive or Negative? Review: ', content, ' Sentiment:' ) AS prompt,
*
FROM
`llm_sample.user_comment_analysis`
-- 動作例なので5レコードに限定して適用する。
LIMIT
5),
STRUCT( 0.2 AS temperature,
100 AS max_output_tokens,
TRUE AS flatten_json_output ))
エンべディング
こちらもまずモデルの作成からです。特にENDPOINT
でモデル名を指定している点がポイントです。
CREATE OR REPLACE MODEL llm_sample.text_embedding_model
REMOTE WITH CONNECTION `asia-northeast1.llm_connection`
OPTIONS (
remote_service_type = 'CLOUD_AI_LARGE_LANGUAGE_MODEL_V1',
ENDPOINT = 'textembedding-gecko-multilingual@latest'
);
パイプラインからは、以下のようなSQLを実行するようにしました。
SELECT
user_id,
content,
review_timestamp,
text_embedding AS embedding,
review_id
FROM ML.GENERATE_TEXT_EMBEDDING(
MODEL llm_sample.text_embedding_model,
TABLE llm_sample.user_review,
STRUCT(TRUE AS flatten_json_output)
);
ベクトル検索
クエリの例
以下のように検索を実行できます。
WITH search_review AS(
SELECT
text_embedding
FROM ML.GENERATE_TEXT_EMBEDDING(
MODEL `llm_sample.text_embedding_model`,
(SELECT '客室が綺麗だった。' AS content),
STRUCT(TRUE AS flatten_json_output)
)
)
SELECT
ML.DISTANCE(search_review.text_embedding, result_table.embedding, 'COSINE') AS distance,
result_table.content,
result_table.sentiment,
result_table.review_timestamp
FROM
search_review,
llm_sample.result_table
ORDER BY 1 ASC
結果としては以下のようになります。客室に関するテキストがdistance
の小さいレコードとして上位に来ていることが分かります。(データについては後述します。)
今回のSQlではML.GENERATE_TEXT_EMBEDDING
に直接テキストを渡すことになります。SQLによるアドホックな分析がしたい場合は良いですが、そうでない場合はCloud Functionsなど別のコンピュートサービスをAPI経由で実行するアプリにし、アプリ内で用意しておいたSQLのテンプレートに検索したいテキストを埋め込んでBigQueryで実行するようなつくりにするとよいかもしれません。
ユースケース
ホテルのレビュー分析
ここではホテルサイトなどの利用者が自由に書き込めるレビューコメントの分析を考えてみます。
データとしては以下のようなものです。レビューの内容はVertex AIからtext-bison@002
で生成した架空のものになります。
データはBigQueryのテーブルに格納しておきます。
このデータをVertex AIパイプラインでパイプラインを実行しPaLM2にて埋め込みベクトルと感情分析の結果を生成しておきました。結果は以下のテーブルのようになります。
ここで、以下のような課題があったとします。
- スタッフ対応に関するアンケート結果が、 2022年に悪い時期があった。
- 2023年に入って改善施策を実施したが、効果を確認したい。
これは例えば「スタッフの対応」というテキストに類似するレビューの感情分析結果の推移を確認するとよさそうです。
今回は分析用のテーブルを作成し、スプレッドシートを使ってグラフを作成し、結果を確認します。
まず以下のように「スタッフの対応」というテキストとテーブルに格納済みのテキストの距離を計算しておきます。繰り返し実行したくないため、CTASで別テーブルに保存しておきます。
-- 分析結果の解析のため、CTASしておく。
CREATE OR REPLACE TABLE llm_sample.sample_analysis
AS
WITH search_review AS(
SELECT
text_embedding
FROM ML.GENERATE_TEXT_EMBEDDING(
MODEL `llm_sample.text_embedding_model`,
(SELECT 'スタッフの対応' AS content),
STRUCT(TRUE AS flatten_json_output)
)
)
SELECT
ML.DISTANCE(search_review.text_embedding, result_table.embedding, 'COSINE') AS distance,
result_table.content,
result_table.sentiment,
result_table.review_timestamp
FROM
search_review,
llm_sample.result_table
ORDER BY 1 ASC
次にスプレッドシートにて感情分析の結果の推移を確認できるよう、結果を取得します。
ここではCTASしておいたテーブルのうち距離が小さいレコードを10件取得し、グラフ化しやすいようにピボットしました。
WITH extract_top10_data AS (
SELECT *
FROM llm_sample.sample_analysis
LIMIT 10
),
review_sentiment_timestamp AS (
SELECT
count(*) AS cnt,
review_timestamp,
sentiment
FROM
extract_top10_data
GROUP BY sentiment, review_timestamp
)
SELECT *
FROM review_sentiment_timestamp
PIVOT(SUM(cnt) FOR sentiment IN ('Positive', 'Negative'))
以下のように集計結果をシートを使って調べる
でスプレッドシートにて開きます。
棒グラフを作成すると、確かに2023年以降はPositiveなレコードが増えていることが分かります。これにより施策は改善効果を発揮していることが示唆されますね。
※補足
先の可視化結果はtop 10をとっているので、実は下位の方に関係がないデータが入っていますが、今回は確認したいものに対して影響が小さいので無視します。レコード量が増えたときに、十分に小さい意味のある値Nでtop Nを取れば解消すると思います。
最後に
今回はVertex AIとBigQueryで、BigQueryのテーブルに格納済みのテキストに対してLLMの処理を実行し、生成したベクトルからベクトル検索をする例をご紹介しました。
Vertex AIパイプラインを使うことで、BigQueryおよびBigQueryから参照できるデータを対象にしつつも、Google Cloud Pipeline ComponentsやVertex AIメタデータなどVertex AIの機能の恩恵もできるだけ受けることができます。
参考になりましたら幸いです。