Snowpark MLのスクリプトをストアドプロシージャにしてタスクで実行してみる

Snowpark MLのスクリプトをStored Procedureとタスクの機能を使って定期実行するようにしてみました。
2023.11.24

データアナリティクス事業本部 機械学習チームの鈴木です。

現在パブリックプレビュー版のSnowpark MLですが、先日クイックスタートを参考に使い方のポイントを記事にまとめました。

クイックスタートを進めてみて、「Snowpark MLで実装した前処理やモデルのデプロイってどうすればいいんだろう?」という気持ちが出てきたので、まずはストアドプロシージャにしてタスクで実行してみる例を作って試してみました。

Snowpark MLはどんどん機能追加がされていくと思いますので、あくまでも現時点での悩みにすぎないかもしれませんが、途中経過としてもまとめておければと思い記事にしました。

この記事の内容

冒頭の記事の続きの位置付けで、Pythonワークシートからその記事で試したSnowpark ML ModelingのAPIを利用するスクリプトをストアドプロシージャにし、タスクとして呼び出すところまでを確認します。

手順としては以下の記事を参考にします。

Snowpark ML Modelingは、現時点ではsnowflake.ml.modeling.pipeline.Pipelineなどの前処理用パイプラインインスタンスはscikit-learnの対応するインスタンスに変換できないため、UDFとしてデプロイできるのは先の変換が可能なモデル部分だけでした。

ストアドプロシージャにすることで、前処理部分はストアドプロシージャとして実行し、モデルについてもUDFにしているならそれを呼び出すのでよし、そうでないならまとめてストアドプロシージャ内で学習または推論もしくは両方を実行してしまえるので、Snowflake内で完結させたい場合は有力な選択肢になるだろうと考えています。

前提

続きの記事として執筆しているので、以前の記事で記載した内容や追加で必要な操作は前提として記載します。

権限の追加

今回、『ローカル開発環境を準備 - 5. Snowflake環境の準備(ロール・ユーザー)』で作成したSNOWPARK_ROLEを使って作業しようと思います。

このロールにはタスクを操作する権限がないため、今回は以下のように追加しました。

-- タスクを操作する権限の付与
GRANT EXECUTE TASK, EXECUTE MANAGED TASK ON ACCOUNT TO ROLE SNOWPARK_ROLE;

権限付与にあたって、以下のガイドを参考にしました。

なにかSnowpark MLを使うだけなら十分な検証用のロールがあって、タスク周りの作業をするのに追加で権限を付与したんだな、くらいのご認識で大丈夫です。

既存リソースについて

クイックスタートでSnowpark ML Modelingを学んだのでポイントをご紹介 | DevelopersIO』で作成した、各種リソースを使います。

特に学習用のデータは、ML_HOL_DB / ML_HOL_SCHEMA / DIAMONDSテーブルを使います。なにか検証用のデータが入ったテーブルがあるんだな、くらいのご認識で大丈夫です。

ストアドプロシージャが実行するPythonスクリプトについて

スクリプトは、『Intro to Machine Learning with Snowpark ML for Python』クイックスタートのコードを引用・改変します。このコードはApache-2.0 licenseです。

実行引用元のノートブックとライセンス表記については、以下のレポジトリをご確認ください。

改変内容としては、ストアドプロシージャで実行できるよう、ライブラリの記載の整理や処理の並び替えをしました。

検証の流れ

以下に試してみたデプロイ方法について順番に記載します。

1. PythonワークシートからのPythonスクリプトの作成

Snowsightから、Pythonワークシートを開き、以下のように設定をしました。

  • ロール: SNOWPARK_ROLE(自分で作成したもの)
  • ウェアハウス: ML_HOL_WH(クイックスタートで作成していたもの)
  • データベース・スキーマ: ML_HOL_DB.ML_HOL_SCHEMA(クイックスタートで作成していたもの)
  • Handler: main(デフォルトのまま)
  • Return type: Table()(デフォルトのまま)
  • Packages: Anaconda Packagesで以下を選択
    • snowflake-ml-python
    • snowflake-snowpark-python

特にpackagesについては以下のように設定できました。

packages

Pythonワークシートに以下のコードを入力しました。

# The Snowpark package is required for Python Worksheets. 
# You can add more packages by selecting them using the Packages control and then importing them.

import numpy as np
from snowflake.ml.modeling.pipeline import Pipeline
import snowflake.ml.modeling.preprocessing as snowml
from snowflake.ml.modeling.xgboost import XGBRegressor
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col


def main(session: snowpark.Session): 

    DEMO_TABLE = 'diamonds' 
    input_tbl = f"{session.get_current_database()}.{session.get_current_schema()}.{DEMO_TABLE}"
    diamonds_df = session.table(input_tbl)
    
    # Split the data into train and test sets
    diamonds_train_df, diamonds_test_df = diamonds_df.random_split(weights=[0.9, 0.1], seed=0)

    # Categorize all the features for processing
    CATEGORICAL_COLUMNS = ["CUT", "COLOR", "CLARITY"]
    CATEGORICAL_COLUMNS_OE = ["CUT_OE", "COLOR_OE", "CLARITY_OE"] # To name the ordinal encoded columns
    NUMERICAL_COLUMNS = ["CARAT", "DEPTH", "TABLE_PCT", "X", "Y", "Z"]

    LABEL_COLUMNS = ['PRICE']
    OUTPUT_COLUMNS = ['PREDICTED_PRICE']

    categories = {
        "CUT": np.array(["IDEAL", "PREMIUM", "VERY_GOOD", "GOOD", "FAIR"]),
        "CLARITY": np.array(["IF", "VVS1", "VVS2", "VS1", "VS2", "SI1", "SI2", "I1", "I2", "I3"]),
        "COLOR": np.array(['D', 'E', 'F', 'G', 'H', 'I', 'J']),
    }

    # Build the pipeline
    preprocessing_pipeline = Pipeline(
        steps=[
                (
                    "OE",
                    snowml.OrdinalEncoder(
                        input_cols=CATEGORICAL_COLUMNS,
                        output_cols=CATEGORICAL_COLUMNS_OE,
                        categories=categories,
                    )
                ),
                (
                    "MMS",
                    snowml.MinMaxScaler(
                        clip=True,
                        input_cols=NUMERICAL_COLUMNS,
                        output_cols=NUMERICAL_COLUMNS,
                    )
                )
        ]
    )

    # Run the train and test sets through the Pipeline object we defined earlier
    train_df = preprocessing_pipeline.fit(diamonds_train_df).transform(diamonds_train_df)
    test_df = preprocessing_pipeline.transform(diamonds_test_df)

    # Define the XGBRegressor
    regressor = XGBRegressor(
        input_cols=CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS,
        label_cols=LABEL_COLUMNS,
        output_cols=OUTPUT_COLUMNS
    )

    # Train
    regressor.fit(train_df)

    # Predict
    result = regressor.predict(test_df)

    # Save Results AS Table
    result.write.mode("overwrite").save_as_table("latest_predict_result")

    # Return value will appear in the Results tab.
    return result

一度実行してみて、成功することを確認しました。

Pythonスクリプトの動作確認

データベースを確認して、latest_predict_resultテーブルができていることも確認できました。このテーブルはタスク実行時に作成されますが、今回は成功が分かりやすいように一度消して、タスクを実行した際にできているか再度確認することとします。

テーブルが作成されたこと

2. ストアドプロシージャの作成

スクリプト記入済みのPythonワークシートから、画面右上のDeployを押して、ストアドプロシージャを作成しました。

Stored Procedureへのデプロイ

New Python Procedureのポップアップにプロシージャ名とHandlerを入れて、Open in Worksheetsを押しました。

ポップアップ

SQLワークシートが開くので、EXECUTE AS CALLERを以下のように追記し、SQLを実行してストアドプロシージャを作成しました。(デフォルトのEXECUTE AS OWNERだとタスク実行時に失敗したため、『Why a stored procedure runs without error in web UI but fails as task / stored procedure.』を参考に天下り的にEXECUTE AS CALLERとしました。)

EXECUTE AS CALLERを追記

以下のようにストアドプロシージャが作成できました。

Stored Procedureが作成できた

3. タスクの作成

 SQLワークシートを開き、以下のSQLを実行してタスクを作成しました。

create task python_stored_test_task
    schedule = 'using cron 0 8 1 * * UTC' -- 毎月1日8:00(UTC)で実行と仮定
    warehouse = 'ML_HOL_WH'
    as
    call SNOWPARK_ML_SAMPLE_PROCEDURE();

alter task python_stored_test_task resume; -- タスクの実行にはresumeが必要
show tasks;

以下のようにタスクが作成されました。

タスクが作成された

続いてSQLワークシートから以下のSQL文を実行してタスクを実行しました。

execute task python_stored_test_task;

Run Historyから確認すると、タスクが成功していることが分かりました。

タスクを実行

※ 何回か試行錯誤したので履歴がいくつか出ていますが、赤枠箇所が成功したものになります。

スキーマを確認すると、latest_predict_resultテーブルができていることが確認できました。

テーブルができていることを確認

最後に

今回はSnowpark ML Modelingを使ったPythonスクリプトをストアドプロシージャにしてタスクで実行する例をご紹介しました。

この方法であればSnowflake外部にスケジュール実行用のコンピュートを配置しなくてもSnowflakeだけで完結してSnowpark ML Modelingを使った処理実行が定期的に行えそうです。

この記事の内容は、学習と推論を毎回行うものでしたが、学習済みのリソースはステージにおいておくなどして、学習と推論を別のストアドプロシージャにしておいてもいいのかなと思っています。

このあたりの使い分けについてはまた別の機会に検証していければと思います。

ほかに参考にした資料