Snowpark MLのスクリプトをストアドプロシージャにしてタスクで実行してみる
データアナリティクス事業本部 機械学習チームの鈴木です。
現在パブリックプレビュー版のSnowpark MLですが、先日クイックスタートを参考に使い方のポイントを記事にまとめました。
クイックスタートを進めてみて、「Snowpark MLで実装した前処理やモデルのデプロイってどうすればいいんだろう?」という気持ちが出てきたので、まずはストアドプロシージャにしてタスクで実行してみる例を作って試してみました。
Snowpark MLはどんどん機能追加がされていくと思いますので、あくまでも現時点での悩みにすぎないかもしれませんが、途中経過としてもまとめておければと思い記事にしました。
この記事の内容
冒頭の記事の続きの位置付けで、Pythonワークシートからその記事で試したSnowpark ML ModelingのAPIを利用するスクリプトをストアドプロシージャにし、タスクとして呼び出すところまでを確認します。
手順としては以下の記事を参考にします。
Snowpark ML Modelingは、現時点ではsnowflake.ml.modeling.pipeline.Pipeline
などの前処理用パイプラインインスタンスはscikit-learnの対応するインスタンスに変換できないため、UDFとしてデプロイできるのは先の変換が可能なモデル部分だけでした。
ストアドプロシージャにすることで、前処理部分はストアドプロシージャとして実行し、モデルについてもUDFにしているならそれを呼び出すのでよし、そうでないならまとめてストアドプロシージャ内で学習または推論もしくは両方を実行してしまえるので、Snowflake内で完結させたい場合は有力な選択肢になるだろうと考えています。
※ 2024/03/04追記:v1.1.1以降のAPIリファレンスではPipelineおよび各種変換にto_sklearn()
が実装されたことが明記されました。現在パブリックプレビューのModel Registryと合わせてかなりモデルのデプロイ・運用がしやすくなると思います。
前提
続きの記事として執筆しているので、以前の記事で記載した内容や追加で必要な操作は前提として記載します。
権限の追加
今回、『ローカル開発環境を準備 - 5. Snowflake環境の準備(ロール・ユーザー)』で作成したSNOWPARK_ROLE
を使って作業しようと思います。
このロールにはタスクを操作する権限がないため、今回は以下のように追加しました。
-- タスクを操作する権限の付与 GRANT EXECUTE TASK, EXECUTE MANAGED TASK ON ACCOUNT TO ROLE SNOWPARK_ROLE;
権限付与にあたって、以下のガイドを参考にしました。
なにかSnowpark MLを使うだけなら十分な検証用のロールがあって、タスク周りの作業をするのに追加で権限を付与したんだな、くらいのご認識で大丈夫です。
既存リソースについて
『クイックスタートでSnowpark ML Modelingを学んだのでポイントをご紹介 | DevelopersIO』で作成した、各種リソースを使います。
特に学習用のデータは、ML_HOL_DB / ML_HOL_SCHEMA / DIAMONDS
テーブルを使います。なにか検証用のデータが入ったテーブルがあるんだな、くらいのご認識で大丈夫です。
ストアドプロシージャが実行するPythonスクリプトについて
スクリプトは、『Intro to Machine Learning with Snowpark ML for Python』クイックスタートのコードを引用・改変します。このコードはApache-2.0 licenseです。
実行引用元のノートブックとライセンス表記については、以下のレポジトリをご確認ください。
改変内容としては、ストアドプロシージャで実行できるよう、ライブラリの記載の整理や処理の並び替えをしました。
検証の流れ
以下に試してみたデプロイ方法について順番に記載します。
1. PythonワークシートからのPythonスクリプトの作成
Snowsightから、Pythonワークシートを開き、以下のように設定をしました。
- ロール:
SNOWPARK_ROLE
(自分で作成したもの) - ウェアハウス:
ML_HOL_WH
(クイックスタートで作成していたもの) - データベース・スキーマ:
ML_HOL_DB.ML_HOL_SCHEMA
(クイックスタートで作成していたもの) - Handler:
main
(デフォルトのまま) - Return type:
Table()
(デフォルトのまま) - Packages: Anaconda Packagesで以下を選択
snowflake-ml-python
snowflake-snowpark-python
特にpackagesについては以下のように設定できました。
Pythonワークシートに以下のコードを入力しました。
# The Snowpark package is required for Python Worksheets. # You can add more packages by selecting them using the Packages control and then importing them. import numpy as np from snowflake.ml.modeling.pipeline import Pipeline import snowflake.ml.modeling.preprocessing as snowml from snowflake.ml.modeling.xgboost import XGBRegressor import snowflake.snowpark as snowpark from snowflake.snowpark.functions import col def main(session: snowpark.Session): DEMO_TABLE = 'diamonds' input_tbl = f"{session.get_current_database()}.{session.get_current_schema()}.{DEMO_TABLE}" diamonds_df = session.table(input_tbl) # Split the data into train and test sets diamonds_train_df, diamonds_test_df = diamonds_df.random_split(weights=[0.9, 0.1], seed=0) # Categorize all the features for processing CATEGORICAL_COLUMNS = ["CUT", "COLOR", "CLARITY"] CATEGORICAL_COLUMNS_OE = ["CUT_OE", "COLOR_OE", "CLARITY_OE"] # To name the ordinal encoded columns NUMERICAL_COLUMNS = ["CARAT", "DEPTH", "TABLE_PCT", "X", "Y", "Z"] LABEL_COLUMNS = ['PRICE'] OUTPUT_COLUMNS = ['PREDICTED_PRICE'] categories = { "CUT": np.array(["IDEAL", "PREMIUM", "VERY_GOOD", "GOOD", "FAIR"]), "CLARITY": np.array(["IF", "VVS1", "VVS2", "VS1", "VS2", "SI1", "SI2", "I1", "I2", "I3"]), "COLOR": np.array(['D', 'E', 'F', 'G', 'H', 'I', 'J']), } # Build the pipeline preprocessing_pipeline = Pipeline( steps=[ ( "OE", snowml.OrdinalEncoder( input_cols=CATEGORICAL_COLUMNS, output_cols=CATEGORICAL_COLUMNS_OE, categories=categories, ) ), ( "MMS", snowml.MinMaxScaler( clip=True, input_cols=NUMERICAL_COLUMNS, output_cols=NUMERICAL_COLUMNS, ) ) ] ) # Run the train and test sets through the Pipeline object we defined earlier train_df = preprocessing_pipeline.fit(diamonds_train_df).transform(diamonds_train_df) test_df = preprocessing_pipeline.transform(diamonds_test_df) # Define the XGBRegressor regressor = XGBRegressor( input_cols=CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS, label_cols=LABEL_COLUMNS, output_cols=OUTPUT_COLUMNS ) # Train regressor.fit(train_df) # Predict result = regressor.predict(test_df) # Save Results AS Table result.write.mode("overwrite").save_as_table("latest_predict_result") # Return value will appear in the Results tab. return result
一度実行してみて、成功することを確認しました。
データベースを確認して、latest_predict_result
テーブルができていることも確認できました。このテーブルはタスク実行時に作成されますが、今回は成功が分かりやすいように一度消して、タスクを実行した際にできているか再度確認することとします。
2. ストアドプロシージャの作成
スクリプト記入済みのPythonワークシートから、画面右上のDeploy
を押して、ストアドプロシージャを作成しました。
New Python Procedure
のポップアップにプロシージャ名とHandlerを入れて、Open in Worksheets
を押しました。
SQLワークシートが開くので、EXECUTE AS CALLER
を以下のように追記し、SQLを実行してストアドプロシージャを作成しました。(デフォルトのEXECUTE AS OWNER
だとタスク実行時に失敗したため、『Why a stored procedure runs without error in web UI but fails as task / stored procedure.』を参考に天下り的にEXECUTE AS CALLER
としました。)
以下のようにストアドプロシージャが作成できました。
3. タスクの作成
SQLワークシートを開き、以下のSQLを実行してタスクを作成しました。
create task python_stored_test_task schedule = 'using cron 0 8 1 * * UTC' -- 毎月1日8:00(UTC)で実行と仮定 warehouse = 'ML_HOL_WH' as call SNOWPARK_ML_SAMPLE_PROCEDURE(); alter task python_stored_test_task resume; -- タスクの実行にはresumeが必要 show tasks;
以下のようにタスクが作成されました。
続いてSQLワークシートから以下のSQL文を実行してタスクを実行しました。
execute task python_stored_test_task;
Run History
から確認すると、タスクが成功していることが分かりました。
※ 何回か試行錯誤したので履歴がいくつか出ていますが、赤枠箇所が成功したものになります。
スキーマを確認すると、latest_predict_result
テーブルができていることが確認できました。
最後に
今回はSnowpark ML Modelingを使ったPythonスクリプトをストアドプロシージャにしてタスクで実行する例をご紹介しました。
この方法であればSnowflake外部にスケジュール実行用のコンピュートを配置しなくてもSnowflakeだけで完結してSnowpark ML Modelingを使った処理実行が定期的に行えそうです。
この記事の内容は、学習と推論を毎回行うものでしたが、学習済みのリソースはステージにおいておくなどして、学習と推論を別のストアドプロシージャにしておいてもいいのかなと思っています。
このあたりの使い分けについてはまた別の機会に検証していければと思います。