Snowpark PythonのDataFramesでテーブルに格納したデータを操作してみた
データアナリティクス事業本部 機械学習チームの鈴木です。
Snowflakeのテーブルに格納したデータの操作を、Snowpark PythonのDataFramesで行う方法を試してみたのでまとめました。
この記事の内容
Snowpark PythonでSnowflakeのデータをクエリして処理する主な方法として、以下のガイドではDataFrameの使用が紹介されています。今回はこの操作の中で、既にテーブルに格納されたデータに対して使いそうなものを試してみます。
APIの使い方はPySparkによく似ており、以下のページでもSparkによるパイプラインのSnowparkへの移行が紹介されています。PySparkに馴染みがある方はAPIリファレンスを見つつすぐに使えると思います。
Snowpark PythonのAPIリファレンスは以下になります。ガイドに載っていなかった一部の例はAPIリファレンスを参考に実装しました。実際にスクリプトを書く中でも、以下のAPIリファレンスを見て進めるとスムーズに実装ができると思います。
準備
データ
データはML_DATABASE.PUBLIC.IRIS
というテーブルを準備して、UCI Machine Learning RepositoryのIris Data Setを取り込んでおきました。
このデータセットは、下記リンクにて公開されています。
https://archive.ics.uci.edu/ml/datasets/iris
データは以下のブログでSnowflakeに連携していたものになります。記事の内容はこのテーブルを使ったものを記載しますが、操作自体はこのデータに限ったものではないので、ご自身のお手持ちのテーブルに合わせて読み替えて頂ければと思います。
開発環境
今回はアドホックに処理を進めたかったので、以前公開した以下のブログの方法でローカル開発環境を用意しました。
ローカルのJupyter Notebookから、SNOWPARK_ROLE
というロールを付与したSNOWPARK_OPERATOR
ユーザーにてSnowflakeの操作を行います。このロールおよびユーザーの作成も、記事内で紹介しました。
この記事はローカルのJupyter Notebookで実行する前提で記載しますが、SnowsightからPythonワークシートで実行しても問題ありません。Pythonワークシートの場合は、ハンドラー関数内で記述することになるので適宜読み替えてください。
ユーザーへの権限付与
SNOWPARK_ROLE
に、ML_DATABASE
データベース内で検証に必要な操作ができるように権限を付与しました。
SnowsightでSQLワークシートを開き、以下のSQLを実行しました。
-- SNOWPARK_ROLEロールは既にSNOWPARK_OPERATORユーザーに付与されていることとする。 -- データベースおよびスキーマのUSAGE権の付与 GRANT USAGE ON DATABASE ML_DATABASE TO ROLE SNOWPARK_ROLE; GRANT USAGE ON SCHEMA ML_DATABASE.PUBLIC TO ROLE SNOWPARK_ROLE; -- スキーマ内でのテーブルおよびビューの作成権限の付与 GRANT CREATE TABLE ON SCHEMA ML_DATABASE.PUBLIC TO ROLE SNOWPARK_ROLE; GRANT CREATE VIEW ON SCHEMA ML_DATABASE.PUBLIC TO ROLE SNOWPARK_ROLE; -- 基本的なSQL操作用の権限の付与 GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA PUBLIC TO ROLE SNOWPARK_ROLE;
遅延評価について
ガイドのDataFrame を評価するアクションの実行に記載があるように、DataFrameは遅延評価されます。
PySpark同様、アクション(collect
など)が実行されるまでは処理がデータフレーム側で記録され、アクションを実行時にDataFrameが評価され、対応するSQLステートメントがサーバーに送信されて検索が実行されます。
例えば以下のようなスクリプトを記載して、実行してみました。一部冗長な記載をしてみて、どのようなSQLに変換されるかも併せて確認してみます。
df_sql = session.sql("SELECT * from IRIS") df_sql = df_sql.filter(col("SEPAL_LENGTH") >= 5.0) df_sql = df_sql.select(col("SEPAL_LENGTH"), col("PETAL_LENGTH"), col("CLASS")) df_sql = df_sql.filter(col("SEPAL_LENGTH") <= 6.0) df_sql = df_sql.select(col("SEPAL_LENGTH"), col("CLASS")) df_sql = df_sql.group_by("CLASS").median("SEPAL_LENGTH") df_res = df_sql.filter(col("CLASS") == 'Iris-setosa') df_res.show()
クエリ履歴から確認したSQLステートメントは以下になり、1つのSQLに変換されたことが分かります。
SELECT * FROM ( SELECT "CLASS", median("SEPAL_LENGTH") AS "MEDIAN(SEPAL_LENGTH)" FROM ( SELECT "SEPAL_LENGTH", "CLASS" FROM (SELECT * from IRIS) WHERE (("SEPAL_LENGTH" >= '5.0' :: FLOAT) AND ("SEPAL_LENGTH" <= '6.0' :: FLOAT))) GROUP BY "CLASS") WHERE ("CLASS" = 'Iris-setosa') LIMIT 10
やってみる
操作例をみていきましょう。
なお、以下はJupyter Notebookにて実行しました。
1. セッションの作成
SNOWPARK_OPERATOR
ユーザーを使ってセッションを作成します。
from snowflake.snowpark import Session # セッションの作成 connection_parameters = { "account": "アカウント識別子", "user": "SNOWPARK_OPERATOR", "password": "設定したパスワード", "warehouse": "使用するウェアハウス" } session = Session.builder.configs(connection_parameters).create()
2. ロール・データベース・スキーマの設定
SQLから実行することもできますが、それ用のAPIもあります。
session.use_role("SNOWPARK_ROLE") session.use_database("ML_DATABASE") session.use_schema("PUBLIC")
3. データフレームでのデータ取得
snowflake.snowpark.Session.sqlにてSQLを実行し、結果をsnowflake.snowpark.DataFrameとして取得します。show
を実行するとクエリが実行されて結果が表示されます。
df_sql = session.sql("SELECT * from IRIS") df_sql.show()
--------------------------------------------------------------------------------- |"SEPAL_LENGTH" |"SEPAL_WIDTH" |"PETAL_LENGTH" |"PETAL_WIDTH" |"CLASS" | --------------------------------------------------------------------------------- |5.1 |3.5 |1.4 |0.2 |Iris-setosa | |4.9 |3.0 |1.4 |0.2 |Iris-setosa | |4.7 |3.2 |1.3 |0.2 |Iris-setosa | |4.6 |3.1 |1.5 |0.2 |Iris-setosa | |5.0 |3.6 |1.4 |0.2 |Iris-setosa | |5.4 |3.9 |1.7 |0.4 |Iris-setosa | |4.6 |3.4 |1.4 |0.3 |Iris-setosa | |5.0 |3.4 |1.5 |0.2 |Iris-setosa | |4.4 |2.9 |1.4 |0.2 |Iris-setosa | |4.9 |3.1 |1.5 |0.1 |Iris-setosa | ---------------------------------------------------------------------------------
count
でデータフレームの行数を確認できます。
df_sql.count()
150
データフレームのschema
プロパティからスキーマが確認できます。
df_sql.schema
StructType([StructField('SEPAL_LENGTH', DoubleType(), nullable=True), StructField('SEPAL_WIDTH', DoubleType(), nullable=True), StructField('PETAL_LENGTH', DoubleType(), nullable=True), StructField('PETAL_WIDTH', DoubleType(), nullable=True), StructField('CLASS', StringType(16777216), nullable=True)])
4. データフレームの列の選択
select
メソッドとColumn
オブジェクトを使い、データフレームを一部のカラムに絞ることができます。
from snowflake.snowpark.functions import col df_select_col = df_sql.select(col("SEPAL_LENGTH"), col("CLASS")) df_select_col.show()
-------------------------------- |"SEPAL_LENGTH" |"CLASS" | -------------------------------- |5.1 |Iris-setosa | |4.9 |Iris-setosa | |4.7 |Iris-setosa | |4.6 |Iris-setosa | |5.0 |Iris-setosa | |5.4 |Iris-setosa | |4.6 |Iris-setosa | |5.0 |Iris-setosa | |4.4 |Iris-setosa | |4.9 |Iris-setosa | --------------------------------
5. データフレームの集約
集約するカラムを指定するgroup_by
とsnowflake.snowpark.functions
の数学的関数などを組み合わせて集約を計算することもできます。
df_median = df_sql.group_by("CLASS").median("SEPAL_LENGTH") df_median.show()
-------------------------------------------- |"CLASS" |"MEDIAN(SEPAL_LENGTH)" | -------------------------------------------- |Iris-virginica |6.5 | |Iris-versicolor |5.9 | |Iris-setosa |5.0 | --------------------------------------------
これはメソッド呼び出しのチェーンの例でもあります。
6. フィルター条件の使用
filter
メソッドとColumn
オブジェクトを使い、データフレームにフィルター条件を指定できます。
df_sql_setosa = df_sql.filter(col("CLASS") == 'Iris-setosa')
行数を確認すると、確かに件数がフィルタできたことが分かります。
df_sql_setosa.count() # 50
show
でデータを確認しても、Iris-setosa
のものだけですね。
df_sql_setosa.show(5)
--------------------------------------------------------------------------------- |"SEPAL_LENGTH" |"SEPAL_WIDTH" |"PETAL_LENGTH" |"PETAL_WIDTH" |"CLASS" | --------------------------------------------------------------------------------- |5.1 |3.5 |1.4 |0.2 |Iris-setosa | |4.9 |3.0 |1.4 |0.2 |Iris-setosa | |4.7 |3.2 |1.3 |0.2 |Iris-setosa | |4.6 |3.1 |1.5 |0.2 |Iris-setosa | |5.0 |3.6 |1.4 |0.2 |Iris-setosa | ---------------------------------------------------------------------------------
7. Pandasデータフレームへの変換
to_pandas
メソッドでSnowparkのデータフレームをPandasのデータフレームに変換することができます。
# 変換 pandas_df = df_sql.to_pandas() # 表示 pandas_df
そのままメモリ上で使うほか、ローカルに保存も可能ですね。
# ローカルへの保存 pandas_df.to_csv("./file_local.csv")
8. テーブルの作成
save_as_table
メソッドでDataFrameのデータをテーブルとしてSnowflakeに永続化することができます。
# テーブルの作成 df_sql.write.mode("overwrite").save_as_table("iris_from_local")
以下のようなテーブルができました。
9. ビューの作成
create_or_replace_view
メソッドでDataFrameに対して行った処理をビューとしてSnowflakeに定義することもできます。
以下のようなスクリプトを実行してみました。
# ビューで実施したい処理 df_sql = session.sql("SELECT * from IRIS") df_sql = df_sql.select(col("SEPAL_LENGTH"), col("SEPAL_WIDTH"), col("CLASS")) df_sql = df_sql.group_by("CLASS").median("SEPAL_LENGTH") # ビューの作成 df_sql.create_or_replace_view(f"ML_DATABASE.PUBLIC.MEDIAN_VIEW")
[Row(status='View MEDIAN_VIEW successfully created.')]
Snowsightからできたビューをみると、SQLの定義が生成されていることが分かります。面白いですね。
10. データフレームの結合
JOINの一例です。結合の種類はhow
で指定できます。
df_join = df_sql.join(df_median, ["CLASS"], how='left') df_join.show()
そのほかのJOINの記述については、DataFrames の結合やAPIリファレンスのsnowflake.snowpark.DataFrame.joinに例が多く記載されているのでご確認頂ければと思います。
最後に
Snowpark PythonのDataFrameの操作を、実際にSnowflakeのデータベースに格納したデータを使って試してみました。
APIはPySparkと類似でとっつきやすいものなので、すぐに使えるようになると思います。
今回のようにローカル環境やSnowflake以外のアプリケーションでSnowparkを使って操作する際に、大きいデータをcollect
したりするとアプリケーションのメモリにデータが乗り切らずアプリケーションが落ちてしまうかもしれないので、Snowflakeで集約をして小さいデータを持ってくるように意識して使うとよいかもしれません。これはSnowparkに限らずPySparkなどの分散処理システムでも同様と思います。遅延評価とこの点は普段の Pythonスクリプトと少し異なる点ですが、大規模なデータに対して処理を実行する際の面白さを感じますね。
参考になりましたら幸いです。