データアナリティクス事業本部 機械学習チームの鈴木です。
Snowpark PythonのクライアントからUDFを使ってみて、Snowflake上で実行される処理や作成したUDFの使い方について確認してみました。
記事の内容
Snowpark Pythonからユーザー定義関数(UDFs)を作成して、UDFの作成パターンや使い方について確認しました。
ガイドとしては、以下のページを参考にしました。
Snowpark PythonではPythonコードを使ってSnowflakeのUDFを作成します。Pythonコードで記述したロジックとSnowflakeの計算資源を使って、Snowflakeに格納したデータを変換することができます。
用途としては、普通にUDFやDataFrameの操作に使えることに加え、以下のクイックスタートのように、別環境で訓練した機械学習モデル(ここでは特にCPUを使うもの)を実行することも可能です。
先に紹介したガイドでも、XGBoostを使用するための例が記載されていました。
SnowparkとUDFを使いこなすことでより様々な処理をSnowflake上で実現できるため、この記事ではまずSnowparkでUDFを作るとSnowflake上でどのような処理が実行され、できたUDFをどのようにしてSnowparkから利用できるのか確認してみました。
準備
検証に必要な準備について記載します。
データベース
SnowsightからML_DATABASE
という名前のデータベースを作成しておきました。
データベースの作成は、例えば以下のチュートリアルをご確認ください。
ロール・ユーザー
SnowsightのSQLワークシートから以下のようなSQLでSNOWPARK_ROLE
というロールを付与したSNOWPARK_OPERATOR
というユーザーを作成しておきました。
-- ROLEの作成
CREATE ROLE SNOWPARK_ROLE;
-- ウェアハウスへのUSAGEアクセス権の付与
GRANT USAGE ON WAREHOUSE 使用するウェアハウス TO ROLE SNOWPARK_ROLE;
-- データベースおよびスキーマのUSAGE権の付与
GRANT USAGE ON DATABASE ML_DATABASE TO ROLE SNOWPARK_ROLE;
GRANT USAGE ON SCHEMA ML_DATABASE.PUBLIC TO ROLE SNOWPARK_ROLE;
-- スキーマ内でのテーブルの作成権限の付与
GRANT CREATE TABLE ON SCHEMA ML_DATABASE.PUBLIC TO ROLE SNOWPARK_ROLE;
-- スキーマ内でのUDFの作成権限の付与
GRANT CREATE FUNCTION ON SCHEMA ML_DATABASE.PUBLIC TO ROLE SNOWPARK_ROLE;
-- 基本的なSQL操作用の権限の付与
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA PUBLIC TO ROLE SNOWPARK_ROLE;
-- ユーザーの作成
CREATE USER SNOWPARK_OPERATOR
PASSWORD = 'パスワード'
DEFAULT_ROLE = 'SNOWPARK_ROLE'
MUST_CHANGE_PASSWORD = FALSE;
-- ユーザーへのロールの付与
GRANT ROLE SNOWPARK_ROLE TO USER SNOWPARK_OPERATOR;
特に今回はUDFを作成したいので、スキーマ内でのUDFの作成権限の付与を忘れずにしておきました。
クライアント開発環境
以下のブログの通り、ローカルにJupyter Notebook環境を用意しました。
Jupyter Notebookを起動して新しいノートブックを作成したら、以下のようにセッションを作成し、使用するデータベースなどの指定をしました。
from snowflake.snowpark import Session
# セッションの作成
connection_parameters = {
"account": "アカウント識別子",
"user": "SNOWPARK_OPERATOR",
"password": "設定したパスワード",
"warehouse": "使用するウェアハウス"
}
session = Session.builder.configs(connection_parameters).create()
# データベース・スキーマ・ロールの指定
session.use_database("ML_DATABASE")
session.use_schema("PUBLIC")
session.sql("USE ROLE SNOWPARK_ROLE").collect()
## [Row(status='Statement executed successfully.')]
利用するAPIについて
snowflake.snowpark.functions.udf
を使ってUDFを作成します。このAPIの詳細は以下のドキュメントに記載があります。
is_permanent
引数がFalse
の場合、一時的なUDFが作成されます。作成したSnowpark PythonクライアントとSnowflakeとの間のセッションが破棄されると、一時的なUDFも破棄されます。デフォルトはFalse
です。
以降でregister_from_fileも利用しますが、is_permanent
引数については同じになります。
一時的ではないUDFを作成したい場合は、is_permanent
をTrue
とします。このとき、stage_location
の指定が必須になります。
一時的なUDFを作成する
まず、一時的なUDFを作成する場合について、試した内容を記載します。
無名関数を使った匿名UDF
まずはPython無名関数を使って匿名UDFを作成しました。udf
内でUDFの名前について指定をしていません。
無名関数をsnowflake.snowpark.functions.udf
に渡して匿名UDFを作成し、変数に割り当てました。
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udf
add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
匿名UDFは、DataFrameに対して適用することができました。
df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
df.select(add_one("a"), add_one("b")).collect()
[Row("ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_FUNCTION_YGAWVG4PSB("A")=2, "ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_FUNCTION_YGAWVG4PSB("B")=3),
Row("ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_FUNCTION_YGAWVG4PSB("A")=4, "ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_FUNCTION_YGAWVG4PSB("B")=5)]
Snowsightのクエリ履歴から、どのようなSQLに変換されたか確認してみました。
まず、以下のように一時的なPython UDFが作成されました。(pickle化した関数
は置き換えたものです。)
CREATE TEMPORARY FUNCTION
でcompute
がハンドラーとして設定されており、compute
関数がpickle化した関数を読み込む形で定義されています。
CREATE
TEMPORARY FUNCTION "ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_FUNCTION_YGAWVG4PSB(arg1 INT)
RETURNS INT
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
PACKAGES=('cloudpickle==2.0.0')
HANDLER='compute'
AS $$
import pickle
func = pickle.loads(bytes.fromhex('pickle化した関数'))
# The following comment contains the source code generated by snowpark-python for explanatory purposes.
# func = lambda x: x+1
from threading import RLock
lock = RLock()
class InvokedFlag:
def __init__(self):
self.invoked = False
def lock_function_once(f, flag):
def wrapper(*args, **kwargs):
if not flag.invoked:
with lock:
if not flag.invoked:
result = f(*args, **kwargs)
flag.invoked = True
return result
return f(*args, **kwargs)
return f(*args, **kwargs)
return wrapper
invoked = InvokedFlag()
def compute(arg1):
return lock_function_once(func, invoked)(arg1)
$$
そのあと、そのUDFを使ってデータの変換をしていました。
SELECT "ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_FUNCTION_YGAWVG4PSB("A"), "ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_FUNCTION_YGAWVG4PSB("B") FROM ( SELECT "_1" AS "A", "_2" AS "B" FROM ( SELECT $1 AS "_1", $2 AS "_2" FROM VALUES (1 :: INT, 2 :: INT), (3 :: INT, 4 :: INT)))
Python上では匿名UDFを割り当てた変数でUDFの管理をしますが、Snowflake上では適当な名前を割り当てた一時的なUDFを作成しているようでした。
なお、pickle化した関数
は以下で質問されているような形になります。
Python UDFsの作成については以下のページに説明がありました。クエリ履歴の結果と見比べると同じ構文であることが分かります。Snowpark PythonからのUDF作成は、PythonスクリプトからPython UDFを作成するための仕組みと考えてよさそうです。
無名関数を使った名前付きUDF
まず無名関数を使った方法です。匿名UDFの作成と同じくsnowflake.snowpark.functions.udf
を使いますが、このときname
引数でUDF名を指定します。
add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
クエリ履歴で確認できたSQLは以下です。(pickle化した関数
は置き換えたものです。)
匿名UDFのときとは異なり、CREATE OR REPLACE TEMPORARY FUNCTION
でUDF名が指定されていました。
CREATE OR REPLACE
TEMPORARY FUNCTION my_udf(arg1 INT)
RETURNS INT
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
PACKAGES=('cloudpickle==2.0.0')
HANDLER='compute'
AS $$
import pickle
func = pickle.loads(bytes.fromhex('pickle化した関数'))
# The following comment contains the source code generated by snowpark-python for explanatory purposes.
# func = lambda x: x+1
from threading import RLock
lock = RLock()
class InvokedFlag:
def __init__(self):
self.invoked = False
def lock_function_once(f, flag):
def wrapper(*args, **kwargs):
if not flag.invoked:
with lock:
if not flag.invoked:
result = f(*args, **kwargs)
flag.invoked = True
return result
return f(*args, **kwargs)
return f(*args, **kwargs)
return wrapper
invoked = InvokedFlag()
def compute(arg1):
return lock_function_once(func, invoked)(arg1)
$$
DataFrameへの操作とする場合は、関数を割り当てた変数を使います。
df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
df.select(add_one("a"), add_one("b")).collect()
[Row(MY_UDF("A")=2, MY_UDF("B")=3), Row(MY_UDF("A")=4, MY_UDF("B")=5)]
SQLで利用する場合にはUDF名を指定できます。どちらかというと後で紹介する永続化したUDFでの用途になるかもしれません。
df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
# 一度テーブルにする
df.write.mode("overwrite").save_as_table("tmp_df")
# UDFを使う
session.sql("select my_udf(a), my_udf(b) from tmp_df").collect()
[Row(MY_UDF(A)=2, MY_UDF(B)=3), Row(MY_UDF(A)=4, MY_UDF(B)=5)]
udfデコレーターを使った名前付きUDF
udf
デコレーターを使って、Python関数の宣言と併せたUDF作成も可能です。
@udf(name="my_udf", replace=True)
def add_one(x: int) -> int:
return x + 1
クエリ履歴で確認できた関数は以下です。(pickle化した関数
は置き換えたものです。)
CREATE OR REPLACE
TEMPORARY FUNCTION my_udf(arg1 BIGINT)
RETURNS BIGINT
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
PACKAGES=('cloudpickle==2.0.0')
HANDLER='compute'
AS $$
import pickle
func = pickle.loads(bytes.fromhex('pickle化した関数'))
# The following comment contains the source code generated by snowpark-python for explanatory purposes.
# @udf(name="my_udf", replace=True)
# def add_one(x: int) -> int:
# return x + 1
#
# func = add_one
from threading import RLock
lock = RLock()
class InvokedFlag:
def __init__(self):
self.invoked = False
def lock_function_once(f, flag):
def wrapper(*args, **kwargs):
if not flag.invoked:
with lock:
if not flag.invoked:
result = f(*args, **kwargs)
flag.invoked = True
return result
return f(*args, **kwargs)
return f(*args, **kwargs)
return wrapper
invoked = InvokedFlag()
def compute(arg1):
return lock_function_once(func, invoked)(arg1)
$$
使い方は無名関数の場合と同じです。DataFrameに使うときの例だけ試してみました。
df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
df.select(add_one("a"), add_one("b")).collect()
[Row(MY_UDF("A")=2, MY_UDF("B")=3), Row(MY_UDF("A")=4, MY_UDF("B")=5)]
ローカルのファイルを使った名前付きUDF
ローカルにあるファイルから作成する場合です。
以下のPythonスクリプトを作成しておきました。
./test_udf_file.py
def mod5(x: int) -> int:
return x % 5
Jupyter Notebookから確認できる場所においておきました。
!ls test_udf_file.py
register_from_file
で一時的なUDFを作成しました。
# mod5() in that file has type hints
mod5_udf = session.udf.register_from_file(
file_path="tests/resources/test_udf_dir/test_udf_file.py",
func_name="mod5",
)
session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
# [Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
クエリ履歴では以下のクエリが実行されていました。
CREATE
TEMPORARY FUNCTION my_udf(arg1 BIGINT)
RETURNS BIGINT
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
IMPORTS=('@"ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_STAGE_KLPD8Y9SQH/my_udf/test_udf_file.py')
PACKAGES=('cloudpickle==2.0.0')
HANDLER='test_udf_file.mod5'
IMPORTS
ではステージを指定していました。
少し前のクエリを確認すると、test_udf_file.py
のステージへのアップロードも含めてSnowpark Python側で実行してくれていました。
PUT 'file://./test_udf_file.py' '@"ML_DATABASE"."PUBLIC".SNOWPARK_TEMP_STAGE_KLPD8Y9SQH/my_udf' PARALLEL = 4 AUTO_COMPRESS = FALSE SOURCE_COMPRESSION = AUTO_DETECT OVERWRITE = TRUE
ステージのファイルを使った名前付きUDF
あらかじめ内部ステージにアップロードしたPythonスクリプトを使って一時的なUDFを作ってみました。
まず、snowflake.snowpark.FileOperation.putでステージにtest_udf_file.py
をアップロードしておきます。今回はユーザーステージの@~/mystage
にアップロードしました。
# Upload a file to a stage.
put_result = session.file.put("./test_udf_file.py", "@~/mystage")
put_result[0].status
# 'UPLOADED'
ユーザーステージにファイルがアップできました。mystage/test_udf_file.py.gz
とgzipになっているところは気になります。
session.sql("LIST @~;").collect()
# [Row(name='mystage/test_udf_file.py.gz', size=80, md5='b0404dafa727a9567fe84cbf14eaa190', last_modified='Thu, 3 Aug 2023 13:59:24 GMT')]
register_from_file
より一時UDFを作成して実行してみると、問題ありませんでした。
# suppose you have uploaded test_udf_file.py to stage location @mystage.
mod5_udf = session.udf.register_from_file(
file_path="@~/mystage/test_udf_file.py",
func_name="mod5",
return_type=IntegerType(),
input_types=[IntegerType()],
name="my_udf"
)
session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
クエリ履歴では以下のクエリが実行されていました。IMPORTS
では@~/mystage/test_udf_file.py
を指定していますが、gzipになっていても大丈夫なようでした。
CREATE
TEMPORARY FUNCTION my_udf(arg1 INT)
RETURNS INT
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
IMPORTS=('@~/mystage/test_udf_file.py')
PACKAGES=('cloudpickle==2.0.0')
HANDLER='test_udf_file.mod5'
永続的なUDFを作成する
udfデコレーターを使った名前付きUDF
is_permanent
をTrue
としました。このとき、stage_location
の指定が必須なため、今回はユーザーステージの@~/mystage
を指定しました。
@udf(name="my_udf", replace=True, is_permanent=True, stage_location="@~/mystage")
def add_one(x: int) -> int:
return x + 1
クエリ履歴で確認できた関数は以下です。(pickle化した関数
は置き換えたものです。)
一時的なUDFとは異なり、CREATE OR REPLACE FUNCTION
が実行されました。
CREATE OR REPLACE
FUNCTION my_udf(arg1 BIGINT)
RETURNS BIGINT
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
PACKAGES=('cloudpickle==2.0.0')
HANDLER='compute'
AS $$
import pickle
func = pickle.loads(bytes.fromhex('pickle化した関数'))
# The following comment contains the source code generated by snowpark-python for explanatory purposes.
# @udf(name="my_udf", replace=True, is_permanent=True, stage_location="@~/mystage")
# def add_one(x: int) -> int:
# return x + 1
#
# func = add_one
from threading import RLock
lock = RLock()
class InvokedFlag:
def __init__(self):
self.invoked = False
def lock_function_once(f, flag):
def wrapper(*args, **kwargs):
if not flag.invoked:
with lock:
if not flag.invoked:
result = f(*args, **kwargs)
flag.invoked = True
return result
return f(*args, **kwargs)
return f(*args, **kwargs)
return wrapper
invoked = InvokedFlag()
def compute(arg1):
return lock_function_once(func, invoked)(arg1)
$$
使ってみます。
df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
df.select(add_one("a"), add_one("b")).collect()
[Row(MY_UDF("A")=2, MY_UDF("B")=3), Row(MY_UDF("A")=4, MY_UDF("B")=5)]
永続化しているので、削除は以下のように行えました。
session.sql("DROP FUNCTION my_udf(number);").collect()
そのほかの方法
『一時的なUDF』で確認した以下の作成方法についても、永続化する場合はis_permanent
をTrue
として、stage_location
の指定します。
- ローカルのPythonスクリプトから作成する方法
- ステージにあるファイルから作成する場合
作成されたUDFの確認
永続化したUDFはSnowsightからも確認が可能でした。権限があるロールに切り替え、データベースの関数より確認できます。
SQLからも確認可能です。SQLを実行するユーザーが権限のあるUDFがリストされます。
SHOW USER FUNCTIONS;
後片付け
特に一時的なUDFはSnowpark Pythonクライアントとのセッションが残っている間は存在するので、作業が終わったらセッションをクローズしておきました。
# セッションのクローズ
session.close()
補足
UDFのML用途について
機械学習モデルのホストのためUDFを使う例についても言及しましたが、Snowparkではネイティブの機械学習APIが発表されています。記事執筆時点ではプレビューです。この機能が一般提供開始すれば、Snowflakeでの機械学習機能の実現方法も大きく変わると思われます。
APIについてはガイドが公開されていますので、併せてご確認ください。
最後に
Snowpark PythonでUDFを作成する際のコード例と、Snowflakeに送信されたSQLを確認しました。
クライアント環境からはPythonで操作をしていますが、実際は対応するSQLがSnowflake側で実行されているので、自分で試してみたクエリの履歴をみつつ理解を深めると良さそうでした。
また、Snowflakeのステージに関する知識も必要でした。今回はユーザーステージにPythonスクリプトを配置してUDFとして読み込む例を試してみましたが、実際の運用では誰でも操作できるように名前付きステージを作って配置したり、依存関係のあるモジュールやファイル出力した機械学習モデルを配置して読み込むような応用をすることもあるかもしれません。これについては別途記事にできればと思います。
そのほかの参考文献
- ローカルファイルに対する内部ステージの選択 | Snowflake Documentation
- Snowpark用に最適化されたウェアハウス | Snowflake Documentation
- Snowflakeセッションおよびセッションポリシー | Snowflake Documentation
- Granting Privileges for User-Defined Functions | Snowflake Documentation
- CREATE FUNCTION | Snowflake Documentation
- SHOW USER FUNCTIONS | Snowflake Documentation
- Snowflakeの内部ステージについて調べてみた | DevelopersIO