![[dbt] データウェアハウスにロードされているローデータを「sources」として定義する(と便利だよ)](https://devio2023-media.developers.io/wp-content/uploads/2020/08/d72f41d6eb099a8fa0ef2791ad2b18a7.png)
[dbt] データウェアハウスにロードされているローデータを「sources」として定義する(と便利だよ)
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
大阪オフィスの玉井です。
dbtはELTの「T」を行うツールです。つまり、DWHに対して、既にEとLが終わっている…言い換えると、DWHに対して、既にデータがロード済であることが(dbtを使用する上で)前提となっています。
ELTは、(見ればわかりますが)Tより先にLがあります。ローデータをゴニョゴニョ変換する前に、まずはそのままDWHにロードする、ということです。つまり、DWHにロード済のローデータが既にあることが、dbtを使用する上での前提になります。
dbtでは、この「DWHにロードされているローデータ」をsourcesとして定義することができます。そして定義したsourcesは、色々な形で利用することができます。今回は、そのsourcesを実際に使ってみました。
検証環境
- macOS Catalina
- dbt CLI 0.18.1
- Google BigQuery
やってみた
sourcesをプロジェクト上で定義する
DWHに下記のようなロード済のローデータがあったとします(ここから色々変換して、データマート用のテーブルとかを作りたいとする)。
こちらをdbtプロジェクト上でsourcesとして定義したいと思います。
sourcesを定義する
dbt側でsourcesを定義するには、Modelディレクトリ下のYAMLファイルに下記のように記述します。
sources:
- name: jaffle
database: tamai-rei
tables:
- name: raw_customer
- name: raw_order
ざっくり言うと、databaseはDB名、tablesはテーブル名を定義します。スキーマ名(BQの場合はデータセット名)はnameで定義した名前がそのまま使われます。しかし、schemaというパラメータでスキーマ名を明示的に定義することもできます。
定義したsourcesを他の場所から参照する
定義したsourcesは、modelファイル内等で参照することができます。
select
id as order_id,
user_id as customer_id,
order_date,
status
from {{ source('jaffle','raw_order') }}
DWH側を確認する
sourcesを定義して、modelからsourcesを参照する形をとった状態で、dbtを実行するとどうなるのでしょうか。dbt runを実施します。
modelファイル側のFROM句は、sourcesを参照するJinjaコードになっていましたが、DWH上では、sourcesで定義したローデータのテーブルを正しく参照するクエリとしてコンパイルされています。
sourcesをテストする
dbtは作成したmodelに対してテストを設定することができます。詳細は下記をどうぞ。
このテストは、sourcesにも設定することができます。sourcesで定義するのは、DWHにロードされてきたローデータなので、実質、ELTのLで入ってくるデータに対してテストを行うことができる、ということになります。
テストの書き方はmodelに対するものと同じです。
sources:
- name: jaffle
database: tamai-rei
tables:
- name: raw_customer
columns:
- name: id
tests:
- unique
- not_null
- name: raw_order
columns:
- name: id
tests:
- unique
- not_null
実際にテストを実行します。
$ dbt test Running with dbt=0.18.1 Found 5 models, 11 tests, 0 snapshots, 0 analyses, 155 macros, 0 operations, 0 seed files, 2 sources 19:40:12 | Concurrency: 4 threads (target='learn') 19:40:12 | 19:40:12 | 1 of 11 START test accepted_values_stg_orders_status__completed__shipped__returned__placed__return_pending [RUN] 19:40:12 | 2 of 11 START test assert_positive_value_for_total_amount............ [RUN] 19:40:12 | 3 of 11 START test not_null_stg_customers_customer_id................ [RUN] 19:40:12 | 4 of 11 START test not_null_stg_orders_order_id...................... [RUN] 19:40:14 | 4 of 11 PASS not_null_stg_orders_order_id............................ [PASS in 2.54s] 19:40:14 | 5 of 11 START test relationships_stg_orders_customer_id__customer_id__ref_stg_customers_ [RUN] 19:40:14 | 3 of 11 PASS not_null_stg_customers_customer_id...................... [PASS in 2.64s] 19:40:14 | 6 of 11 START test source_not_null_jaffle_raw_customer_id............ [RUN] 19:40:15 | 2 of 11 PASS assert_positive_value_for_total_amount.................. [PASS in 3.14s] 19:40:15 | 7 of 11 START test source_not_null_jaffle_raw_order_id............... [RUN] ...
sourcesとして定義したテーブルに対してテストクエリが実行されています。
sourcesをドキュメントに反映させる
dbtは、作成したデータモデルに関するドキュメントを自動生成することができます。詳細は下記をどうぞ。
当然、sourcesもドキュメントに含めることができます。これも、定義の仕方は従来と同じです。
sources:
- name: jaffle_shop
description: "BQに存在するパブリックデータからCTASして生成"
database: dbt-tutorial
tables:
- name: customers
description: "顧客データ(raw)"
columns:
- name: id
tests:
- unique
- not_null
- name: orders
description: "注文データ(raw)"
columns:
- name: id
tests:
- unique
- not_null
ドキュメントにsourcesのdescriptionが表示されています。
もちろん、DAGにも反映されます。
sourcesの「鮮度」をチェックする
DWHにロードするデータは様々ですが、定期的に新しいデータがロードされ続けるテーブルが存在するケースもあります。そういうケースの場合、sourcesに定義しているテーブルのデータの「鮮度」をdbtでチェックすることができます。
下記のように記述します。
sources:
- name: jaffle
description: "BQに存在するパブリックデータからCTASして生成"
database: tamai-rei
tables:
- name: raw_customer
description: "顧客データ(raw)"
columns:
- name: id
tests:
- unique
- not_null
- name: raw_order
loaded_at_field: _etl_loaded_at
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
description: "注文データ(raw)"
columns:
- name: id
tests:
- unique
- not_null
raw_orderに新しいパラメータを追加しています。
loaded_at_fieldは、鮮度の計算に使用するための時間データが入ったカラムを指定します。(後々クエリを見ればわかりますが)dbtはこのカラムのMAX値をとり、現在時刻との差を計ることで、データの鮮度をチェックします。ちなみに、timestampかつUTCじゃないとダメなので、そうじゃない場合はこの部分でキャストをかけるクエリを記述する必要があります。(loaded_at_field: "convert_timezone('UTC', 'Asia/Tokyo', _etl_loaded_at)など)。
freshness以降で、「現在時刻とデータの時刻がこれだけ離れていたら警告/エラーを出す」という定義をします。例えば、warn_after: {count: 12, period: hour}は「最新データが現在時刻より12時間以上離れていたら警告を出す」という感じです。
この状態で、dbt source snapshot-freshnessを実行すると、sourcesの鮮度を計ることができます。
$ dbt source snapshot-freshness Running with dbt=0.18.1 Found 5 models, 11 tests, 0 snapshots, 0 analyses, 155 macros, 0 operations, 0 seed files, 2 sources 20:35:05 | Concurrency: 4 threads (target='learn') 20:35:05 | 20:35:06 | 1 of 1 START freshness of jaffle.raw_order........................... [RUN] 20:35:09 | 1 of 1 WARN freshness of jaffle.raw_order............................ [WARN in 3.98s] 20:35:09 | Done.
WARNが出たので、このデータは現在時刻(コマンド実行時刻)から12時間以上経過している、ということになります。
DWH側では、下記のクエリが実行されます。
単発で実行するというよりは、ジョブとして定期的に実行し続けて、データの鮮度に問題ないかどうかウォッチする、という使い方が想定されます。
おわりに
dbtがいかに「ELT」を意識したツールになっているかということをひしひしと感じました。まずロードありきということで、そのままロードしたデータがDWHにある前提で、そのローデータを「sources」として定義し、そこから各種データモデルに派生させていく、という思想がツール越しに伝わってきました。要するに、「sources」として定義するべきデータがDWHに無いという場合、それはELTのEとLが正しく完了していないということになります。dbtを使用する = ELTのEとLを正しく実施する(整備する)…ということになるでしょう。















