UDFによるステージのファイル読み込みの仕組みから、機械学習モデルをUDFにデプロイする際のポイントを学ぶ

学習済みの機械学習モデルをUDFで実行するためには、UDFによるステージのファイル読み込みの仕組みを理解することが重要でした。
2024.01.22

データアナリティクス事業本部 機械学習チームの鈴木です。

Snowflakeでは、Snowpark MLで作成した機械学習モデルをUDFとしてデプロイすることができます。

モデルのUDFへのデプロイの仕組みは、現在プレビュー段階のModel Registryでより便利に使えるようになると思いますが、UDFの仕組みを使ったより基礎的な実装をどのようにできるか確認し、Snowflakeでの機械学習モデル実行の仕組みについて理解を深めたいと思います。

この記事について

機械学習モデルのUDFへのデプロイについて、公式のクイックスタートで紹介されている『3_snowpark_ml_model_training_inference.ipynb』の『Appendix - Execute batch inference using a Vectorized UDF』にサンプルがあります。

実装を読み解くのにUDFによるステージのファイル読み込みの仕組みを理解する必要があるため、その実装のポイントを確認したのでご紹介します。

このノートブックは以下のSnowpark MLのクイックスタートにて参照されているものになります。

※ クイックスタートの更新によりリンクが切れている場合は、GETTING STARTED WITH SNOWFLAKEから検索してください。

機械学習モデルの実行のためには、UDFによるステージのファイル読み込みの仕組みを理解することが重要になります。SnowflakeのUDFについてある程度の理解が必要になるため、実装例を見て仕組みを知りたいと思った方がどの資料を確認すると理解が深まりそうか、簡単な説明と一緒にご紹介します。

なお、以下では、このノートブックの実装を引用しますが、ノートブックのライセンスはApache-2.0 licenseであり、引用するコードもそのライセンスにしたがいます。

実装例の確認

まずはクイックスタートで紹介されているUDF作成処理のサンプルについて確認します。

ノートブックの記載より、永続化したscikit-learnのモデルを保存したmodel.joblibを使い、以下のようにUDFの作成および推論の実行ができることが分かります。

# 2024/1/22に『3_snowpark_ml_model_training_inference.ipynb』より引用しました。
# https://github.com/Snowflake-Labs/sfguide-intro-to-machine-learning-with-snowpark-ml-for-python/blob/main/3_snowpark_ml_model_training_inference.ipynb

# Let's save our optimal model first
MODEL_FILE = 'model.joblib'
joblib.dump(optimal_model, MODEL_FILE) # we are just pickling it locally first

# You can also save the pickled object into the stage we created earlier
session.file.put(MODEL_FILE, "@ML_HOL_ASSETS", overwrite=True)

# Get all relevant column names to pass into the UDF call
feature_cols = test_df[CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS].columns


# Cache the model load to optimize inference
@cachetools.cached(cache={})
def load_model(filename):
    import joblib
    import sys
    import os

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m

# Register the UDF via decorator
@udf(name='batch_predict_diamond', 
     session=session, 
     replace=True, 
     is_permanent=True, 
     stage_location='@ML_HOL_ASSETS',
     input_types=[F.FloatType()]*len(feature_cols),
     return_type=F.FloatType(),
     imports=['@ML_HOL_ASSETS/model.joblib.gz'],
     packages=['pandas','joblib','cachetools','xgboost'])
def batch_predict_diamond(test_df: pd.DataFrame) -> pd.Series:
    # Need to name the columns because column names aren't passed in to this function
    test_df.columns = ["CUT_OE", "COLOR_OE", "CLARITY_OE", 'CARAT', 'DEPTH', 'TABLE_PCT', 'X', 'Y', 'Z']
    model = load_model('model.joblib.gz')
    return model.predict(test_df) # This is using the XGBoost library's model.predict(), not Snowpark ML's

test_df_w_preds = test_df.with_column('PREDICTED_PRICE', batch_predict_diamond(*feature_cols))
test_df_w_preds.show()

ファイル読み取りの観点でポイントになりそうな点を確認します。

ファイル読み取りのポイント

UDFを使用したファイルの読み取り

サンプルでは、load_model関数でステージに保存したモデルを読み取っています。

この仕組みは『Creating User-Defined Functions (UDFs) for DataFrames in Python』の『Reading Statically-Specified Files』に記載がありました。

snowflake.snowpark.functions.udfimportsでステージのモデルを指定しているため、ファイルがUDFとともにサーバーにアップロードされるようになっています。

ファイルがUDFの作成中に一度だけ読み取られるよう、@cachetools.cached(cache={})デコレーターをload_model関数につけています。

UDFのホームディテクトリの位置の取得

model.joblib.gzload_model関数で読み込んでいますが、このときにモデルがある場所を以下のように取得していました。

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

Python UDF Handler Examples』の『Reading a File』に記載がありますが、UDFのホームディレクトリの位置をsys._xoptionsメソッドとsnowflake_import_directoryシステムオプションを使用して取得できます。

最後に

学習済みの機械学習モデルをUDFで実行するためには、UDFによるステージのファイル読み込みの仕組みを理解することが重要であることが確認できました。

キャッシュの設定などは公式ガイドのサンプルで指定があるため、忘れずに実装しておきたいですね。

参考になりましたら幸いです。