Snowflakeで定期的な差分更新が簡単に実現できる「Dynamic Table」をdbt経由で定義してみた

2023.11.13

さがらです。

Snowflakeで定期的な差分更新が簡単に実現できる「Dynamic Table」をdbt経由で定義してみたので、その内容を本記事でまとめてみます。

Dynamic Tableとは

Dynamic Tableとは、端的に説明するとCREATE DYNAMIC TABLE AS SELECT ...の構文で定義を行うと、指定した時間間隔(lag)で自動で前回更新時からの差分のみを更新してくれるSnowflake特有のテーブルです。※2023/11/13時点ではパブリックプレビューの機能です。

これまでのSnowflakeでこのような増分更新のパイプラインを実現しようとすると、「STREAMで対象のテーブルの差分を検知し、そのSTREAMを使って指定したテーブルにINSERT・UPDATE・DELETEを行うTaskを組む」ということをしないといけませんでした。正直ちょっと大変でした。(参考ブログ

この増分更新のパイプラインが、CREATE DYNAMIC TABLE AS SELECT ...の定義だけで簡単にできるようになったのが、Dynamic Tableの素晴らしい点です!更には、複数テーブルをJOINするパイプラインにも対応しております。

Dynamic Tableについてより詳しくは、下記の公式Docをご覧ください。

また、Dynamic Tableに関しては日本語でも記事が出ておりますのでこちらも参考になるはずです。

dbt経由で定義するDynamic Tableとは

この上で、今回試すのは「dbt経由で定義するDynamic Table」となります。

実はDynamic Tableは、リリース当初からdbt経由で定義できるようになっております!

dbtのドキュメントとしては、下記のドキュメントの「Dynamic tables」の章が該当します。各Modelごとに定義するのはもちろん、dbt_project.yml経由でフォルダレベルでまとめてDynamic Tableとして定義することも可能です。

dbtの公式ブログでも本機能について言及されたブログが出ております。特にtarget_lag_environment()というマクロを定義することにより、使用するtargetlagの値を動的に切りかえるテクニックなどが参考になります。

ちなみに、dbt経由でのDynamic Tableの定義についても、実はすでに日本語で記事もあります。この記事ではLambda Viewと呼ばれる差分更新を行うロジックを実装せずとも、Dynamic Tableならば1つModelとConfigを定義するだけで簡単に差分更新の処理が実装できることをわかりやすく書いております。

このようにdbt経由でDynamic Tableを定義することに関して情報はすでにいくつかあるのですが、改めて私も挙動を確かめてみたかったため、本記事で試して内容をまとめてみたいと思います。

やること

Snowflake上に、dbtのチュートリアルでよく使われるJaffle Shopのデータをロードしておきます。

顧客情報を持つcustomersテーブルと注文情報を持つordersテーブルをJOINするテーブルを、dbtでDynamic Tableとして定義してみて、レコードを追加したときの挙動を確かめてみようと思います。

検証環境

Snowflake : Enterpriseエディション dbt : dbt Cloud Enterpriseエディション dbtのバージョン:1.6 ※dbt CloudのEnvironmentで指定

Dynamic Tableを定義してみた

前提

まず、下図のようにcustomers.sqlを定義します。JOIN元となっているstg_customerscustomersのステージングモデル、stg_ordersordersのステージングモデルです。

customers.sqlに対しては、下図のようにcustomer_idカラムに対してテストの記述もあります。

Dynamic Tableの参照先となるテーブルでCHANGE_TRACKINGを有効化

Dynamic Tableは、参照先のテーブルでCHANGE_TRACKING = TRUEとなっていないといけません。デフォルトではFALSEとなっています。

そのため、今回使用するテーブルであるcustomersordersで、以下のクエリを実行してCHANGE_TRACKING = TRUEとしておきます。生データに該当するテーブルの変更となるため、使用するロールに注意しましょう。

alter table customers set change_tracking = true;
alter table orders set change_tracking = true;

この上でshow tablesなどのコマンドを実行し、change_trackingONとなっていればOKです。

Dynamic Tableの定義&初回構築

今回はdbt_project.yml上で特定のフォルダ化のModelに対してまとめてDynamic Tableの定義をしてみます。

dbt_project.ymlの該当箇所のみ抽出すると、下記のようになります。

models:
  jaffle_shop:
    staging:
      materialized: view
    marts:
      materialized: dynamic_table
      snowflake_warehouse: sagara_dbt_wh
      target_lag: '1 minutes'

各オプションについては、下記のように入力すればOKです。

  • materialized: dynamic_tableと記入
  • snowflake_warehouse: 使用したいウェアハウス名を記入
  • target_lag: num { seconds | minutes | hours | days }の書式で、どの時間間隔で参照先のデータに差分があるかチェックするか記入。
    • DOWNSTREAMと記入すると、参照先のテーブルがDynamic Tableであった場合に、その参照先のDynamic Tableが更新されたタイミングで、更新されるようになります。

この上で、一度dbt buildコマンドを実行すれば、Dynamic Tableが作られます。

dbt buildとして実行したので、定義したテストも実行されています。

Snowsight上での確認

Snowsight上で確認するとDynamic Tableの分類でcustomersテーブルができていることがわかります。

Graphタブでは、customersテーブルがどのテーブル・ビューからできているのかリネージを見ることができます。

Refresh Historyタブでは、cutomersテーブルの差分更新をいつ行ったのかを履歴として確認可能です。(target_lag: '1 minutes'としたので1分おきに実行されているように見えますが、ウェアハウスが動くのは差分を検知した時のみです。)

レコードを1行追加してみる

ここで、元となっているordersテーブルに対して1レコード追加したときにどのような挙動をするか、確認をしてみます。新しく追加するレコードの内容としては、customer_id20の顧客の新しい注文データがINSERTされた、というものになります。

insert into orders
    values
    (100,20,'2018-04-10','placed');

すると、更新を検知したために対象のDynamic TableでRefreshが行われました。

実行されたクエリプロファイルを見ると、すごいNodesで構成されたプロファイルが出てきました…(クエリ自体は合計2.2秒で終わっているんですけども)

実際に更新後のDynamic Tableを見ると、問題なく対象のcustomer_id20の顧客のみ、MOST_RECENT_ORDER_DATEが更新されていますね。customersのdbt上の定義ではJOINもGROUP BYを用いた集計も行っていたのですが、これらの処理があっても問題なく更新されていることがわかりました!

  • Before ※スクショ撮り忘れのためタイムトラベルしたクエリで失礼します…

  • After

しかし唯一ネックな点としては、Dynamic Tableの更新時にdbtのテストのクエリは実行されませんでした

一度Dynamic Tableを構築済みの状態で、target_lagを書き換えて、dbt buildを行うとどうなるか

ここで、dbt_project.ymlを下記のようにtarget_lag: '1 minutes'からtarget_lag: '24 hours'に書き換えたときに、dbt buildを行うとどうなるかを試してみます。

models:
  jaffle_shop:
    staging:
      materialized: view
    marts:
      materialized: dynamic_table
      snowflake_warehouse: sagara_dbt_dev_wh
      target_lag: '24 hours'

すると、Dynamic Tableのデータの更新は行われずに、Dynamic Table definition was altered. Lag metrics have been reset.と表示され、lagが24時間に切り替わったことだけがわかります。

一度Dynamic Tableを構築済みの状態で、レコード追加後にdbt buildをしてみる

最後に、一度Dynamic Tableを構築済みの状態で、レコード追加後にdbt buildをしてみます。上述のtarget_lag: '24 hours'の続きとなるため、検証途中に自動でデータが更新されることはありません。

下記のクエリを実行して、新しくレコードを追加します。

insert into orders
    values
    (101,30,'2018-04-11','placed');

この上で、dbt buildコマンドを実行してみます。

しかし、このdbt buildの実行時にはDynamic Tableのデータは更新されませんでしたdbt build時にはtarget_lagなどの設定値のみがALTERコマンドで見直されるようです。

検証後に感じた注意事項

dbt経由でDynamic Tableを定義・構築してみましたが、いくつか注意すべき点があると感じたのでまとめておきます。

開発環境でも問答無用に参照先のテーブルが更新されたら、開発環境のDynamic Tableがウェアハウスを用いて更新されてしまう

今回は基本的にdbt CloudのIDEからdbt buildなどのコマンドを実行していたのですが、開発環境などは関係なく、指定したtarget_lagの値に応じて更新処理が走ってしまいました。

具体的な対策の一つとしては、このdbtのブログからの引用ですが、targetによってtarget_lagの値を動的に変更するマクロの定義、などを行う必要があります。

  • マクロの定義
{% macro target_lag_environment() %}
{% set lag = '1 minute' if target.name == "prod" else '35 days' %}
{{ return(lag) }}
{% endmacro %}
  • マクロの適用:各ModelでのConfigの設定例
{{
config(
    materialized = 'dynamic_table',
    snowflake_warehouse = 'transforming',
    target_lag = target_lag_environment(),
    on_configuration_change = 'apply',
)
}}

dbtでDynamic Tableのテーブルにtestを定義していても、Dynamic Tableの更新時にtestが実行されない

これは当たり前といえばそうかもしれませんが、Dynamic Tableを一度定義するとSnowflake内で完結する処理になるため、対象のテーブルに対してdbtのtestを定義していても、Dynamic Tableの更新時にtestは実行されません。

Dynamic Tableに対してdbtで定義したtestを実行する場合には、定期的にDynamic Table向けのtestだけを回すジョブを定義するなど、何かしらの対策が必要となります。

最後に

Snowflakeで定期的な差分更新が簡単に実現できる「Dynamic Table」をdbt経由で定義し、色々な動作を試してみました。

dbtで定義することで、Dynamic Tableの定義元となるSQLをGit管理できますし、開発・本番と環境を分離して管理することも容易になります。

設定も難しくないため、dbt×Snowflakeをお使いの方にはぜひ試してほしいです!