Snowflakeで定期的な差分更新が簡単に実現できる「Dynamic Table」をdbt経由で定義してみた
さがらです。
Snowflakeで定期的な差分更新が簡単に実現できる「Dynamic Table」をdbt経由で定義してみたので、その内容を本記事でまとめてみます。
Dynamic Tableとは
Dynamic Tableとは、端的に説明するとCREATE DYNAMIC TABLE AS SELECT ...
の構文で定義を行うと、指定した時間間隔(lag
)で自動で前回更新時からの差分のみを更新してくれるSnowflake特有のテーブルです。※2023/11/13時点ではパブリックプレビューの機能です。
これまでのSnowflakeでこのような増分更新のパイプラインを実現しようとすると、「STREAMで対象のテーブルの差分を検知し、そのSTREAMを使って指定したテーブルにINSERT・UPDATE・DELETEを行うTaskを組む」ということをしないといけませんでした。正直ちょっと大変でした。(参考ブログ)
この増分更新のパイプラインが、CREATE DYNAMIC TABLE AS SELECT ...
の定義だけで簡単にできるようになったのが、Dynamic Tableの素晴らしい点です!更には、複数テーブルをJOINするパイプラインにも対応しております。
Dynamic Tableについてより詳しくは、下記の公式Docをご覧ください。
また、Dynamic Tableに関しては日本語でも記事が出ておりますのでこちらも参考になるはずです。
dbt経由で定義するDynamic Tableとは
この上で、今回試すのは「dbt経由で定義するDynamic Table」となります。
実はDynamic Tableは、リリース当初からdbt経由で定義できるようになっております!
dbtのドキュメントとしては、下記のドキュメントの「Dynamic tables」の章が該当します。各Modelごとに定義するのはもちろん、dbt_project.yml
経由でフォルダレベルでまとめてDynamic Tableとして定義することも可能です。
dbtの公式ブログでも本機能について言及されたブログが出ております。特にtarget_lag_environment()
というマクロを定義することにより、使用するtarget
でlag
の値を動的に切りかえるテクニックなどが参考になります。
ちなみに、dbt経由でのDynamic Tableの定義についても、実はすでに日本語で記事もあります。この記事ではLambda Viewと呼ばれる差分更新を行うロジックを実装せずとも、Dynamic Tableならば1つModelとConfigを定義するだけで簡単に差分更新の処理が実装できることをわかりやすく書いております。
このようにdbt経由でDynamic Tableを定義することに関して情報はすでにいくつかあるのですが、改めて私も挙動を確かめてみたかったため、本記事で試して内容をまとめてみたいと思います。
やること
Snowflake上に、dbtのチュートリアルでよく使われるJaffle Shopのデータをロードしておきます。
顧客情報を持つcustomers
テーブルと注文情報を持つorders
テーブルをJOINするテーブルを、dbtでDynamic Tableとして定義してみて、レコードを追加したときの挙動を確かめてみようと思います。
検証環境
Snowflake : Enterpriseエディション dbt : dbt Cloud Enterpriseエディション dbtのバージョン:1.6 ※dbt CloudのEnvironmentで指定
Dynamic Tableを定義してみた
前提
まず、下図のようにcustomers.sql
を定義します。JOIN元となっているstg_customers
はcustomers
のステージングモデル、stg_orders
はorders
のステージングモデルです。
customers.sql
に対しては、下図のようにcustomer_id
カラムに対してテストの記述もあります。
Dynamic Tableの参照先となるテーブルでCHANGE_TRACKINGを有効化
Dynamic Tableは、参照先のテーブルでCHANGE_TRACKING = TRUE
となっていないといけません。デフォルトではFALSE
となっています。
そのため、今回使用するテーブルであるcustomers
とorders
で、以下のクエリを実行してCHANGE_TRACKING = TRUE
としておきます。生データに該当するテーブルの変更となるため、使用するロールに注意しましょう。
alter table customers set change_tracking = true; alter table orders set change_tracking = true;
この上でshow tables
などのコマンドを実行し、change_tracking
がON
となっていればOKです。
Dynamic Tableの定義&初回構築
今回はdbt_project.yml
上で特定のフォルダ化のModelに対してまとめてDynamic Tableの定義をしてみます。
dbt_project.yml
の該当箇所のみ抽出すると、下記のようになります。
models: jaffle_shop: staging: materialized: view marts: materialized: dynamic_table snowflake_warehouse: sagara_dbt_wh target_lag: '1 minutes'
各オプションについては、下記のように入力すればOKです。
materialized
:dynamic_table
と記入snowflake_warehouse
: 使用したいウェアハウス名を記入target_lag
:num { seconds | minutes | hours | days }
の書式で、どの時間間隔で参照先のデータに差分があるかチェックするか記入。DOWNSTREAM
と記入すると、参照先のテーブルがDynamic Tableであった場合に、その参照先のDynamic Tableが更新されたタイミングで、更新されるようになります。
この上で、一度dbt build
コマンドを実行すれば、Dynamic Tableが作られます。
dbt build
として実行したので、定義したテストも実行されています。
Snowsight上での確認
Snowsight上で確認するとDynamic Tableの分類でcustomers
テーブルができていることがわかります。
Graph
タブでは、customers
テーブルがどのテーブル・ビューからできているのかリネージを見ることができます。
Refresh History
タブでは、cutomers
テーブルの差分更新をいつ行ったのかを履歴として確認可能です。(target_lag: '1 minutes'
としたので1分おきに実行されているように見えますが、ウェアハウスが動くのは差分を検知した時のみです。)
レコードを1行追加してみる
ここで、元となっているorders
テーブルに対して1レコード追加したときにどのような挙動をするか、確認をしてみます。新しく追加するレコードの内容としては、customer_id
が20
の顧客の新しい注文データがINSERTされた、というものになります。
insert into orders values (100,20,'2018-04-10','placed');
すると、更新を検知したために対象のDynamic TableでRefreshが行われました。
実行されたクエリプロファイルを見ると、すごいNodesで構成されたプロファイルが出てきました…(クエリ自体は合計2.2秒で終わっているんですけども)
実際に更新後のDynamic Tableを見ると、問題なく対象のcustomer_id
が20
の顧客のみ、MOST_RECENT_ORDER_DATE
が更新されていますね。customers
のdbt上の定義ではJOINもGROUP BYを用いた集計も行っていたのですが、これらの処理があっても問題なく更新されていることがわかりました!
- Before ※スクショ撮り忘れのためタイムトラベルしたクエリで失礼します…
- After
しかし唯一ネックな点としては、Dynamic Tableの更新時にdbtのテストのクエリは実行されませんでした。
一度Dynamic Tableを構築済みの状態で、target_lagを書き換えて、dbt buildを行うとどうなるか
ここで、dbt_project.yml
を下記のようにtarget_lag: '1 minutes'
からtarget_lag: '24 hours'
に書き換えたときに、dbt build
を行うとどうなるかを試してみます。
models: jaffle_shop: staging: materialized: view marts: materialized: dynamic_table snowflake_warehouse: sagara_dbt_dev_wh target_lag: '24 hours'
すると、Dynamic Tableのデータの更新は行われずに、Dynamic Table definition was altered. Lag metrics have been reset.
と表示され、lag
が24時間に切り替わったことだけがわかります。
一度Dynamic Tableを構築済みの状態で、レコード追加後にdbt buildをしてみる
最後に、一度Dynamic Tableを構築済みの状態で、レコード追加後にdbt buildをしてみます。上述のtarget_lag: '24 hours'
の続きとなるため、検証途中に自動でデータが更新されることはありません。
下記のクエリを実行して、新しくレコードを追加します。
insert into orders values (101,30,'2018-04-11','placed');
この上で、dbt build
コマンドを実行してみます。
しかし、このdbt build
の実行時にはDynamic Tableのデータは更新されませんでした。dbt build
時にはtarget_lag
などの設定値のみがALTER
コマンドで見直されるようです。
検証後に感じた注意事項
dbt経由でDynamic Tableを定義・構築してみましたが、いくつか注意すべき点があると感じたのでまとめておきます。
開発環境でも問答無用に参照先のテーブルが更新されたら、開発環境のDynamic Tableがウェアハウスを用いて更新されてしまう
今回は基本的にdbt CloudのIDEからdbt build
などのコマンドを実行していたのですが、開発環境などは関係なく、指定したtarget_lag
の値に応じて更新処理が走ってしまいました。
具体的な対策の一つとしては、このdbtのブログからの引用ですが、target
によってtarget_lag
の値を動的に変更するマクロの定義、などを行う必要があります。
- マクロの定義
{% macro target_lag_environment() %} {% set lag = '1 minute' if target.name == "prod" else '35 days' %} {{ return(lag) }} {% endmacro %}
- マクロの適用:各ModelでのConfigの設定例
{{ config( materialized = 'dynamic_table', snowflake_warehouse = 'transforming', target_lag = target_lag_environment(), on_configuration_change = 'apply', ) }}
dbtでDynamic Tableのテーブルにtestを定義していても、Dynamic Tableの更新時にtestが実行されない
これは当たり前といえばそうかもしれませんが、Dynamic Tableを一度定義するとSnowflake内で完結する処理になるため、対象のテーブルに対してdbtのtestを定義していても、Dynamic Tableの更新時にtestは実行されません。
Dynamic Tableに対してdbtで定義したtestを実行する場合には、定期的にDynamic Table向けのtestだけを回すジョブを定義するなど、何かしらの対策が必要となります。
最後に
Snowflakeで定期的な差分更新が簡単に実現できる「Dynamic Table」をdbt経由で定義し、色々な動作を試してみました。
dbtで定義することで、Dynamic Tableの定義元となるSQLをGit管理できますし、開発・本番と環境を分離して管理することも容易になります。
設定も難しくないため、dbt×Snowflakeをお使いの方にはぜひ試してほしいです!