そうだ、Kubeflow Pipelinesでカスタムコンテナ利用と設定ファイルの記述をしよう

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

おはこんハロチャオ~!何者(なにもん)なんじゃ?じょんすみすです。

今日も今日とて、機械学習パイプラインを作っては流し作っては流しを繰り返す日々、 皆さんいかがお過ごしでしょうか。 私は、パイプラインが複雑になっていくにつれなぜか楽しくなっていきます。

さて、そんな日々をすごしておりますが、Kubeflow Pipelines(以下KFP)を使って最も手軽にコンポーネントの処理内容を定義する関数による実装では、処理が複雑化すると大変なことになってきます。

なんせ、単一の関数内で処理を書くわけですから、処理が長くなってしまうとどこで何をやってるか見通しを立てづらくなりますし、よく使う処理の共通化も難しいです。

そんなわけなので、今回はパイプライン処理の規模が大きくなることを想定した使い方を見ていきましょう。

考える要素

今回考えるのは以下の2つの要素になります。

  • 実行するコンポーネント単位でコードを分けてカスタムコンテナを作成する
  • コンポーネントの入出力を設定ファイルに外だしする

コマンドで実行可能な形式で処理を実装したものをコンテナに入れて実行できる状態にして、 その際のパラメータなどは別途設定ファイルで定義できるようにしようという発想です。

カスタムコンテナの作成

コマンドライン引数を受け取って処理を実行するプログラムを実装します。 それをDockerのENTRYPOINTで指定するなどして実行可能なコンテナを用意します。

学習処理を行う train.py があるような場合、以下のような実装になります。

import argparse
import os
import pickle

import pandas as pd
from sklearn.linear_model import ElasticNet

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--input-path')
    parser.add_argument('--output-path')
    parser.add_argument('--target')
    parser.add_argument('--alpha', default=1.0, type=float)
    parser.add_argument('--l1-ratio', default=0.5, type=float)
    args = parser.parse_args()

    # 学習に利用するデータは入力パス内にtrain.csvという名前で保存されている前提とする
    train_df = pd.read_csv(os.path.join(args.input_path, 'train.csv'))
    target = train_df[args.target]
    data = train_df.drop(columns=args.target)

    model = ElasticNet(alpha=args.alpha, l1_ratio=args.l1_ratio)
    model.fit(data, target)

    os.makedirs(args.output_path, exist_ok=True)
    with open(os.path.join(args.output_path, 'model.pkl'), "wb") as f:
        pickle.dump(model, f)

これに対してのDockerfileを用意します

FROM gcr.io/deeplearning-platform-release/sklearn-cpu:latest

WORKDIR /root
COPY train.py /root/train.py

ENTRYPOINT ["python", "train.py"]

これをビルドしてArtifact Registryなどにアップロードすれば学習用のカスタムコンテナの作成は完了です。 これが、 @component を付けた関数内と同様に実行される処理となります。

ここでは、簡単な処理の実装のみにしていますが、より複雑な処理や複数ファイルに分割しての実装などの際も同様に実行可能なコンテナを用意するのみとなっています。

通常のPythonプログラム同様に実装できるため、複雑な処理を実行したい際により見通しが立てやすくなります。 また、特別なことは行っていないためテストコードの実装なども容易になります。

コンポーネントの入出力を定義する

実装したコードをKFP上で実行するするための仕組みとして、動かし方や入出力に関する設定を行うファイルを用意します。

この設定ファイルの主な役割としては

  • 関数で定義した際の引数に相当するものをコマンドライン引数と紐づける
  • 利用するコンテナと実行時のコマンドを定義する

となります。

このファイルはyaml形式で定義します。 まずは例を見てみましょう。 以下のようなファイルをcomponent.yamlという名前で作成します。

name: vertex_ai_pipelines_train
description: Vertex AIのComponentの設定を定義する
inputs:
  - {name: input_path, type: Dataset, description: '入力ファイル'}
  - {name: alpha, type: Float, description: 'ElasticNetのalphaパラメータ'}
 - {name: l1_ratio, type: Float, description: 'ElasticNetのl1_ratioパラメータ'}
outputs:
  - {name: model, type: Model, description: '学習済みモデル'}
implementation:
  container:
    image: <先ほど作成してArtifact Registryに登録したコンテナ>
    command: [python, train.py]
    args: [
      --input-path, {inputPath: input_path},
      --alpha, {inputValue: alpha},
      --l1-ratio, {inputValue: l1_ratio},
      --output-path, {outputPath: model},
      --target, "target"
    ]

まずはinputs/outputsの設定です。 これは、関数で定義する際におけるInputやOutputを含む引数と同等のものを定義する部分となっています。 上記の場合、以下のような書き方をしてるのと同等です。

@component
def train(
        input_path: Input[Dataset],
        alpha: float,
        l1_ratio: float,
        model: Output[Model]
)

implementation > containerでは利用するコンテナに関する設定をします。

imageでは利用するコンテナを指定してます。 Artifact Registryなどに登録した利用するコンテナをここで指定します。

commandとargsに実行するコマンドとその引数を記述しています。 先ほどのDockerfileにてENTRYPOINTを指定していましたが、ここで指定できるので実はそちらは必須ではありません。

argsでは、input/outputとコマンドライン引数の対応関係も定義しています。 inputValue, inputPath, outputPathといったキーで対応するinputs/outputsのnameを値として指定することで、パイプライン中における入出力と引数として与える項目を結び付けているわけです。

この際、inputPathやoutputPathをキーとして指定した項目は、ファイルパスが渡されます。 これはArtifactに相当するものが保存されたパスを示すためのものになっており、データやモデルなどを保存するために利用します。

一方、inputValueは値をそのまま指定するためのものになっています。 こちらはパイプライン実行時のパラメータなどを渡すためのものとなります。

処理をパイプライン化

処理をKFPのパイプライン化するにはPipelineの関数を実装する必要があります。 その際に、ここまでで実装したものを利用する方法を見ていきます。

from kfp import components
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import Input, Model, Output, Dataset, component

# 前処理をしてデータセットを出力する関数を定義
@component(base_image='gcr.io/deeplearning-platform-release/sklearn-cpu:latest')
def preprocess(
        input_path: str,
        output_dataset: Output[Dataset]
):
    # 前処理の実施
    ...


@dsl.pipeline(name="vertex-ai-pipeline", pipeline_root='gs://path/to/pipeline/root')
def pipeline(
    input_path: str,
    alpha: float,
    l1_ratio: float
) -> None:
    # 前処理はComponentとして定義した関数を使っている
    preprocess_task = preprocess(input_path=input_path)

    # 学習処理は先ほど作成したcomponent.yamlから設定を受け取り入力の引数に関する指定をする
    train_op = components.load_component_from_file('component.yaml')
    train_task = train_op(
        input_path=preprocess_task.outputs['output_dataset'],
        alpha=alpha,
        l1_ratio=l1_ratio
    )

あとはこれをコンパイルしてjsonやyamlファイルを作成すればパイプライン処理として実行可能です。

今回は、関数として定義したものと設定ファイルから取得したものを1つずつ利用するようにしていますが、 複数のコンポーネントを設定がから読み込ませることも可能ですので、 全てをそちらで統一する使い方も可能です。

おわりに

今回は、KFPにおいてカスタムコンテナと設定ファイルを利用したコンポーネントの作成方法を紹介しました。

機械学習の各種処理を実行するためのコードとパイプライン化して実行するためのコードや設定をわけて書くことができるので、大規模化しやすく、取り回しも効きやすくなります。