SnowparkでDataFrameの操作を試してみた

2021.06.23

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは!DA(データアナリティクス)事業本部 インテグレーション部の大高です。

SnowflakeのPreview機能として、Snowparkという機能が公開されています。

先日、Visual Studio Codeでこの機能を動かせるようにセットアップを行ったので、今回は実際にDataFrameを操作して挙動と試してみたいと思います。また、今回は以下のドキュメントを参考に色々と触ってみようと思います。

なお、環境のセットアップについては以下のエントリに記載しています。

DataFrameの操作をしてみる

今回は簡単な例として、ある2テーブルをJOINして、その結果を別テーブルに書き戻すようなプログラムで色々試したいと思います。

テーブルの取得とJOIN

まずテーブルをDataFrameとして取得するには、以下のようにします。とても簡単ですね。

val configs = Map (
  "URL" -> "https://<account>.snowflakecomputing.com:443",
  "USER" -> "<user name>",
  "PASSWORD" -> "<password>",
  "ROLE" -> "<role name>",
  "WAREHOUSE" -> "<warehouse name>",
  "DB" -> "OOTAKA_SANDBOX_DB",
  "SCHEMA" -> "PUBLIC"
)
val session = Session.builder.configs(configs).create

// NationテーブルのDataFrame
val dfNation = session.table("snowflake_sample_data.tpch_sf1.nation")

今回はあえてセッション構築した際の「データベース」、「スキーマ」とは別のものを指定してみました。SQLと同様に.で区切って明示的に[データベース名].[スキーマ名].[テーブル名]と指定することで問題なく動くことが分かります。

更に今度は2テーブルを取得してJOINしてみます。

// NationテーブルとRegionテーブルのDataFrame
val dfNation = session.table("snowflake_sample_data.tpch_sf1.nation")
val dfRegion = session.table("snowflake_sample_data.tpch_sf1.region")

// RegionKeyでJoinする
val dfJoined = dfNation.join(dfRegion, dfNation.col("n_regionkey") === dfRegion.col("r_regionkey"))

これも特に違和感のないコードですね。Nationテーブルに対してRegionテーブルをJOINしており、その条件がcolで指定したカラムによるJOIN条件です。

Snowflake上では何が起きているのか?

さて、ここまでの時点でSnowflake上では何が起きているのでしょうか?

一旦Scalaコードを実行して、クエリの履歴を見てみます。

SHOW PARAMETERS LIKE 'snowpark_lazy_analysis'
select current_version();

実行されていたのは、上記の2クエリのみでした。どちらもメタデータ処理のみなので、ウェアハウスも起動していません。ここまではセッションの確立時に必ず実行されるようです。

この段階では、とくに何もする必要がないのでSnowflake上ではクエリは実行されないようですね。

JOINした結果を表示させてみる

では、続いて以下のようにコードを追加して、JOIN結果を表示させてみましょう。表示にはshow()メソッドを利用します。

// NationテーブルとRegionテーブルのDataFrame
val dfNation = session.table("snowflake_sample_data.tpch_sf1.nation")
val dfRegion = session.table("snowflake_sample_data.tpch_sf1.region")

// RegionKeyでJoinする
val dfJoined = dfNation.join(dfRegion, dfNation.col("n_regionkey") === dfRegion.col("r_regionkey"))

// Joinした結果を表示
dfJoined.show()

実行するとデバックコンソールに取得結果が表示されます。また、Snowflake上で履歴を確認すると以下のクエリが実行されたことが分かります。

SELECT  *  FROM ( SELECT  *  FROM (( SELECT "N_NATIONKEY" AS "N_NATIONKEY", "N_NAME" AS "N_NAME", "N_REGIONKEY" AS "N_REGIONKEY", "N_COMMENT" AS "N_COMMENT" FROM ( SELECT  *  FROM (snowflake_sample_data.tpch_sf1.nation))) AS SN_TEMP_OBJECT_1796244952 INNER JOIN ( SELECT "R_REGIONKEY" AS "R_REGIONKEY", "R_NAME" AS "R_NAME", "R_COMMENT" AS "R_COMMENT" FROM ( SELECT  *  FROM (snowflake_sample_data.tpch_sf1.region))) AS SN_TEMP_OBJECT_1888285858 ON ("N_REGIONKEY" = "R_REGIONKEY"))) LIMIT 10

ちょっと長いので整形してみましょう。

SELECT
  *
FROM
(
  SELECT
    *
  FROM
  (
    (
      SELECT
        "N_NATIONKEY" AS "N_NATIONKEY",
        "N_NAME" AS "N_NAME",
        "N_REGIONKEY" AS "N_REGIONKEY",
        "N_COMMENT" AS "N_COMMENT"
      FROM
      (
        SELECT
          *
        FROM
          (snowflake_sample_data.tpch_sf1.nation)
      )
    ) AS SN_TEMP_OBJECT_1796244952
    INNER JOIN
    (
      SELECT
        "R_REGIONKEY" AS "R_REGIONKEY",
        "R_NAME" AS "R_NAME",
        "R_COMMENT" AS "R_COMMENT"
      FROM
      (
        SELECT
          *
        FROM
          (snowflake_sample_data.tpch_sf1.region)
      )
    ) AS SN_TEMP_OBJECT_1888285858
    ON ("N_REGIONKEY" = "R_REGIONKEY")
  )
)
LIMIT 10

指定した2つのテーブルをINNER JOINしてデータを取得しているのが分かります。

また、データ件数を抑制するために自動でLIMIT 10も掛けられていることが分かります。不用意に大きなテーブルデータを取得しないようになっているのは安心ですね。(showの引数に数値を指定することで、件数を変更することもできます

今回はshow()を呼んだので、クエリが実行されウェアハウスも起動しました。

JOINした結果は表示しないで、テーブルに書き込む

最後に、JOINした結果は表示はさせずに、テーブルに書き込むとどうでしょうか?

// NationテーブルとRegionテーブルのDataFrame
val dfNation = session.table("snowflake_sample_data.tpch_sf1.nation")
val dfRegion = session.table("snowflake_sample_data.tpch_sf1.region")

// RegionKeyでJoinする
val dfJoined = dfNation.join(dfRegion, dfNation.col("n_regionkey") === dfRegion.col("r_regionkey"))

// Joinした結果を表示
//dfJoined.show()  // ←コメントアウトします

// Joinした結果をテーブルに保存
dfJoined.write.mode(SaveMode.Overwrite).saveAsTable("joined_table")

Snowflake上の履歴では、以下のクエリが実行されていました。

CREATE  OR  REPLACE  TABLE joined_table AS  SELECT  *  FROM ( SELECT  *  FROM (( SELECT "N_NATIONKEY" AS "N_NATIONKEY", "N_NAME" AS "N_NAME", "N_REGIONKEY" AS "N_REGIONKEY", "N_COMMENT" AS "N_COMMENT" FROM ( SELECT  *  FROM (snowflake_sample_data.tpch_sf1.nation))) AS SN_TEMP_OBJECT_1249459065 INNER JOIN ( SELECT "R_REGIONKEY" AS "R_REGIONKEY", "R_NAME" AS "R_NAME", "R_COMMENT" AS "R_COMMENT" FROM ( SELECT  *  FROM (snowflake_sample_data.tpch_sf1.region))) AS SN_TEMP_OBJECT_267716345 ON ("N_REGIONKEY" = "R_REGIONKEY")))

整形すると、以下のようになります。

CREATE  OR  REPLACE  TABLE joined_table AS
SELECT
  *
FROM
(
  SELECT
    *
  FROM
  (
    (
      SELECT
        "N_NATIONKEY" AS "N_NATIONKEY",
        "N_NAME" AS "N_NAME",
        "N_REGIONKEY" AS "N_REGIONKEY",
        "N_COMMENT" AS "N_COMMENT"
      FROM
      (
        SELECT
          *
        FROM
          (snowflake_sample_data.tpch_sf1.nation)
      )
    ) AS SN_TEMP_OBJECT_1249459065
    INNER JOIN
    (
      SELECT
        "R_REGIONKEY" AS "R_REGIONKEY",
        "R_NAME" AS "R_NAME",
        "R_COMMENT" AS "R_COMMENT"
      FROM
      (
        SELECT
          *
        FROM
          (snowflake_sample_data.tpch_sf1.region)
      )
    ) AS SN_TEMP_OBJECT_267716345
    ON ("N_REGIONKEY" = "R_REGIONKEY")
  )
)

先程試した結果とほぼ同じSELECT結果を用いて、テーブルを作成しているのが分かりますね。(今回はLIMITが外れています)

まとめ

以上、SnowparkでDataFrameの操作を試してみました。

Snowparkの説明を読んだ際に「ワークロードをSnowflake内で直接実行できる」というのがよく理解できていなかったのですが、実際にDataFrameを操作することで感覚的にしっくりきました。

実際にSnowflakeで実行されたクエリと見比べながら試してみると、Scalaで実装したコードによって、自動的にクエリが構築されてSnowflake上で実行されることにより、クライアント側に不要な負荷をかけずにSnowflake側で処理を実行してくれているのが良く分かります。

また、SnowflakeのTask機能を利用してSQLベースで定期的なクエリ実行もできるのですが、Snowparkを利用することによって、別のプログラミング言語で定期的にクエリ実行をさせることもできそうですし、同時にSnowflakeとは別の処理も実行できるのが良さそうです。

たとえば、プログラムコードで他システムから外部ステージに連携元データを配置・加工などをしてから、Snowparkを利用してクエリ実行をさせるなどの処理もできそうですね。

使い方としては色々な方法・組み合わせがありそうですし、とても面白い機能ということが良く分かりました。

どなたかのお役に立てば幸いです。それでは!

おまけ

以下のブログ記事を読みましたが、とても興味深かったです。