Snowpark PythonクライアントからのUDF作成方法と実行されたSQLについて確認してみた

Snowpark Pythonからユーザー定義関数(UDFs)を作成することで、Pythonで記述したロジックをSnowflake上で実行することが可能です。Snowflakeで実行されるSQLを確認すると理解を深まります。
2023.08.04

データアナリティクス事業本部 機械学習チームの鈴木です。

Snowpark PythonのクライアントからUDFを使ってみて、Snowflake上で実行される処理や作成したUDFの使い方について確認してみました。

記事の内容

Snowpark Pythonからユーザー定義関数(UDFs)を作成して、UDFの作成パターンや使い方について確認しました。

ガイドとしては、以下のページを参考にしました。

Snowpark PythonではPythonコードを使ってSnowflakeのUDFを作成します。Pythonコードで記述したロジックとSnowflakeの計算資源を使って、Snowflakeに格納したデータを変換することができます。

用途としては、普通にUDFやDataFrameの操作に使えることに加え、以下のクイックスタートのように、別環境で訓練した機械学習モデル(ここでは特にCPUを使うもの)を実行することも可能です。

先に紹介したガイドでも、XGBoostを使用するための例が記載されていました。

SnowparkとUDFを使いこなすことでより様々な処理をSnowflake上で実現できるため、この記事ではまずSnowparkでUDFを作るとSnowflake上でどのような処理が実行され、できたUDFをどのようにしてSnowparkから利用できるのか確認してみました。

準備

検証に必要な準備について記載します。

データベース

SnowsightからML_DATABASEという名前のデータベースを作成しておきました。

データベースの作成は、例えば以下のチュートリアルをご確認ください。

ロール・ユーザー

SnowsightのSQLワークシートから以下のようなSQLでSNOWPARK_ROLEというロールを付与したSNOWPARK_OPERATORというユーザーを作成しておきました。

-- ROLEの作成
CREATE ROLE SNOWPARK_ROLE;

-- ウェアハウスへのUSAGEアクセス権の付与
GRANT USAGE ON WAREHOUSE 使用するウェアハウス TO ROLE SNOWPARK_ROLE;

-- データベースおよびスキーマのUSAGE権の付与
GRANT USAGE ON DATABASE ML_DATABASE TO ROLE SNOWPARK_ROLE;
GRANT USAGE ON SCHEMA ML_DATABASE.PUBLIC TO ROLE SNOWPARK_ROLE;

-- スキーマ内でのテーブルの作成権限の付与
GRANT CREATE TABLE ON SCHEMA ML_DATABASE.PUBLIC TO ROLE SNOWPARK_ROLE;

-- スキーマ内でのUDFの作成権限の付与
GRANT CREATE FUNCTION ON SCHEMA ML_DATABASE.PUBLIC TO ROLE SNOWPARK_ROLE;

-- 基本的なSQL操作用の権限の付与
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA PUBLIC TO ROLE SNOWPARK_ROLE;

-- ユーザーの作成
CREATE USER SNOWPARK_OPERATOR
    PASSWORD = 'パスワード'
    DEFAULT_ROLE = 'SNOWPARK_ROLE'
    MUST_CHANGE_PASSWORD = FALSE;

-- ユーザーへのロールの付与
GRANT ROLE SNOWPARK_ROLE TO USER SNOWPARK_OPERATOR;

特に今回はUDFを作成したいので、スキーマ内でのUDFの作成権限の付与を忘れずにしておきました。

クライアント開発環境

以下のブログの通り、ローカルにJupyter Notebook環境を用意しました。

Jupyter Notebookを起動して新しいノートブックを作成したら、以下のようにセッションを作成し、使用するデータベースなどの指定をしました。

from snowflake.snowpark import Session

# セッションの作成
connection_parameters = {
   "account": "アカウント識別子",
   "user": "SNOWPARK_OPERATOR",
   "password": "設定したパスワード",
    "warehouse": "使用するウェアハウス"
}

session = Session.builder.configs(connection_parameters).create()

# データベース・スキーマ・ロールの指定
session.use_database("ML_DATABASE")
session.use_schema("PUBLIC")
session.sql("USE ROLE SNOWPARK_ROLE").collect()
## [Row(status='Statement executed successfully.')]

利用するAPIについて

snowflake.snowpark.functions.udfを使ってUDFを作成します。このAPIの詳細は以下のドキュメントに記載があります。

is_permanent引数がFalseの場合、一時的なUDFが作成されます。作成したSnowpark PythonクライアントとSnowflakeとの間のセッションが破棄されると、一時的なUDFも破棄されます。デフォルトはFalseです。

以降でregister_from_fileも利用しますが、is_permanent引数については同じになります。

一時的ではないUDFを作成したい場合は、is_permanentTrueとします。このとき、stage_locationの指定が必須になります。

一時的なUDFを作成する

まず、一時的なUDFを作成する場合について、試した内容を記載します。

無名関数を使った匿名UDF

まずはPython無名関数を使って匿名UDFを作成しました。udf内でUDFの名前について指定をしていません。

無名関数をsnowflake.snowpark.functions.udfに渡して匿名UDFを作成し、変数に割り当てました。

from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udf

add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])

匿名UDFは、DataFrameに対して適用することができました。

df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
df.select(add_one("a"), add_one("b")).collect()
[Row("ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_FUNCTION_YGAWVG4PSB("A")=2, "ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_FUNCTION_YGAWVG4PSB("B")=3),
 Row("ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_FUNCTION_YGAWVG4PSB("A")=4, "ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_FUNCTION_YGAWVG4PSB("B")=5)]

Snowsightのクエリ履歴から、どのようなSQLに変換されたか確認してみました。

まず、以下のように一時的なPython UDFが作成されました。(pickle化した関数は置き換えたものです。)

CREATE TEMPORARY FUNCTIONcomputeがハンドラーとして設定されており、compute関数がpickle化した関数を読み込む形で定義されています。

CREATE
TEMPORARY  FUNCTION  "ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_FUNCTION_YGAWVG4PSB(arg1 INT)
RETURNS INT
LANGUAGE PYTHON 
RUNTIME_VERSION=3.8

PACKAGES=('cloudpickle==2.0.0')
HANDLER='compute'

AS $$
import pickle

func = pickle.loads(bytes.fromhex('pickle化した関数'))
# The following comment contains the source code generated by snowpark-python for explanatory purposes.
# func = lambda x: x+1


from threading import RLock

lock = RLock()

class InvokedFlag:
    def __init__(self):
        self.invoked = False

def lock_function_once(f, flag):
    def wrapper(*args, **kwargs):
        if not flag.invoked:
            with lock:
                if not flag.invoked:
                    result = f(*args, **kwargs)
                    flag.invoked = True
                    return result
                return f(*args, **kwargs)
        return f(*args, **kwargs)
    return wrapper


invoked = InvokedFlag()

def compute(arg1):
    return lock_function_once(func, invoked)(arg1)
$$

そのあと、そのUDFを使ってデータの変換をしていました。

SELECT "ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_FUNCTION_YGAWVG4PSB("A"), "ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_FUNCTION_YGAWVG4PSB("B") FROM ( SELECT "_1" AS "A", "_2" AS "B" FROM ( SELECT $1 AS "_1", $2 AS "_2" FROM  VALUES (1 :: INT, 2 :: INT), (3 :: INT, 4 :: INT)))

Python上では匿名UDFを割り当てた変数でUDFの管理をしますが、Snowflake上では適当な名前を割り当てた一時的なUDFを作成しているようでした。

なお、pickle化した関数は以下で質問されているような形になります。

Python UDFsの作成については以下のページに説明がありました。クエリ履歴の結果と見比べると同じ構文であることが分かります。Snowpark PythonからのUDF作成は、PythonスクリプトからPython UDFを作成するための仕組みと考えてよさそうです。

無名関数を使った名前付きUDF

まず無名関数を使った方法です。匿名UDFの作成と同じくsnowflake.snowpark.functions.udfを使いますが、このときname引数でUDF名を指定します。

add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)

クエリ履歴で確認できたSQLは以下です。(pickle化した関数は置き換えたものです。)

匿名UDFのときとは異なり、CREATE OR REPLACE TEMPORARY FUNCTIONでUDF名が指定されていました。

CREATE OR REPLACE 
TEMPORARY  FUNCTION  my_udf(arg1 INT)
RETURNS INT
LANGUAGE PYTHON 
RUNTIME_VERSION=3.8

PACKAGES=('cloudpickle==2.0.0')
HANDLER='compute'

AS $$
import pickle

func = pickle.loads(bytes.fromhex('pickle化した関数'))
# The following comment contains the source code generated by snowpark-python for explanatory purposes.
# func = lambda x: x+1


from threading import RLock

lock = RLock()

class InvokedFlag:
    def __init__(self):
        self.invoked = False

def lock_function_once(f, flag):
    def wrapper(*args, **kwargs):
        if not flag.invoked:
            with lock:
                if not flag.invoked:
                    result = f(*args, **kwargs)
                    flag.invoked = True
                    return result
                return f(*args, **kwargs)
        return f(*args, **kwargs)
    return wrapper


invoked = InvokedFlag()

def compute(arg1):
    return lock_function_once(func, invoked)(arg1)
$$

DataFrameへの操作とする場合は、関数を割り当てた変数を使います。

df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
df.select(add_one("a"), add_one("b")).collect()
[Row(MY_UDF("A")=2, MY_UDF("B")=3), Row(MY_UDF("A")=4, MY_UDF("B")=5)]

SQLで利用する場合にはUDF名を指定できます。どちらかというと後で紹介する永続化したUDFでの用途になるかもしれません。

df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")

# 一度テーブルにする
df.write.mode("overwrite").save_as_table("tmp_df")

# UDFを使う
session.sql("select my_udf(a), my_udf(b) from tmp_df").collect()
[Row(MY_UDF(A)=2, MY_UDF(B)=3), Row(MY_UDF(A)=4, MY_UDF(B)=5)]

udfデコレーターを使った名前付きUDF

udfデコレーターを使って、Python関数の宣言と併せたUDF作成も可能です。

@udf(name="my_udf", replace=True)
def add_one(x: int) -> int:
    return x + 1

クエリ履歴で確認できた関数は以下です。(pickle化した関数は置き換えたものです。)

CREATE OR REPLACE 
TEMPORARY  FUNCTION  my_udf(arg1 BIGINT)
RETURNS BIGINT
LANGUAGE PYTHON 
RUNTIME_VERSION=3.8

PACKAGES=('cloudpickle==2.0.0')
HANDLER='compute'

AS $$
import pickle

func = pickle.loads(bytes.fromhex('pickle化した関数'))
# The following comment contains the source code generated by snowpark-python for explanatory purposes.
# @udf(name="my_udf", replace=True)
# def add_one(x: int) -> int:
#     return x + 1
#
# func = add_one


from threading import RLock

lock = RLock()

class InvokedFlag:
    def __init__(self):
        self.invoked = False

def lock_function_once(f, flag):
    def wrapper(*args, **kwargs):
        if not flag.invoked:
            with lock:
                if not flag.invoked:
                    result = f(*args, **kwargs)
                    flag.invoked = True
                    return result
                return f(*args, **kwargs)
        return f(*args, **kwargs)
    return wrapper


invoked = InvokedFlag()

def compute(arg1):
    return lock_function_once(func, invoked)(arg1)
$$

使い方は無名関数の場合と同じです。DataFrameに使うときの例だけ試してみました。

df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
df.select(add_one("a"), add_one("b")).collect()
[Row(MY_UDF("A")=2, MY_UDF("B")=3), Row(MY_UDF("A")=4, MY_UDF("B")=5)]

ローカルのファイルを使った名前付きUDF

ローカルにあるファイルから作成する場合です。

以下のPythonスクリプトを作成しておきました。

./test_udf_file.py

def mod5(x: int) -> int:
    return x % 5

Jupyter Notebookから確認できる場所においておきました。

!ls test_udf_file.py

register_from_fileで一時的なUDFを作成しました。

# mod5() in that file has type hints
mod5_udf = session.udf.register_from_file(
    file_path="tests/resources/test_udf_dir/test_udf_file.py",
    func_name="mod5",
)  
session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()  
# [Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]

クエリ履歴では以下のクエリが実行されていました。

CREATE
TEMPORARY  FUNCTION  my_udf(arg1 BIGINT)
RETURNS BIGINT
LANGUAGE PYTHON 
RUNTIME_VERSION=3.8
IMPORTS=('@"ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_STAGE_KLPD8Y9SQH/my_udf/test_udf_file.py')
PACKAGES=('cloudpickle==2.0.0')
HANDLER='test_udf_file.mod5'

IMPORTSではステージを指定していました。

少し前のクエリを確認すると、test_udf_file.pyのステージへのアップロードも含めてSnowpark Python側で実行してくれていました。

PUT 'file://./test_udf_file.py' '@"ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_STAGE_KLPD8Y9SQH/my_udf' PARALLEL = 4 AUTO_COMPRESS = FALSE SOURCE_COMPRESSION = AUTO_DETECT OVERWRITE = TRUE

ステージのファイルを使った名前付きUDF

あらかじめ内部ステージにアップロードしたPythonスクリプトを使って一時的なUDFを作ってみました。

まず、snowflake.snowpark.FileOperation.putでステージにtest_udf_file.pyをアップロードしておきます。今回はユーザーステージの@~/mystageにアップロードしました。

# Upload a file to a stage.
put_result = session.file.put("./test_udf_file.py", "@~/mystage")
put_result[0].status
# 'UPLOADED'

ユーザーステージにファイルがアップできました。mystage/test_udf_file.py.gzとgzipになっているところは気になります。

session.sql("LIST @~;").collect()
# [Row(name='mystage/test_udf_file.py.gz', size=80, md5='b0404dafa727a9567fe84cbf14eaa190', last_modified='Thu, 3 Aug 2023 13:59:24 GMT')]

register_from_fileより一時UDFを作成して実行してみると、問題ありませんでした。

# suppose you have uploaded test_udf_file.py to stage location @mystage.
mod5_udf = session.udf.register_from_file(
    file_path="@~/mystage/test_udf_file.py",
    func_name="mod5",
    return_type=IntegerType(),
    input_types=[IntegerType()],
    name="my_udf"
)  
session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]

クエリ履歴では以下のクエリが実行されていました。IMPORTSでは@~/mystage/test_udf_file.pyを指定していますが、gzipになっていても大丈夫なようでした。

CREATE
TEMPORARY  FUNCTION  my_udf(arg1 INT)
RETURNS INT
LANGUAGE PYTHON 
RUNTIME_VERSION=3.8
IMPORTS=('@~/mystage/test_udf_file.py')
PACKAGES=('cloudpickle==2.0.0')
HANDLER='test_udf_file.mod5'

永続的なUDFを作成する

udfデコレーターを使った名前付きUDF

is_permanentTrueとしました。このとき、stage_locationの指定が必須なため、今回はユーザーステージの@~/mystageを指定しました。

@udf(name="my_udf", replace=True, is_permanent=True, stage_location="@~/mystage")
def add_one(x: int) -> int:
    return x + 1

クエリ履歴で確認できた関数は以下です。(pickle化した関数は置き換えたものです。)

一時的なUDFとは異なり、CREATE OR REPLACE FUNCTIONが実行されました。

CREATE OR REPLACE 
  FUNCTION  my_udf(arg1 BIGINT)
RETURNS BIGINT
LANGUAGE PYTHON 
RUNTIME_VERSION=3.8

PACKAGES=('cloudpickle==2.0.0')
HANDLER='compute'

AS $$
import pickle

func = pickle.loads(bytes.fromhex('pickle化した関数'))
# The following comment contains the source code generated by snowpark-python for explanatory purposes.
# @udf(name="my_udf", replace=True, is_permanent=True, stage_location="@~/mystage")
# def add_one(x: int) -> int:
#     return x + 1
#
# func = add_one


from threading import RLock

lock = RLock()

class InvokedFlag:
    def __init__(self):
        self.invoked = False

def lock_function_once(f, flag):
    def wrapper(*args, **kwargs):
        if not flag.invoked:
            with lock:
                if not flag.invoked:
                    result = f(*args, **kwargs)
                    flag.invoked = True
                    return result
                return f(*args, **kwargs)
        return f(*args, **kwargs)
    return wrapper


invoked = InvokedFlag()

def compute(arg1):
    return lock_function_once(func, invoked)(arg1)
$$

使ってみます。

df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
df.select(add_one("a"), add_one("b")).collect()
[Row(MY_UDF("A")=2, MY_UDF("B")=3), Row(MY_UDF("A")=4, MY_UDF("B")=5)]

永続化しているので、削除は以下のように行えました。

session.sql("DROP FUNCTION my_udf(number);").collect()

そのほかの方法

一時的なUDF』で確認した以下の作成方法についても、永続化する場合はis_permanentTrueとして、stage_locationの指定します。

  • ローカルのPythonスクリプトから作成する方法
  • ステージにあるファイルから作成する場合

作成されたUDFの確認

永続化したUDFはSnowsightからも確認が可能でした。権限があるロールに切り替え、データベースの関数より確認できます。

永続化したUDF

SQLからも確認可能です。SQLを実行するユーザーが権限のあるUDFがリストされます。

SHOW USER FUNCTIONS;

後片付け

特に一時的なUDFはSnowpark Pythonクライアントとのセッションが残っている間は存在するので、作業が終わったらセッションをクローズしておきました。

# セッションのクローズ
session.close()

補足

UDFのML用途について

機械学習モデルのホストのためUDFを使う例についても言及しましたが、Snowparkではネイティブの機械学習APIが発表されています。記事執筆時点ではプレビューです。この機能が一般提供開始すれば、Snowflakeでの機械学習機能の実現方法も大きく変わると思われます。

APIについてはガイドが公開されていますので、併せてご確認ください。

最後に

Snowpark PythonでUDFを作成する際のコード例と、Snowflakeに送信されたSQLを確認しました。

クライアント環境からはPythonで操作をしていますが、実際は対応するSQLがSnowflake側で実行されているので、自分で試してみたクエリの履歴をみつつ理解を深めると良さそうでした。

また、Snowflakeのステージに関する知識も必要でした。今回はユーザーステージにPythonスクリプトを配置してUDFとして読み込む例を試してみましたが、実際の運用では誰でも操作できるように名前付きステージを作って配置したり、依存関係のあるモジュールやファイル出力した機械学習モデルを配置して読み込むような応用をすることもあるかもしれません。これについては別途記事にできればと思います。

そのほかの参考文献