dbt Coreのis_incrementalマクロの値でモデルが参照するテーブルを切り替える

is_incremental()マクロの値によって処理するモデルを切り替えるモデルを実装してみました。
2024.01.15

データアナリティクス事業本部 機械学習チームの鈴木です。

incrementalモデルの運用を想定したときに、通常の実行とテーブル再構築の場合で参照するモデルを切り替えられるのか知りたいことがありました。dbtのマクロとJinjaの機能を使えば実現できそうでしたが、実際に調査・検証してみて注意点がありそうか確認したのでまとめました。

確認したいこと

Incrementalモデルで以下のような切り替えができるかどうかを確認しました。

検証イメージ

例えばIncrementalモデルが参照するテーブルに格納されているテーブルのデータが、あるときのスナップショットを積み重ねているような場合、随時増分更新する分には良いものの、何かしらの理由でテーブルを再構築したい場合にはスナップショット間のデータの重複をユニークキーで識別して上手く解消し最新の状態にする必要があります。

JinjaではIfの機能があるため、FROM句で指定するテーブルを通常実行かテーブル再構築かで切り替えることによりこのようなケースにも対応できないかと考えました。

なお、dbtのガイドとしてはIncrementalモデルで参照する2つのモデルを切り替えるような実装の言及はありませんでした。

以下のようにis_incremental()マクロが返す値を元に増分か全量かを切り替える実装のみ紹介されています。

-- 上記ガイドより2024/1/15に引用

{{
    config(
        materialized='incremental'
    )
}}

select
    *,
    my_slow_function(my_column)

from raw_app_data.events

{% if is_incremental() %}

  -- this filter will only be applied on an incremental run
  -- (uses > to include records whose timestamp occurred since the last run of this model)
  where event_time > (select max(event_time) from {{ this }})

{% endif %}

今回は{% if is_incremental() %}のブロックだけではなく{% else %}のブロックを使うことで参照するテーブルを切り替えられるか簡単に確認しますが、ガイドで言及されていない実装であることにはご留意ください。

is_incrementalマクロについて

Understanding the is_incremental() macro』によると、以下の全てが満たされるときにis_incremental()マクロはTrueを返します。

  • データを追加するテーブルがデータベースにすでに存在するとき
  • dbtがフルリフレッシュモードで実行されていないとき
  • マテリアライズがincrementalのとき

マクロの実装自体は、以下で確認できます。

検証の準備

環境について

以下のバージョンで検証をしました。

  • Python: 3.10.13
  • dbt Core: 1.7.4
  • dbt-athena-community: 1.7.1

私はdbt-athenaをよく使うため、このアダプターを使いました。コミュニティ版のアダプターですが、今回は検証したいものがdbt自体の機能なのでほかのアダプターでも同じになるかなと思います。

用意したモデル

検証用に、以下の3つのモデルを作成しました。

is_incremental()の値により、異なるモデルが参照されます。

value_check_model.sql

-- depends_on: {{ ref('true_value') }}

{{ config(materialized='incremental') }}

  select is_incremental_value


{% if is_incremental() %}

  from {{ref('true_value')}}

{% else %}

  from {{ref('false_value')}}

{% endif %}

ifによる分岐はJinjaの文法により実装しました。

上記のvalue_check_modelモデルは以下のモデルを参照するようにしました。

true_value.sql

{{ config(materialized='view') }}

select 'is_incremental() is True' as is_incremental_value

false_value.sql

{{ config(materialized='view') }}

select 'is_incremental() is False' as is_incremental_value

なお、dbt-athenaのIncrementalモデルのデフォルトはinsert_overwriteです。

モデル実行の結果確認

普段の運用を想定して、3パターンでどのようなレコードが作成されるか確認しました。

1. テーブルがないとき

以下のようにモデルを実行しました。

dbt run

マクロの返す値はFalseであることが分かります。

テーブルがないとき

2. テーブルが存在してフルリフレッシュモードでないとき

以下のようにモデルを実行しました。

dbt run

マクロの返す値はTrueであることが分かります。

テーブルが存在してフルリフレッシュモードでないとき

3. フルリフレッシュモードのとき

以下のようにフルリフレッシュモードで実行しました。

dbt run --full-refresh

マクロの返す値はFalseであることが分かります。

フルリフレッシュモードのとき

ドキュメント上での見え方

リネージ

以下のように、dbtプロジェクトでドキュメントを生成し、サーバーを起動してブラウザからアクセスしました。

# ドキュメントの生成
dbt docs generate

# サーバーの起動 
dbt docs serve --port 8001

リネージを確認すると以下のようになっていました。is_incremental()の値で分岐しているもののリネージは両方に依存するように見えるので注意が必要そうです。ここはモデルの説明に詳細を書いておくのが良いでしょう。

生成されたリネージ

なお、value_check_modelモデルではdbtが依存関係を理解できるように、依存関係を指定しました。この記載をしないとドキュメントを生成する際にエラーとなってしまいました。

% dbt docs generate
09:56:21  Running with dbt=1.7.4
09:56:21  Registered adapter: athena=1.7.1
09:56:21  Found 3 models, 0 sources, 0 exposures, 0 metrics, 423 macros, 0 groups, 0 semantic models

(略)

Runtime Error
  Compilation Error in model value_check_model (models/example/value_check_model.sql)
    dbt was unable to infer all dependencies for the model "value_check_model".
    This typically happens when ref() is placed within a conditional block.
    
    To fix this, add the following hint to the top of the model "value_check_model":
    
    -- depends_on: {{ ref('true_value') }}

この指定方法については、以下に説明がありました。

最後に

Incrementalモデルの運用を想定して、is_incremental()の返す値をいくつかのよくある場合で確認してみました。

また、JinjaのIfを使ってref()で参照するモデルが切り替えられるモデルを実装しました。モデルで依存関係を明示しないとドキュメント作成時にエラーになることも確認できました。

参考になりましたら幸いです。