SnowparkでDataFrameの操作を試してみた
こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。
SnowflakeのPreview機能として、Snowparkという機能が公開されています。
先日、Visual Studio Codeでこの機能を動かせるようにセットアップを行ったので、今回は実際にDataFrameを操作して挙動と試してみたいと思います。また、今回は以下のドキュメントを参考に色々と触ってみようと思います。
なお、環境のセットアップについては以下のエントリに記載しています。
DataFrameの操作をしてみる
今回は簡単な例として、ある2テーブルをJOINして、その結果を別テーブルに書き戻すようなプログラムで色々試したいと思います。
テーブルの取得とJOIN
まずテーブルをDataFrameとして取得するには、以下のようにします。とても簡単ですね。
val configs = Map ( "URL" -> "https://<account>.snowflakecomputing.com:443", "USER" -> "<user name>", "PASSWORD" -> "<password>", "ROLE" -> "<role name>", "WAREHOUSE" -> "<warehouse name>", "DB" -> "OOTAKA_SANDBOX_DB", "SCHEMA" -> "PUBLIC" ) val session = Session.builder.configs(configs).create // NationテーブルのDataFrame val dfNation = session.table("snowflake_sample_data.tpch_sf1.nation")
今回はあえてセッション構築した際の「データベース」、「スキーマ」とは別のものを指定してみました。SQLと同様に.
で区切って明示的に[データベース名].[スキーマ名].[テーブル名]
と指定することで問題なく動くことが分かります。
更に今度は2テーブルを取得してJOINしてみます。
// NationテーブルとRegionテーブルのDataFrame val dfNation = session.table("snowflake_sample_data.tpch_sf1.nation") val dfRegion = session.table("snowflake_sample_data.tpch_sf1.region") // RegionKeyでJoinする val dfJoined = dfNation.join(dfRegion, dfNation.col("n_regionkey") === dfRegion.col("r_regionkey"))
これも特に違和感のないコードですね。Nationテーブルに対してRegionテーブルをJOINしており、その条件がcol
で指定したカラムによるJOIN条件です。
Snowflake上では何が起きているのか?
さて、ここまでの時点でSnowflake上では何が起きているのでしょうか?
一旦Scalaコードを実行して、クエリの履歴を見てみます。
SHOW PARAMETERS LIKE 'snowpark_lazy_analysis' select current_version();
実行されていたのは、上記の2クエリのみでした。どちらもメタデータ処理のみなので、ウェアハウスも起動していません。ここまではセッションの確立時に必ず実行されるようです。
この段階では、とくに何もする必要がないのでSnowflake上ではクエリは実行されないようですね。
JOINした結果を表示させてみる
では、続いて以下のようにコードを追加して、JOIN結果を表示させてみましょう。表示にはshow()
メソッドを利用します。
// NationテーブルとRegionテーブルのDataFrame val dfNation = session.table("snowflake_sample_data.tpch_sf1.nation") val dfRegion = session.table("snowflake_sample_data.tpch_sf1.region") // RegionKeyでJoinする val dfJoined = dfNation.join(dfRegion, dfNation.col("n_regionkey") === dfRegion.col("r_regionkey")) // Joinした結果を表示 dfJoined.show()
実行するとデバックコンソールに取得結果が表示されます。また、Snowflake上で履歴を確認すると以下のクエリが実行されたことが分かります。
SELECT * FROM ( SELECT * FROM (( SELECT "N_NATIONKEY" AS "N_NATIONKEY", "N_NAME" AS "N_NAME", "N_REGIONKEY" AS "N_REGIONKEY", "N_COMMENT" AS "N_COMMENT" FROM ( SELECT * FROM (snowflake_sample_data.tpch_sf1.nation))) AS SN_TEMP_OBJECT_1796244952 INNER JOIN ( SELECT "R_REGIONKEY" AS "R_REGIONKEY", "R_NAME" AS "R_NAME", "R_COMMENT" AS "R_COMMENT" FROM ( SELECT * FROM (snowflake_sample_data.tpch_sf1.region))) AS SN_TEMP_OBJECT_1888285858 ON ("N_REGIONKEY" = "R_REGIONKEY"))) LIMIT 10
ちょっと長いので整形してみましょう。
SELECT * FROM ( SELECT * FROM ( ( SELECT "N_NATIONKEY" AS "N_NATIONKEY", "N_NAME" AS "N_NAME", "N_REGIONKEY" AS "N_REGIONKEY", "N_COMMENT" AS "N_COMMENT" FROM ( SELECT * FROM (snowflake_sample_data.tpch_sf1.nation) ) ) AS SN_TEMP_OBJECT_1796244952 INNER JOIN ( SELECT "R_REGIONKEY" AS "R_REGIONKEY", "R_NAME" AS "R_NAME", "R_COMMENT" AS "R_COMMENT" FROM ( SELECT * FROM (snowflake_sample_data.tpch_sf1.region) ) ) AS SN_TEMP_OBJECT_1888285858 ON ("N_REGIONKEY" = "R_REGIONKEY") ) ) LIMIT 10
指定した2つのテーブルをINNER JOIN
してデータを取得しているのが分かります。
また、データ件数を抑制するために自動でLIMIT 10
も掛けられていることが分かります。不用意に大きなテーブルデータを取得しないようになっているのは安心ですね。(show
の引数に数値を指定することで、件数を変更することもできます
今回はshow()
を呼んだので、クエリが実行されウェアハウスも起動しました。
JOINした結果は表示しないで、テーブルに書き込む
最後に、JOINした結果は表示はさせずに、テーブルに書き込むとどうでしょうか?
// NationテーブルとRegionテーブルのDataFrame val dfNation = session.table("snowflake_sample_data.tpch_sf1.nation") val dfRegion = session.table("snowflake_sample_data.tpch_sf1.region") // RegionKeyでJoinする val dfJoined = dfNation.join(dfRegion, dfNation.col("n_regionkey") === dfRegion.col("r_regionkey")) // Joinした結果を表示 //dfJoined.show() // ←コメントアウトします // Joinした結果をテーブルに保存 dfJoined.write.mode(SaveMode.Overwrite).saveAsTable("joined_table")
Snowflake上の履歴では、以下のクエリが実行されていました。
CREATE OR REPLACE TABLE joined_table AS SELECT * FROM ( SELECT * FROM (( SELECT "N_NATIONKEY" AS "N_NATIONKEY", "N_NAME" AS "N_NAME", "N_REGIONKEY" AS "N_REGIONKEY", "N_COMMENT" AS "N_COMMENT" FROM ( SELECT * FROM (snowflake_sample_data.tpch_sf1.nation))) AS SN_TEMP_OBJECT_1249459065 INNER JOIN ( SELECT "R_REGIONKEY" AS "R_REGIONKEY", "R_NAME" AS "R_NAME", "R_COMMENT" AS "R_COMMENT" FROM ( SELECT * FROM (snowflake_sample_data.tpch_sf1.region))) AS SN_TEMP_OBJECT_267716345 ON ("N_REGIONKEY" = "R_REGIONKEY")))
整形すると、以下のようになります。
CREATE OR REPLACE TABLE joined_table AS SELECT * FROM ( SELECT * FROM ( ( SELECT "N_NATIONKEY" AS "N_NATIONKEY", "N_NAME" AS "N_NAME", "N_REGIONKEY" AS "N_REGIONKEY", "N_COMMENT" AS "N_COMMENT" FROM ( SELECT * FROM (snowflake_sample_data.tpch_sf1.nation) ) ) AS SN_TEMP_OBJECT_1249459065 INNER JOIN ( SELECT "R_REGIONKEY" AS "R_REGIONKEY", "R_NAME" AS "R_NAME", "R_COMMENT" AS "R_COMMENT" FROM ( SELECT * FROM (snowflake_sample_data.tpch_sf1.region) ) ) AS SN_TEMP_OBJECT_267716345 ON ("N_REGIONKEY" = "R_REGIONKEY") ) )
先程試した結果とほぼ同じSELECT結果を用いて、テーブルを作成しているのが分かりますね。(今回はLIMITが外れています)
まとめ
以上、SnowparkでDataFrameの操作を試してみました。
Snowparkの説明を読んだ際に「ワークロードをSnowflake内で直接実行できる」というのがよく理解できていなかったのですが、実際にDataFrameを操作することで感覚的にしっくりきました。
実際にSnowflakeで実行されたクエリと見比べながら試してみると、Scalaで実装したコードによって、自動的にクエリが構築されてSnowflake上で実行されることにより、クライアント側に不要な負荷をかけずにSnowflake側で処理を実行してくれているのが良く分かります。
また、SnowflakeのTask機能を利用してSQLベースで定期的なクエリ実行もできるのですが、Snowparkを利用することによって、別のプログラミング言語で定期的にクエリ実行をさせることもできそうですし、同時にSnowflakeとは別の処理も実行できるのが良さそうです。
たとえば、プログラムコードで他システムから外部ステージに連携元データを配置・加工などをしてから、Snowparkを利用してクエリ実行をさせるなどの処理もできそうですね。
使い方としては色々な方法・組み合わせがありそうですし、とても面白い機能ということが良く分かりました。
どなたかのお役に立てば幸いです。それでは!
おまけ
以下のブログ記事を読みましたが、とても興味深かったです。