Vertex AIではじめるKubeflow Pipelines

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

おはこんハロチャオ~!何者(なにもん)なんじゃ?じょんすみすです。

世の中は大機械学習パイプライン化時代を迎えています。 いや、特に迎えていないですがMLOpsの文脈の中で機械学習に関わる一連の処理をどのようにパイプライン化するかは一つの重要なテーマとなります。

そこで今回はVertex AIでマネージドな環境を利用できるKubeflow Pipeline(以下KFPと表記)の話をしていきましょう。

なお、本記事は

  • Vertex AI Pipelines上で実行のみについて言及
  • Vertex AI Workbenchでkfp 1.8.9をインストールして実行

となっております。 それ以外の実行環境やバージョンでの確認は行っておりませんのであらかじめご了承ください。

登場人物

以下の3つの人物に登場していただきます。

  • Component
  • Parameter, Artifact
  • Pipeline

KFP全体での登場人物は他にもいるのですが、このあたりの関係性をまずは理解していくと他の要素への理解もスムーズになるでしょう。 これらの関係性は以下のようなイメージになります。

KFPはKubernetes環境上で処理を実行するものであるため、 学習などの実際に行う処理はDockerコンテナとして利用可能な状態にします。 Vertex AI Pipelinesでは実行環境はマネージドで提供されているため、処理を動かすコンテナのみを用意すればいいことになります。 また、パイプラインという性質上DAGで表現される処理を順次実行していることになりますが、異なる処理は異なるコンテナとして実行されるためそれらの間を取り持つ入出力に関する定義が必要となります。

コンテナを起動して処理を実行するComponent、Componentの入出力を定義するParameterとArtifact、それらのつながりから処理の流れを定義するPipelineという関係になります。

Component

Componentは処理を実行する個々のコンテナとなります。 これは、実行する処理の内容を記述するためのものとなります。

最もシンプルな書き方は @component を付けた関数を実装することです。

from kfp.v2.dsl import component

@component
def hello():
    print('hello')

標準出力に「hello」と出るだけの簡単な関数です。 このように @component アノテーションをつけることでKFPのコンポーネントとしてできる状態になります。 利用するコンテナを指定せずにコンポーネントを作成した場合は、Vertex AIではPython3.7環境が利用されます。(※ 2023年2月現在の状況となります)

任意のライブラリや設定を行ったカスタムコンテナを利用したい場合は、base_imageで指定します。

from kfp.v2.dsl import component

# Google Cloudが提供するScikit-learnイメージを利用
# https://cloud.google.com/deep-learning-containers/docs/choosing-container
@component(base_image='gcr.io/deeplearning-platform-release/sklearn-cpu:latest')
def lenear_regression():
    # 処理で利用するライブラリは関数内でインポートする
    from sklearn.datasets import fetch_california_housing
    from sklearn.linear_model import LinearRegression

    housing = fetch_california_housing()

    model = LinearRegression()
    model.fit(housing['data'], housing['target'])

また、Componentは実行される処理に関する記述が全て関数内に記述されている必要があるため、上記のようにimport文も含めています。

Parameter, Artifact

KFPでは上述のようにComponentを使って実行する処理を記述します。

実際の処理では実行時に学習対象となるデータやパラメータなどを渡したい場合があります。 また、学習済みモデルもどこかに保存しておかないと推論時に利用できません。

それらのやり取りの方法として、Componentの引数として渡して、入出力を定義します。 その際にどのような情報を受け渡しするかを指定するのですが、これに使うのがParameterとArtifactになります。

Parameterは主に実行時に渡す情報がメインとなり、str, int, floatなど扱います。 それに対して、Artifactは各処理の出力を扱うようなケースで利用され、Dataset, Model, Metricsなどいった複雑な形式の情報を扱うためのものとなっています。

先ほどの処理に対して、学習データがあるパスを文字列で渡し、モデルを出力するようにする場合は以下のようになります。

from kfp.v2.dsl import Model, Output, component

@component(base_image='gcr.io/deeplearning-platform-release/sklearn-cpu:latest')
def lenear_regression(
        input_path: str, # 文字列のパラメータとして学習データのパスを指定
        output_model: Output[Model] # 出力はモデルアーティファクト
):
    import os
    import pickle

    import pandas as pd
    from sklearn.datasets import fetch_california_housing
    from sklearn.linear_model import LinearRegression

    # CSV形式で学習データのパスを指定
    # target列が目的変数、それ以外は説明変数となっていることを仮定
    input_df = pd.read_csv(input_path)
    target = input_df['target']
    data = input_df.drop(columns='target')

    model = LinearRegression()
    model.fit(data, target)

    # 出力からパスの情報を取得してモデルを保存する
    os.makedirs(output_model.path, exist_ok=True)
    with open(os.path.join(output_model.path, 'model.pkl'), "wb") as f:
        pickle.dump(model, f)

また、パイプライン化するにあたって前段の処理の出力を後段の処理で使いたいような場合には以下のように Input を使ってArtifactを受け取ること可能です。

from kfp.v2.dsl import Input, Model, Output, Dataset, Metrics, ClassificationMetrics, component

# 前処理をしてデータセットを出力する関数を定義
@component(base_image='gcr.io/deeplearning-platform-release/sklearn-cpu:latest')
def preprocess(
        input_path: str,
        output_dataset: Output[Dataset] # 出力アーティファクトはデータセット
):
    # 前処理の実施
    df = ...
    ...

    # 処理結果を出力する
    os.makedirs(output_dataset.path, exist_ok=True)
    df.to_csv(os.path.join(output_dataset.path, 'preprocessed_data.csv'))


@component(base_image='gcr.io/deeplearning-platform-release/sklearn-cpu:latest')
def train(
        input_dataset: Input[Dataset], # データセットとして入力を受け取る
        output_model: Output[Model] # 出力はモデルアーティファクト
):
    # データセットのパスからデータを取得
    input_df = pd.read_csv(os.path.join(input_dataset.path, 'preprocessed_data.csv'))
    ...

Pipeline

ここまででKFP上で実行する処理の記述ができました。 最後にこれらを繋ぎ合わせて機械学習パイプラインを作る仕組みを見ていきます。 そのために利用するのがこのPipelineです。

各Componentは引数で入出力を定義しています。 それらに対して実行時に指定する値としたり、どのComponentの出力をどのComponentの入力に渡すのかを定義したりすることで、 機械学習パイプライン全体のフローを定義するわけです。

Pipelineも関数として定義ます。

from kfp.v2 import compiler, dsl

# デコレータの引数でPipelineの名称とpieplie_rootを指定する
# pipeline_rootはOutputのpathなど明示的に指定しないものの保存先のベースパスを指定する
@dsl.pipeline(name="vertex-ai-pipeline", pipeline_root='gs://path/to/pipeline/root')
def pipeline(
    input_path: str, # パイプラインの関数の引数が実行時に指定するパラメータになる
) -> None:
    # Componentとして定義した関数の入力に相当する引数を指定
    preprocess_task = preprocess(input_path=input_path)

    # 前段の処理の出力を引数として渡すことで次の処理の入力として渡す
    # このやりとりをもってパイプライン実行時の依存関係が自動的に定義される
    train_task = train(input_dataset=preprocess_task.outputs['output_dataset'])

# パイプラインのコンパイル
# 関数で定義したものと出力ファイルを指定することでパイプラインの定義ファイルが作成される
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="vertex_ai_pipeline.json"
)

この例では、preprocessの入力はパイプライン実行時のパラメータとして渡して、trainの入力はpreprocessの出力を指定しています。 これによって、パイプラインの順番とデータの受け渡しが定義されます。

また、出力のパスに関してはこの処理の中で定義していません。 それらは、pipeline_rootで指定した場所に自動的に保存されます。

最後にこの関数を指定してパイプラインをコンパイルすることで一連の流れが定義されたjsonやyamlファイルを出力します。 このファイルと実行時のパラメータを指定することでパイプライン処理が実行できる状態となります。

機械学習パイプラインの実行

出力されたファイルを指定してパイプラインを実行します。 今回は、コンソール上で実行してみたいと思います。

「Vertex AI > パイプライン > 実行の作成」からファイルを指定します。

ファイルを選択すると以降の項目が自動で入力されるので、必要に応じて変更してください。

次の「ランタイムの構成」ではpipeline_rootの値と実行時のパラメータが表示されています。

pipeline_rootはそのままで問題なければ変更の必要はありません。 パラメータに必要な値を入れてパイプラインを実行します。

実行すると、以下のようにコンソール上から状況が確認できます。 入出力の設定で処理の順番が設定されているが可視化されており、現在の実行状況が確認できます。

また、処理が完了した部分に関しては出力をVertex AIのアーティファクトとしてメタデータから確認することも可能です。

おわりに

今回は、Vertex AI上でKubeflow Pieplinesを実行する方法として最も簡単なものを紹介しました。

個別のComponentでより複雑な処理を行う必要がある場合や、複雑な機械学習パイプラインを組む場合などは他にも様々な実装をする必要がありますが、まずはやってみる際の参考にしていただければと思います。