クイックスタートでSnowpark ML Modelingを学んだのでポイントをご紹介

クイックスタートを実施してみて、Snowpark MLの利用イメージを掴むために特にポイントとなりそうだなと思った点をご紹介します。
2023.11.14

データアナリティクス事業本部 機械学習チームの鈴木です。

現在パブリックプレビュー版のSnowpark MLですが、既にクイックスタートも公開されております。

今回はクイックスタートを実施してみて、Snowpark MLの、特にSnowpark ML Modelingの利用イメージを掴むためにポイントとなりそうだなと思った点をご紹介します。

この記事の内容

以下の『Intro to Machine Learning with Snowpark ML for Python』はSnowpark MLを使ったデータの前処理〜モデルの作成・デプロイまでが学べるクイックスタートです。Snowpark ML ModelingはSnowpark MLの主要な機能の一つで、クイックスタートでは主にこの機能を使った開発が紹介されています。

今回はこのクイックスタートの内容を実施してみて、Snowpark ML Modelingの利用イメージを掴むために特にポイントとなりそうだなと思った点をご紹介します。

この記事では以下のセクションに限って言及します。

  • 4.Set up the data in Snowflake
  • 5.ML Feature Transformations
  • 6.ML Model Training and Deployment(モデルの訓練・推論部分)

セクション3までで必要な内容は、ローカル開発環境構築の記事で紹介していて、その内容を踏まえて頂ければセクション4以降を進められるかなと思います。

クイックスタートの内容の全体についてはぜひクイックスタート自体を実施頂くとして、この記事ではポイントと思った箇所や、試してみた上で記事執筆時点ではこう修正すると良かったという点を見ていければと思います。

なお、記事執筆時点ではSnowpark ML Modelingはパブリックプレビューですのでご注意ください。

前提

ローカルのJupyter環境からコードを実行しました。

ローカル開発環境は以下のブログで用意したものとなります。

以降の内容では、『Intro to Machine Learning with Snowpark ML for Python』クイックスタートのコードを引用します。このコードはApache-2.0 licenseです。一部改変したものについては該当箇所で明記します。

引用元のノートブックとライセンス表記については、以下のレポジトリをご確認ください。

1. 『Set up the data in Snowflake』のポイント

Snowflakeへのセッションを作成する

Session.builder.configsを使って接続を作成できますが、今回は以下のようにJupyter Notebook上で辞書を作成し、それを読み込む形としました。

# Snowpark for Python
from snowflake.snowpark import Session
from snowflake.snowpark.version import VERSION
from snowflake.snowpark.types import StructType, StructField, DoubleType, IntegerType, StringType, FloatType
import snowflake.snowpark.functions as F

# data science libs
import numpy as np

# misc
import json

connection_parameters = {
    "account": "アカウント識別子",
    "user": "SNOWPARK_OPERATOR",
    "password": "パスワード",
    "role": "SNOWPARK_ROLE",
    "warehouse": "ML_HOL_WH",
    "database": "ML_HOL_DB",
    "schema": "ML_HOL_SCHEMA",
} 

session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True

snowflake_environment = session.sql('SELECT current_user(), current_version()').collect()
snowpark_version = VERSION

# Current Environment Details
print('\nConnection Established with the following parameters:')
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

# Connection Established with the following parameters:
# User                        : SNOWPARK_OPERATOR
# Role                        : "SNOWPARK_ROLE"
# Database                    : "ML_HOL_DB"
# Schema                      : "ML_HOL_SCHEMA"
# Warehouse                   : "ML_HOL_WH"
# Snowflake version           : 7.40.0
# Snowpark for Python version : 1.5.1

アカウント識別子は『アカウント識別子』のガイドの形式1のものを使いました。orgname-account_nameという形式になります。

orgnameおよびaccount_nameは、『組織の使用を始めるにあたり』のガイドの『組織およびそのアカウントの名前の表示』の部分に記載がありました。Snowsightから管理者 > アカウントと開くと確認できました。

元々のサンプルコードではconnection_parameters = json.load(open('connection.json'))となっていますが、これは同等の内容がconnection.jsonファイルに記載されているのでそれを読み込むという理解で良さそうです。

ステージのファイルからテーブルを作成する

続いて、@DIAMONDS_ASSETSステージのCSVファイルを読み込みますが、今回はuser_schemaを自分で定義して、以下のハイライトのようにスキーマ指定して読み込みました。

user_schema = StructType([
    StructField("carat", FloatType()), 
    StructField("cut", StringType()), 
    StructField("color", StringType()), 
    StructField("clarity", StringType()), 
    StructField("depth", FloatType()), 
    StructField("table", FloatType()), 
    StructField("price", IntegerType()), 
    StructField("x", FloatType()), 
    StructField("y", FloatType()), 
    StructField("z", FloatType())])

# Show the file before loading
session.sql("LS @DIAMONDS_ASSETS;").show()

# Create a Snowpark DataFrame that is configured to load data from the CSV file
# We can now infer schema from CSV files.
diamonds_df = session.read.options({"field_delimiter": ",",
                                    "field_optionally_enclosed_by": '"',
                                    "infer_schema": True,
                                    "skip_header": 1}).schema(user_schema).csv("@DIAMONDS_ASSETS")

diamonds_df.show()

# Look at descriptive stats on the DataFrame
diamonds_df.describe().show()

以降の処理は同じですが、上記の修正に伴って元のカラム名はTABLEになっていたので、TABLE_PCTにカラム名を変える際にはハイライト箇所のように修正しています。

# Force headers to uppercase
for colname in diamonds_df.columns:
    if colname == "TABLE":
       new_colname = "TABLE_PCT"
    else:
        new_colname = str.upper(colname)
    diamonds_df = diamonds_df.with_column_renamed(colname, new_colname)

diamonds_df.show()

最後のコードではデータフレームのwriteメソッドでdiamondsテーブルをセッションで指定しているスキーマに作成しました。

diamonds_df.write.mode('overwrite').save_as_table('diamonds')

このデータフレームからテーブルを作成する処理は、Snowpark Pythonのデータフレームの機能で、以下のブログの『8. テーブルの作成』で紹介したものと同じです。

2. 『ML Feature Transformations』のポイント

前処理用関数を利用する

ここでは先に作成したdiamondテーブルからデータを読み出し、Snowpark ML Modelingの前処理用関数を使って前処理ができることを確認しました。

ポイントは以下のコードでOrdinalEncoderMinMaxScalerをPipelineでまとめて、scikit-learnに類似した記述で前処理を定義できる点です。

# Categorize all the features for processing
CATEGORICAL_COLUMNS = ["CUT", "COLOR", "CLARITY"]
CATEGORICAL_COLUMNS_OE = ["CUT_OE", "COLOR_OE", "CLARITY_OE"] # To name the ordinal encoded columns
NUMERICAL_COLUMNS = ["CARAT", "DEPTH", "TABLE_PCT", "X", "Y", "Z"]

categories = {
    "CUT": np.array(["IDEAL", "PREMIUM", "VERY_GOOD", "GOOD", "FAIR"]),
    "CLARITY": np.array(["IF", "VVS1", "VVS2", "VS1", "VS2", "SI1", "SI2", "I1", "I2", "I3"]),
    "COLOR": np.array(['D', 'E', 'F', 'G', 'H', 'I', 'J']),
}

# Build the pipeline
preprocessing_pipeline = Pipeline(
    steps=[
            (
                "OE",
                snowml.OrdinalEncoder(
                    input_cols=CATEGORICAL_COLUMNS,
                    output_cols=CATEGORICAL_COLUMNS_OE,
                    categories=categories,
                )
            ),
            (
                "MMS",
                snowml.MinMaxScaler(
                    clip=True,
                    input_cols=NUMERICAL_COLUMNS,
                    output_cols=NUMERICAL_COLUMNS,
                )
            )
    ]
)

一方で、完全にscikit-learnと同じわけではなく、OrdinalEncoderMinMaxScalerは引数が異なることに注意が必要です。

また、snowflake.ml.modeling.pipeline.Pipelineのインスタンスをsklearn.pipeline.Pipelineのインスタンスに変換することは、少なくとも記事執筆時点ではできません。

具体的にはSnowpark MLのAPI Referenceのsklearn.pipeline.Pipelineクラスのページにはto_sklearnのようなメソッドはありません。(snowflake.ml.modeling.ensemble.RandomForestClassifierクラスのようなモデルに関するクラスには実装されています。)

パイプラインインスタンスの永続化

作成したSnowpark MLのクラスのインスタンスはjoblibで書き出して永続化する例が紹介されていました。

PIPELINE_FILE = 'preprocessing_pipeline.joblib'
joblib.dump(preprocessing_pipeline, PIPELINE_FILE) # We are just pickling it locally first

ローカルに置いておいてjoblib.loadで読み出すことができますし、ステージにアップロードしておいても良いですね。

# You can also save the pickled object into the stage we created earlier for deployment
session.file.put(PIPELINE_FILE, "@ML_HOL_ASSETS", overwrite=True)

3. 『ML Model Training and Deployment』のポイント

訓練・テストへデータ分割する

これはSnowpark Pythonの機能ですが、DataFrameのrandom_splitメソッドでデータフレームを分割できるのがとても便利です。

# Split the data into train and test sets
diamonds_train_df, diamonds_test_df = diamonds_df.random_split(weights=[0.9, 0.1], seed=0)

# Run the train and test sets through the Pipeline object we defined earlier
train_df = preprocessing_pipeline.fit(diamonds_train_df).transform(diamonds_train_df)
test_df = preprocessing_pipeline.transform(diamonds_test_df)

訓練と推論を実行する

分割した後のデータセットを使って、snowflake.ml.modeling.xgboost.XGBRegressorで回帰モデルを構築することができます。訓練はfitメソッドで行い、predictメソッドで推論ができることが紹介されていました。

# Define the XGBRegressor
regressor = XGBRegressor(
    input_cols=CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS,
    label_cols=LABEL_COLUMNS,
    output_cols=OUTPUT_COLUMNS
)

# Train
regressor.fit(train_df)

# Predict
result = regressor.predict(test_df)

ちなみにSnowpark MLのスコープをイメージしやすくという意味で、クイックスタートのノートブックでも可視化はSeabornのようないわゆるPythonの可視化ツールを使っていました。

実測値と予測値

モデル選択をする

グリッドサーチをすることも可能です。これは大変便利ですね。

grid_search = GridSearchCV(
    estimator=XGBRegressor(),
    param_grid={
        "n_estimators":[100, 200, 300, 400, 500],
        "learning_rate":[0.1, 0.2, 0.3, 0.4, 0.5],
    },
    n_jobs = -1,
    scoring="neg_mean_absolute_percentage_error",
    input_cols=CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS,
    label_cols=LABEL_COLUMNS,
    output_cols=OUTPUT_COLUMNS
)

# Train
grid_search.fit(train_df)

to_sklearnメソッドでsklearn.model_selection.GridSearchCVに変換することも可能です。

# Let's save our optimal model first and its metadata
optimal_model = grid_search.to_sklearn().best_estimator_
optimal_n_estimators = grid_search.to_sklearn().best_estimator_.n_estimators
optimal_learning_rate = grid_search.to_sklearn().best_estimator_.learning_rate

モデルのデプロイについての補足

『6.ML Model Training and Deployment』の最後にはモデルのデプロイについて、Snowpark Model Registryのサンプルの紹介がありました。この機能は記事執筆時点でまだパブリックプレビューではない認識なので、ここでは触れません。

記事執筆時点でどのような選択肢がありそうなのかは別途まとめて記事にしたいと思います。

最後に

Snowpark MLのクイックスタートである『Intro to Machine Learning with Snowpark ML for Python』を試してみたので、ポイントとなる点をご紹介しました。

とても簡単にSnowpark MLの使い方を学ぶことができる資料でしたので、ご興味があればぜひ試して頂ければと思います。