[dbt] 「incremental」というMaterializationを使ってデータモデルを増分更新する

インクリといつまでも
2021.02.02

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

大阪オフィスの玉井です。

以前、dbtのデータモデルの「Materialization」という設定について検証しました。

上記で紹介していないMaterializationの種類に「incremental」というものがあります。これは、データモデルとしては、普通にテーブルとして生成するのですが、初回生成以降については、データの増分更新ができるようになります。

検証がちょっと面倒だったので、今回、別記事に切り出した形で、実際に検証してみたいと思います。

検証環境

  • macOS Catalina 10.15.7
  • dbt CLI 0.18.1
  • Google BigQuery

やってみた

前準備

下記の環境を準備しておきます(手軽にデータを増やせるので)。ただし、今回使用するDWHはBigQueryです。

ロードしたデータは下記の通り。これが今回の検証におけるローデータとなります。この時点で1560件あります(後で増やします)。

「incremental」モデルを作成する

モデルファイル

準備したローデータを参照して、「incremental」なデータモデルを作成します。クエリは下記の通り。

stg_stream_data.sql

{{
    config(
        materialized='incremental'
    )
}}

SELECT
_fivetran_synced
,_modified
,change
,price
,sector
,ticker_symbol

FROM `tamai-rei.kinesis.dev_stream_data`

{% if is_incremental() %}

  where _fivetran_synced > (select max(_fivetran_synced) from {{ this }})

{% endif %}

config

今回検証するMaterializationはincrementalなので、それをconfigで指定します。

is_incremental

ここがincrementalモデルのミソになります。incrementalモデルは増分更新なので、「どこからどこまでを増分するのか」を判断する基準となるロジックを組み込む必要があります。

要するに、「増分したレコードだけを特定する」必要があるわけです。方法はデータモデルによって千差万別ですが、今回は日付を使います。いまデータモデルに存在するレコードより新しいレコードだけを増分更新の対象にします。

まず、増分の基準には、WHERE句を使います。今回、データのロードにはFivetranを使っているため、Fivetranでロードした日付として_fivetran_syncedというカラムがあります。現在のデータモデルのカラムの最新日付をとり、それよりも新しい日付のレコードだけをデータモデルに対して増分更新するという仕組みです。

そして、このWHERE句は、is_incrementalという条件式がTrueの時だけ動くようになっています。is_incrementalは、下記の条件を満たした時にTrueとなります。

  • Materializationがincrementalである
  • データモデルがDWHに既に存在する(= 初回作成ではない)
  • --full-refreshのオプションが無い状態で実行されている

ざっくりいうと、データモデルの更新時のときだけ動作するようになる、という感じです。

モデルの実行

上記のデータモデルを初めて実行します。

$ dbt run --models stg_stream_data
Running with dbt=0.18.1
Found 6 models, 11 tests, 0 snapshots, 2 analyses, 326 macros, 0 operations, 1 seed file, 2 sources

15:40:56 | Concurrency: 4 threads (target='kinesis')
15:40:56 |
15:40:56 | 1 of 1 START incremental model kinesis.stg_stream_data............... [RUN]
15:40:59 | 1 of 1 OK created incremental model kinesis.stg_stream_data.......... [CREATE TABLE (1.6k rows, 71.9 KB processed) in 3.01s]
15:40:59 |
15:40:59 | Finished running 1 incremental model in 5.30s.

Completed successfully

Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

データモデルがテーブルとして生成されました。

実際に実行されたクエリですが、WHERE句のないSELECT文が、そのままCREATE TABLEされています。これは初回実行なので、is_incrementalがFalseとなったためです。

データを増やして、再度モデルを実行する

ローデータ側を増やしてみて、再度モデルを実行してみます。incrementalなので、データモデルは再構築ではなく、増分更新となるはずです。

Firehose側でデータを増やします。

Fivetranで同期し、ローデータ側が1560→2600に増えたことを確認します。

データモデルを再度実行します。CREATEではなくMERGEとなっていますね。

$ dbt run --models stg_stream_data
Running with dbt=0.18.1
Found 6 models, 11 tests, 0 snapshots, 2 analyses, 326 macros, 0 operations, 1 seed file, 2 sources

16:22:50 | Concurrency: 4 threads (target='kinesis')
16:22:50 |
16:22:50 | 1 of 1 START incremental model kinesis.stg_stream_data............... [RUN]
16:22:53 | 1 of 1 OK created incremental model kinesis.stg_stream_data.......... [MERGE (1.0k rows, 132.1 KB processed) in 2.79s]
16:22:53 |
16:22:53 | Finished running 1 incremental model in 5.64s.

Completed successfully

Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

では、DWH側のクエリ履歴を見てみましょう。

WHERE句内の条件式が動作し、増分したレコードだけマージされていますね。検証としては成功です。

つかいみち

「incremental」が向いているのは、下記のようなデータだと思いました(公式ドキュメントにも書いてあるけど…)。

  • データ量が純粋に積み上がっていく
    • 原則レコードの削除が無い
  • データ量がかなり大きい
    • 億単位〜
  • データ構造の変更が原則無い
    • カラムの変更など

アプリケーションのイベントデータなどが該当するのではないでしょうか。膨大に積み上がっていくデータの場合、毎度データモデルを再構築していては、DWHに多大な負荷がかかります。SnowflakeやBigQueryの場合は、コストも馬鹿にならなくなってきます。そういう時、モデルのMaterializationを「incremental」をすることで、増えた分だけをクエリの対象にし、DWHの負荷やコストを抑えることができます。

逆に、マスタデータのような、カラム等の仕様が変わる(ことが予想される)データには、「incremental」は向いていないでしょうね。

また、「incremental」は、増分を特定するために、モデル内に少々ロジックを組む必要があるため、データモデルのクエリの可読性は下がります。ここらへんはトレードオフなので、じっくり検討しましょう。

おわりに

「Materialization」についても、データ側の知見(どういう仕様で更新されていくのか等)が分かってないと、適切な選択をするのは難しいと思います。