Snowflake で処理を実現する方法とサンプル(2024-02版)
とーかみです。
Snowflake ではデータを SQL で参照するだけでなく、前処理や加工を行うこともできます。
このような処理を実装するにあたって Snowflake のどんな機能が使えるかをまとめました。
説明にあたってのサンプルテーブル
以降のサンプルクエリで使用するテーブルは以下のものを使用します。
- データベース :
sample_db
- スキーマ :
sample_db.sample_schema
- サンプルデータ :
snowflake_sample_data
内のnation
とregion
- 仮想ウェアハウス :
compute_wh
CREATE DATABASE sample_db;
CREATE SCHEMA sample_db.sample_schema;
CREATE TABLE sample_db.sample_schema.nation AS
SELECT * FROM snowflake_sample_data.tpch_sf1.nation;
CREATE TABLE sample_db.sample_schema.region AS
SELECT * FROM snowflake_sample_data.tpch_sf1.region;
View / Materialized View / Secure View
まずは View です。データ量や加工する量(複雑さ)が少ない場合は View で十分であることも多いと思います。
また、加工内容の検証のために暫定的に View で実現する場合もあります。
Snowflake の View には、標準の View (ビュー)の他に Materialized View (マテリアライズドビュー)と Secure View (セキュアビュー)があります。
View
View はクエリ内容をテーブルのように見せるものです。(他の多くのデータベースエンジンで利用できるものと同じです)
毎回同じようなクエリでアクセスするようなテーブルに対しては View を作成しておくとアクセスがシンプルにできます。
例えば、列や行を絞ったり、コード値とマスタテーブルを結合して名称を解決するような場合です。
作成と参照のサンプルクエリは以下のようになります。
-- 作成
CREATE VIEW sample_db.sample_schema.my_view AS
SELECT
n.n_nationkey,
n.n_name,
n.n_regionkey,
r.r_name,
n.n_comment
FROM sample_db.sample_schema.nation n
LEFT JOIN sample_db.sample_schema.region r
ON n.n_regionkey = r.r_regionkey;
-- 参照
SELECT * FROM sample_db.sample_schema.my_view;
Materialized View
View がアクセスするときに元のテーブルにデータを参照しに行くのに対し、 Materialized View は事前に作っておいた計算済みのレコード(データセット)に対してアクセスするものです。(他の多くのデータベースエンジンで利用できるものと同じです)
事前に計算したレコードにアクセスするためパフォーマンスが向上できるメリットがあり、事前計算した結果を保存しておくためのストレージコストや、事前計算された状態を維持するための再計算の処理コストがかかるデメリットがあります。
アクセスできるデータサイズを減らしたり、データ更新頻度が低くアクセス頻度が高い場合に高い効果を発揮します。
Snowflake の Materialized View の特徴はデータの再計算(更新)が自動で行われる点です。
(データベースエンジンごとで自動/スケジュール設定など特徴があります)
注意点として、Materialized view では JOIN のような 複数テーブルを参照するものは作れません。
作成と参照のサンプルクエリは以下のようになります。
-- 作成
CREATE MATERIALIZED VIEW sample_db.sample_schema.my_materialized_view AS
SELECT
n.n_nationkey,
n.n_name,
n.n_regionkey,
-- r.r_name,
n.n_comment
FROM sample_db.sample_schema.nation n
-- LEFT JOIN sample_db.sample_schema.region r
-- ON n.n_regionkey = r.r_regionkey
;
-- 参照
SELECT * FROM sample_db.sample_schema.my_materialized_view;
また、 Snowflake ではクエリ結果のキャッシュによりパフォーマンスを向上しています。
通常の View 、キャッシュされたクエリ結果、 Materialized View との比較についても公式のユーザーガイドの以下のページに記載されています。
Secure View
Secure View は、 View の定義や中間データをユーザーに公開したくない場合に SECURE
オプションを付与して作成する View です。
単純な View にも Materialized view にも適用できます。
(Materialized view の場合は CREATE SECURE MATERIALIZED VIEW
のように指定します)
利用イメージは以下の記事をご参照ください。
注意点は上記の記事にもありますが、 Snowflake のオプティマイザが適用されずパフォーマンス低下やコスト増加につながるデメリットがあります。
作成と参照のサンプルクエリは以下のようになります。
-- 作成
CREATE SECURE VIEW sample_db.sample_schema.my_secure_view AS
SELECT
n.n_nationkey,
n.n_name,
n.n_regionkey,
r.r_name,
n.n_comment
FROM sample_db.sample_schema.nation n
LEFT JOIN sample_db.sample_schema.region r
ON n.n_regionkey = r.r_regionkey;
-- 参照
SELECT * FROM sample_db.sample_schema.my_secure_view;
View に関するユーザーガイド
Dynamic Table (動的テーブル)
Dynamic Table は、View のように加工後のデータの形式を宣言的に記述しながらデータ加工を実現する機能です。
ベースとなるテーブル(複数も可)の更新を定期的に宛先のテーブルに反映します。
ストリームデータのように細かい更新が積み重なるテーブルの反映では特に効果を発揮します。
この記事の執筆時点では日本語ページではプレビュー扱いで記載されていますが以下のリリースノートに記載の通り 2024/4/29 に一般公開(GA)されています。
ユーザーガイドによると Dynamic Table を利用するのに適しているのは以下のような場合です。
- データの依存関係を追跡してデータのリフレッシュを管理するコードの記述を望まない。
- ストリームやタスクの複雑さは必要ない、または避けたい。
- 複数のベーステーブルのクエリ結果を具体化する必要があります。
- ETL パイプラインを介してデータを変換するために、複数のテーブルを構築する必要がある。
- 粒度の細かいリフレッシュスケジュールのコントロールは必要なく、パイプラインのターゲットデータの鮮度を指定することを希望する。
- ストアドプロシージャ、 フルリフレッシュでサポートされる非決定性関数 にリスト されていない 非決定性関数、または 外部関数 など、サポートされていない動的クエリ構造を使用する必要がない。または、外部テーブルである動的テーブル、ストリームまたはマテリアライズドビューのソースを使用する必要がある。
作成と参照のサンプルクエリは以下のようになります。
-- 作成
CREATE OR REPLACE DYNAMIC TABLE sample_db.sample_schema.my_dynamic_table
TARGET_LAG = '1 minute'
WAREHOUSE = compute_wh
AS
SELECT
n.n_nationkey,
n.n_name,
n.n_regionkey,
r.r_name,
n.n_comment
FROM sample_db.sample_schema.nation n
LEFT JOIN sample_db.sample_schema.region r
ON n.n_regionkey = r.r_regionkey;
-- 参照
SELECT * FROM sample_db.sample_schema.my_dynamic_table;
また、 Dynamic Table と Materialized View の比較についても以下に記載があります。
ストリームデータのように細かく更新が行われる場合、 Materialized View での自動更新は余分な負荷がかかってしまうため、 Dynamic Table のような一定の時間間隔で更新していくような方式が最適になるという理解をしています。
逆に言えば日次で更新されるようなデータであれば Dynamic Table ではなく Materialized View の方が適しているのだと思います。
タスク / ストアドプロシージャ / Snowflake スクリプト
処理のスケジューリングや処理ロジックを定義する方法としてタスク、ストアドプロシージャ、Snowflake スクリプトがあります。
まずはこれらの関係性を整理します。
- タスク
- スケジューリングを設定できます
- 時間起動、イベント起動(他のタスクとの前後関係)
- 次の方法で処理内容を記述できます
- 単一の SQL
- ストアドプロシージャ
- Snowflake スクリプト
- 単一の SQL で記述できるものは直接インラインで定義します
- 複雑な処理の記述が必要な場合は、ストアドプロシージャや Snowflake スクリプトで定義しておき、それをタスクから呼び出します
- スケジューリングを設定できます
- ストアドプロシージャ
- 一連の処理ロジックを手続き型の形で定義するためのものです
- タスクからの呼び出しまたは手動実行ができます
- 次の方法で処理内容を記述できます
- Java
- JavaScript
- Python
- Scala
- Snowflake スクリプト(SQL)
- Snowflake スクリプト
- SQL の形式で処理内容を記述できます
BEGIN
~END
のような形式で一連の SQL を実行できます
- SQL の形式で処理内容を記述できます
タスク(Task)
タスクは以下の方法で実行するように設定できます。
- 手動
- スケジュール
- トリガー
- トリガータスクと呼ばれ、ストリームに入ったデータ変更イベントをもとに実行するように設定するものです
タスクの設定変更や状態変更は Snowsight (Web コンソール)から行うこともできます。
ストリームとは
ストリームオブジェクトは、挿入、更新、削除などのテーブルに加えられたデータ操作言語(DML)の変更、および各変更に関するメタデータを記録し、変更されたデータを使用してアクションを実行できるようにします。このプロセスは、変更データキャプチャ(CDC)と呼ばれます。このトピックでは、ストリームを使用した変更データキャプチャの重要な概念を紹介します。
ストリームは、レコードの変化を記録する変更データキャプチャ(CDC ; Change Data Capture)を実現する機能です。
具体的な動作イメージは以下のブログに記載されているので参考にしていただければと思います。
手動実行するタスク
動作確認用に ID 、タイムスタンプ、カウンターを持つテーブルを作成します。
CREATE TABLE sample_db.sample_schema.sample_counter (
id STRING, -- ID列
timestamp TIMESTAMP_NTZ, -- タイムスタンプ
counter INTEGER NOT NULL -- カウンター
);
このテーブルに対し、レコードを追加するタスクを作成します。
CREATE TASK sample_db.sample_schema.task_01
WAREHOUSE = 'COMPUTE_WH'
AS
/* タスクで実行する処理↓ */
INSERT INTO sample_db.sample_schema.sample_counter (id, timestamp, counter)
VALUES ('A', CURRENT_TIMESTAMP, 1)
/* タスクで実行する処理↑ */
;
タスクを手動実行するには以下のようなクエリを実行します。
EXECUTE TASK sample_db.sample_schema.task_01;
テーブルのレコードを確認すると、タスクにより追加されたレコードが確認できました。
SELECT * FROM sample_db.sample_schema.sample_counter;
追加されるレコードイメージ
ID | TIMESTAMP | COUNTER |
---|---|---|
A | 2025-01-23 04:05:06.789 | 1 |
タスクのスケジュール
タスクのスケジュールは分単位の時間間隔または cron 式で設定します。
時間間隔の設定のb亜愛、サポートされる最大値は 11520 (8日間)です。
CREATE TASK sample_db.sample_schema.task_02
WAREHOUSE = 'COMPUTE_WH'
SCHEDULE = '5 MINUTE' -- スケジュールを指定(時間間隔指定)
-- SCHEDULE = 'USING CRON 0 * * * * Asia/Tokyo' -- スケジュールを指定(cron 式での指定も可能)
AS
INSERT INTO sample_db.sample_schema.sample_counter (id, timestamp, counter)
VALUES ('B', CURRENT_TIMESTAMP, 1)
;
作成時は一時停止状態のため、再開する必要があります。
ALTER TASK sample_db.sample_schema.task_02 RESUME; -- 再開
-- ALTER TASK sample_db.sample_schema.task_02 SUSPEND; -- 一時停止
再開し、設定したスケジュールによってレコードが追加されていることが確認できます。
別のタスクの後に起動するタスク
タスク間の依存関係を定義することで、タスクグラフを構成することができます。
AFTER
ステートメントを用いて設定します。
AFTER
ステートメントには複数のタスクを指定できるので、待ち合わせのようなタスクを設定することもできます。
注意点として、最初に起動されるタスクのスケジュールは一時停止状態である必要があります。
task_01
のタスクの後に動くタスク task_01_after
を設定する場合は以下のようなクエリになります。
CREATE TASK sample_db.sample_schema.task_01_after
WAREHOUSE = 'COMPUTE_WH'
AFTER sample_db.sample_schema.task_01 -- task_01 の後に起動
AS
UPDATE sample_db.sample_schema.sample_counter
SET counter = counter + 1
WHERE id = 'A';
;
AFTER
ステートメントは、起動トリガーとしての有効/無効の状態を持つため、作成後は再開する必要があります。
ALTER TASK sample_db.sample_schema.task_01_after RESUME; -- 再開
-- ALTER TASK sample_db.sample_schema.task_01_after SUSPEND; -- 一時停止
以下のクエリで task_01
のタスクを手動起動すると、後続の task_01_after
も動作してレコードが更新されていることが確認できます。
EXECUTE TASK sample_db.sample_schema.task_01;
タスクの依存関係は Snowsight 上でも以下のようにグラフを確認することができます。
ストリームからトリガーするタスク
ストリームの動作確認
テーブルに対応するストリームを作成します。
CREATE STREAM sample_db.sample_schema.counter_table_changes
ON TABLE sample_db.sample_schema.sample_counter;
手動実行用のタスクを実行してレコードを更新します。
EXECUTE TASK sample_db.sample_schema.task_01;
ストリームの中身を確認すると、レコードの変更が記録されていることが確認できました。
task_01_after
の処理も含めて COUNTER
が 2
のレコードが INSERT
として 1 行だけ記録されていたため、厳密なレコード操作を記録するのではなく、レコードの変化を記録しているようです。
SELECT * FROM sample_db.sample_schema.counter_table_changes;
ストリームの中身を参照しただけでは、ストリームに記録されたレコードは消費されません。
次に、レコードを消費してみます。
消費するために反映先のテーブルを作成します。
CREATE TABLE sample_db.sample_schema.sample_counter_target (
id STRING,
timestamp TIMESTAMP_NTZ,
counter INTEGER NOT NULL,
metadata_action VARCHAR, -- INSERT/DELETE/UPDATE などのアクション種別
metadata_is_update BOOLEAN, -- UPDATEの場合か否か
metadata_row_id VARCHAR -- Snowflakeが付与する行ID
);
ストリームをもとに反映先のテーブルにレコードを更新します。
INSERT INTO sample_db.sample_schema.sample_counter_target
SELECT * FROM sample_db.sample_schema.counter_table_changes;
再度ストリームの中身を確認すると、消費されレコードが空になっていることが確認できます。
SELECT * FROM sample_db.sample_schema.counter_table_changes; -- 再度ストリームの中身を確認
SELECT * FROM sample_db.sample_schema.sample_counter_target; -- 反映先テーブルを確認
タスクの作成
ストリームの動作を確認した際の SQL を用いて、ストリームからトリガーするタスクを作成します。
CREATE TASK sample_db.sample_schema.task_03
WAREHOUSE = 'COMPUTE_WH'
WHEN system$stream_has_data('sample_db.sample_schema.counter_table_changes')
AS
INSERT INTO sample_db.sample_schema.sample_counter_target
SELECT * FROM sample_db.sample_schema.counter_table_changes
;
同様に起動トリガーとしての有効/無効の状態を持つため、作成後は再開する必要があります。
ALTER TASK sample_db.sample_schema.task_03 RESUME; -- 再開
-- ALTER TASK sample_db.sample_schema.task_03 SUSPEND; -- 一時停止
検証時と同様に手動実行用のタスクを実行してレコードを更新します。
EXECUTE TASK sample_db.sample_schema.task_01;
WHEN
ステートメントで指定した内容は、デフォルトでは 30 秒間隔で確認されるため、その時間が経過後に確認するとストリームが空になっており、反映先テーブルにレコードが反映されていることが確認できます。
SELECT * FROM sample_db.sample_schema.counter_table_changes; -- 再度ストリームの中身を確認
SELECT * FROM sample_db.sample_schema.sample_counter_target; -- 反映先テーブルを確認
また、 WHEN
ステートメントは AND
や OR
を用いて複数の条件を組み合わせて評価することができるます。
ストアドプロシージャ
一連の処理ロジックを手続き型の形で定義するためのものです。
今回は Python でロジックを記述しました。
テーブル名を引数として受け取り、レコード件数を返すサンプルを作成します。
CREATE PROCEDURE sample_db.sample_schema.procedure_01_count_record(tableName VARCHAR)
RETURNS NUMBER
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'count_record'
AS
$$
import logging
logger = logging.getLogger("python_logger")
from snowflake.snowpark.functions import col
def count_record(session, table_name):
try:
logger.info(f"target table : {table_name}")
df = session.table(table_name) # レコード件数を取得
record_count = df.count()
return record_count
except Exception as e:
logger.error("Error counting records for table '{}': {}".format(table_name, str(e)))
return -1
$$
;
以下の用に引数としてテーブル名を渡して実行すると、テーブルのレコード件数が取得できます。
CALL sample_db.sample_schema.procedure_01_count_record('sample_db.sample_schema.nation');
ログはイベントテーブルから確認ができます。イベントテーブルの事前設定とログレベルの設定が必要です。
また、実行後にログがイベントテーブルに記録されるまでに数分ラグがあるため注意が必要です。
どのような処理ができるか
今回のサンプルではクエリ結果を値として返していますが、テーブル形式のデータを返したりすることもできます。
公式ドキュメントにいくつかサンプルが載っているので参考にしてください。
Python 以外のプログラム言語についても同様に記載があります。
Snowflake スクリプト
Snowflakeスクリプト(SQL)でストアドプロシージャを定義する場合は次のように記述します。
プロシージャに渡す引数を使う部分は :tableName
のように :
を頭につけます。
CREATE OR REPLACE PROCEDURE sample_db.sample_schema.procedure_02_count_record(tableName VARCHAR)
RETURNS TABLE (row_count NUMBER)
LANGUAGE SQL
AS
DECLARE
res RESULTSET DEFAULT (SELECT count(*) as row_count FROM TABLE(:tableName));
BEGIN
RETURN TABLE(res);
END;
Snowflake スクリプトも条件分岐やループなどを組み合わせながらロジックを組むことができます。
詳細は以下のページを参照してください。
その他
ユーザー定義関数(UDF)
ユーザー定義関数を作成して独自の処理を関数として定義しておくと、複数の箇所で使用するような処理を共通化することができます。
UDF は単一の値を返す「スカラー関数(UDF)」とテーブル形式の値を返す「表関数(UDTF)」があります。
UDF では SQL 以外に Java 、 JavaScript 、 Python 、 Scala で記述することができます。
スカラー関数
スカラー関数を SQL で定義する場合、以下のように RETURNS
として計算した単一の値を返す形で記述します。
CREATE FUNCTION area_of_circle(radius FLOAT)
RETURNS FLOAT
AS
$$
pi() * radius * radius
$$
;
スカラー関数は SELECT 句などカラムや値を指定する部分に使用することができます。
SELECT area_of_circle(10);
表関数
表関数を SQL で定義する場合、以下のように RETURNS
としてカラムとデータ型、そしてデータを作成するクエリを記述することでテーブル形式で返すことができます。
CREATE FUNCTION t()
RETURNS TABLE(msg VARCHAR)
AS
$$
SELECT 'Hello'
UNION
SELECT 'World'
$$;
表関数は FROM 句などテーブルとして参照する部分に使用することができます。
SELECT msg
FROM TABLE(t())
ORDER BY msg;
ストアドプロシージャとユーザー定義関数のどちらを選択するか
ストアドプロシージャとユーザー定義関数のどちらを選択するかについて、以下のページに解説があります。
ストアドプロシージャの目的 | ユーザー定義関数の目的 |
---|---|
通常、 SQL ステートメントを実行して管理操作を実行します。ストアドプロシージャの本文では、値(エラーインジケーターなど)を明示的に返すことが許可されていますが、必須ではありません。 | 値を計算して返します。関数は、式を指定することによって常に明示的に値を返します。たとえば、 JavaScript UDF の本文には、値を返す return ステートメントが必要です。 |
外部関数
Snowflake 外の API を呼び出すような処理も外部関数として作成することができます。
UDF やストアドプロシージャでは外部ライブラリを利用したい場合に限界があったり、外部サービスと連携する処理が難しい場合があるため、外部関数を利用します。
外部で動作する機械学習モデルを利用するような使い方もできます。
具体的な実装イメージは以下の記事を参考にしてください。
Snowpipe
Snowpipe は断続的にデータをロードするための方法で、ストレージ統合と組み合わせることで例えば Amazon S3 の特定のパスに置かれたファイルをテーブルに取り込み反映するような処理が実現できます。
ストリームデータを断続的に Snowflake に取り込みたい場合や、 Snowflake ストリームや動的テーブルと組み合わせてイベント駆動の非同期実行パイプラインを作成する場合に活躍します。
設定イメージは以下の記事を参考にしてください。
ワークシート
ワークシートは、 Snowsight 上で SQL クエリや Python コードを実行できる機能です。
一時的な作業として利用する機会が多いと思いますが、ワークシートをストアドプロシージャにすることもできます。
ストアドプロシージャの作成クエリ内に処理を直接記述する場合、シンタックスハイライトが効かないため、ワークシートで開発する方が見やすくなるメリットもあります。
以下の記事に手順が紹介されていますので、参考にしてください。
データ共有
異なる領域へのデータ転送のみであればデータ共有が利用できます。
詳細は以下のページを参照してください。
API
Snowflake の API を使用して外部から SQL を実行することもできます。
API には SQL REST API や Python API などいくつか種類があります。
SQL API
Python API
サードパーティツール
dbt などのサードパーティツールで処理・変換内容を定義・管理することもできます。
サポートしているパートナーは以下のページに記載しています。
また、この記事でも紹介した Dynamic Table は dbt 経由で定義することもできます。
まとめ
方法をまとめました。
さまざまな選択肢があり組み合わせることもあると思いますが、プログラムと同様にデータパイプラインが複雑になるほど管理やトレースが大変になります。
Snowflake 内のリソースを Terraform で IaC として管理するパターンもあると思いますので、リソースの管理方法は全体で考える必要があります。
メダリオンアーキテクチャのようにデータの処理段階を定義して複雑になりすぎない形でデータパイプライン環境を目指していきましょう。