Vertex AIのカスタムコンテナを作ろう

こんにちは小澤です。

Vertex AIではAutoMLの仕組みによって多くの機械学習タスクを簡単に実現できます。 AutoMLで実現可能なタスクについては利用者が機械学習の内部処理のことをほぼ意識することなく実現できるのが魅力ですが、 対応していない範囲のタスクを実行したい場合や、特定のアルゴリズムを使いたい場合など、 詳細な設定が必要だったり、あらかじめ用意されているもの以外のタスクを実行したいこともあります。

そんな時にはカスタムコンテナを利用することで任意の学習や推論処理を実装しそれをVertex AI上で実行できます。

今回は、その実装方法を簡単な例で見ていきたいと思います。

全体の流れ

カスタムコンテナを利用する場合の全体的な流れは以下のようになります。

  1. 学習処理を実装したコンテナの作成
  2. 推論処理を実装したコンテナの作成
  3. コンテナをArtifact Registryに登録
  4. それらを使った処理を実行する

これらの流れを順に見ていきましょう。

学習処理を実装したコンテナの作成

学習コンテナではモデリングの処理を実装したものを呼び出すコンテナを用意して、 それをVertex AI上で実行する仕組みとなります。

そのため、基本的には通常と同じように実装できますが、 Vertex AIの機能を利用できるようにするための仕組みが提供されています。 今回は、その中の一つである、データセットに登録されたデータの利用をしています。

以下はPythonを使って線形回帰を行っています。 コマンドから実行可能な形式でプログラムであればPython以外でも任意のもので問題ありません。

import argparse
import os
from io import BytesIO

import joblib
import pandas as pd
from google.cloud import storage
from sklearn.linear_model import LinearRegression


def get_data(gs_path: str) -> pd.DataFrame:
    '''
    Cloud Storageにあるデータを取得する
    Vertex AIのデータセットの仕様として

    - データセットをtraining, validation, testの3種類に分割
    - それぞれがデータサイズに応じて複数ファイルに分割される

    となる。
    
    カスタムコンテナでデータのパスを受け取る際には
        gs://<bucket>/<prefix>/training-*
        gs://<bucket>/<prefix>/validaion-*
        gs://<bucket>/<prefix>/test-*
    のように分割された全てのファイルを含むワイルドカードで指定された値となっている
    '''
    # バケット名とプレフィックスに分割
    bucket_name = gs_path.split('/')[2]
    files = '/'.join(gs_path.split('/')[3:])

    gs_client = storage.Client()
    bucket_obj = gs_client.bucket(bucket_name)

    # プレフィックスのワイルドカード手前までをフィルタ条件としてふぃある一覧を取得
    blob_objects = bucket_obj.list_blobs(prefix=files[:-1])

    # 全てのファイルのデータを含むDataFrameを作成
    data = pd.concat([pd.read_csv(BytesIO(blob.download_as_bytes())) for blob in blob_objects])

    return data


def train(train_data: pd.DataFrame, target: str, model_path: str):
    '''
    scikit-learnのLinearRegressionを使った線形回帰
    '''
    train_X = train_data.drop(target, axis=1)
    train_y = train_data[target]

    linear = LinearRegression()
    linear.fit(train_X, train_y)

    os.makedirs('model', exist_ok=True)
    joblib.dump(linear, os.path.join(model_path, 'linear_model.pkl'))


def output_model(model_path: str, output_path: str) -> None:
    '''
    学習済みモデルをCloud Storageに保存する
    '''
    bucket_name = output_path.split('/')[2]
    prefix = '/'.join(output_path.split('/')[3:])

    gs_client = storage.Client()
    bucket_obj = gs_client.bucket(bucket_name)

    # 今回は学習済みモデルは単一ファイルとして出力されているが
    # 複数ファイルあるようなケースにも対応するためディレクトリ内すべてのファイルを保存している
    for file in os.listdir(model_path):
        output_path = os.path.join(prefix, file)
        blob = bucket_obj.blob(output_path)
        blob.upload_from_filename(os.path.join(model_path, file))


if __name__ == '__main__':
    # コンテナ実行時にコマンド引数を渡すことも可能
    # 目的変数を指定できるようにしている
    parser = argparse.ArgumentParser()
    parser.add_argument('--target')
    args = parser.parse_args()

    # training, validation, testのCloud Storage上でのパスは環境変数として渡される
    # - AIP_TRAINING_DATA_URI
    # - AIP_VALIDATION_DATA_URI
    # - AIP_TEST_DATA_URI
    data = get_data(os.environ['AIP_TRAINING_DATA_URI'])
    train(data, args.target, 'model')

    # 環境変数AIP_MODEL_DIRで渡されるCloud Storageのパスに保存されたファイルがモデルレジストリのモデルと
    # 関連付けられるため最終的な出力パスとして指定
    output_model('model', os.environ['AIP_MODEL_DIR'])

各種環境変数は、Vertex AIの仕組みとしてコンテナ実行時に指定したものを基準として渡されるものとなります。

また、実行時に引数を渡すことも可能です。 今回は、目的変数のカラム名を渡していますが、ハイパーパラメータなどの指定にもそのまま利用できます。

推論処理を実装したコンテナの作成

続いて、推論処理を実現するコンテナを実装します。 こちらは、学習と同様にコンテナ起動時に実行する処理をするプログラムを渡しますが、 Vertex AI上で動かすための決まり事として、HTTP(S)リクエストで推論する処理を実装する必要があります。

import json
import os

import flask
import joblib
import pandas as pd
from google.cloud import storage

# 死活監視のためのものと推論のためのものが、
# それぞれどのようなパスでアクセスできればよいかの値が環境変数に入っている
AIP_HEALTH_ROUTE = os.environ['AIP_HEALTH_ROUTE']
AIP_PREDICT_ROUTE = os.environ['AIP_PREDICT_ROUTE']

# モデルファイルが保存されているCloud Storageのパス
# gs://<bucket>/<prefix> の形式で保持している
#
# なお、ここでのパスはモデルレジストリに登録されたモデルのパスではなく
# モデルファイルをコピーした別なパスになっているので注意
bucket_name = os.environ['AIP_STORAGE_URI'].split('/')[2]
model_dir = '/'.join(os.environ['AIP_STORAGE_URI'].split('/')[3:])
client = storage.Client()

os.makedirs('model', exist_ok=True)

bucket_obj = client.bucket(bucket_name)
blob_objects = bucket_obj.list_blobs()
for blob in blob_objects:
    blob.download_to_filename(
        os.path.join('model', blob.name.split('/')[-1])
    )
model = joblib.load('model/linear_model.pkl')
app = flask.Flask(__name__)


@app.route(AIP_HEALTH_ROUTE, methods=['GET'])
def ping():
    status = 200
    return flask.Response(response='OK', status=status, mimetype='application/json')


@app.route(AIP_PREDICT_ROUTE, methods=['POST'])
def transformation():
    data = pd.DataFrame(
        json.loads(
            flask.request.data.decode("utf-8")
        )['instances']
    )
    # Vertex AIの仕様として、「predictions」をキーとして推論結果を入れたものをJSONで返す
    result = {'predictions' : model.predict(data).tolist()}
    return flask.jsonify(result)

# エンドポイントとして受け付けるポート番号を環境変数から取得
app.run(host='0.0.0.0', port=int(os.environ.get('AIP_HTTP_PORT', '8080')))

Flaskを使ってJSON形式で推論結果を返す処理を実装しています。

  • 動作に必要な値をVertex AIで設定された環境変数からの取得
  • モデルをCloud Storageから取得
  • レスポンスはキーが「predictions」となるものを返す

というのがVertex AI固有の処理となっています。

また、今回は内容を簡単にするため、Flaskをそのまま実行していますが、 実運用の際にはGunicornを使うなどWebアプリケーションとして必要な構成を検討してください。

コンテナをArtifact Registryに登録

これらの処理を含むコンテナを作成して、Artifact Registryにコンテナを登録します。 学習用と推論用でコンテナを分けても問題ありませんし、両方を含む単一のコンテを作成しても問題ありません。 今回は1つのコンテナに両方含めたDockerfileを用意しています。

FROM python:3.8-slim-bullseye

RUN apt update

WORKDIR /root

COPY train.py /root/train.py
COPY serve.py /root/serve.py

RUN pip install -U pip && \
    pip install pandas scikit-learn google-cloud-storage flask

EXPOSE 8080

それぞれのスクリプトと動作に必要なライブラリをインストールしするだけのコンテナとなります。

コンテナを実行する際にはそれぞれのスクリプトを実行対象とする必要がありますが、 ENTRYPOINTはここでは指定してません。 後ほど実行時にコマンドを指定したいと思います。

このコンテナをbuild → pushして利用可能な状態にすればカスタムコンテナ利用の準備は完了です。

カスタムコンテナを使った処理を実行する

ここまでで作成したカスタムコンテナ使って、学習処理とエンドポイントの作成を行ってみましょう。 コンソール上から実行することも可能ですが、ここではPythonを使って一連の処理を実行しています。

import os

from google.cloud import aiplatform

# 認証情報が必要な場合は環境変数に設定する
# os.environ['GOOGLE_APPLICATION_CREDENTIALS']='/path/to/service-account.json'

PROJECT_ID = '<project id>'
REGION = '<region>'
BUCKET_URI = '<各種処理の出力先となるCloud Storageのバケット>'

TRAIN_CONTAINER_URI = '<学習に利用するコンテナ>'
MODEL_CONTAINER_URI = '<推論に利用するコンテナ>'

aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

# データセットの作成
data_uri = "gs://cloud-samples-data/ai-platform-unified/datasets/tabular/california-housing-tabular-regression.csv"
dataset = aiplatform.TabularDataset.create(
    display_name = "California Housing Dataset",
    gcs_source = data_uri
)
# 既にあるものを利用する場合はデータセットのIDをdataset_name引数で指定してインスタンスを作成する
# dataset = aiplatform.TabularDataset(dataset_name='<dataset id>')

# 学習ジョブの実行
# ジョブはカスタムコンテナを利用するためのインスタンスを作成する
# command引数で実行時のコマンドを指定する
# 推論時のコンテナ情報、実行コマンドはモデルレジストリとして紐づくためこのタイミングで指定している
job = aiplatform.CustomContainerTrainingJob(
    display_name="linear-regression",
    container_uri=TRAIN_CONTAINER_URI,
    command=["python", "train.py", "--target", "median_house_value"],
    model_serving_container_command=["python", "serve.py"],
    model_serving_container_image_uri=MODEL_CONTAINER_URI
)
model = job.run(
    dataset=dataset,
    model_display_name="linear-regression",
    machine_type="n1-standard-4"
)

# エンドポイントの作成
endpoint = aiplatform.Endpoint.create(display_name="linear-regression")

# 学習済みモデルをエンドポイントにデプロイする
model.deploy(endpoint=endpoint, machine_type="n1-standard-4")

この一連の処理でカスタムコンテナを使った学習とエンドポイントのデプロイが完了します。 curlコマンドを使って、エンドポイントへのリクエストを投げて推論を行ってみます。

ENDPOINT_ID="<作成されたエンドポイントのID>"
PROJECT_ID="<project id>"
REGION="<エンドポイントを作成したリージョン>"

curl \
-X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://${REGION}-aiplatform.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/endpoints/${ENDPOINT_ID}:predict \
-d '
{
  "instances": [
    {
        "longitude": -122.24,
        "latitude": 37.85,
        "housing_median_age": 52,
        "total_rooms": 1467,
        "total_bedrooms": 190,
        "population": 496,
        "households": 177,
        "median_income": 7.2574
    }
  ]
}
'

以下のような結果が返ってくれば成功です。

{
  "predictions": [
    46941304.041999057
  ],
  "deployedModelId": ...,
  "model": ...,
  "modelDisplayName": "linear-regression",
  "modelVersionId": "1"
}

最後に、作成した各種リソースが不要な場合はクリーンナップしておきます。

endpoint.undeploy_all()
endpoint.delete()
model.delete()
job.delete()
dataset.delete()

おわりに

今回はVertex AIでのカスタムコンテナを使った学習・推論の方法を紹介しました。 任意のアルゴリズムや自作モデルなどを使いた場合には参考にしてください。