
dbt Cloud で incremental model を試してみた
はじめに
dbt Cloud で incremental model をについて整理しておきたく記事としました。特に incremental model で指定できる各戦略の動作を確認しています。
過去にもこちらの機能に関する記事はありますので、あわせてご参照ください。
前提条件
記事内で検証も含みますが、この際の動作は Snowflake を前提としています。
機能の概要
本機能については、以下に記載があります。
incremental model は、dbt のマテリアライゼーションのタイプの1つです。
初回実行時にはモデルの全データを変換してテーブルを構築しますが、2回目以降の実行では前回実行以降に新しく追加または変更されたデータのみを処理し、既存のテーブルに追加・更新するのが基本的な動作です。
incremental strategy
incremental model を使用する際は、新規や変更されたデータをどのようにターゲットテーブルに反映させるかを incremental strategy として指定します。この一覧は以下に記載があります。
ユーザー側でカスタムの戦略を定義することも可能ですが、まずはビルトインの以下の方式を使用することが基本となります。
- append
- merge
- delete+insert
- insert_overwrite
- microbatch
incremental strategy はモデルプロパティを定義する yml ファイルやモデル内の config ブロックで指定可能です。
models:
+incremental_strategy: "insert_overwrite"
{{
config(
materialized='incremental',
unique_key='date_day',
incremental_strategy='delete+insert',
...
)
}}
select ...
前提として、使用する DWH ごとに対応するincremental strategy の違いもある点にご注意ください。一覧は以下に記載があります。
試してみる
以降で各 incremental strategy を試しながら、基本的な動作を確認してみます。
事前準備
Snowflake 側で生成AIで作成した検証用のデータを用意しておきました。
-- テーブル1: ad_clicks
CREATE OR REPLACE TABLE ad_clicks (
click_id VARCHAR(50) PRIMARY KEY,
user_id VARCHAR(50),
click_timestamp TIMESTAMP_LTZ
);
-- ad_clicks テーブルにデータを挿入
INSERT INTO ad_clicks (click_id, user_id, click_timestamp) VALUES
('C-101', 'U-501', '2025-09-29 09:05:00'),
('C-102', 'U-502', '2025-09-29 10:35:00'),
('C-103', 'U-501', '2025-09-29 11:50:00');
append
こちらは最もシンプルな更新戦略の一つです。
incremental model の設定時は、対象テーブルの unique_key をオプションで設定可能なのですが、この指定がない場合のデフォルト動作となります。
append を使用すると、既存のデータを更新、削除することなく、新規や変更されたデータが対象のテーブルに挿入されます。
上記のため、ログデータなど、変更や削除が発生しないデータに利用できます。
先のサンプルデータをテーブルとして Snowflake で定義し、以下のモデルを定義しました。(生データから STG レイヤーを介して incremental model として作成しています)
{{
config(
materialized = 'incremental',
incremental_strategy = 'append'
)
}}
WITH ad_clicks AS (
SELECT
click_id,
user_id,
click_timestamp
FROM
{{ ref('stg_incremental_test_ad_clicks') }}
{% if is_incremental() %}
-- 前回の実行以降の新規レコードのみを選択
WHERE click_timestamp > (SELECT MAX(click_timestamp) FROM {{ this }})
{% endif %}
)
SELECT * FROM ad_clicks
ポイントは{% if is_incremental() %}
としている箇所で、is_incremental()
マクロでは、以下の条件に合致する場合True
を返し、このブロック内も実行対象としてコンパイル後に含まれます。上記は基本的な例ですが、ここで増分となるロジックを定義できます。
- データを追加するテーブルがデータベースにすでに存在する
- モデル実行時に
full-refresh
モードを使用してない - モデルのマテリアライズ方式が
materialized='incremental’
初回実行:テーブルのフル作成
モデルを初めて実行するとき、ターゲットとなるテーブルは存在しません。この場合、以下のクエリが実行され、ソースデータからテーブルをフルで作成します。
create or replace transient table dev_db.dbt_tyasuhara.mart_user_conversions_append
as (
WITH ad_clicks AS (
SELECT
click_id,
user_id,
click_timestamp
FROM
dev_db.dbt_tyasuhara.stg_incremental_test_ad_clicks
)
SELECT * FROM ad_clicks
)
2回目以降:増分更新で差分を処理
一度テーブルが作成された後、ソースデータに新しいレコードが追加されたとします。
-- ad_clicks テーブルに新しいデータを挿入
INSERT INTO ad_clicks (click_id, user_id, click_timestamp) VALUES
('C-104', 'U-504', '2025-09-30 09:30:00'),
('C-105', 'U-501', '2025-09-30 10:00:00');
この状態で再度モデルを実行すると以下の SQL が実行されます。
- 新規レコードの特定と一時ビューの作成
(SELECT MAX(click_timestamp) FROM <更新対象のテーブル>)
により、データが追加される予定の既存テーブルから、{% if is_incremental() %}
ブロックで指定のロジックに基づき、ここではclick_timestamp
(タイムスタンプの最大値)の現在の最大値を取得WHERE
句のフィルタにより、既存のターゲットテーブルのタイムスタンプよりも新しいレコードだけをソース側から抽出し、一時的なビューとして作成
create or replace temporary view dev_db.dbt_tyasuhara.mart_user_conversions_append__dbt_tmp
as (
WITH ad_clicks AS (
SELECT
click_id,
user_id,
click_timestamp
FROM
-- ソーステーブルが対象
dev_db.dbt_tyasuhara.stg_incremental_test_ad_clicks
-- 前回の実行以降の新規レコードのみを選択
WHERE click_timestamp > (SELECT MAX(click_timestamp) FROM dev_db.dbt_tyasuhara.mart_user_conversions_append)
)
SELECT * FROM ad_clicks
)
- ターゲットテーブルへのレコード追加
- 先の手順で作成された一時ビューに抽出された新規レコードを既存のターゲットテーブルに INSERT
insert into dev_db.dbt_tyasuhara.mart_user_conversions_append ("CLICK_ID", "USER_ID", "CLICK_TIMESTAMP")
(
select "CLICK_ID", "USER_ID", "CLICK_TIMESTAMP"
from dev_db.dbt_tyasuhara.mart_user_conversions_append__dbt_tmp
)
merge
unique_key
を基準に、ターゲットテーブルにキーが存在しないレコードは INSERT し、キーが存在するレコードは更新します。これにより、重複データの発生を防ぐことが可能です。
更新される可能性のあるマスターデータなどで使用できます。
先と同じモデルで設定部分のみ、以下のように変更し、動作を確認してみます。
具体的には、incremental_strategy = 'merge'
としターゲットテーブルのユニークキーのとなる項目をunique_key
で設定しています。複数のキーを指定可能です。
{{
config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key=['click_id']
)
}}
初回実行:テーブルのフル作成
モデルを初めて実行するとき、ターゲットとなるテーブルは存在しないため、ソースデータからテーブルをフルで作成します(append 時と変わりありません)。
2回目以降:増分更新で差分を処理
一度テーブルが作成された後、ソースデータに新しいレコードが追加されたとします。
この状態で再度モデルを実行すると以下の SQL が実行されます。
- 新規レコードの特定と一時ビューの作成
- ※append 時と変わりありません
- ターゲットテーブルへのレコード更新・追加(マージ)
- 先の手順で作成した一時ビュー(ソースデータ)と、既存のターゲットテーブルを
unique_key
(この場合はclick_id
)を基に結合し、MERGE 文を実行します
- 先の手順で作成した一時ビュー(ソースデータ)と、既存のターゲットテーブルを
merge into dev_db.dbt_tyasuhara.mart_user_conversions_merge as DBT_INTERNAL_DEST
using dev_db.dbt_tyasuhara.mart_user_conversions_merge__dbt_tmp as DBT_INTERNAL_SOURCE
on (
DBT_INTERNAL_SOURCE.click_id = DBT_INTERNAL_DEST.click_id
)
when matched then
-- 主キーが一致する場合、レコードを更新
update set
"CLICK_ID" = DBT_INTERNAL_SOURCE."CLICK_ID","USER_ID" = DBT_INTERNAL_SOURCE."USER_ID","CLICK_TIMESTAMP" = DBT_INTERNAL_SOURCE."CLICK_TIMESTAMP"
when not matched then
-- 主キーが一致しない場合、新しいレコードとして挿入
insert
("CLICK_ID", "USER_ID", "CLICK_TIMESTAMP")
values
("CLICK_ID", "USER_ID", "CLICK_TIMESTAMP")
delete+insert
こちらもunique_key
に基づいて、まずターゲットテーブルから一致するレコードを DELETE し、その後で新しいレコードを INSERT します。
delete+insert を試してみます。モデルの設定をincremental_strategy = 'delete+insert'
、ターゲットテーブルのユニークキーとなる項目をunique_key
で指定します。
{{
config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key=['click_id']
)
}}
初回実行:テーブルのフル作成
モデルを初めて実行するとき、ターゲットとなるテーブルは存在しないため、ソースデータからテーブルをフルで作成します(これまでの動作と変わりありません)。
2回目以降:増分更新で差分を処理
一度テーブルが作成された後、ソースデータに新しいレコードが追加されたとします。
この状態で再度モデルを実行すると以下の SQL が実行されます。
- 新規レコードの特定と一時ビューの作成
- ※これまでと変わりありません
- 続けて、先の手順で作成された一時ビューに存在する更新対象のレコードを、既存のターゲットテーブルから DELETE します
delete from dev_db.dbt_tyasuhara.mart_user_conversions_deleteinsert as DBT_INTERNAL_DEST
where (click_id) in (
select distinct click_id
from dev_db.dbt_tyasuhara.mart_user_conversions_deleteinsert__dbt_tmp as DBT_INTERNAL_SOURCE
)
- 一時ビューに抽出された新規レコードを既存のターゲットテーブルに INSERT します。これにより、削除されたレコードは新しいバージョンで置き換えられます
insert into dev_db.dbt_tyasuhara.mart_user_conversions_deleteinsert ("CLICK_ID", "USER_ID", "CLICK_TIMESTAMP")
(
select "CLICK_ID", "USER_ID", "CLICK_TIMESTAMP"
from dev_db.dbt_tyasuhara.mart_user_conversions_deleteinsert__dbt_tmp
)
insert_overwrite
Snowflake における、insert_overwrite は一般的な dbt の増分戦略とは異なるため注意します。
具体的には、insert_overwrite
は Snowflake では対象テーブルの TRUNCATE 、INSERT のように動作します。
dbt においては、一時的なビューで更新対象のレコードを抽出後、insert overwrite
により、対象テーブルが更新されるため、更新対象のレコードでターゲットテーブルが上書きされる点に注意が必要です。
insert_overwrite を試してみます。モデルの設定をincremental_strategy = 'insert_overwrite'
とします。insert_overwrite 戦略を設定する際には、unique_key
を指定する必要はありません。
{{
config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite'
)
}}
初回実行:テーブルのフル作成
初回実行時は他戦略同様、ソースデータからターゲットテーブルをフルで作成します。
2回目以降:増分更新で差分を処理
ソースデータに以下のレコードを追加します。
-- ad_clicks テーブルに新しいデータを挿入
INSERT INTO ad_clicks (click_id, user_id, click_timestamp) VALUES
('C-104', 'U-504', '2025-09-30 09:30:00'),
('C-105', 'U-501', '2025-09-30 10:00:00');
この状態で再度モデルを実行すると以下の SQL が実行されます。
- 新規レコードの特定と一時ビューの作成
- ※ここは、これまでと変わりありません
- 一時ビューに抽出された新規レコードを既存のターゲットテーブルに
insert overwrite
します
insert overwrite into dev_db.dbt_tyasuhara.mart_user_conversions_insert_overwrite
select *
from dev_db.dbt_tyasuhara.mart_user_conversions_insert_overwrite__dbt_tmp
更新後のテーブルを確認すると以下のように、新規レコードのみから構成されます。
>SELECT * FROM MART_USER_CONVERSIONS_INSERT_OVERWRITE;
+----------+---------+-------------------------------+
| CLICK_ID | USER_ID | CLICK_TIMESTAMP |
|----------+---------+-------------------------------|
| C-104 | U-504 | 2025-09-30 09:30:00.000 +0900 |
| C-105 | U-501 | 2025-09-30 10:00:00.000 +0900 |
+----------+---------+-------------------------------+
microbatch
dbt ver 1.9 以降で新たに追加された戦略です。microbatch は、特定の期間のレコードを効率的に更新できます。
主な特徴は以下の通りです。
event_time
プロパティを構成することで、ここで指定のカラムに基づき、処理が複数のクエリ(バッチ)に分割されるbatch_size
を設定することで、各バッチのサイズ(期間)を制御できる- ※デフォルトのサイズは
day
(1日分)
- Snowflake の場合:
- 実際の更新ロジックには、
delete+insert
が使用されます(指定したバッチ期間内の既存レコードをすべて削除し、その後、新しいレコードを追加) event_time
カラムに基づいて時間ベースのバッチ処理を行うためunique_key
の指定は必要ありません
- 実際の更新ロジックには、
また、後述しますが、バックフィルにも対応しており、頻繁に過去のデータが更新・修正される場合や特定の期間だけを再処理したい場合に便利なオプションです。
実際に試してみます。
microbatch 向けに以下のようなデータを用意しました。
-- ad_clicks テーブルにデータを挿入
INSERT INTO ad_clicks (click_id, user_id, click_timestamp) VALUES
('C-101', 'U-501', '2025-09-29 09:05:00'),
('C-102', 'U-502', '2025-09-29 10:35:00'),
('C-103', 'U-501', '2025-09-29 11:50:00'),
('C-091', 'U-503', '2025-09-25 15:20:00'),
('C-092', 'U-504', '2025-09-26 08:40:00'),
('C-093', 'U-501', '2025-09-26 14:10:00'),
('C-094', 'U-505', '2025-09-27 10:00:00'),
('C-095', 'U-502', '2025-09-27 16:30:00'),
('C-096', 'U-504', '2025-09-28 09:15:00'),
('C-097', 'U-503', '2025-09-28 12:40:00');
次に、dbt_project.yml
またはモデルファイル内で、microbatch
戦略を設定します。
begin
オプションで増分モデルのデータの開始点となるタイムスタンプ値を設定します。ここでは以下のように設定することで「2025-09-25 以降のデータを、click_timestamp カラムを基準に1日単位で処理する」設定としています。
また、その他の戦略と異なる特徴として、event_time
カラムに基づいて時間ベースのバッチ処理を行う方式のため、{% if is_incremental() %}
のような増分を検出するロジックは必要ありません。
{{
config(
materialized = 'incremental',
incremental_strategy = 'microbatch',
begin='2025-09-25',
batch_size='day',
event_time = 'click_timestamp'
)
}}
WITH ad_clicks AS (
SELECT
click_id,
user_id,
click_timestamp
FROM
{{ ref('stg_incremental_test_ad_clicks') }}
)
SELECT * FROM ad_clicks
各プロパティの詳細は以下をご参照ください。
注意点として、バッチ処理を正しく行うために、ソースモデルにも event_time プロパティを追加しておく必要があります。ここでは、ソースの使用と、前段に最低限のクレンジング処理を行うことを想定した STG レイヤーを設けているので、それぞれに対して設定します。
STG レイヤーのモデルのプロパティを設定
{{ config(
event_time='click_timestamp'
) }}
ソースファイル
sources:
- name: incremental_test
tables:
- name: ad_clicks
config:
event_time: 'click_timestamp'
初回実行
各種設定後、モデルを実行します。
実行のサマリログを確認すると、begin
で指定した2025-09-25 以降、日別でバッチが作成されていることがわかります。(9/30に実行しています)
07:16:38 1 of 1 START sql microbatch model dbt_tyasuhara.mart_user_conversions_microbatch [RUN]
07:16:38 Batch 1 of 6 START batch 2025-09-25 of dbt_tyasuhara.mart_user_conversions_microbatch [RUN]
07:16:39 Batch 1 of 6 OK created batch 2025-09-25 of dbt_tyasuhara.mart_user_conversions_microbatch [SUCCESS 1 in 1.41s]
07:16:39 Batch 2 of 6 START batch 2025-09-26 of dbt_tyasuhara.mart_user_conversions_microbatch [RUN]
07:16:39 Batch 3 of 6 START batch 2025-09-27 of dbt_tyasuhara.mart_user_conversions_microbatch [RUN]
07:16:39 Batch 4 of 6 START batch 2025-09-28 of dbt_tyasuhara.mart_user_conversions_microbatch [RUN]
07:16:39 Batch 5 of 6 START batch 2025-09-29 of dbt_tyasuhara.mart_user_conversions_microbatch [RUN]
07:16:43 Batch 2 of 6 OK created batch 2025-09-26 of dbt_tyasuhara.mart_user_conversions_microbatch [SUCCESS 1 in 3.42s]
07:16:43 Batch 3 of 6 OK created batch 2025-09-27 of dbt_tyasuhara.mart_user_conversions_microbatch [SUCCESS 2 in 3.56s]
07:16:43 Batch 5 of 6 OK created batch 2025-09-29 of dbt_tyasuhara.mart_user_conversions_microbatch [SUCCESS 3 in 3.79s]
07:16:43 Batch 4 of 6 OK created batch 2025-09-28 of dbt_tyasuhara.mart_user_conversions_microbatch [SUCCESS 2 in 3.86s]
07:16:43 Batch 6 of 6 START batch 2025-09-30 of dbt_tyasuhara.mart_user_conversions_microbatch [RUN]
初回実行時のテーブル作成までにどのようなクエリが実行されるかを見てみます。
まずはじめに、この時点ではテーブルが存在しなかったため、最初のバッチ(2025-09-25)からなるテーブルが作成されます。
create or replace transient table dev_db.dbt_tyasuhara.mart_user_conversions_microbatch
as (
WITH ad_clicks AS (
SELECT
click_id,
user_id,
click_timestamp
FROM
(select * from dev_db.dbt_tyasuhara.stg_incremental_test_ad_clicks where
click_timestamp >= '2025-09-25 00:00:00+00:00' and click_timestamp < '2025-09-26 00:00:00+00:00')
)
SELECT * FROM ad_clicks
)
2つ目のバッチ(2025-09-26)からは、delete+insert
のロジックが適用されます。
- 各バッチの一時ビューを作成(以下は2025-09-26のバッチ)
- バッチ期間に該当するレコードがソーステーブルから抽出され、一時的なビューとして作成されます
create or replace temporary view dev_db.dbt_tyasuhara.mart_user_conversions_microbatch__dbt_tmp_20250926
as (
WITH ad_clicks AS (
SELECT
click_id,
user_id,
click_timestamp
FROM
(select * from dev_db.dbt_tyasuhara.stg_incremental_test_ad_clicks where
click_timestamp >= '2025-09-26 00:00:00+00:00' and click_timestamp < '2025-09-27 00:00:00+00:00')
)
SELECT * FROM ad_clicks
)
delete+insert
の実行- 最初に、ターゲットテーブルから、各バッチと同じ期間の既存レコードをすべて削除します
delete from dev_db.dbt_tyasuhara.mart_user_conversions_microbatch DBT_INTERNAL_TARGET
where (
DBT_INTERNAL_TARGET.click_timestamp >= to_timestamp_tz('2025-09-26 00:00:00+00:00')
and DBT_INTERNAL_TARGET.click_timestamp < to_timestamp_tz('2025-09-27 00:00:00+00:00')
)
- 続けて、一時ビューの新しいレコードが挿入されます
insert into dev_db.dbt_tyasuhara.mart_user_conversions_microbatch ("CLICK_ID", "USER_ID", "CLICK_TIMESTAMP")
(
select "CLICK_ID", "USER_ID", "CLICK_TIMESTAMP"
from dev_db.dbt_tyasuhara.mart_user_conversions_microbatch__dbt_tmp_20250926
)
上記が各バッチで実行日(2025-9-30のバッチ)まで行われます。
2回目以降
テーブルレコードを以下のように更新します。
-- 9月29日と9月30日の新しいレコードを挿入
INSERT INTO ad_clicks (click_id, user_id, click_timestamp) VALUES
('C-104', 'U-504', '2025-09-29 15:30:00'),
('C-105', 'U-501', '2025-09-30 10:00:00'),
('C-106', 'U-502', '2025-09-30 14:20:00');
-- 9月28日の既存レコード(C-096)を更新
UPDATE ad_clicks
SET
click_timestamp = '2025-09-30 17:31:24' -- 現在のタイムスタンプ
WHERE click_id = 'C-096';
この状態でモデルを実行します。
更新後のテーブルは以下の通りです。
>SELECT * FROM MART_USER_CONVERSIONS_MICROBATCH ORDER BY 1;
+----------+---------+-------------------------------+
| CLICK_ID | USER_ID | CLICK_TIMESTAMP |
|----------+---------+-------------------------------|
| C-091 | U-503 | 2025-09-25 15:20:00.000 +0900 |
| C-092 | U-504 | 2025-09-26 08:40:00.000 +0900 |
| C-093 | U-501 | 2025-09-26 14:10:00.000 +0900 |
| C-094 | U-505 | 2025-09-27 10:00:00.000 +0900 |
| C-095 | U-502 | 2025-09-27 16:30:00.000 +0900 |
| C-096 | U-504 | 2025-09-28 09:15:00.000 +0900 | #更新前のレコードが残っている
| C-096 | U-504 | 2025-09-30 17:31:24.000 +0900 | #更新後のレコードがINSERTされている
| C-097 | U-503 | 2025-09-28 12:40:00.000 +0900 |
| C-101 | U-501 | 2025-09-29 09:05:00.000 +0900 |
| C-102 | U-502 | 2025-09-29 10:35:00.000 +0900 |
| C-103 | U-501 | 2025-09-29 11:50:00.000 +0900 |
| C-104 | U-504 | 2025-09-29 15:30:00.000 +0900 |
| C-105 | U-501 | 2025-09-30 10:00:00.000 +0900 |
| C-106 | U-502 | 2025-09-30 14:20:00.000 +0900 |
+----------+---------+-------------------------------+
14 Row(s) produced. Time Elapsed: 1.147s
ポイントは以下の通りです。
- 「9月29日と9月30日の新しいレコードを挿入」として追加した3レコードは反映されている
- 「9月28日の既存レコード(C-096)を更新」としたレコードは、既存のレコードが削除されずに残っている
- 一方で更新後のレコードは追加されている
ログやクエリ履歴を確認すると「2025-09-29」と「2025-09-30」のバッチにわかれてそれぞれ処理されていることがわかります。
08:33:47 1 of 1 START sql microbatch model dbt_tyasuhara.mart_user_conversions_microbatch [RUN]
08:33:47 Batch 1 of 2 START batch 2025-09-29 of dbt_tyasuhara.mart_user_conversions_microbatch [RUN]
08:33:51 Batch 1 of 2 OK created batch 2025-09-29 of dbt_tyasuhara.mart_user_conversions_microbatch [SUCCESS 4 in 3.68s]
08:33:51 Batch 2 of 2 START batch 2025-09-30 of dbt_tyasuhara.mart_user_conversions_microbatch [RUN]
08:33:54 Batch 2 of 2 OK created batch 2025-09-30 of dbt_tyasuhara.mart_user_conversions_microbatch [SUCCESS 3 in 3.08s]
- 2025-09-29のバッチ
create or replace temporary view dev_db.dbt_tyasuhara.mart_user_conversions_microbatch__dbt_tmp_20250929
as (
WITH ad_clicks AS (
SELECT
click_id,
user_id,
click_timestamp
FROM
(select * from dev_db.dbt_tyasuhara.stg_incremental_test_ad_clicks where
click_timestamp >= '2025-09-29 00:00:00+00:00' and click_timestamp < '2025-09-30 00:00:00+00:00')
)
SELECT * FROM ad_clicks
)
- DELETE 文
delete from dev_db.dbt_tyasuhara.mart_user_conversions_microbatch DBT_INTERNAL_TARGET
where (
DBT_INTERNAL_TARGET.click_timestamp >= to_timestamp_tz('2025-09-29 00:00:00+00:00')
and DBT_INTERNAL_TARGET.click_timestamp < to_timestamp_tz('2025-09-30 00:00:00+00:00')
)
- INSERT 文
insert into dev_db.dbt_tyasuhara.mart_user_conversions_microbatch ("CLICK_ID", "USER_ID", "CLICK_TIMESTAMP")
(
select "CLICK_ID", "USER_ID", "CLICK_TIMESTAMP"
from dev_db.dbt_tyasuhara.mart_user_conversions_microbatch__dbt_tmp_20250929
)
上記について、microbatch のデフォルトの動作では、lookback オプションが「1」に設定されています。microbatch では、現在のタイムスタンプと設定されたlookback
に基づいてバッチが処理されます。より具体的には、lookback
により、何バッチ分過去に遡ってデータを再処理するかを指定できます。
先の実行では、デフォルト(1
)だったため、実行タイミングの前日(2025-9-29)までのバッチに遡ってデータが処理されます。
そのため、9月28日の既存レコードについては、この範囲外のため DELETE 処理から漏れてしまいました。データの特性上、過去XX日は変更の可能性があるような場合、それに合わせてlookback
の設定も必要です。
先の例では、9月28日分の変更が漏れたため、microbatch では以下のアプローチを取れます。
- full-refresh で全件更新
lookback
を適切な範囲に変更し再度実行--event-time-start
と--event-time-end
によるバックフィル
以降で、バックフィルについて見ていきます。
incremental model では通常、前回の実行以降の新しいデータが処理されますが、「ソースデータに誤りがあり、修正された過去のデータを反映させたい場合」など、過去のデータを再更新したい場面もあるかと思います。
従来の増分戦略では、full-refresh や増分ロジックを変更などの手間が必要でしたが、microbatch 戦略の場合、元より各バッチで独立して処理されるため、特定の期間のバッチのみを対象として再処理することが可能です。
具体的には、dbt コマンドに--event-time-start
と--event-time-end
のオプションを指定します。
これにより、指定された期間のバッチを独立したクエリとして処理できるため、容易にかつ効率的な処理が可能です。
先の例では、9月28日分の変更が漏れたため以下のように実行できます。
dbt run --select mart_user_conversions_microbatch --event-time-start "2025-09-28" --event-time-end "2025-09-29"
すでに更新後のレコードは追加済みなので、実行時は2025-09-28のバッチにより、更新前の2025-09-28の既存のレコードが削除されます。(更新されていない2025-09-28のレコードは再度INSERTされるのでそのまま)
>SELECT * FROM MART_USER_CONVERSIONS_MICROBATCH ORDER BY 1;
+----------+---------+-------------------------------+
| CLICK_ID | USER_ID | CLICK_TIMESTAMP |
|----------+---------+-------------------------------|
| C-091 | U-503 | 2025-09-25 15:20:00.000 +0900 |
| C-092 | U-504 | 2025-09-26 08:40:00.000 +0900 |
| C-093 | U-501 | 2025-09-26 14:10:00.000 +0900 |
| C-094 | U-505 | 2025-09-27 10:00:00.000 +0900 |
| C-095 | U-502 | 2025-09-27 16:30:00.000 +0900 |
| C-096 | U-504 | 2025-09-30 17:31:24.000 +0900 |
| C-097 | U-503 | 2025-09-28 12:40:00.000 +0900 |
| C-101 | U-501 | 2025-09-29 09:05:00.000 +0900 |
| C-102 | U-502 | 2025-09-29 10:35:00.000 +0900 |
| C-103 | U-501 | 2025-09-29 11:50:00.000 +0900 |
| C-104 | U-504 | 2025-09-29 15:30:00.000 +0900 |
| C-105 | U-501 | 2025-09-30 10:00:00.000 +0900 |
| C-106 | U-502 | 2025-09-30 14:20:00.000 +0900 |
+----------+---------+-------------------------------+
13 Row(s) produced. Time Elapsed: 1.442s
さいごに
dbt Cloud で incremental model の各戦略の動作を確認してみました。
こちらの内容が何かの参考になれば幸いです。