この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
さがらです。
dbtvaultのSnowflakeのサンプルデータ(TPC-H)を用いたWorked exmpleを試してみたので、その内容をまとめてみます。
より具体的には、以下のリンク先の内容に取り組んでいきます。
dbtvaultとは
改めてdbtvaultとは、dbtでData Vault 2.0を用いたモデリングを行うためのdbt packageです。
そして、このdbtvaultに採用されているData Vaultとは、ざっくり言うと「データの数や仕様に変動があっても変更履歴を柔軟に保持し、特定日時のデータをいつでも検索できるモデリング手法」です。
Data Vaultのメリット
ここでData Vaultのメリットについて、2つまとめておきます。
1つ目として、特定日時のデータをいつでも検索できます。
Data Vaultでは基本的に、Hubs、Links、Satellitesといった要素に分けてテーブルを管理していくのですが、いずれのテーブルも各レコードがロードされた日時を1つのカラムとして定義し、primary keyが新しい値を持つレコードが挿入されたり、既存のprimary keyのレコードが更新されたときに、1レコード追加される仕様となっています。この仕様によりData Vaultで構築したあとは基本的に全履歴を保持するため、いつでも特定日時のデータを検索できるようになり、過去のある時点のデータを用いた分析や監査対応を容易に可能にします。
2つ目として、複数のシステムから入ってくるデータの変更履歴を柔軟に保持できます。
システムやビジネスの変化によりスキーマ定義の変更が発生しても、Data Vaultを採用していると柔軟に対応可能です。
例えば、あるECサイトシステムのデータ基盤で、会員関連で新しい情報を付与した場合には、
- 新しい情報と既存の情報の関係性を確認
- 確認した関係性に応じて、会員のprimary keyを持つHubsに紐づくSatellitesテーブルを新しく増やしたり、既存のSatellitesテーブルに対してカラムを追加する、といった対応を行う(Staging Layerも一部書き換え)
といった工程を行えば、すぐに既存のデータ基盤に追加が可能です。
dbvaultの必要性
前述しましたData Vaultなのですが、良くも悪くも多くのテーブルに分かれてしまう特徴があります。それぞれのテーブルを構築するための処理は似た物が多いのですが、構築するテーブルの数が増えてしまうと処理の開発にも時間がかかり、少し処理を書き間違えてエラーが発生してしまうというトラブルに繋がる可能性はあります。
そんな時に、dbtvaultの出番です。
dbtvaultはdbtで使用できるpackageの1つなのですが、Hubs、Links、SatellitesなどのData Vaultの要素に該当するテーブルを構築するためのマクロが定義されているdbt packageです。
本記事で紹介するサンプルコードを見ていただくとわかるのですが、dbtvaultはメタデータを定義し、そのメタデータをdbtvaultのマクロの引数に指定するだけで、Data Vaultの各要素に当たるテーブルを構成することが出来ます。宣言的な記述だけでData Vaultでモデリングするための処理を開発できるので非常に楽です。
また、dbtの特徴でもあるmodel間のリネージやドキュメント化もありますので、構築するテーブルの数が多くなっても管理がしやすくなります。
こういった事由から、Data Vaultでモデリングする際にはdbtvaultを使用することで、効率よく安定した開発を行うことが出来ます。
参考リンク
まず、本記事ではSnowflakeのサンプルデータを用いたWorked exampleを試しますが、BigQueryを使用されている方も多いと思います。
BigQueryでdbtvaultを使用される場合には、下記のリンク先が参考になりますので、ぜひご覧ください。
また、適用しているモデリング手法であるData Vaultについては、1つの記事では詳細を書ききれない(私もまだ全て理解できていない)ので、私が参考にしたリンクも載せておきます。
Getting Started
ここからは、実際にdbtvaultのWorked exampleの内容に沿って進めていきます。
まずはGetting Startedです。
今回実施する内容について
SnowflakeのサンプルデータであるTPC-Hデータセットを用いて、Data Vault 2.0のモデリング手法を用いたデータウェアハウスの開発を、dbtvaultのマクロを使用して行っていきます。
より詳細には、以下の内容を取り組んでいきます。
- dbtプロジェクトのセットアップ
- TPC-Hデータセットの内容についてプロファイリングし、Data Vaultのアーキテクチャにマッピングする方法を探る
- Raw Staging Layerを作成する
- Raw Staging Layerに対してハッシュ化などの処理を行う(Hashed Staging Layer)
- Hashed Staging Layer上のmodelを用いてdbtvaultのマクロを実行し、Hubs、 Links、 Satellites、Transactional Linksに該当するテーブルを構築する
使用するdbtvaultのマクロについては別途チュートリアルもありますので、こちらも併せてご覧ください。(※本記事で試すWorked exampleとは別階層のページなので、ご注意ください。)
Project setup
次はProject setupです。
dbtプロジェクトや使用するSnowflakeアカウントでデータベースを作成し、準備をしていきます。
使用するWorked exampleのVersion
今回試すWorked exampleは定期的に更新されています。今回はVersion5.3を使用します。
こちらのリポジトリからはZipでのダウンロードしか出来なかったため、一度ダウンロードして解凍後に私のGitHub上のリモートリポジトリにプッシュした上で、対象のリモートリポジトリをdbt Cloudを繋げて使用していきます。
Snowflakeのトライアルアカウントのセットアップ
このWorked exampleはSnowflakeのトライアルアカウントを使用しているため、事前にセットアップしておきます。
Snowflakeのトライアルアカウントのセットアップが出来たら、データベースとウェアハウスをSYSADMINロールで作成します。以下のクエリを実行すればOKです。
-- データベースの作成
USE ROLE SYSADMIN;
CREATE DATABASE DV_PROTOTYPE_DB;
-- ウェアハウスの作成
USE ROLE SYSADMIN;
CREATE WAREHOUSE DV_PROTOTYPE_WH
WITH
WAREHOUSE_SIZE = 'XSMALL'
WAREHOUSE_TYPE = 'STANDARD'
AUTO_SUSPEND = 60 // 300もいらないため、60に変更
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 1
SCALING_POLICY = 'STANDARD';
使用するdbtバージョン
参考までに、dbt関係で使用するバージョンを記しておきます。
- dbt Cloud:v1.1.58.33
- dbt:1.1 ※Environmentsから指定
- dbtvault:0.8.3
dbtプロジェクトのセットアップ
前述しましたリポジトリとSnowflakeアカウントを紐づけて、dbtプロジェクトをセットアップします。プロジェクトのセットアップ手順はこちらの記事とほぼ同じ内容のため手順は割愛し、注意点だけ記します。
Snowflake Settings
では、前述したリソースを使うように設定してください。
リポジトリの設定では、事前にダウンロードしたZipファイルの内容を一度プッシュしたリモートリポジトリを設定します。
dbtプロジェクトの準備が出来たら、IDEからdbt deps
コマンドを実行して、dbtvaultをインストールします。
dbt_project.ymlの内容確認
dbt_project.yml
の内容を確認しておきます。
グローバル変数について
modelに関する設定の前に、vars
でグローバル変数が定義されています。
- LOAD_DATE
- 検証のためにデータ量を削減するため、raw_staging層ではこの変数で指定した日付のレコードに絞り込んでロードします
- TPCH_SIZE
- SnowflakeではTPC-Hのサンプルデータについて、データ量に併せて1、10、100、1000、10000と分かれてスキーマ用意されています
- デフォルトでは「10」が設定されています
vars:
load_date: '1992-01-08'
tpch_size: 10 #1, 10, 100, 1000, 10000
modelsセクションについて
models:
の中では、modelファイルを管理するサブフォルダごとにタグと適切なmaterializationを指定しています。
各modelファイル用のMetadataについて
各modelでdbtvaultを使うためのMetadata(マクロに渡す引数)については、各modelファイルで定義しています。
dbtvaultでの、各マクロと必要なMetadataについての詳細は、こちらのページもご覧ください。
Profiling TPC-H
次はProfiling TPC-Hです。
このWorked exampleでは、サンプルデータとしてSnowflakeのTPC-Hというデータセットを使用しています。
どのモデリング手法でも同様だと思いますが、Data Vaultを採用する場合であってもモデリング対象のデータの特徴を押さえていないとモデリングは出来ません。そのため、モデリングを適用するデータの特徴を押さえておくことはとても重要です。
TPC-Hのスキーマ構成
Date関係のフィールド
このTPC-Hのデータセットには、合計4つの日付フィールドがあります。
このうち3つはLINEITEM
テーブルにあります。
SHIPDATE
COMMITDATE
RECEIPTDATE
そして、もう1つはORDERSテーブルにあります。
ORDERDATE
実際に入ってくる日付の特性としては、卸売業のサンプルデータということもあり、ほぼ時系列でORDERDATE
➟SHIPDATE
➟RECEIPTDATE
➟COMMITDATE
の順で登録がされています。(一部のデータではCOMMITDATE
がこの順番に沿っていないケースがあるようです。)
Relationships
続いて、TPC-HデータセットのRelationshipsについて見ていきます。スキーマ上の各テーブルがどのフィールドで結合可能であるか、テーブル間のリレーションは1対1・n対1・1対n・n対nのどれに該当するか、関係性を確認することはData Vaultでもモデリングする際にも重要なステップです。
ORDERSテーブルとSUPPLIERテーブル
dbtvaultのページからの引用ですが、LINEITEM
テーブルとORDERS
テーブルをSUPPLIER
テーブルと内部結合して、注文とサプライヤの関係を調べ、各注文の異なるサプライヤを数えてリレーションを確認したようです。
この結果、ORDERS
テーブル対SUPPLIER
テーブルは、1対nの関係であることが判明したとのことです。1つの注文には、異なるサプライヤからの部品が含まれることがあるようです。
CUSTOMERSテーブルとORDERSテーブル
こちらもdbtvaultのページからの引用ですが、ORDERS
テーブルとCUSTOMER
テーブルの左外部結合を行ったところ、注文のない顧客が複数存在することが判明しているようです。
Transactional Linksのためのview生成
Transactional Linksを作るためには、トランザクション処理に関するテーブルデータが必要なのですが、元のTPC-Hのデータセットでは該当するテーブルがないようです。
そのため、今回はTransactional Linksの検証のためにraw_transactions
という擬似的にトランザクション処理のレコードを持つビューを用意します。このビューでは、ORDERS
テーブルに対してCUSTOMER
テーブルを左外部結合して以下のカラムを定義します。
- Customer key
- Order key
- Order date
- Total price
- 別名「Amount」、概要のOrderで支払われた合計金額を意味する
- Type
- 顧客に対する借方・貸方を意味する
CR
またはDR
を持つカラム
- 顧客に対する借方・貸方を意味する
- Transaction Date
- Order Dateに20日間を加えたもので、注文から20日後に支払われたことを意味します
- Transaction number
- Order key、Customer key、Order date を連結し、24桁の数値にした計算カラム(24桁に至らない場合は
0
で埋める)
- Order key、Customer key、Order date を連結し、24桁の数値にした計算カラム(24桁に至らない場合は
Creating the stage layers
次は、Creating the stage layersです。
その名の通りステージング層にあたるテーブルやビューを生成していきます。
作成するステージング層のイメージ
Raw Staging LayerとHashed Staging Layerの2種類を作成していきます。
Raw Staging Layerについて
まずRaw Staging Layerについて、どういったテーブルを作成していくか確認します。ver5.3のリポジトリでは、models/raw_stage
フォルダに3つの.sqlファイルが格納されています。
raw_orders
raw_orders.sql
では、TPC-Hの各テーブルをJOINして、注文に関する各カラムを持つワイドテーブルを作成します。
raw_orders.sql (クリックして展開)
SELECT
a.L_ORDERKEY AS ORDERKEY,
a.L_PARTKEY AS PARTKEY ,
a.L_SUPPKEY AS SUPPLIERKEY,
a.L_LINENUMBER AS LINENUMBER,
a.L_QUANTITY AS QUANTITY,
a.L_EXTENDEDPRICE AS EXTENDEDPRICE,
a.L_DISCOUNT AS DISCOUNT,
a.L_TAX AS TAX,
a.L_RETURNFLAG AS RETURNFLAG,
a.L_LINESTATUS AS LINESTATUS,
a.L_SHIPDATE AS SHIPDATE,
a.L_COMMITDATE AS COMMITDATE,
a.L_RECEIPTDATE AS RECEIPTDATE,
a.L_SHIPINSTRUCT AS SHIPINSTRUCT,
a.L_SHIPMODE AS SHIPMODE,
a.L_COMMENT AS LINE_COMMENT,
b.O_CUSTKEY AS CUSTOMERKEY,
b.O_ORDERSTATUS AS ORDERSTATUS,
b.O_TOTALPRICE AS TOTALPRICE,
b.O_ORDERDATE AS ORDERDATE,
b.O_ORDERPRIORITY AS ORDERPRIORITY,
b.O_CLERK AS CLERK,
b.O_SHIPPRIORITY AS SHIPPRIORITY,
b.O_COMMENT AS ORDER_COMMENT,
c.C_NAME AS CUSTOMER_NAME,
c.C_ADDRESS AS CUSTOMER_ADDRESS,
c.C_NATIONKEY AS CUSTOMER_NATION_KEY,
c.C_PHONE AS CUSTOMER_PHONE,
c.C_ACCTBAL AS CUSTOMER_ACCBAL,
c.C_MKTSEGMENT AS CUSTOMER_MKTSEGMENT,
c.C_COMMENT AS CUSTOMER_COMMENT,
d.N_NAME AS CUSTOMER_NATION_NAME,
d.N_REGIONKEY AS CUSTOMER_REGION_KEY,
d.N_COMMENT AS CUSTOMER_NATION_COMMENT,
e.R_NAME AS CUSTOMER_REGION_NAME,
e.R_COMMENT AS CUSTOMER_REGION_COMMENT
FROM {{ source('tpch_sample', 'ORDERS') }} AS b
LEFT JOIN {{ source('tpch_sample', 'LINEITEM') }} AS a
ON a.L_ORDERKEY = b.O_ORDERKEY
LEFT JOIN {{ source('tpch_sample', 'CUSTOMER') }} AS c
ON b.O_CUSTKEY = c.C_CUSTKEY
LEFT JOIN {{ source('tpch_sample', 'NATION') }} AS d
ON c.C_NATIONKEY = d.N_NATIONKEY
LEFT JOIN {{ source('tpch_sample', 'REGION') }} AS e
ON d.N_REGIONKEY = e.R_REGIONKEY
LEFT JOIN {{ source('tpch_sample', 'PART') }} AS g
ON a.L_PARTKEY = g.P_PARTKEY
LEFT JOIN {{ source('tpch_sample', 'SUPPLIER') }} AS h
ON a.L_SUPPKEY = h.S_SUPPKEY
LEFT JOIN {{ source('tpch_sample', 'NATION') }} AS j
ON h.S_NATIONKEY = j.N_NATIONKEY
LEFT JOIN {{ source('tpch_sample', 'REGION') }} AS k
ON j.N_REGIONKEY = k.R_REGIONKEY
WHERE b.O_ORDERDATE = TO_DATE('{{ var('load_date') }}')
raw_inventory
raw_inventory.sql
では、TPC-Hの各テーブルをJOINして、在庫に関するカラムを持つワイドテーブルを作成します。
raw_inventory.sql (クリックして展開)
SELECT
a.PS_PARTKEY AS PARTKEY,
a.PS_SUPPKEY AS SUPPLIERKEY,
a.PS_AVAILQTY AS AVAILQTY,
a.PS_SUPPLYCOST AS SUPPLYCOST,
a.PS_COMMENT AS PART_SUPPLY_COMMENT,
b.S_NAME AS SUPPLIER_NAME,
b.S_ADDRESS AS SUPPLIER_ADDRESS,
b.S_NATIONKEY AS SUPPLIER_NATION_KEY,
b.S_PHONE AS SUPPLIER_PHONE,
b.S_ACCTBAL AS SUPPLIER_ACCTBAL,
b.S_COMMENT AS SUPPLIER_COMMENT,
c.P_NAME AS PART_NAME,
c.P_MFGR AS PART_MFGR,
c.P_BRAND AS PART_BRAND,
c.P_TYPE AS PART_TYPE,
c.P_SIZE AS PART_SIZE,
c.P_CONTAINER AS PART_CONTAINER,
c.P_RETAILPRICE AS PART_RETAILPRICE,
c.P_COMMENT AS PART_COMMENT,
d.N_NAME AS SUPPLIER_NATION_NAME,
d.N_COMMENT AS SUPPLIER_NATION_COMMENT,
d.N_REGIONKEY AS SUPPLIER_REGION_KEY,
e.R_NAME AS SUPPLIER_REGION_NAME,
e.R_COMMENT AS SUPPLIER_REGION_COMMENT
FROM {{ source('tpch_sample', 'PARTSUPP') }} AS a
LEFT JOIN {{ source('tpch_sample', 'SUPPLIER') }} AS b
ON a.PS_SUPPKEY = b.S_SUPPKEY
LEFT JOIN {{ source('tpch_sample', 'PART') }} AS c
ON a.PS_PARTKEY = c.P_PARTKEY
LEFT JOIN {{ source('tpch_sample', 'NATION') }} AS d
ON b.S_NATIONKEY = d.N_NATIONKEY
LEFT JOIN {{ source('tpch_sample', 'REGION') }} AS e
ON d.N_REGIONKEY = e.R_REGIONKEY
JOIN {{ ref('raw_orders') }} AS f
ON a.PS_PARTKEY = f.PARTKEY AND a.PS_SUPPKEY=f.SUPPLIERKEY
ORDER BY a.PS_PARTKEY, a.PS_SUPPKEY
raw_transactions
raw_transactions.sql
では、前章「Transactional Linksのためのview生成」で触れたように、Transactional Linksを作成するために、顧客からの注文に対していくつかの計算を行ったトランザクションに関するレコードを保持します。
raw_transactions.sql (クリックして展開)
SELECT
b.O_ORDERKEY AS ORDER_ID,
b.O_CUSTKEY AS CUSTOMER_ID,
b.O_ORDERDATE AS ORDER_DATE,
DATEADD(DAY, 20, b.O_ORDERDATE) AS TRANSACTION_DATE,
TO_NUMBER(RPAD(CONCAT(b.O_ORDERKEY, b.O_CUSTKEY, TO_CHAR(b.O_ORDERDATE, 'YYYYMMDD')), 24, '0')) AS TRANSACTION_NUMBER,
b.O_TOTALPRICE AS AMOUNT,
CAST(
CASE ABS(MOD(RANDOM(1), 2)) + 1
WHEN 1 THEN 'DR'
WHEN 2 THEN 'CR'
END AS VARCHAR(2)) AS TYPE
FROM {{ source('tpch_sample', 'ORDERS') }} AS b
LEFT JOIN {{ source('tpch_sample', 'CUSTOMER') }} AS c
ON b.O_CUSTKEY = c.C_CUSTKEY
WHERE b.O_ORDERDATE = TO_DATE('{{ var('load_date') }}')
Raw Staging Layerの生成処理の実行
続いて、前述した3つのモデルを実行し、Raw Staging Layerを生成します。
このWorked exampleのリポジトリでは、dbt_project.yml
において各modelsのサブフォルダごとにタグ付けがされています。そのため、Raw Staging Layerを生成するには、以下のコマンドを実行すればOKです。
dbt run -s tag:raw
materialized: view
としているため、すぐに終わります。
Hashed Staging Layerについて
先程Raw Staging Layerとして3つviewを生成しましたが、これだけではHubs、Links、SatellitesといったRaw Vault層の生成は出来ません。
具体的には、主キーのHash値、Hash Diff、既存カラムを用いたデータソース名やレコードの有効開始日などを定義するderived columns、などを新しくカラムとして追加する必要があります。これらは既存レコードと追加予定レコードの重複を避けることや、レコードの有効日を持つことでいつでもある日時点のデータを取得することに役立ちます。
そして、これらのカラムを生成するために、dbtvaultでは専用のマクロが用意されています。このマクロを使って、Raw Vault層向けの変換を行うステージング層となるHashed Staging Layerを生成していきます。
Hashed Staging Layerで定義するmodelについて
Hashed Staging Layerでは、v_stg_orders.sql
、v_stg_inventory.sql
、v_stg_transactions.sql
の3つのmodelを定義します。
それぞれRaw Staging Layerのraw_orders.sql
とraw_inventory.sql
、raw_transactions.sql
をソースとして使用した上で、viewとして生成されます。各modelでは前述した通り、主キーのHash値、Hash Diff、既存カラムを用いたデータソース名やレコードの有効開始日などを定義するderived columns、などをカラムとして追加します。
下記に各modelのコードを記載しておきますが、中身の説明は次章で行います。
v_stg_orders.sql (クリックして展開)
{%- set yaml_metadata -%}
source_model: 'raw_orders'
derived_columns:
CUSTOMER_KEY: 'CUSTOMERKEY'
NATION_KEY: 'CUSTOMER_NATION_KEY'
REGION_KEY: 'CUSTOMER_REGION_KEY'
RECORD_SOURCE: '!TPCH-ORDERS'
EFFECTIVE_FROM: 'ORDERDATE'
hashed_columns:
CUSTOMER_PK: 'CUSTOMER_KEY'
LINK_CUSTOMER_NATION_PK:
- 'CUSTOMER_KEY'
- 'CUSTOMER_NATION_KEY'
CUSTOMER_NATION_PK: 'CUSTOMER_NATION_KEY'
CUSTOMER_REGION_PK: 'CUSTOMER_REGION_KEY'
NATION_PK: 'CUSTOMER_NATION_KEY'
REGION_PK: 'CUSTOMER_REGION_KEY'
NATION_REGION_PK:
- 'CUSTOMER_NATION_KEY'
- 'CUSTOMER_REGION_KEY'
ORDER_PK: 'ORDERKEY'
ORDER_CUSTOMER_PK:
- 'CUSTOMER_KEY'
- 'ORDERKEY'
LINEITEM_PK:
- 'ORDERKEY'
- 'LINENUMBER'
LINK_LINEITEM_ORDER_PK:
- 'ORDERKEY'
- 'ORDERKEY'
- 'LINENUMBER'
PART_PK: 'PARTKEY'
SUPPLIER_PK: 'SUPPLIERKEY'
INVENTORY_PK:
- 'PARTKEY'
- 'SUPPLIERKEY'
INVENTORY_ALLOCATION_PK:
- 'LINENUMBER'
- 'PARTKEY'
- 'SUPPLIERKEY'
CUSTOMER_HASHDIFF:
is_hashdiff: true
columns:
- 'CUSTOMER_KEY'
- 'CUSTOMER_NAME'
- 'CUSTOMER_ADDRESS'
- 'CUSTOMER_PHONE'
- 'CUSTOMER_ACCBAL'
- 'CUSTOMER_MKTSEGMENT'
- 'CUSTOMER_COMMENT'
CUSTOMER_NATION_HASHDIFF:
is_hashdiff: true
columns:
- 'CUSTOMER_NATION_NAME'
- 'CUSTOMER_NATION_COMMENT'
- 'CUSTOMER_NATION_KEY'
CUSTOMER_REGION_HASHDIFF:
is_hashdiff: true
columns:
- 'CUSTOMER_REGION_NAME'
- 'CUSTOMER_REGION_COMMENT'
- 'CUSTOMER_REGION_KEY'
LINEITEM_HASHDIFF:
is_hashdiff: true
columns:
- 'ORDERKEY'
- 'LINENUMBER'
- 'COMMITDATE'
- 'DISCOUNT'
- 'EXTENDEDPRICE'
- 'LINESTATUS'
- 'LINE_COMMENT'
- 'QUANTITY'
- 'RECEIPTDATE'
- 'RETURNFLAG'
- 'SHIPDATE'
- 'SHIPINSTRUCT'
- 'SHIPMODE'
- 'TAX'
ORDER_HASHDIFF:
is_hashdiff: true
columns:
- 'ORDERKEY'
- 'CLERK'
- 'ORDERDATE'
- 'ORDERPRIORITY'
- 'ORDERSTATUS'
- 'ORDER_COMMENT'
- 'SHIPPRIORITY'
- 'TOTALPRICE'
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{% set source_model = metadata_dict['source_model'] %}
{% set derived_columns = metadata_dict['derived_columns'] %}
{% set hashed_columns = metadata_dict['hashed_columns'] %}
WITH staging AS (
{{ dbtvault.stage(include_source_columns=true,
source_model=source_model,
derived_columns=derived_columns,
hashed_columns=hashed_columns,
ranked_columns=none) }}
)
SELECT *,
TO_DATE('{{ var('load_date') }}') AS LOAD_DATE
FROM staging
v_stg_inventory.sql (クリックして展開)
{%- set yaml_metadata -%}
source_model: 'raw_inventory'
derived_columns:
NATION_KEY: 'SUPPLIER_NATION_KEY'
REGION_KEY: 'SUPPLIER_REGION_KEY'
RECORD_SOURCE: '!TPCH-INVENTORY'
hashed_columns:
SUPPLIER_PK: 'SUPPLIERKEY'
SUPPLIER_NATION_PK: 'SUPPLIER_NATION_KEY'
SUPPLIER_REGION_PK: 'SUPPLIER_REGION_KEY'
REGION_PK: 'SUPPLIER_REGION_KEY'
NATION_PK: 'SUPPLIER_NATION_KEY'
NATION_REGION_PK:
- 'SUPPLIER_NATION_KEY'
- 'SUPPLIER_REGION_KEY'
LINK_SUPPLIER_NATION_PK:
- 'SUPPLIERKEY'
- 'SUPPLIER_NATION_KEY'
PART_PK: 'PARTKEY'
INVENTORY_PK:
- 'PARTKEY'
- 'SUPPLIERKEY'
SUPPLIER_HASHDIFF:
is_hashdiff: true
columns:
- 'SUPPLIERKEY'
- 'SUPPLIER_ACCTBAL'
- 'SUPPLIER_ADDRESS'
- 'SUPPLIER_PHONE'
- 'SUPPLIER_COMMENT'
- 'SUPPLIER_NAME'
PART_HASHDIFF:
is_hashdiff: true
columns:
- 'PARTKEY'
- 'PART_BRAND'
- 'PART_COMMENT'
- 'PART_CONTAINER'
- 'PART_MFGR'
- 'PART_NAME'
- 'PART_RETAILPRICE'
- 'PART_SIZE'
- 'PART_TYPE'
SUPPLIER_REGION_HASHDIFF:
is_hashdiff: true
columns:
- 'SUPPLIER_REGION_KEY'
- 'SUPPLIER_REGION_COMMENT'
- 'SUPPLIER_REGION_NAME'
SUPPLIER_NATION_HASHDIFF:
is_hashdiff: true
columns:
- 'SUPPLIER_NATION_KEY'
- 'SUPPLIER_NATION_COMMENT'
- 'SUPPLIER_NATION_NAME'
INVENTORY_HASHDIFF:
is_hashdiff: true
columns:
- 'PARTKEY'
- 'SUPPLIERKEY'
- 'AVAILQTY'
- 'SUPPLYCOST'
- 'PART_SUPPLY_COMMENT'
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{% set source_model = metadata_dict['source_model'] %}
{% set derived_columns = metadata_dict['derived_columns'] %}
{% set hashed_columns = metadata_dict['hashed_columns'] %}
WITH staging AS (
{{ dbtvault.stage(include_source_columns=true,
source_model=source_model,
derived_columns=derived_columns,
hashed_columns=hashed_columns,
ranked_columns=none) }}
)
SELECT *,
TO_DATE('{{ var('load_date') }}') AS LOAD_DATE,
TO_DATE('{{ var('load_date') }}') AS EFFECTIVE_FROM
FROM staging
v_stg_transactions.sql (クリックして展開)
{%- set yaml_metadata -%}
source_model: 'raw_transactions'
derived_columns:
RECORD_SOURCE: '!RAW_TRANSACTIONS'
LOAD_DATE: DATEADD(DAY, 1, TRANSACTION_DATE)
EFFECTIVE_FROM: 'TRANSACTION_DATE'
hashed_columns:
TRANSACTION_PK:
- 'CUSTOMER_ID'
- 'TRANSACTION_NUMBER'
CUSTOMER_PK: 'CUSTOMER_ID'
ORDER_PK: 'ORDER_ID'
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{% set source_model = metadata_dict['source_model'] %}
{% set derived_columns = metadata_dict['derived_columns'] %}
{% set hashed_columns = metadata_dict['hashed_columns'] %}
{{ dbtvault.stage(include_source_columns=true,
source_model=source_model,
derived_columns=derived_columns,
hashed_columns=hashed_columns,
ranked_columns=none) }}
Hashed Staging Layer向けのマクロを用いたmodelの定義
続いて、Hashed Staging Layerを生成するためのdbtvaultのマクロを用いて、各modelを定義していきます。
まず、Hashed Staging Layerを生成するためのマクロですが、dbtvault.stage
というマクロです。このマクロに対して必要なメタデータを提供することでソースに設定したテーブルから、主キーのHash値、Hash Diff、derived columns、を生成することが出来ます。
{{ dbtvault.stage(include_source_columns=true,
source_model=source_model,
derived_columns=derived_columns,
hashed_columns=hashed_columns,
ranked_columns=none) }}
続いて、v_stg_transactions.sql
の内容を元に、このマクロをどうやって使うのか見ていきます。
{%- set yaml_metadata -%}
source_model: 'raw_transactions'
derived_columns:
RECORD_SOURCE: '!RAW_TRANSACTIONS'
LOAD_DATE: DATEADD(DAY, 1, TRANSACTION_DATE)
EFFECTIVE_FROM: 'TRANSACTION_DATE'
hashed_columns:
TRANSACTION_HK:
- 'CUSTOMER_ID'
- 'TRANSACTION_NUMBER'
CUSTOMER_HK: 'CUSTOMER_ID'
ORDER_HK: 'ORDER_ID'
{%- endset -%}
{% set metadata_dict = fromyaml(yaml_metadata) %}
{% set source_model = metadata_dict['source_model'] %}
{% set derived_columns = metadata_dict['derived_columns'] %}
{% set hashed_columns = metadata_dict['hashed_columns'] %}
{{ dbtvault.stage(include_source_columns=true,
source_model=source_model,
derived_columns=derived_columns,
hashed_columns=hashed_columns,
ranked_columns=none) }}
まず、dbtvault.stage
に渡すためのメタデータのセットについて、{%- set yaml_metadata -%}
としてyamlで定義します。この上で、このyaml内で各メタデータをどう定義しているかを見ていきます。
source_model
では、どのmodelを使用してHashed Staging Layerを構築するのかを定義するため、Raw Staging Layerで該当するmodel名を入れます。
derived_columns
では、Raw Vault層に必要なカラムを定義します。v_stg_transactions.sql
においては、以下3つのカラムを定義しています。
RECORD_SOURCE
:ソースデータの名称を入れます。!RAW_TRANSACTIONS
のように、1文字目に!
を入れることで、任意の文字列を定義できます。LOAD_DATE
:対象のレコードがロードされた日付を持つカラムを指定します。※このv_stg_transactions.sql
ではDATEADD関数を使用していますが、これは「トランザクション処理の翌日をロードされた日とする」と疑似的に指定しているだけです。EFFECTIVE_FROM
:あるレコードにおけるビジネス上の有効期限の開始日を保持するカラムを指定します。
derived_columns:
RECORD_SOURCE: '!RAW_TRANSACTIONS'
LOAD_DATE: DATEADD(DAY, 1, TRANSACTION_DATE)
EFFECTIVE_FROM: 'TRANSACTION_DATE'
hashed_columns
では、主にハッシュ化を行うカラムを定義していきます。
v_stg_transactions.sql
においては、以下3つのカラムを定義しています。
TRANSACTION_HK
:raw_transactions.sql
に存在するCUSTOMER_ID
とTRANSACTION_NUMBER
を連結した、新しいハッシュ化したカラムとして定義しています。CUSTOMER_HK
とORDER_HK
:どちらも単一のカラムをHash化しているので、対象のカラム名だけを定義しています。
hashed_columns:
TRANSACTION_HK:
- 'CUSTOMER_ID'
- 'TRANSACTION_NUMBER'
CUSTOMER_HK: 'CUSTOMER_ID'
ORDER_HK: 'ORDER_ID'
ここまででv_stg_transactions.sql
におけるメタデータのセットの説明としては以上となります。
もう一つhashed_columns
について、Hash Diffの説明のためにv_stg_orders.sql
で定義されている内容も見ていきます。
CUSTOMER_HASHDIFF
:columns
には対象のテーブルでレコードの差分を適切に検知できるカラムを指定します。また、is_hashdiff: true
とすることで、指定したカラムにおいて自動的にアルファソートが適用された上でHash化されます。
hashed_columns:
CUSTOMER_HASHDIFF:
is_hashdiff: true
columns:
- 'CUSTOMERKEY'
- 'CUSTOMER_NAME'
- 'CUSTOMER_ADDRESS'
- 'CUSTOMER_PHONE'
- 'CUSTOMER_ACCBAL'
- 'CUSTOMER_MKTSEGMENT'
- 'CUSTOMER_COMMENT'
Hash周りのプラクティスについては、dbtvaultもドキュメントを用意しているため、こちらも併せてご覧ください。
Hashed Staging Layerの生成処理の実行
前述した3つのモデルを実行し、Hashed Staging Layerを生成します。
このWorked exampleのリポジトリでは、dbt_project.yml
において各modelsのサブフォルダごとにタグ付けがされています。そのため、Hashed Staging Layerを生成するには、以下のコマンドを実行すればOKです。
dbt run -s +tag:stage
materialized: view
としているため、すぐに終わります。
+
をつけているため、上流に位置するRaw Staging Layerのviewも改めて生成されました。
ここで、dbtvaultのマクロを使ったHashed Staging Layerでは、Raw Staging Layerとどういった差があるのかを、raw_transactions.sql
とv_stg_transactions.sql
で生成されたviewの中身を見て確認してみます。
まずは、raw_transactions.sql
により生成されたviewの中身は下図のようになっています。
続いて、dbtvault.stage
を適用したraw_transactions.sql
により生成されたviewの中身を見てみます。すると、raw_transactions.sql
内でメタデータとして設定した各カラムが追加されていることがわかります。
Loading the vault
これまでの工程で、Raw Vault層の生成に必要なカラムも追加したviewがステージング層に整いました。これを用いて、Raw Vault層へのロード処理を行っていきます。
Hubs
まずは、Hubsです。基本的に、モデリング対象のテーブル群に存在するPrimary Key1つごとに、Hubsのテーブルを生成します。
Data VaultにおいてHubsは、Data Vaultのモデリングで中核となる構成要素の1つです。一般的には以下の4つのカラムで構成されますが、場合によってはnatural keyが複数列必要な場合もあるため、これ以上のカラムを持つこともあります。
- primary/hash key(またはsurrogate key)
- natural key(business keyとも呼ばれる)をハッシュ化した値を保持します
- natural key
- 顧客IDや注文番号など、レコードを一意に識別するための情報(複数列の場合もある)
- load date または load date timestamp
- 各レコードが最初にロードされた日時
- source for the record
- 各レコードがどこから生まれたデータを示す
Hubsのデプロイ
このWorked exampleのリポジトリでは、dbt_project.yml
において各modelsのサブフォルダごとにタグ付けがされています。そのため、Hubsに該当するオブジェクトを生成するには、以下のコマンドを実行すればOKです。
dbt run -s tag:hub
models/raw_vault
フォルダ全体にmaterialized: incremental
が設定されているためテーブルが作成されますが、XSのウェアハウスでも数秒で終わるかと思います。
ここで改めて、Hubsに該当するテーブルを生成するdbtvaultのマクロdbtvault.hub
の使い方について確認してみます。
まず、dbtvault.hub
では、5つのメタデータを使用します。
source_model
- このmodelで対象とするprimary keyとnatural keyを持つmodel名を指定します(あるprimary keyが複数のmodelで定義されている場合は、複数のmodelを指定可能)
src_pk
source_model
で指定したmodelのうち、対象のHubssテーブルにおいてprimary keyとなるカラムを指定します
src_nk
source_model
で指定したmodelのうち、対象のHubssテーブルにおいてnatural keyとなるカラムを指定します(natural keyが複数のカラムで構成されている場合は、複数のカラムを指定可能)
src_ldts
source_model
で指定したmodelのうち、対象のHubssテーブルにおいて各レコードのロード日時を持つカラムを指定します
src_source
source_model
で指定したmodelのうち、対象のHubssテーブルにおいて各レコードのソースデータ名を持つカラムを指定します
この上で、2つのmodelを例に、どういった記述を行って、どんなデータが作られているかを見てみます。
hub_lineitem.sql
では、natural keyが2つあるため、src_nk = ["LINENUMBER", "ORDERKEY"]
といった形でメタデータを定義しています。
{%- set source_model = "v_stg_orders" -%}
{%- set src_pk = "LINEITEM_PK" -%}
{%- set src_nk = ["LINENUMBER", "ORDERKEY"] -%}
{%- set src_ldts = "LOAD_DATE" -%}
{%- set src_source = "RECORD_SOURCE" -%}
{{ dbtvault.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts,
src_source=src_source, source_model=source_model) }}
hub_nation.sql
では、対象とするprimary keyNATION_PK
が、v_stg_orders.sql
とv_stg_inventory.sql
それぞれで定義されているため、source_model = ["v_stg_orders", "v_stg_inventory"]
としてメタデータを定義しています。
{%- set source_model = ["v_stg_orders", "v_stg_inventory"] -%}
{%- set src_pk = "NATION_PK" -%}
{%- set src_nk = "NATION_KEY" -%}
{%- set src_ldts = "LOAD_DATE" -%}
{%- set src_source = "RECORD_SOURCE" -%}
{{ dbtvault.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts,
src_source=src_source, source_model=source_model) }}
Links
次はLinksです。
Linksは、Data Vaultのもう一つの基本要素です。Linksでは、Hubs間をつなげるためのテーブルを生成します。
Linksは以下の要素を持ちます。
- primary key
- Linksテーブルにおけるprimary key。このLinksがつなげるHubsのnatural keyを連結しハッシュ化したもの
- Foreign keys holding the primary key for each Hub referenced in the Link
- LinksからHubsを参照するための、各Hubsのprimary key。このLinksが参照するHubsの数に応じて、2つ以上のprimary keyを1つ1つカラムとして保持する
- load date または load date timestamp
- 各レコードが最初にロードされた日時
- source for the record
- 各レコードがどこから生まれたデータを示す
Linksのデプロイ
このWorked exampleのリポジトリでは、dbt_project.yml
において各modelsのサブフォルダごとにタグ付けがされています。そのため、Linksに該当するオブジェクトを生成するには、以下のコマンドを実行すればOKです。
dbt run -s tag:link
models/raw_vault
フォルダ全体にmaterialized: incremental
が設定されているためテーブルが作成されますが、XSのウェアハウスでも数十秒で終わるかと思います。
ここで改めて、Linksに該当するテーブルを生成するdbtvaultのマクロdbtvault.link
の使い方について確認してみます。
まず、dbtvault.link
では、5つのメタデータを使用します。
source_model
- このmodelの生成元となるmodel名を指定します(このLinksが対象とするHubsが複数のmodelから定義されている場合は、複数のmodelを指定可能)
src_pk
source_model
で指定したmodelのうち、対象のLinksテーブルにおけるprimary keyとなるカラムを指定します
src_fk
source_model
で指定したmodelのうち、このLinksテーブルが繋がりを示すHubsのprimary keyとなるカラムを指定します(複数指定可能)
src_ldts
source_model
で指定したmodelのうち、対象のLinksテーブルの各レコードのロード日時を持つカラムを指定します
src_source
source_model
で指定したmodelのうち、対象のLinksテーブルの各レコードのソースデータ名を持つカラムを指定します
この上で、1つのmodelを例に、どういった記述を行って、どんなデータが作られているかを見てみます。
link_nation_region.sql
は、hub_nation.sql
とhub_region.sql
により生成されるHubsのテーブルを繋げるLinksテーブルを生成します。src_pk
には、事前にHashed Staging Layerに該当するv_stg_orders.sql
とv_stg_inventory.sql
でhashed_columns
として定義していたNATION_REGION_PK
を指定します。src_fk
には、対象のHubsとなるhub_nation.sql
とhub_region.sql
のprimary keyであるNATION_PK
とREGION_PK
を指定します。
{%- set source_model = ["v_stg_orders", "v_stg_inventory"] -%}
{%- set src_pk = "NATION_REGION_PK" -%}
{%- set src_fk = ["NATION_PK", "REGION_PK"] -%}
{%- set src_ldts = "LOAD_DATE" -%}
{%- set src_source = "RECORD_SOURCE" -%}
{{ dbtvault.link(src_pk=src_pk, src_fk=src_fk, src_ldts=src_ldts,
src_source=src_source, source_model=source_model) }}
Satellites
次に、Satellitesです。前述のHubsとLinksではキー情報しか持っていませんでしたが、Satellitesでは、キー以外のカラム全てを保持し、Slowly Changing Dimensionの方式で変更履歴まで保持します。
Satellitesは以下の要素を持ちます。
- primary key
- Satellitesテーブルにおけるprimary key。基本的にはnatural keyをハッシュ化した値
- Hash Diff
- 後述のpayloadとprimary keyを連結してハッシュ化した値。これにより、各レコードの値が1つでも変わったらHash Diffの値が変わることになるため、レコードの変更を検知出来ます
- payload
- キー情報ではない、具体的なデータを保持する。例えば、顧客データの場合には顧客キー以外の、氏名、生年月日、国籍、年齢、性別、などが該当する
- effectivity date
- 一般的に
EFFECTIVE_FROM
というカラム名で定義されるもの。各レコードの有効開始日を保持します
- 一般的に
- load date または load date timestamp
- 各レコードが最初にロードされた日時
- source for the record
- 各レコードがどこから生まれたデータを示す
ここで、「effectivity date」と「load date」どちらも持つ必要があるの?と疑問に感じた方もいると思います。
dbtvaultのドキュメントからの引用ですが、「load date」は、レコードがデータベースにロードされた時刻です。「effectivity date」は、ビジネス上のイベントが発生してからレコードがロードのためにデータベースに到着するまでにバッチ処理の遅延がある場合、異なる値を保持する可能性があります。両方のカラムを持つことで、データ的にロードされたとき、ビジネス上の有効となったとき、をそれぞれ知ることが出来ます。
Satellitesのデプロイ
このWorked exampleのリポジトリでは、dbt_project.yml
において各modelsのサブフォルダごとにタグ付けがされています。そのため、Satellitesに該当するオブジェクトを生成するには、以下のコマンドを実行すればOKです。
dbt run -s tag:satellite
models/raw_vault
フォルダ全体にmaterialized: incremental
が設定されているためテーブルが作成されますが、XSのウェアハウスでも数十秒で終わるかと思います。
ここで改めて、Linksに該当するテーブルを生成するdbtvaultのマクロdbtvault.sat
の使い方について確認してみます。
まず、dbtvault.sat
では、7つのメタデータを使用します。
source_model
- このmodelの生成元となるmodelを指定します。基本的には、作成するSatellitesに紐づくHubsの生成元となっているmodelを指定します
src_pk
source_model
で指定したmodelのうち、対象のSatellitesテーブルにおいてprimary keyとなるカラムを指定します
src_hashdiff
source_model
で指定したmodelのうち、対象のSatellitesテーブルのレコード変更を検知できるHash Diffのカラムを指定します
src_payload
source_model
で指定したmodelのうち、対象のSatellitesテーブルのキー以外の具体的なデータを持つカラムを指定します(複数指定可能)
src_eff
source_model
で指定したmodelのうち、対象のSatellitesテーブルの各レコードの有効開始日を持つカラムを指定します
src_ldts
source_model
で指定したmodelのうち、対象のSatellitesテーブルの各レコードのロード日時を持つカラムを指定します
src_source
source_model
で指定したmodelのうち、対象のSatellitesテーブルの各レコードのソースデータ名を持つカラムを指定します
この上で、1つのmodelを例に、どういった記述を行って、どんなデータが作られているかを見てみます。
sat_order_cust_nation_details.sql
は、CUSTOMER
テーブルのNATIONKEY
に紐づくNATION
テーブルの変更履歴を保持するためのSatellitesテーブルを生成します。
src_hashdiff
には、v_stg_orders.sql
で定義したCUSTOMER_REGION_HASHDIFF
を指定します。CUSTOMER_REGION_HASHDIFF
は、NATION
テーブルのprimary keyであるCUSTOMER_NATION_KEY
と、具体的なデータを保持するカラムであるCUSTOMER_NATION_NAME
とCUSTOMER_NATION_COMMENT
、この3つのカラムを連結した値をHash化としたものです。
{%- set source_model = "v_stg_orders" -%}
{%- set src_pk = "CUSTOMER_PK" -%}
{%- set src_hashdiff = "CUSTOMER_NATION_HASHDIFF" -%}
{%- set src_payload = ["CUSTOMER_NATION_NAME", "CUSTOMER_NATION_COMMENT"] -%}
{%- set src_eff = "EFFECTIVE_FROM" -%}
{%- set src_ldts = "LOAD_DATE" -%}
{%- set src_source = "RECORD_SOURCE" -%}
{{ dbtvault.sat(src_pk=src_pk, src_hashdiff=src_hashdiff,
src_payload=src_payload, src_eff=src_eff,
src_ldts=src_ldts, src_source=src_source,
source_model=source_model) }}
Transactional Links
最後に、Transactional Linksです。
Transactional Linksは、トランザクション処理されたデータやセンサーデータなど、一度処理が完了したら基本的に更新しないデータを管理するための概念です。Linksと同じレベルの概念になりますが、キー情報だけでなくトランザクションに関するデータも持つことが出来る概念です。
Transactional Linksは以下の要素を持ちます。
- primary key
- Transactional Linksテーブルにおけるprimary key。基本的には対象のトランザクション処理のIDやnumberを持つカラムが該当する。もしこういったIDやnumberがない場合には、外部キーを連結しハッシュ化したものをprimary keyとする
- Foreign keys holding the primary key for each Hub referenced in the Transactional Link
- Transaction Linksから参照する各Hubsのprimary keyを保持する(参照するHubsの数に応じて、2つ以上指定可能)
- payload
- キー情報ではない、金額、種類、日付、ハッシュ化されていない取引番号など、トランザクションそのものに関するデータ
- effectivity date
- 一般的に
EFFECTIVE_FROM
というカラム名で定義されるもの。各レコードで実際にトランザクションが行われた日時を保持する
- 一般的に
- load date または load date timestamp
- 各レコードが最初にロードされた日時
- source for the record
- 各レコードがどこから生まれたデータを示す
Transactional Linksのデプロイ
このWorked exampleのリポジトリでは、dbt_project.yml
において各modelsのサブフォルダごとにタグ付けがされています。そのため、Transactional Linksに該当するオブジェクトを生成するには、以下のコマンドを実行すればOKです。
dbt run -s tag:t_link
models/raw_vault
フォルダ全体にmaterialized: incremental
が設定されているためテーブルが作成されますが、XSのウェアハウスでも数秒で終わるかと思います。
ここで改めて、Linksに該当するテーブルを生成するdbtvaultのマクロdbtvault.t_link
の使い方について確認してみます。
まず、dbtvault.t_link
では、7つのメタデータを使用します。必要なメタデータを見ると、dbtvault.link
とdbtvault.sat
を組み合わせたイメージですね。
source_model
- このmodelの生成元となるmodelを指定します。基本的には、作成するTransactional Linkの元となるHashed Staging Layerのmodelを指定します。
src_pk
source_model
で指定したmodelのうち、対象のTransactional Linksテーブルにおいてprimary keyとなるカラムを指定します
src_fk
source_model
で指定したmodelのうち、対象のTransactional LinksテーブルとつなげるHubsのprimary keyであるカラムを指定します
src_payload
source_model
で指定したmodelのうち、対象のTransactional Linksテーブルのキー以外の具体的なデータを持つカラムを指定します(複数指定可能)
src_eff
source_model
で指定したmodelのうち、対象のTransactional Linksテーブルの各レコードの有効開始日を持つカラムを指定します
src_ldts
source_model
で指定したmodelのうち、対象のTransactional Linksテーブルの各レコードのロード日時を持つカラムを指定します
src_source
source_model
で指定したmodelのうち、対象のTransactional Linksテーブルの各レコードのソースデータ名を持つカラムを指定します
この上で、1つのmodelを例に、どういった記述を行って、どんなデータが作られているかを見てみます。
t_link_transactions.sql
は、raw_transactions.sql
に対して各種ハッシュ値を追加したv_stg_transactions.sqlを
ベースにして、hub_customer.sql
とhub_order.sql
の2つのHubsとつながるTransactional Linksのテーブルを生成します。
{%- set source_model = "v_stg_transactions" -%}
{%- set src_pk = "TRANSACTION_PK" -%}
{%- set src_fk = ["CUSTOMER_PK", "ORDER_PK"] -%}
{%- set src_payload = ["TRANSACTION_NUMBER", "TRANSACTION_DATE", "TYPE", "AMOUNT"] -%}
{%- set src_eff = "EFFECTIVE_FROM" -%}
{%- set src_ldts = "LOAD_DATE" -%}
{%- set src_source = "RECORD_SOURCE" -%}
{{ dbtvault.t_link(src_pk=src_pk, src_fk=src_fk, src_ldts=src_ldts,
src_payload=src_payload, src_eff=src_eff,
src_source=src_source, source_model=source_model) }}
load_dateを変えて挙動を確かめてみる
最後にload_date
を1992-01-08
から1992-01-09
に変えて、どういった変更があるかを見てみます。
この上で、dbt run
を実行します。全てのmodelを実行するため、XSのウェアハウスの場合には1分程度かかると思います。
例えば、hub_customer.sql
を見ると、hub_customer__dbt_tmp
というtemporary tableを作成したあと、hub_customer
テーブルにinsertされているのがわかります。これは、models/raw_vault
フォルダ内のmodelは全てmaterialized: incremental
が指定されており、2回目以降の実行は差分がinsertされるためです。
dbtvaultで構築したスキーマに対するサンプルクエリ
これで一通りのモデリングを終えましたが、たくさんテーブルがあってどうクエリを書けばいいのかわかりづらいところもあると思います。
そこで私の自作ですが、Worked Exampleで生成されたテーブルを用いて、どういったクエリを書けば目的のデータが得られるか、例を載せておきます。
- 元の
PART
テーブルについて、最新のレコードがほしい- 記述するクエリ:partについて、HubsとSatellitesをJOINする。各Satellitesは、primary keyに対して最新の
load_date
でフィルタリングする
- 記述するクエリ:partについて、HubsとSatellitesをJOINする。各Satellitesは、primary keyに対して最新の
// 「hub_part」に対して、「部品の詳細情報を持つSatellites」をJOINする
select
hub_part.partkey,
sat_inv_part_details.part_name,
sat_inv_part_details.part_mfgr,
sat_inv_part_details.part_brand,
sat_inv_part_details.part_type,
sat_inv_part_details.part_size,
sat_inv_part_details.part_name,
sat_inv_part_details.part_container,
sat_inv_part_details.part_retailprice,
sat_inv_part_details.part_comment,
sat_inv_part_details.effective_from,
sat_inv_part_details.load_date
from
hub_part
inner join
sat_inv_part_details
on
hub_part.part_pk = sat_inv_part_details.part_pk
where
sat_inv_part_details.load_date = (
select
max(sat_inv_part_details_2.load_date)
from
sat_inv_part_details as sat_inv_part_details_2
where
hub_part.part_pk = sat_inv_part_details_2.part_pk
)
order by
partkey;
- 1992年1月9日時点の注文一覧について、どの国に在籍するどんな氏名の会員が注文しているか知りたい
- 記述するクエリ:orderとcustomer、それぞれでHubsとSatellitesをJOINし、orderとcustomerはLinksを用いてJOINする。各Satellitesは、primary keyに対して1992年1月9日以前で最新の
load_date
でフィルタリングする
- 記述するクエリ:orderとcustomer、それぞれでHubsとSatellitesをJOINし、orderとcustomerはLinksを用いてJOINする。各Satellitesは、primary keyに対して1992年1月9日以前で最新の
// 「hub_order」に対して、「注文の詳細情報を持つSatellites」をJOINする
with order_19920109 as (
select
hub_order.order_pk,
hub_order.orderkey,
sat_order_order_details.orderstatus,
sat_order_order_details.totalprice,
sat_order_order_details.orderdate
from
hub_order
inner join
sat_order_order_details
on
hub_order.order_pk = sat_order_order_details.order_pk
where
sat_order_order_details.load_date = (
select
max(sat_order_order_details_2.load_date)
from
sat_order_order_details as sat_order_order_details_2
where
hub_order.order_pk = sat_order_order_details_2.order_pk
and sat_order_order_details_2.load_date < '10-Jan-1992'
)
),
//「hub_customer」に対して、「ユーザーの氏名を持つSatellites」「ユーザーの国情報を持つSatellites」をJOINする
cust_nation_19920109 as (
select
hub_customer.customer_pk,
sat_order_customer_details.customer_name,
sat_order_cust_nation_details.customer_nation_name,
sat_order_cust_nation_details.load_date
from
hub_customer
inner join
sat_order_customer_details
on
hub_customer.customer_pk = sat_order_customer_details.customer_pk
inner join
sat_order_cust_nation_details
on
hub_customer.customer_pk = sat_order_cust_nation_details.customer_pk
where
sat_order_customer_details.load_date = (
select
max(sat_order_customer_details_2.load_date)
from
sat_order_customer_details as sat_order_customer_details_2
where
hub_customer.customer_pk = sat_order_customer_details_2.customer_pk
and sat_order_customer_details_2.load_date < '10-Jan-1992'
)
and sat_order_cust_nation_details.load_date = (
select
max(sat_order_cust_nation_details_2.load_date)
from
sat_order_cust_nation_details as sat_order_cust_nation_details_2
where
hub_customer.customer_pk = sat_order_cust_nation_details_2.customer_pk
and sat_order_cust_nation_details_2.load_date < '10-Jan-1992'
)
)
// linksである「link_customer_order」を介して、「hub_order」と「hub_customer」をJOINする
select
order_19920109.orderkey,
order_19920109.orderstatus,
order_19920109.totalprice,
order_19920109.orderdate,
cust_nation_19920109.customer_name,
cust_nation_19920109.customer_nation_name
from
order_19920109
left join
link_customer_order
on
order_19920109.order_pk = link_customer_order.order_pk
left join
cust_nation_19920109
on
cust_nation_19920109.customer_pk = link_customer_order.customer_pk
order by
orderkey;
dbtvaultで他にできること
今回はdbtvault公式のWorked Exampleを試しましたが、Data Vaultの他の概念を実装するためのdbtマクロがdbtvaultパッケージに含まれています。
- Effectivity Satellites
- Multi-Active Satellites
- Extended Tracking Satellites
- As of Date Tables
- Point In Time (PIT) tables
- Bridge Tables
これらのマクロがどういった時に役立つか、少し触れておきたいと思います。
例えば、前述のサンプルクエリの内容を見るとわかると思いますが、Data Vaultでモデリングした場合、JOIN句が多くなり、WHERE句もサブクエリを用いたものが複数必要で、クエリの内容が複雑になりがちです。
この問題に対して、WHERE句にサブクエリを複数回用いるところはPIT tablesを定義することでサブクエリの記述回数は1回で済みますし、JOIN句が多くなるところはBridge Tablesを定義することでJOIN対象のテーブルを減らすことが出来ます。
これらの概念が実装されるレイヤーはBusiness Vaultとも呼ばれます。Business Vault層では、汎用的な指標を算出するビジネスロジックをSatellitesとして事前定義することもあります。
流石にこれらの機能全てを1つの記事では取り組めない(現状、私も全て理解できていないw)ので、今後試してブログに出来たらなとは考えています…
最後に
dbtvaultのSnowflakeのサンプルデータ(TPC-H)を用いたWorked exmpleを試してみました。
今回は開発済のコードを持つリポジトリを使って実行しただけですが、Data Vaultのモデリングを自分で1からSQLで構築しようとすると、どこかしらでミスをするだろうな…と感じました。 その点dbtvaultがあれば、必要なメタデータを定義し、各種マクロにそのメタデータを渡すだけでData Vaultのモデリングが出来るため、機械的な実装ができると感じました。
一方で、1からSQLで構築する場合でも、dbtvaultを使う場合でも、モデリング対象のデータの整理がとても重要だと改めて感じました。データの整理をきちんと行った上でRaw Staging Layerでテーブルの定義を行い、Hashed Staging Layerで必要なハッシュ値を正しく定義する、この工程が大事ですね。
改めて、この記事を通してdbtvaultの理解に少しでもつながっていると嬉しいです!他のdbtvaultの機能のブログは、いずれ書く…はずですw