
Vertex AI PipelinesからBigQueryMLで機械学習モデルの訓練を行い予測を実行する
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
データアナリティクス事業本部 機械学習チームの鈴木です。
この記事は、ブログリレー『Google CloudのAI/MLとかなんとか』の2本目の記事になります。
Google Cloud Pipeline Components SDKのうち、BigQuery MLリソースに関連するオペレータを使用して、Vertex AI Pipelinesで実行する用の機械学習パイプラインの定義を作成しましたので、サンプルをご紹介します。
この記事について
『Vertex AI Pipelines の BigQuery および BigQuery ML 演算子に関するお知らせ』では、Vertex AI Pipelines向けのBigQueryおよびBQMLコンポーネントが紹介されていました。
このコンポーネントを使用して、Vertex AI PipelinesでBQMLのモデル作成用のデータ前処理・モデル訓練・評価・推論までを行う機械学習パイプラインの例をご紹介します。
BigQueryで一連の処理を行うために、Google Cloudではオーケストレーターとしていくつかの選択肢があると思います。
例えば以下のようなものが考えられます。
- Cloud Composer
- Vertex AI Pipelines
- Workflows
- BigQueryのスケジュールクエリ
- Compute Engineに独自のワークフローエンジンを構築
このうち、Vertex AI Pipelinesはポータビリティに優位性があり、Vertex AIのメタデータ管理の恩恵を受けることもできます。
Vertex AI PipelinesとBigQuery MLコンポーネントについて
Vertex AI Pipelinesについて
Vertex AI Pipelinesは、Kubeflow Pipelines SDKまたはTensorFlow Extendedを使用して構築されたパイプラインを実行できる、サーバーレスなMLワークフローのオーケストレーションサービスです。
ブログリレー1本目の記事である、以下の記事でご紹介しました。
BigQuery MLコンポーネントについて
BigQuery MLコンポーネントは、Google Cloud Pipeline Components(GCPC)のコンポーネントの一つです。GCPCは、Google Cloud Vertex AI PipelinesなどKFP準拠のパイプライン実行バックエンドで実行できる、事前定義されたKFPコンポーネントを提供します。GCPCのコンポーネントはKubeflow Pipelines SDKを使用して、パイプラインに組み込むことができます。
BigQuery MLコンポーネントの一覧は以下のガイドにあります。各コンポーネントの詳細が確認できるので、記載された仕様を調べつつ機械学習パイプラインを実装することになります。
パイプラインの実装
1. 前提
今回は、『Vertex AI Pipelines の BigQuery および BigQuery ML 演算子に関するお知らせ』で紹介されていた『google_cloud_pipeline_components_bqml_text.ipynb』を参考に、既にBigQueryに格納されているテーブルデータに対して、BQMLのモデル作成用のデータ前処理・モデル訓練・評価・推論までを行う機械学習パイプラインを作成しました。
コードはこのノートブックの内容を引用・改変したものです。このノートブックのライセンスはApache-2.0 licenseであり、このブログに記載のコードもそれにしたがうことにご留意ください。
2. Python環境の準備
作業用ディレクトリを作成し、pyenvで以下のバージョンを指定して仮想環境を作成しました。
mkdir vertexai_pipeline_bigquery cd vertexai_pipeline_bigquery pyenv local 3.10.13 python -m venv vertexai_pipeline_bigquery
仮想環境を有効化し、Kubeflow Pipelines SDKとGCPCをインストールしました。
source vertexai_pipeline_bigquery/bin/activate pip install kfp==2.4.0 pip install google-cloud-pipeline-components==2.6.0
3. BigQueryの準備
asia-northeast1にkubeflow_datasetという名前の空のデータセットを作成しました。
このデータセットに、bigquery-public-data.ml_datasets.penguinsのデータを東京リージョンに移してきたpenguinsテーブルを作成しました。

4. 作成したコード
以下のコードを作成しました。
from kfp import compiler
from kfp import dsl
from bqml_utils import *
def main(): 
    @dsl.pipeline
    def bq_hello_pipeline():
        from google_cloud_pipeline_components.v1.bigquery import (
        BigqueryCreateModelJobOp, BigqueryEvaluateModelJobOp,
        BigqueryPredictModelJobOp, BigqueryQueryJobOp)
        
        # run preprocessing job
        bq_preprocess_op = BigqueryQueryJobOp(
            query=generate_create_bq_preprocess_query(),
            project=get_project(),
            location="asia-northeast1",
        )
        # create the linear regression
        bq_model_op = BigqueryCreateModelJobOp(
            query=generate_create_bq_model_query(),
            project=get_project(),
            location="asia-northeast1",
        ).after(bq_preprocess_op)
    
        # evaluate the linear regression
        bq_evaluate_op = BigqueryEvaluateModelJobOp(
            project=get_project(),
            location="asia-northeast1",
            model=bq_model_op.outputs["model"]
        ).after(bq_model_op)
        # similuate prediction
        BigqueryPredictModelJobOp(
            model=bq_model_op.outputs["model"],
            query_statement=generate_create_bq_prediction_query(),
            job_configuration_query=get_predict_job_config(),
            project=get_project(),
            location="asia-northeast1",
        ).after(bq_evaluate_op)
    compiler.Compiler().compile(bq_hello_pipeline, 'pipeline.yaml')
if __name__ == "__main__":
    main()
google_cloud_pipeline_components.v1.bigqueryより、使用するコンポーネントをインポートしました。
BigqueryPredictModelJobOpは作成した推論結果を保存するテーブルを、job_configuration_queryで指定しました。
また、実行するSQL文などを扱いやすくするため、以下のスクリプトに必要な関数を分けました。長めのスクリプトを2つ並べると説明が見にくくなるのでトグルメニューに隠しておきます。
bqml_utils.py
PROJECT_ID = "自分のGoogleCloudプロジェクトID"
BQ_DATASET = "kubeflow_dataset"
RAW_DATA_TABLE = "penguins"
PREPROCESSED_TABLE = "penguins_processed"
CLASSIFICATION_MODEL_NAME = "penguins_model"
PREDICT_RESULT_TABLE = "penguins_predict_result"
def get_project():
    """ プロジェクトIDを返す
    """
    return PROJECT_ID
def get_predict_job_config():
    """ 推定結果を格納するテーブルの情報をジョブ設定として返す
    """
    job_config = {
        "destinationTable": {
            "projectId": PROJECT_ID,
            "datasetId": BQ_DATASET,
            "tableId": PREDICT_RESULT_TABLE,
        }
    }
    return job_config
def generate_create_bq_preprocess_query():
    """ 前処理のためのSQLを返す
    """
    create_bq_preprocess_query = f"""
        -- create the splitted table
        CREATE OR REPLACE TABLE `{PROJECT_ID}.{BQ_DATASET}.{PREPROCESSED_TABLE}`
        AS (
            WITH data_with_split_flag AS
            (
                SELECT
                    species,
                    island,
                    culmen_length_mm,
                    culmen_depth_mm,
                    flipper_length_mm,
                    body_mass_g,
                    sex,
                    RAND() AS split_flag
                FROM `{PROJECT_ID}.{BQ_DATASET}.{RAW_DATA_TABLE}`
                WHERE
                    body_mass_g IS NOT NULL
            )
            SELECT
                species,
                island,
                culmen_length_mm,
                culmen_depth_mm,
                flipper_length_mm,
                body_mass_g,
                sex,
                CASE WHEN split_flag < 0.8 THEN 'TRAIN' ELSE 'PREDICT' END AS split
            FROM data_with_split_flag
          )
        """
    return create_bq_preprocess_query
def generate_create_bq_model_query():
    """ モデル作成を実行するSQLを返す
    """
    create_bq_model_query = f"""
        CREATE OR REPLACE MODEL `{PROJECT_ID}.{BQ_DATASET}.{CLASSIFICATION_MODEL_NAME}`
          OPTIONS (
              model_type='linear_reg',
              input_label_cols=['body_mass_g']) AS
          SELECT
              species,
              island,
              culmen_length_mm,
              culmen_depth_mm,
              flipper_length_mm,
              body_mass_g,
              sex
          FROM
             `{PROJECT_ID}.{BQ_DATASET}.{PREPROCESSED_TABLE}`
          WHERE split = 'TRAIN';
        """
    return create_bq_model_query
def generate_create_bq_prediction_query():
    """ モデルで推定する対象のデータを表すSQLを返す
    """
    create_bq_prediction_query = f"""
            SELECT
                species,
                island,
                culmen_length_mm,
                culmen_depth_mm,
                flipper_length_mm,
                sex
            FROM `{PROJECT_ID}.{BQ_DATASET}.{PREPROCESSED_TABLE}` 
            WHERE split = 'PREDICT' 
        """
    return create_bq_prediction_query
5. IR YAMLの生成
以下のように、create_pipeline_yaml.pyを実行してIR YAMLを生成しました。
python create_pipeline_yaml.py
意図通りに実行できれば、スクリプトと同じディレクトリにpipeline.yamlができます。
tree -L 1 # . # ├── bqml_utils.py # ├── create_pipeline_yaml.py # ├── pipeline.yaml # └── vertexai_pipeline_bigquery
パイプラインの実行
1. Vertex AI Pipelinesで実行
pipeline.yamlをVertex AI Pipelinesにアップロードして実行を作成しました。
まず、実行を作成を押して、実行のソースをファイルをアップロードとし、pipeline.yamlをアップロードしました。
Cloud Storage のロケーションとして適当な場所を指定して、送信しました。
以下のようにパイプラインの実行が始まりました。

前処理のステップのパイプライン実行分析を見ると、以下のように入力パラメータとしてどんなクエリを実行したのかが分かるようになっていました。

このように、全てのステップが成功しました。

2. BigQueryでの結果確認
データセットを確認すると、以下のようにリソースができていることを確認できました。

推論結果も出力されていました。この出力を使って、機械学習機能以降の処理を行うわけですね。

最後に
Google Cloud Pipeline Components SDKのうち、BigQuery MLリソースに関連するオペレータを使用して、Vertex AI Pipelinesで実行する用の機械学習パイプラインの定義を作成する例のご紹介でした。
Vertex AI Pipelinesは定期実行もサポートしているので、BQMLの実行はこの機能からするのがよさそうだなと思いました。
『コンポーネントでアーティファクトを使用または生成する』に記載のように、Google Cloud PipelineコンポーネントSDKは、コンポーネントの入力および出力として機能する一連のMLメタデータアーティファクトタイプを定義でき、Vertex AIメタデータでリソースの追跡をできる点も大きなポイントですね。












