Snowpark PythonのDataFramesでテーブルに格納したデータを操作してみた

Snowflakeのテーブルに格納したデータに対して、DataFrameを使っての操作方法を確認しました。
2023.07.31

データアナリティクス事業本部 機械学習チームの鈴木です。

Snowflakeのテーブルに格納したデータの操作を、Snowpark PythonのDataFramesで行う方法を試してみたのでまとめました。

この記事の内容

Snowpark PythonでSnowflakeのデータをクエリして処理する主な方法として、以下のガイドではDataFrameの使用が紹介されています。今回はこの操作の中で、既にテーブルに格納されたデータに対して使いそうなものを試してみます。

APIの使い方はPySparkによく似ており、以下のページでもSparkによるパイプラインのSnowparkへの移行が紹介されています。PySparkに馴染みがある方はAPIリファレンスを見つつすぐに使えると思います。

Snowpark PythonのAPIリファレンスは以下になります。ガイドに載っていなかった一部の例はAPIリファレンスを参考に実装しました。実際にスクリプトを書く中でも、以下のAPIリファレンスを見て進めるとスムーズに実装ができると思います。

準備

データ

データはML_DATABASE.PUBLIC.IRISというテーブルを準備して、UCI Machine Learning RepositoryのIris Data Setを取り込んでおきました。

このデータセットは、下記リンクにて公開されています。

https://archive.ics.uci.edu/ml/datasets/iris

データは以下のブログでSnowflakeに連携していたものになります。記事の内容はこのテーブルを使ったものを記載しますが、操作自体はこのデータに限ったものではないので、ご自身のお手持ちのテーブルに合わせて読み替えて頂ければと思います。

開発環境

今回はアドホックに処理を進めたかったので、以前公開した以下のブログの方法でローカル開発環境を用意しました。

ローカルのJupyter Notebookから、SNOWPARK_ROLEというロールを付与したSNOWPARK_OPERATORユーザーにてSnowflakeの操作を行います。このロールおよびユーザーの作成も、記事内で紹介しました。

この記事はローカルのJupyter Notebookで実行する前提で記載しますが、SnowsightからPythonワークシートで実行しても問題ありません。Pythonワークシートの場合は、ハンドラー関数内で記述することになるので適宜読み替えてください。

ユーザーへの権限付与

SNOWPARK_ROLEに、ML_DATABASEデータベース内で検証に必要な操作ができるように権限を付与しました。

SnowsightでSQLワークシートを開き、以下のSQLを実行しました。

-- SNOWPARK_ROLEロールは既にSNOWPARK_OPERATORユーザーに付与されていることとする。

-- データベースおよびスキーマのUSAGE権の付与
GRANT USAGE ON DATABASE ML_DATABASE TO ROLE SNOWPARK_ROLE;
GRANT USAGE ON SCHEMA ML_DATABASE.PUBLIC TO ROLE SNOWPARK_ROLE;

-- スキーマ内でのテーブルおよびビューの作成権限の付与
GRANT CREATE TABLE ON SCHEMA ML_DATABASE.PUBLIC TO ROLE SNOWPARK_ROLE;
GRANT CREATE VIEW ON SCHEMA ML_DATABASE.PUBLIC TO ROLE SNOWPARK_ROLE;

-- 基本的なSQL操作用の権限の付与
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA PUBLIC TO ROLE SNOWPARK_ROLE;

遅延評価について

ガイドのDataFrame を評価するアクションの実行に記載があるように、DataFrameは遅延評価されます。

PySpark同様、アクション(collectなど)が実行されるまでは処理がデータフレーム側で記録され、アクションを実行時にDataFrameが評価され、対応するSQLステートメントがサーバーに送信されて検索が実行されます。

例えば以下のようなスクリプトを記載して、実行してみました。一部冗長な記載をしてみて、どのようなSQLに変換されるかも併せて確認してみます。

df_sql = session.sql("SELECT * from IRIS")
df_sql = df_sql.filter(col("SEPAL_LENGTH") >= 5.0)
df_sql = df_sql.select(col("SEPAL_LENGTH"), col("PETAL_LENGTH"), col("CLASS"))
df_sql = df_sql.filter(col("SEPAL_LENGTH") <= 6.0)
df_sql = df_sql.select(col("SEPAL_LENGTH"), col("CLASS"))
df_sql = df_sql.group_by("CLASS").median("SEPAL_LENGTH")
df_res = df_sql.filter(col("CLASS") == 'Iris-setosa')
df_res.show()

クエリ履歴から確認したSQLステートメントは以下になり、1つのSQLに変換されたことが分かります。

実行されたSQL

SELECT  *  FROM ( SELECT "CLASS", median("SEPAL_LENGTH") AS "MEDIAN(SEPAL_LENGTH)" FROM ( SELECT "SEPAL_LENGTH", "CLASS" FROM (SELECT * from IRIS) WHERE (("SEPAL_LENGTH" >= '5.0' :: FLOAT) AND ("SEPAL_LENGTH" <= '6.0' :: FLOAT))) GROUP BY "CLASS") WHERE ("CLASS" = 'Iris-setosa') LIMIT 10

やってみる

操作例をみていきましょう。

なお、以下はJupyter Notebookにて実行しました。

1. セッションの作成

SNOWPARK_OPERATORユーザーを使ってセッションを作成します。

from snowflake.snowpark import Session

# セッションの作成
connection_parameters = {
   "account": "アカウント識別子",
   "user": "SNOWPARK_OPERATOR",
   "password": "設定したパスワード",
    "warehouse": "使用するウェアハウス"
}

session = Session.builder.configs(connection_parameters).create()

2. ロール・データベース・スキーマの設定

SQLから実行することもできますが、それ用のAPIもあります。

session.use_role("SNOWPARK_ROLE")
session.use_database("ML_DATABASE")
session.use_schema("PUBLIC")

3. データフレームでのデータ取得

snowflake.snowpark.Session.sqlにてSQLを実行し、結果をsnowflake.snowpark.DataFrameとして取得します。showを実行するとクエリが実行されて結果が表示されます。

df_sql = session.sql("SELECT * from IRIS")

df_sql.show()
---------------------------------------------------------------------------------
|"SEPAL_LENGTH"  |"SEPAL_WIDTH"  |"PETAL_LENGTH"  |"PETAL_WIDTH"  |"CLASS"      |
---------------------------------------------------------------------------------
|5.1             |3.5            |1.4             |0.2            |Iris-setosa  |
|4.9             |3.0            |1.4             |0.2            |Iris-setosa  |
|4.7             |3.2            |1.3             |0.2            |Iris-setosa  |
|4.6             |3.1            |1.5             |0.2            |Iris-setosa  |
|5.0             |3.6            |1.4             |0.2            |Iris-setosa  |
|5.4             |3.9            |1.7             |0.4            |Iris-setosa  |
|4.6             |3.4            |1.4             |0.3            |Iris-setosa  |
|5.0             |3.4            |1.5             |0.2            |Iris-setosa  |
|4.4             |2.9            |1.4             |0.2            |Iris-setosa  |
|4.9             |3.1            |1.5             |0.1            |Iris-setosa  |
---------------------------------------------------------------------------------

countでデータフレームの行数を確認できます。

df_sql.count()
150

データフレームのschemaプロパティからスキーマが確認できます。

df_sql.schema
StructType([StructField('SEPAL_LENGTH', DoubleType(), nullable=True), StructField('SEPAL_WIDTH', DoubleType(), nullable=True), StructField('PETAL_LENGTH', DoubleType(), nullable=True), StructField('PETAL_WIDTH', DoubleType(), nullable=True), StructField('CLASS', StringType(16777216), nullable=True)])

4. データフレームの列の選択

selectメソッドとColumnオブジェクトを使い、データフレームを一部のカラムに絞ることができます。

from snowflake.snowpark.functions import col

df_select_col = df_sql.select(col("SEPAL_LENGTH"), col("CLASS"))
df_select_col.show()
--------------------------------
|"SEPAL_LENGTH"  |"CLASS"      |
--------------------------------
|5.1             |Iris-setosa  |
|4.9             |Iris-setosa  |
|4.7             |Iris-setosa  |
|4.6             |Iris-setosa  |
|5.0             |Iris-setosa  |
|5.4             |Iris-setosa  |
|4.6             |Iris-setosa  |
|5.0             |Iris-setosa  |
|4.4             |Iris-setosa  |
|4.9             |Iris-setosa  |
--------------------------------

5. データフレームの集約

集約するカラムを指定するgroup_bysnowflake.snowpark.functionsの数学的関数などを組み合わせて集約を計算することもできます。

df_median = df_sql.group_by("CLASS").median("SEPAL_LENGTH")
df_median.show()
--------------------------------------------
|"CLASS"          |"MEDIAN(SEPAL_LENGTH)"  |
--------------------------------------------
|Iris-virginica   |6.5                     |
|Iris-versicolor  |5.9                     |
|Iris-setosa      |5.0                     |
--------------------------------------------

これはメソッド呼び出しのチェーンの例でもあります。

6. フィルター条件の使用

filterメソッドとColumnオブジェクトを使い、データフレームにフィルター条件を指定できます。

df_sql_setosa = df_sql.filter(col("CLASS") == 'Iris-setosa')

行数を確認すると、確かに件数がフィルタできたことが分かります。

df_sql_setosa.count()
# 50

showでデータを確認しても、Iris-setosaのものだけですね。

df_sql_setosa.show(5)
---------------------------------------------------------------------------------
|"SEPAL_LENGTH"  |"SEPAL_WIDTH"  |"PETAL_LENGTH"  |"PETAL_WIDTH"  |"CLASS"      |
---------------------------------------------------------------------------------
|5.1             |3.5            |1.4             |0.2            |Iris-setosa  |
|4.9             |3.0            |1.4             |0.2            |Iris-setosa  |
|4.7             |3.2            |1.3             |0.2            |Iris-setosa  |
|4.6             |3.1            |1.5             |0.2            |Iris-setosa  |
|5.0             |3.6            |1.4             |0.2            |Iris-setosa  |
---------------------------------------------------------------------------------

7. Pandasデータフレームへの変換

to_pandasメソッドでSnowparkのデータフレームをPandasのデータフレームに変換することができます。

# 変換
pandas_df = df_sql.to_pandas()

# 表示
pandas_df

Pandasの結果

そのままメモリ上で使うほか、ローカルに保存も可能ですね。

# ローカルへの保存
pandas_df.to_csv("./file_local.csv")

8. テーブルの作成

save_as_tableメソッドでDataFrameのデータをテーブルとしてSnowflakeに永続化することができます。

# テーブルの作成
df_sql.write.mode("overwrite").save_as_table("iris_from_local")

以下のようなテーブルができました。

できたテーブルの確認

9. ビューの作成

create_or_replace_viewメソッドでDataFrameに対して行った処理をビューとしてSnowflakeに定義することもできます。

以下のようなスクリプトを実行してみました。

# ビューで実施したい処理
df_sql = session.sql("SELECT * from IRIS")
df_sql = df_sql.select(col("SEPAL_LENGTH"), col("SEPAL_WIDTH"), col("CLASS"))
df_sql = df_sql.group_by("CLASS").median("SEPAL_LENGTH")

# ビューの作成
df_sql.create_or_replace_view(f"ML_DATABASE.PUBLIC.MEDIAN_VIEW")
[Row(status='View MEDIAN_VIEW successfully created.')]

Snowsightからできたビューをみると、SQLの定義が生成されていることが分かります。面白いですね。

できたビュー

10. データフレームの結合

JOINの一例です。結合の種類はhowで指定できます。

df_join = df_sql.join(df_median, ["CLASS"], how='left')
df_join.show()

そのほかのJOINの記述については、DataFrames の結合やAPIリファレンスのsnowflake.snowpark.DataFrame.joinに例が多く記載されているのでご確認頂ければと思います。

最後に

Snowpark PythonのDataFrameの操作を、実際にSnowflakeのデータベースに格納したデータを使って試してみました。

APIはPySparkと類似でとっつきやすいものなので、すぐに使えるようになると思います。

今回のようにローカル環境やSnowflake以外のアプリケーションでSnowparkを使って操作する際に、大きいデータをcollectしたりするとアプリケーションのメモリにデータが乗り切らずアプリケーションが落ちてしまうかもしれないので、Snowflakeで集約をして小さいデータを持ってくるように意識して使うとよいかもしれません。これはSnowparkに限らずPySparkなどの分散処理システムでも同様と思います。遅延評価とこの点は普段の Pythonスクリプトと少し異なる点ですが、大規模なデータに対して処理を実行する際の面白さを感じますね。

参考になりましたら幸いです。

そのた参考にしたもの