Vertex AI PipelinesとKubeflow Pipelinesはじめの一歩

Vertex AI Pipelinesを全く使ったことがない人向けに、Kubeflow Pipelinesのはじめ方からまとめてみました。
2023.11.27

データアナリティクス事業本部 機械学習チームの鈴木です。

この記事は、ブログリレー『Google CloudのAI/MLとかなんとか』の1本目の記事になります。

そろそろVertex AIもマスターしないとなと思い、Vertex AI Pipelinesをキャッチアップしています。同僚のじょんすみすさんが以前に以下の『Vertex AIではじめるKubeflow Pipelines』を公開してくれていました。

私はBigQueryなどのデータ分析系のサービスの経験が厚く、Vertex AI PipelinesおよびKubeflow Pipelinesを触るのが初めてだったため、Kubeflow Pipelinesとはなんぞやから始める必要がありました。

Kubeflowは公式ドキュメントや上記記事をはじめとした資料が充実しているのですぐに理解し始めていますが、この記事ではどのあたりの資料を見ると上記ブログ記事が理解できたか、まとめていこうと思います。

Kubeflow Pipelinesについて

Kubeflow PipelinesとSDKについて

Kubeflow PipelinesはKubeflowのコンポーネントのうち、Dockerコンテナを使用して複数ステップのMLワークフローを構築・デプロイ・管理するためのプラットフォームです。

Kubeflow Pipelinesはv1とv2があり、それぞれにパイプラインを定義するために必要なDSL(Domain-specific language)をPythonで記述するためのSDKがあります。この記事ではv2向けのSDKを使った実装について説明します。

Kubeflow Pipelines v2については以下のページをルートととしてガイドをみていくと分かりやすいと思います。

Kubeflow Pipelinesはv1のSDKにもPipelines SDK (v2)があり、記事執筆時点でもVertex AI Pipelinesの一部サンプルにはこちらでの実装例が記載されていることがありましたが、いくつかのガイドを見る感じだとKubeflow Pipelines v2のSDKで進めても特に問題はなさそうという印象でした。

パイプラインの構築 - 使用するパイプライン SDK について』ではKubeflow Pipelines SDKの要件が確認でき、『Vertex AI Pipelines の BigQuery および BigQuery ML 演算子に関するお知らせ』に記載のサンプルはKubeflow Pipelines v2のSDKの構文で記述されていました。

Kubeflowについて

Kubeflowは、Kubernetes上での機械学習ワークフローのデプロイメントをシンプル・ポータブル・スケーラブルにすることに特化したツールキットです。Google社が社内でTensorFlowを実行するために使っていたツールをオープンソース化したものが始まりです。Kubernetes環境を用意することにより、ローカル・オンプレサーバー・クラウドなどポータブルに実行することが可能です。

Kubeflow自体については、以下の公式ガイドをご確認ください。

ガイドにも掲載されていますが、YoutubeではKubeflowについて簡単に解説している動画プレイリストまで用意されています。 2020年公開のもののため、現在の細かな仕様とは異なる点もあるかもしれませんが、雰囲気を掴むのには良さそうです。

Intro to Kubeflow PipelinesとしてKubeflow Pipelinesの紹介動画もあります。

Kubeflowのコンポーネントの中で、Kubeflow Pipelinesがどのような立ち位置のものであるかは、以下のアーキテクチャに関するページが参考になりました。

Vertex AI Pipelinesとは

Kubeflow Pipelines SDKまたはTensorFlow Extendedを使用して構築されたパイプラインを実行できる、サーバーレスなMLワークフローのオーケストレーションサービスです。

記事執筆時点ではKubeflow Pipelines SDK v1.8.9以降またはTensorFlow Extended v0.30.0以降を使用して構築されたパイプラインの実行をサポートしています。(今後のサポートバージョンについては『パイプラインの構築 - 使用するパイプライン SDK について』をご確認下さい。)

今回、パイプラインを定義するYAMLファイルをローカルで生成してVertex AI Pipelinesで実行します。このようにKubernetesを構築しなくてもKubeflow Pipelinesを実行できるという強力なサービスです。

先にも挙げた『Vertex AI Pipelines の BigQuery および BigQuery ML 演算子に関するお知らせ』のように、Vertex AI Pipelinesからほかのサービスを利用することも可能です。この場合、Kubeflow Pipelines SDKとは別にGoogle Cloud パイプライン コンポーネントもインストールします。

パイプライン定義ファイルの作成

早速、以下の『Hello World Pipeline』でどのようにパイプライン定義ファイルを生成するか見ていきます。

1. 作業環境の作成

まず作業用のディレクトリを作成しました。

mkdir hello_world_pipeline
cd hello_world_pipeline

次に、Kubeflow pipelines serviceのレポジトリより、記事執筆時点でPython3.10をサポートしていることが分かったので、pyenvで以下のPythonを使用するよう設定しました。

pyenv local 3.10.13

python --version
# Python 3.10.13

pip -V
# pip 23.0.1 from ローカルのパス/pip (python 3.10)

続いてvenvで作業用の仮想環境を作成し、Kubeflow Pipelines SDK(kfp)をインストールしました。Kubeflow Pipelines v2向けのSDKをインストールします。

python -m venv hello_world_pipeline
source hello_world_pipeline/bin/activate

pip install kfp==2.4.0

2. パイプラインの定義とコンパイルのためのPythonスクリプトの作成

以下のようにhellow_world_pipeline.pyを作成しました。なお、このスクリプトは上述の『Hello World Pipeline』のスクリプトを引用、一部並び替えしました。

hellow_world_pipeline.py

from kfp import compiler
from kfp import dsl

@dsl.component
def say_hello(name: str) -> str:
    hello_text = f'Hello, {name}!'
    print(hello_text)
    return hello_text

@dsl.pipeline
def hello_pipeline(recipient: str) -> str:
    hello_task = say_hello(name=recipient)
    return hello_task.output

compiler.Compiler().compile(hello_pipeline, 'pipeline.yaml')

3. パイプラインのコンパイル

記述したパイプラインをコンパイルします。以下のように実行しました。

python hellow_world_pipeline.py

実行するとスクリプトと同じディレクトリに、以下のpipeline.yamlが生成されました。

パイプラインのコンパイルについては、『Compile a Pipeline | Kubeflow』のページを見るとよく理解できました。生成されたYAMLファイルはIR YAMLと呼ばれます。通常、さらにKubeflow PipelinesサービスがこのYAMLファイルを処理し、実行用の Kubernetesリソースのセットに変換します。

このYAMLファイルは人間が読む用ではなく、また人間が直接記載するものでもないようです。ただし中身を見るとなんとなく何がしたいのかは読み取れるようになっています。今回生成されたIR YAMLの中身は以下のようになっていました。

pipeline.yaml

# PIPELINE DEFINITION
# Name: hello-pipeline
# Inputs:
#    recipient: str
# Outputs:
#    Output: str
components:
  comp-say-hello:
    executorLabel: exec-say-hello
    inputDefinitions:
      parameters:
        name:
          parameterType: STRING
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
deploymentSpec:
  executors:
    exec-say-hello:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - say_hello
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef say_hello(name: str) -> str:\n    hello_text = f'Hello, {name}!'\n\
          \    print(hello_text)\n    return hello_text\n\n"
        image: python:3.7
pipelineInfo:
  name: hello-pipeline
root:
  dag:
    outputs:
      parameters:
        Output:
          valueFromParameter:
            outputParameterKey: Output
            producerSubtask: say-hello
    tasks:
      say-hello:
        cachingOptions:
          enableCache: true
        componentRef:
          name: comp-say-hello
        inputs:
          parameters:
            name:
              componentInputParameter: recipient
        taskInfo:
          name: say-hello
  inputDefinitions:
    parameters:
      recipient:
        parameterType: STRING
  outputDefinitions:
    parameters:
      Output:
        parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.4.0

Vertex AI Pipelinesでの実行

続いて生成したIR YAMLをVertex AI Pipelinesにアップロードして実行することで、どんな感じでVertex AI Pipelinesを使えばいいか簡単にですが見てみましょう。

1. サンプルのワークフローの実行

まずVertex AIの画面からパイプラインを開きました。

パイプラインを開く

パイプラインの画面から実行を作成を押しました。

実行を作成

①実行の詳細で実行のソースとしてファイルをアップロードを選び、ローカルで作成したIR YAMLを指定してアップロードしました。

実行の詳細

②ランタイムの構成で出力ディレクトリとしてCloud Storageバケットへのパスを指定しました。また、今回はパイプラインの入力としてrecipientという文字列を指定しているため、パイプラインパタメータとして適当な名前を入力しました。

ランタイムの構成

送信を押しました。

2. 実行結果の確認

送信を押すと作成したパイプラインの実行画面に移るので、実行される様子を確認しました。

最初は以下のように実行待ちのような表示になりました。

実行待ち

そのまましばらくすると、実行中の表示になりました。

実行中

最終的に実行完了になるので、表示されているノードを確認すると、出力パラメータで期待した結果が出力されていることが確認できました。

実行完了

最後に

Kubeflow PipelinesおよびKubeflowの概要について確認し、最も基本的なパイプラインをVertex AI Pipelinesで実行するところまでを確認しました。

Kubeflow PipelinesのSDKの使い方をなんとなく理解するところまでは少し時間がかかりますが、使い方がある程度分かってしまえば、どこかの環境でPython DSLからIR YAMLに変換し、Vertex AI Pipelinesにアップロードすれば実行できるのでとても良いですね。

まずはやってみる際の参考にしていただければと思います。

参考になるもの