Vertex AI PipelinesからBigQueryMLで機械学習モデルの訓練を行い予測を実行する

Google Cloud Pipeline ComponentsのBigQuery MLコンポーネントを使うことで、Vertex AI PipelinesからBQMLの処理を実行しました。
2023.12.01

データアナリティクス事業本部 機械学習チームの鈴木です。

この記事は、ブログリレー『Google CloudのAI/MLとかなんとか』の2本目の記事になります。

Google Cloud Pipeline Components SDKのうち、BigQuery MLリソースに関連するオペレータを使用して、Vertex AI Pipelinesで実行する用の機械学習パイプラインの定義を作成しましたので、サンプルをご紹介します。

この記事について

Vertex AI Pipelines の BigQuery および BigQuery ML 演算子に関するお知らせ』では、Vertex AI Pipelines向けのBigQueryおよびBQMLコンポーネントが紹介されていました。

このコンポーネントを使用して、Vertex AI PipelinesでBQMLのモデル作成用のデータ前処理・モデル訓練・評価・推論までを行う機械学習パイプラインの例をご紹介します。

BigQueryで一連の処理を行うために、Google Cloudではオーケストレーターとしていくつかの選択肢があると思います。

例えば以下のようなものが考えられます。

  • Cloud Composer
  • Vertex AI Pipelines
  • Workflows
  • BigQueryのスケジュールクエリ
  • Compute Engineに独自のワークフローエンジンを構築

このうち、Vertex AI Pipelinesはポータビリティに優位性があり、Vertex AIのメタデータ管理の恩恵を受けることもできます。

Vertex AI PipelinesとBigQuery MLコンポーネントについて

Vertex AI Pipelinesについて

Vertex AI Pipelinesは、Kubeflow Pipelines SDKまたはTensorFlow Extendedを使用して構築されたパイプラインを実行できる、サーバーレスなMLワークフローのオーケストレーションサービスです。

ブログリレー1本目の記事である、以下の記事でご紹介しました。

BigQuery MLコンポーネントについて

BigQuery MLコンポーネントは、Google Cloud Pipeline Components(GCPC)のコンポーネントの一つです。GCPCは、Google Cloud Vertex AI PipelinesなどKFP準拠のパイプライン実行バックエンドで実行できる、事前定義されたKFPコンポーネントを提供します。GCPCのコンポーネントはKubeflow Pipelines SDKを使用して、パイプラインに組み込むことができます。

BigQuery MLコンポーネントの一覧は以下のガイドにあります。各コンポーネントの詳細が確認できるので、記載された仕様を調べつつ機械学習パイプラインを実装することになります。

パイプラインの実装

1. 前提

今回は、『Vertex AI Pipelines の BigQuery および BigQuery ML 演算子に関するお知らせ』で紹介されていた『google_cloud_pipeline_components_bqml_text.ipynb』を参考に、既にBigQueryに格納されているテーブルデータに対して、BQMLのモデル作成用のデータ前処理・モデル訓練・評価・推論までを行う機械学習パイプラインを作成しました。

コードはこのノートブックの内容を引用・改変したものです。このノートブックのライセンスはApache-2.0 licenseであり、このブログに記載のコードもそれにしたがうことにご留意ください。

2. Python環境の準備

作業用ディレクトリを作成し、pyenvで以下のバージョンを指定して仮想環境を作成しました。

mkdir vertexai_pipeline_bigquery
cd vertexai_pipeline_bigquery

pyenv local 3.10.13

python -m venv vertexai_pipeline_bigquery

仮想環境を有効化し、Kubeflow Pipelines SDKとGCPCをインストールしました。

source vertexai_pipeline_bigquery/bin/activate

pip install kfp==2.4.0
pip install google-cloud-pipeline-components==2.6.0

3. BigQueryの準備

asia-northeast1kubeflow_datasetという名前の空のデータセットを作成しました。

このデータセットに、bigquery-public-data.ml_datasets.penguinsのデータを東京リージョンに移してきたpenguinsテーブルを作成しました。

作成したデータセット

4. 作成したコード

以下のコードを作成しました。

create_pipeline_yaml.py

from kfp import compiler
from kfp import dsl

from bqml_utils import *


def main(): 
    @dsl.pipeline
    def bq_hello_pipeline():
        from google_cloud_pipeline_components.v1.bigquery import (
        BigqueryCreateModelJobOp, BigqueryEvaluateModelJobOp,
        BigqueryPredictModelJobOp, BigqueryQueryJobOp)
        
        # run preprocessing job
        bq_preprocess_op = BigqueryQueryJobOp(
            query=generate_create_bq_preprocess_query(),
            project=get_project(),
            location="asia-northeast1",
        )

        # create the linear regression
        bq_model_op = BigqueryCreateModelJobOp(
            query=generate_create_bq_model_query(),
            project=get_project(),
            location="asia-northeast1",
        ).after(bq_preprocess_op)
    
        # evaluate the linear regression
        bq_evaluate_op = BigqueryEvaluateModelJobOp(
            project=get_project(),
            location="asia-northeast1",
            model=bq_model_op.outputs["model"]
        ).after(bq_model_op)

        # similuate prediction
        BigqueryPredictModelJobOp(
            model=bq_model_op.outputs["model"],
            query_statement=generate_create_bq_prediction_query(),
            job_configuration_query=get_predict_job_config(),
            project=get_project(),
            location="asia-northeast1",
        ).after(bq_evaluate_op)

    compiler.Compiler().compile(bq_hello_pipeline, 'pipeline.yaml')


if __name__ == "__main__":
    main()

google_cloud_pipeline_components.v1.bigqueryより、使用するコンポーネントをインポートしました。

BigqueryPredictModelJobOpは作成した推論結果を保存するテーブルを、job_configuration_queryで指定しました。

また、実行するSQL文などを扱いやすくするため、以下のスクリプトに必要な関数を分けました。長めのスクリプトを2つ並べると説明が見にくくなるのでトグルメニューに隠しておきます。

bqml_utils.py

bqml_utils.py

PROJECT_ID = "自分のGoogleCloudプロジェクトID"
BQ_DATASET = "kubeflow_dataset"
RAW_DATA_TABLE = "penguins"
PREPROCESSED_TABLE = "penguins_processed"
CLASSIFICATION_MODEL_NAME = "penguins_model"
PREDICT_RESULT_TABLE = "penguins_predict_result"


def get_project():
    """ プロジェクトIDを返す
    """
    return PROJECT_ID


def get_predict_job_config():
    """ 推定結果を格納するテーブルの情報をジョブ設定として返す
    """
    job_config = {
        "destinationTable": {
            "projectId": PROJECT_ID,
            "datasetId": BQ_DATASET,
            "tableId": PREDICT_RESULT_TABLE,
        }
    }
    return job_config


def generate_create_bq_preprocess_query():
    """ 前処理のためのSQLを返す
    """
    create_bq_preprocess_query = f"""
        -- create the splitted table
        CREATE OR REPLACE TABLE `{PROJECT_ID}.{BQ_DATASET}.{PREPROCESSED_TABLE}`
        AS (
            WITH data_with_split_flag AS
            (
                SELECT
                    species,
                    island,
                    culmen_length_mm,
                    culmen_depth_mm,
                    flipper_length_mm,
                    body_mass_g,
                    sex,
                    RAND() AS split_flag
                FROM `{PROJECT_ID}.{BQ_DATASET}.{RAW_DATA_TABLE}`
                WHERE
                    body_mass_g IS NOT NULL
            )
            SELECT
                species,
                island,
                culmen_length_mm,
                culmen_depth_mm,
                flipper_length_mm,
                body_mass_g,
                sex,
                CASE WHEN split_flag < 0.8 THEN 'TRAIN' ELSE 'PREDICT' END AS split
            FROM data_with_split_flag
          )
        """
    return create_bq_preprocess_query


def generate_create_bq_model_query():
    """ モデル作成を実行するSQLを返す
    """
    create_bq_model_query = f"""
        CREATE OR REPLACE MODEL `{PROJECT_ID}.{BQ_DATASET}.{CLASSIFICATION_MODEL_NAME}`
          OPTIONS (
              model_type='linear_reg',
              input_label_cols=['body_mass_g']) AS
          SELECT
              species,
              island,
              culmen_length_mm,
              culmen_depth_mm,
              flipper_length_mm,
              body_mass_g,
              sex
          FROM
             `{PROJECT_ID}.{BQ_DATASET}.{PREPROCESSED_TABLE}`
          WHERE split = 'TRAIN';
        """
    return create_bq_model_query


def generate_create_bq_prediction_query():
    """ モデルで推定する対象のデータを表すSQLを返す
    """
    create_bq_prediction_query = f"""
            SELECT
                species,
                island,
                culmen_length_mm,
                culmen_depth_mm,
                flipper_length_mm,
                sex
            FROM `{PROJECT_ID}.{BQ_DATASET}.{PREPROCESSED_TABLE}` 
            WHERE split = 'PREDICT' 
        """
    return create_bq_prediction_query

5. IR YAMLの生成

以下のように、create_pipeline_yaml.pyを実行してIR YAMLを生成しました。

python create_pipeline_yaml.py

意図通りに実行できれば、スクリプトと同じディレクトリにpipeline.yamlができます。

tree -L 1
# .
# ├── bqml_utils.py
# ├── create_pipeline_yaml.py
# ├── pipeline.yaml
# └── vertexai_pipeline_bigquery

パイプラインの実行

1. Vertex AI Pipelinesで実行

pipeline.yamlをVertex AI Pipelinesにアップロードして実行を作成しました。

まず、実行を作成を押して、実行のソースファイルをアップロードとし、pipeline.yamlをアップロードしました。

Cloud Storage のロケーションとして適当な場所を指定して、送信しました。

以下のようにパイプラインの実行が始まりました。

パイプラインの実行開始

前処理のステップのパイプライン実行分析を見ると、以下のように入力パラメータとしてどんなクエリを実行したのかが分かるようになっていました。

パイプライン実行分析

このように、全てのステップが成功しました。

全てのステップが成功

2. BigQueryでの結果確認

データセットを確認すると、以下のようにリソースができていることを確認できました。

作成されたリソース

推論結果も出力されていました。この出力を使って、機械学習機能以降の処理を行うわけですね。

推論結果

最後に

Google Cloud Pipeline Components SDKのうち、BigQuery MLリソースに関連するオペレータを使用して、Vertex AI Pipelinesで実行する用の機械学習パイプラインの定義を作成する例のご紹介でした。

Vertex AI Pipelinesは定期実行もサポートしているので、BQMLの実行はこの機能からするのがよさそうだなと思いました。

コンポーネントでアーティファクトを使用または生成する』に記載のように、Google Cloud PipelineコンポーネントSDKは、コンポーネントの入力および出力として機能する一連のMLメタデータアーティファクトタイプを定義でき、Vertex AIメタデータでリソースの追跡をできる点も大きなポイントですね。

ほかに参考にしたもの