[dbt] データウェアハウスにロードされているローデータを「sources」として定義する(と便利だよ)

どっちかっていうと醤油派
2021.01.26

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

大阪オフィスの玉井です。

dbtはELTの「T」を行うツールです。つまり、DWHに対して、既にEとLが終わっている…言い換えると、DWHに対して、既にデータがロード済であることが(dbtを使用する上で)前提となっています。

ELTは、(見ればわかりますが)Tより先にLがあります。ローデータをゴニョゴニョ変換する前に、まずはそのままDWHにロードする、ということです。つまり、DWHにロード済のローデータが既にあることが、dbtを使用する上での前提になります。

dbtでは、この「DWHにロードされているローデータ」をsourcesとして定義することができます。そして定義したsourcesは、色々な形で利用することができます。今回は、そのsourcesを実際に使ってみました。

検証環境

  • macOS Catalina
  • dbt CLI 0.18.1
  • Google BigQuery

やってみた

sourcesをプロジェクト上で定義する

DWHに下記のようなロード済のローデータがあったとします(ここから色々変換して、データマート用のテーブルとかを作りたいとする)。

こちらをdbtプロジェクト上でsourcesとして定義したいと思います。

sourcesを定義する

dbt側でsourcesを定義するには、Modelディレクトリ下のYAMLファイルに下記のように記述します。

sources:
    - name: jaffle
      database: tamai-rei
      tables:
        - name: raw_customer
        - name: raw_order

ざっくり言うと、databaseはDB名、tablesはテーブル名を定義します。スキーマ名(BQの場合はデータセット名)はnameで定義した名前がそのまま使われます。しかし、schemaというパラメータでスキーマ名を明示的に定義することもできます。

定義したsourcesを他の場所から参照する

定義したsourcesは、modelファイル内等で参照することができます。

stg_orders.sql

select
    id as order_id,
    user_id as customer_id,
    order_date,
    status

from {{ source('jaffle','raw_order') }}

DWH側を確認する

sourcesを定義して、modelからsourcesを参照する形をとった状態で、dbtを実行するとどうなるのでしょうか。dbt runを実施します。

modelファイル側のFROM句は、sourcesを参照するJinjaコードになっていましたが、DWH上では、sourcesで定義したローデータのテーブルを正しく参照するクエリとしてコンパイルされています。

sourcesをテストする

dbtは作成したmodelに対してテストを設定することができます。詳細は下記をどうぞ。

このテストは、sourcesにも設定することができます。sourcesで定義するのは、DWHにロードされてきたローデータなので、実質、ELTのLで入ってくるデータに対してテストを行うことができる、ということになります。

テストの書き方はmodelに対するものと同じです。

sources:
    - name: jaffle
      database: tamai-rei
      tables:
        - name: raw_customer
          columns:
            - name: id
              tests:
                - unique
                - not_null
        - name: raw_order
          columns:
            - name: id
              tests:
                - unique
                - not_null

実際にテストを実行します。

$ dbt test
Running with dbt=0.18.1
Found 5 models, 11 tests, 0 snapshots, 0 analyses, 155 macros, 0 operations, 0 seed files, 2 sources

19:40:12 | Concurrency: 4 threads (target='learn')
19:40:12 |
19:40:12 | 1 of 11 START test accepted_values_stg_orders_status__completed__shipped__returned__placed__return_pending [RUN]
19:40:12 | 2 of 11 START test assert_positive_value_for_total_amount............ [RUN]
19:40:12 | 3 of 11 START test not_null_stg_customers_customer_id................ [RUN]
19:40:12 | 4 of 11 START test not_null_stg_orders_order_id...................... [RUN]
19:40:14 | 4 of 11 PASS not_null_stg_orders_order_id............................ [PASS in 2.54s]
19:40:14 | 5 of 11 START test relationships_stg_orders_customer_id__customer_id__ref_stg_customers_ [RUN]
19:40:14 | 3 of 11 PASS not_null_stg_customers_customer_id...................... [PASS in 2.64s]
19:40:14 | 6 of 11 START test source_not_null_jaffle_raw_customer_id............ [RUN]
19:40:15 | 2 of 11 PASS assert_positive_value_for_total_amount.................. [PASS in 3.14s]
19:40:15 | 7 of 11 START test source_not_null_jaffle_raw_order_id............... [RUN]
...

sourcesとして定義したテーブルに対してテストクエリが実行されています。

sourcesをドキュメントに反映させる

dbtは、作成したデータモデルに関するドキュメントを自動生成することができます。詳細は下記をどうぞ。

当然、sourcesもドキュメントに含めることができます。これも、定義の仕方は従来と同じです。

sources:
    - name: jaffle_shop
      description: "BQに存在するパブリックデータからCTASして生成"
      database: dbt-tutorial
      tables:
        - name: customers
          description: "顧客データ(raw)"
          columns:
            - name: id
              tests:
                - unique
                - not_null
        - name: orders
          description: "注文データ(raw)"
          columns:
            - name: id
              tests:
                - unique
                - not_null

ドキュメントにsourcesdescriptionが表示されています。

もちろん、DAGにも反映されます。

sourcesの「鮮度」をチェックする

DWHにロードするデータは様々ですが、定期的に新しいデータがロードされ続けるテーブルが存在するケースもあります。そういうケースの場合、sourcesに定義しているテーブルのデータの「鮮度」をdbtでチェックすることができます。

下記のように記述します。

sources:
    - name: jaffle
      description: "BQに存在するパブリックデータからCTASして生成"
      database: tamai-rei
      tables:
        - name: raw_customer
          description: "顧客データ(raw)"
          columns:
            - name: id
              tests:
                - unique
                - not_null
        - name: raw_order
          loaded_at_field: _etl_loaded_at
          freshness:
            warn_after: {count: 12, period: hour}
            error_after: {count: 24, period: hour}
          description: "注文データ(raw)"
          columns:
            - name: id
              tests:
                - unique
                - not_null

raw_orderに新しいパラメータを追加しています。

loaded_at_fieldは、鮮度の計算に使用するための時間データが入ったカラムを指定します。(後々クエリを見ればわかりますが)dbtはこのカラムのMAX値をとり、現在時刻との差を計ることで、データの鮮度をチェックします。ちなみに、timestampかつUTCじゃないとダメなので、そうじゃない場合はこの部分でキャストをかけるクエリを記述する必要があります。(loaded_at_field: "convert_timezone('UTC', 'Asia/Tokyo', _etl_loaded_at)など)。

freshness以降で、「現在時刻とデータの時刻がこれだけ離れていたら警告/エラーを出す」という定義をします。例えば、warn_after: {count: 12, period: hour}は「最新データが現在時刻より12時間以上離れていたら警告を出す」という感じです。

この状態で、dbt source snapshot-freshnessを実行すると、sourcesの鮮度を計ることができます。

$ dbt source snapshot-freshness
Running with dbt=0.18.1
Found 5 models, 11 tests, 0 snapshots, 0 analyses, 155 macros, 0 operations, 0 seed files, 2 sources

20:35:05 | Concurrency: 4 threads (target='learn')
20:35:05 |
20:35:06 | 1 of 1 START freshness of jaffle.raw_order........................... [RUN]
20:35:09 | 1 of 1 WARN freshness of jaffle.raw_order............................ [WARN in 3.98s]
20:35:09 | Done.

WARNが出たので、このデータは現在時刻(コマンド実行時刻)から12時間以上経過している、ということになります。

DWH側では、下記のクエリが実行されます。

単発で実行するというよりは、ジョブとして定期的に実行し続けて、データの鮮度に問題ないかどうかウォッチする、という使い方が想定されます。

おわりに

dbtがいかに「ELT」を意識したツールになっているかということをひしひしと感じました。まずロードありきということで、そのままロードしたデータがDWHにある前提で、そのローデータを「sources」として定義し、そこから各種データモデルに派生させていく、という思想がツール越しに伝わってきました。要するに、「sources」として定義するべきデータがDWHに無いという場合、それはELTのEとLが正しく完了していないということになります。dbtを使用する = ELTのEとLを正しく実施する(整備する)…ということになるでしょう。