dbtvaultのSnowflakeサンプルデータを用いたWorked exampleを試してみた

2022.08.26

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

さがらです。

dbtvaultのSnowflakeのサンプルデータ(TPC-H)を用いたWorked exmpleを試してみたので、その内容をまとめてみます。

より具体的には、以下のリンク先の内容に取り組んでいきます。

dbtvaultとは

改めてdbtvaultとは、dbtでData Vault 2.0を用いたモデリングを行うためのdbt packageです。

そして、このdbtvaultに採用されているData Vaultとは、ざっくり言うと「データの数や仕様に変動があっても変更履歴を柔軟に保持し、特定日時のデータをいつでも検索できるモデリング手法」です。

Data Vaultのメリット

ここでData Vaultのメリットについて、2つまとめておきます。

1つ目として、特定日時のデータをいつでも検索できます。

Data Vaultでは基本的に、Hubs、Links、Satellitesといった要素に分けてテーブルを管理していくのですが、いずれのテーブルも各レコードがロードされた日時を1つのカラムとして定義し、primary keyが新しい値を持つレコードが挿入されたり、既存のprimary keyのレコードが更新されたときに、1レコード追加される仕様となっています。この仕様によりData Vaultで構築したあとは基本的に全履歴を保持するため、いつでも特定日時のデータを検索できるようになり、過去のある時点のデータを用いた分析や監査対応を容易に可能にします。

2つ目として、複数のシステムから入ってくるデータの変更履歴を柔軟に保持できます。

システムやビジネスの変化によりスキーマ定義の変更が発生しても、Data Vaultを採用していると柔軟に対応可能です。

例えば、あるECサイトシステムのデータ基盤で、会員関連で新しい情報を付与した場合には、

  • 新しい情報と既存の情報の関係性を確認
  • 確認した関係性に応じて、会員のprimary keyを持つHubsに紐づくSatellitesテーブルを新しく増やしたり、既存のSatellitesテーブルに対してカラムを追加する、といった対応を行う(Staging Layerも一部書き換え)

といった工程を行えば、すぐに既存のデータ基盤に追加が可能です。

dbvaultの必要性

前述しましたData Vaultなのですが、良くも悪くも多くのテーブルに分かれてしまう特徴があります。それぞれのテーブルを構築するための処理は似た物が多いのですが、構築するテーブルの数が増えてしまうと処理の開発にも時間がかかり、少し処理を書き間違えてエラーが発生してしまうというトラブルに繋がる可能性はあります。

そんな時に、dbtvaultの出番です。

dbtvaultはdbtで使用できるpackageの1つなのですが、Hubs、Links、SatellitesなどのData Vaultの要素に該当するテーブルを構築するためのマクロが定義されているdbt packageです。

本記事で紹介するサンプルコードを見ていただくとわかるのですが、dbtvaultはメタデータを定義し、そのメタデータをdbtvaultのマクロの引数に指定するだけで、Data Vaultの各要素に当たるテーブルを構成することが出来ます。宣言的な記述だけでData Vaultでモデリングするための処理を開発できるので非常に楽です。

また、dbtの特徴でもあるmodel間のリネージやドキュメント化もありますので、構築するテーブルの数が多くなっても管理がしやすくなります。

こういった事由から、Data Vaultでモデリングする際にはdbtvaultを使用することで、効率よく安定した開発を行うことが出来ます。

参考リンク

まず、本記事ではSnowflakeのサンプルデータを用いたWorked exampleを試しますが、BigQueryを使用されている方も多いと思います。

BigQueryでdbtvaultを使用される場合には、下記のリンク先が参考になりますので、ぜひご覧ください。

また、適用しているモデリング手法であるData Vaultについては、1つの記事では詳細を書ききれない(私もまだ全て理解できていない)ので、私が参考にしたリンクも載せておきます。

Getting Started

ここからは、実際にdbtvaultのWorked exampleの内容に沿って進めていきます。

まずはGetting Startedです。

今回実施する内容について

SnowflakeのサンプルデータであるTPC-Hデータセットを用いて、Data Vault 2.0のモデリング手法を用いたデータウェアハウスの開発を、dbtvaultのマクロを使用して行っていきます。

より詳細には、以下の内容を取り組んでいきます。

  • dbtプロジェクトのセットアップ
  • TPC-Hデータセットの内容についてプロファイリングし、Data Vaultのアーキテクチャにマッピングする方法を探る
  • Raw Staging Layerを作成する
  • Raw Staging Layerに対してハッシュ化などの処理を行う(Hashed Staging Layer)
  • Hashed Staging Layer上のmodelを用いてdbtvaultのマクロを実行し、Hubs、 Links、 Satellites、Transactional Linksに該当するテーブルを構築する

使用するdbtvaultのマクロについては別途チュートリアルもありますので、こちらも併せてご覧ください。(※本記事で試すWorked exampleとは別階層のページなので、ご注意ください。)

Project setup

次はProject setupです。

dbtプロジェクトや使用するSnowflakeアカウントでデータベースを作成し、準備をしていきます。

使用するWorked exampleのVersion

今回試すWorked exampleは定期的に更新されています。今回はVersion5.3を使用します。

こちらのリポジトリからはZipでのダウンロードしか出来なかったため、一度ダウンロードして解凍後に私のGitHub上のリモートリポジトリにプッシュした上で、対象のリモートリポジトリをdbt Cloudを繋げて使用していきます。

Snowflakeのトライアルアカウントのセットアップ

このWorked exampleはSnowflakeのトライアルアカウントを使用しているため、事前にセットアップしておきます。

Snowflakeのトライアルアカウントのセットアップが出来たら、データベースとウェアハウスをSYSADMINロールで作成します。以下のクエリを実行すればOKです。

-- データベースの作成
USE ROLE SYSADMIN;

CREATE DATABASE DV_PROTOTYPE_DB;


-- ウェアハウスの作成
USE ROLE SYSADMIN;

CREATE WAREHOUSE DV_PROTOTYPE_WH 
WITH
  WAREHOUSE_SIZE = 'XSMALL' 
  WAREHOUSE_TYPE = 'STANDARD'
  AUTO_SUSPEND = 60 // 300もいらないため、60に変更
  AUTO_RESUME = TRUE
  MIN_CLUSTER_COUNT = 1
  MAX_CLUSTER_COUNT = 1
  SCALING_POLICY = 'STANDARD';

使用するdbtバージョン

参考までに、dbt関係で使用するバージョンを記しておきます。

  • dbt Cloud:v1.1.58.33
  • dbt:1.1 ※Environmentsから指定
  • dbtvault:0.8.3

dbtプロジェクトのセットアップ

前述しましたリポジトリとSnowflakeアカウントを紐づけて、dbtプロジェクトをセットアップします。プロジェクトのセットアップ手順はこちらの記事とほぼ同じ内容のため手順は割愛し、注意点だけ記します。

Snowflake Settingsでは、前述したリソースを使うように設定してください。

リポジトリの設定では、事前にダウンロードしたZipファイルの内容を一度プッシュしたリモートリポジトリを設定します。

dbtプロジェクトの準備が出来たら、IDEからdbt depsコマンドを実行して、dbtvaultをインストールします。

dbt_project.ymlの内容確認

dbt_project.ymlの内容を確認しておきます。

グローバル変数について

modelに関する設定の前に、varsでグローバル変数が定義されています。

  • LOAD_DATE
    • 検証のためにデータ量を削減するため、raw_staging層ではこの変数で指定した日付のレコードに絞り込んでロードします
  • TPCH_SIZE
    • SnowflakeではTPC-Hのサンプルデータについて、データ量に併せて1、10、100、1000、10000と分かれてスキーマ用意されています
    • デフォルトでは「10」が設定されています
vars:
  load_date: '1992-01-08'
  tpch_size: 10 #1, 10, 100, 1000, 10000

modelsセクションについて

models:の中では、modelファイルを管理するサブフォルダごとにタグと適切なmaterializationを指定しています。

各modelファイル用のMetadataについて

各modelでdbtvaultを使うためのMetadata(マクロに渡す引数)については、各modelファイルで定義しています。

dbtvaultでの、各マクロと必要なMetadataについての詳細は、こちらのページもご覧ください。

Profiling TPC-H

次はProfiling TPC-Hです。

このWorked exampleでは、サンプルデータとしてSnowflakeのTPC-Hというデータセットを使用しています。

どのモデリング手法でも同様だと思いますが、Data Vaultを採用する場合であってもモデリング対象のデータの特徴を押さえていないとモデリングは出来ません。そのため、モデリングを適用するデータの特徴を押さえておくことはとても重要です。

TPC-Hのスキーマ構成

Date関係のフィールド

このTPC-Hのデータセットには、合計4つの日付フィールドがあります。

このうち3つはLINEITEMテーブルにあります。

  • SHIPDATE
  • COMMITDATE
  • RECEIPTDATE

そして、もう1つはORDERSテーブルにあります。

  • ORDERDATE

実際に入ってくる日付の特性としては、卸売業のサンプルデータということもあり、ほぼ時系列でORDERDATESHIPDATERECEIPTDATECOMMITDATEの順で登録がされています。(一部のデータではCOMMITDATEがこの順番に沿っていないケースがあるようです。)

Relationships

続いて、TPC-HデータセットのRelationshipsについて見ていきます。スキーマ上の各テーブルがどのフィールドで結合可能であるか、テーブル間のリレーションは1対1・n対1・1対n・n対nのどれに該当するか、関係性を確認することはData Vaultでもモデリングする際にも重要なステップです。

ORDERSテーブルとSUPPLIERテーブル

dbtvaultのページからの引用ですが、LINEITEMテーブルとORDERSテーブルをSUPPLIERテーブルと内部結合して、注文とサプライヤの関係を調べ、各注文の異なるサプライヤを数えてリレーションを確認したようです。

この結果、ORDERSテーブル対SUPPLIERテーブルは、1対nの関係であることが判明したとのことです。1つの注文には、異なるサプライヤからの部品が含まれることがあるようです。

CUSTOMERSテーブルとORDERSテーブル

こちらもdbtvaultのページからの引用ですが、ORDERSテーブルとCUSTOMERテーブルの左外部結合を行ったところ、注文のない顧客が複数存在することが判明しているようです。

Transactional Linksのためのview生成

Transactional Linksを作るためには、トランザクション処理に関するテーブルデータが必要なのですが、元のTPC-Hのデータセットでは該当するテーブルがないようです。

そのため、今回はTransactional Linksの検証のためにraw_transactionsという擬似的にトランザクション処理のレコードを持つビューを用意します。このビューでは、ORDERSテーブルに対してCUSTOMERテーブルを左外部結合して以下のカラムを定義します。

  • Customer key
  • Order key
  • Order date
  • Total price
    • 別名「Amount」、概要のOrderで支払われた合計金額を意味する
  • Type
    • 顧客に対する借方・貸方を意味するCRまたはDRを持つカラム
  • Transaction Date
    • Order Dateに20日間を加えたもので、注文から20日後に支払われたことを意味します
  • Transaction number
    • Order key、Customer key、Order date を連結し、24桁の数値にした計算カラム(24桁に至らない場合は0で埋める)

Creating the stage layers

次は、Creating the stage layersです。

その名の通りステージング層にあたるテーブルやビューを生成していきます。

作成するステージング層のイメージ

Raw Staging LayerHashed Staging Layerの2種類を作成していきます。

Raw Staging Layerについて

まずRaw Staging Layerについて、どういったテーブルを作成していくか確認します。ver5.3のリポジトリでは、models/raw_stageフォルダに3つの.sqlファイルが格納されています。

raw_orders

raw_orders.sqlでは、TPC-Hの各テーブルをJOINして、注文に関する各カラムを持つワイドテーブルを作成します。

raw_orders.sql (クリックして展開)
SELECT
    a.L_ORDERKEY AS ORDERKEY,
    a.L_PARTKEY AS PARTKEY ,
    a.L_SUPPKEY AS SUPPLIERKEY,
    a.L_LINENUMBER AS LINENUMBER,
    a.L_QUANTITY AS QUANTITY,
    a.L_EXTENDEDPRICE AS EXTENDEDPRICE,
    a.L_DISCOUNT AS DISCOUNT,
    a.L_TAX AS TAX,
    a.L_RETURNFLAG AS RETURNFLAG,
    a.L_LINESTATUS AS LINESTATUS,
    a.L_SHIPDATE AS SHIPDATE,
    a.L_COMMITDATE AS COMMITDATE,
    a.L_RECEIPTDATE AS RECEIPTDATE,
    a.L_SHIPINSTRUCT AS SHIPINSTRUCT,
    a.L_SHIPMODE AS SHIPMODE,
    a.L_COMMENT AS LINE_COMMENT,
    b.O_CUSTKEY AS CUSTOMERKEY,
    b.O_ORDERSTATUS AS ORDERSTATUS,
    b.O_TOTALPRICE AS TOTALPRICE,
    b.O_ORDERDATE AS ORDERDATE,
    b.O_ORDERPRIORITY AS ORDERPRIORITY,
    b.O_CLERK AS CLERK,
    b.O_SHIPPRIORITY AS SHIPPRIORITY,
    b.O_COMMENT AS ORDER_COMMENT,
    c.C_NAME AS CUSTOMER_NAME,
    c.C_ADDRESS AS CUSTOMER_ADDRESS,
    c.C_NATIONKEY AS CUSTOMER_NATION_KEY,
    c.C_PHONE AS CUSTOMER_PHONE,
    c.C_ACCTBAL AS CUSTOMER_ACCBAL,
    c.C_MKTSEGMENT AS CUSTOMER_MKTSEGMENT,
    c.C_COMMENT AS CUSTOMER_COMMENT,
    d.N_NAME AS CUSTOMER_NATION_NAME,
    d.N_REGIONKEY AS CUSTOMER_REGION_KEY,
    d.N_COMMENT AS CUSTOMER_NATION_COMMENT,
    e.R_NAME AS CUSTOMER_REGION_NAME,
    e.R_COMMENT AS CUSTOMER_REGION_COMMENT
FROM {{ source('tpch_sample', 'ORDERS') }} AS b
LEFT JOIN {{ source('tpch_sample', 'LINEITEM') }} AS a
    ON a.L_ORDERKEY = b.O_ORDERKEY
LEFT JOIN {{ source('tpch_sample', 'CUSTOMER') }} AS c
    ON b.O_CUSTKEY  = c.C_CUSTKEY
LEFT JOIN {{ source('tpch_sample', 'NATION') }} AS d
    ON c.C_NATIONKEY  = d.N_NATIONKEY
LEFT JOIN {{ source('tpch_sample', 'REGION') }} AS e
    ON d.N_REGIONKEY  = e.R_REGIONKEY
LEFT JOIN {{ source('tpch_sample', 'PART') }} AS g
    ON a.L_PARTKEY = g.P_PARTKEY
LEFT JOIN {{ source('tpch_sample', 'SUPPLIER') }} AS h
    ON a.L_SUPPKEY = h.S_SUPPKEY
LEFT JOIN {{ source('tpch_sample', 'NATION') }} AS j
    ON h.S_NATIONKEY = j.N_NATIONKEY
LEFT JOIN {{ source('tpch_sample', 'REGION') }} AS k
    ON j.N_REGIONKEY = k.R_REGIONKEY
WHERE b.O_ORDERDATE = TO_DATE('{{ var('load_date') }}')

raw_inventory

raw_inventory.sqlでは、TPC-Hの各テーブルをJOINして、在庫に関するカラムを持つワイドテーブルを作成します。

raw_inventory.sql (クリックして展開)
SELECT
    a.PS_PARTKEY AS PARTKEY,
    a.PS_SUPPKEY AS SUPPLIERKEY,
    a.PS_AVAILQTY AS AVAILQTY,
    a.PS_SUPPLYCOST AS SUPPLYCOST,
    a.PS_COMMENT AS PART_SUPPLY_COMMENT,
    b.S_NAME AS SUPPLIER_NAME,
    b.S_ADDRESS AS SUPPLIER_ADDRESS,
    b.S_NATIONKEY AS SUPPLIER_NATION_KEY,
    b.S_PHONE AS SUPPLIER_PHONE,
    b.S_ACCTBAL AS SUPPLIER_ACCTBAL,
    b.S_COMMENT AS SUPPLIER_COMMENT,
    c.P_NAME AS PART_NAME,
    c.P_MFGR AS PART_MFGR,
    c.P_BRAND AS PART_BRAND,
    c.P_TYPE AS PART_TYPE,
    c.P_SIZE AS PART_SIZE,
    c.P_CONTAINER AS PART_CONTAINER,
    c.P_RETAILPRICE AS PART_RETAILPRICE,
    c.P_COMMENT AS PART_COMMENT,
    d.N_NAME AS SUPPLIER_NATION_NAME,
    d.N_COMMENT AS SUPPLIER_NATION_COMMENT,
    d.N_REGIONKEY AS SUPPLIER_REGION_KEY,
    e.R_NAME AS SUPPLIER_REGION_NAME,
    e.R_COMMENT AS SUPPLIER_REGION_COMMENT
FROM {{ source('tpch_sample', 'PARTSUPP') }} AS a
LEFT JOIN {{ source('tpch_sample', 'SUPPLIER') }} AS b
    ON a.PS_SUPPKEY = b.S_SUPPKEY
LEFT JOIN {{ source('tpch_sample', 'PART') }} AS c
    ON a.PS_PARTKEY = c.P_PARTKEY
LEFT JOIN {{ source('tpch_sample', 'NATION') }} AS d
    ON b.S_NATIONKEY = d.N_NATIONKEY
LEFT JOIN {{ source('tpch_sample', 'REGION') }} AS e
    ON d.N_REGIONKEY = e.R_REGIONKEY
JOIN {{ ref('raw_orders') }} AS f
    ON a.PS_PARTKEY = f.PARTKEY AND a.PS_SUPPKEY=f.SUPPLIERKEY
ORDER BY a.PS_PARTKEY, a.PS_SUPPKEY

raw_transactions

raw_transactions.sqlでは、前章「Transactional Linksのためのview生成」で触れたように、Transactional Linksを作成するために、顧客からの注文に対していくつかの計算を行ったトランザクションに関するレコードを保持します。

raw_transactions.sql (クリックして展開)
SELECT
    b.O_ORDERKEY AS ORDER_ID,
    b.O_CUSTKEY AS CUSTOMER_ID,
    b.O_ORDERDATE AS ORDER_DATE,
    DATEADD(DAY, 20, b.O_ORDERDATE) AS TRANSACTION_DATE,
    TO_NUMBER(RPAD(CONCAT(b.O_ORDERKEY, b.O_CUSTKEY, TO_CHAR(b.O_ORDERDATE, 'YYYYMMDD')),  24, '0')) AS TRANSACTION_NUMBER,
    b.O_TOTALPRICE AS AMOUNT,
    CAST(
    CASE ABS(MOD(RANDOM(1), 2)) + 1
        WHEN 1 THEN 'DR'
        WHEN 2 THEN 'CR'
    END AS VARCHAR(2)) AS TYPE
FROM {{ source('tpch_sample', 'ORDERS') }}  AS b
LEFT JOIN {{ source('tpch_sample', 'CUSTOMER') }} AS c
    ON b.O_CUSTKEY = c.C_CUSTKEY
WHERE b.O_ORDERDATE = TO_DATE('{{ var('load_date') }}')

Raw Staging Layerの生成処理の実行

続いて、前述した3つのモデルを実行し、Raw Staging Layerを生成します。

このWorked exampleのリポジトリでは、dbt_project.ymlにおいて各modelsのサブフォルダごとにタグ付けがされています。そのため、Raw Staging Layerを生成するには、以下のコマンドを実行すればOKです。

dbt run -s tag:raw

materialized: viewとしているため、すぐに終わります。

Hashed Staging Layerについて

先程Raw Staging Layerとして3つviewを生成しましたが、これだけではHubs、Links、SatellitesといったRaw Vault層の生成は出来ません。

具体的には、主キーのHash値、Hash Diff、既存カラムを用いたデータソース名やレコードの有効開始日などを定義するderived columns、などを新しくカラムとして追加する必要があります。これらは既存レコードと追加予定レコードの重複を避けることや、レコードの有効日を持つことでいつでもある日時点のデータを取得することに役立ちます。

そして、これらのカラムを生成するために、dbtvaultでは専用のマクロが用意されています。このマクロを使って、Raw Vault層向けの変換を行うステージング層となるHashed Staging Layerを生成していきます。

Hashed Staging Layerで定義するmodelについて

Hashed Staging Layerでは、v_stg_orders.sqlv_stg_inventory.sqlv_stg_transactions.sqlの3つのmodelを定義します。

それぞれRaw Staging Layerのraw_orders.sqlraw_inventory.sqlraw_transactions.sqlをソースとして使用した上で、viewとして生成されます。各modelでは前述した通り、主キーのHash値、Hash Diff、既存カラムを用いたデータソース名やレコードの有効開始日などを定義するderived columns、などをカラムとして追加します。

下記に各modelのコードを記載しておきますが、中身の説明は次章で行います。

v_stg_orders.sql (クリックして展開)
{%- set yaml_metadata -%}
source_model: 'raw_orders'
derived_columns:
  CUSTOMER_KEY: 'CUSTOMERKEY'
  NATION_KEY: 'CUSTOMER_NATION_KEY'
  REGION_KEY: 'CUSTOMER_REGION_KEY'
  RECORD_SOURCE: '!TPCH-ORDERS'
  EFFECTIVE_FROM: 'ORDERDATE'
hashed_columns:
  CUSTOMER_PK: 'CUSTOMER_KEY'
  LINK_CUSTOMER_NATION_PK:
    - 'CUSTOMER_KEY'
    - 'CUSTOMER_NATION_KEY'
  CUSTOMER_NATION_PK: 'CUSTOMER_NATION_KEY'
  CUSTOMER_REGION_PK: 'CUSTOMER_REGION_KEY'
  NATION_PK: 'CUSTOMER_NATION_KEY'
  REGION_PK: 'CUSTOMER_REGION_KEY'
  NATION_REGION_PK:
    - 'CUSTOMER_NATION_KEY'
    - 'CUSTOMER_REGION_KEY'
  ORDER_PK: 'ORDERKEY'
  ORDER_CUSTOMER_PK:
    - 'CUSTOMER_KEY'
    - 'ORDERKEY'
  LINEITEM_PK:
    - 'ORDERKEY'
    - 'LINENUMBER'
  LINK_LINEITEM_ORDER_PK:
    - 'ORDERKEY'
    - 'ORDERKEY'
    - 'LINENUMBER'
  PART_PK: 'PARTKEY'
  SUPPLIER_PK: 'SUPPLIERKEY'
  INVENTORY_PK:
    - 'PARTKEY'
    - 'SUPPLIERKEY'
  INVENTORY_ALLOCATION_PK:
    - 'LINENUMBER'
    - 'PARTKEY'
    - 'SUPPLIERKEY'
  CUSTOMER_HASHDIFF:
    is_hashdiff: true
    columns:
      - 'CUSTOMER_KEY'
      - 'CUSTOMER_NAME'
      - 'CUSTOMER_ADDRESS'
      - 'CUSTOMER_PHONE'
      - 'CUSTOMER_ACCBAL'
      - 'CUSTOMER_MKTSEGMENT'
      - 'CUSTOMER_COMMENT'
  CUSTOMER_NATION_HASHDIFF:
    is_hashdiff: true
    columns:
      - 'CUSTOMER_NATION_NAME'
      - 'CUSTOMER_NATION_COMMENT'
      - 'CUSTOMER_NATION_KEY'
  CUSTOMER_REGION_HASHDIFF:
    is_hashdiff: true
    columns:
      - 'CUSTOMER_REGION_NAME'
      - 'CUSTOMER_REGION_COMMENT'
      - 'CUSTOMER_REGION_KEY'
  LINEITEM_HASHDIFF:
    is_hashdiff: true
    columns:
      - 'ORDERKEY'
      - 'LINENUMBER'
      - 'COMMITDATE'
      - 'DISCOUNT'
      - 'EXTENDEDPRICE'
      - 'LINESTATUS'
      - 'LINE_COMMENT'
      - 'QUANTITY'
      - 'RECEIPTDATE'
      - 'RETURNFLAG'
      - 'SHIPDATE'
      - 'SHIPINSTRUCT'
      - 'SHIPMODE'
      - 'TAX'
  ORDER_HASHDIFF:
    is_hashdiff: true
    columns:
      - 'ORDERKEY'
      - 'CLERK'
      - 'ORDERDATE'
      - 'ORDERPRIORITY'
      - 'ORDERSTATUS'
      - 'ORDER_COMMENT'
      - 'SHIPPRIORITY'
      - 'TOTALPRICE'
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{% set source_model = metadata_dict['source_model'] %}

{% set derived_columns = metadata_dict['derived_columns'] %}

{% set hashed_columns = metadata_dict['hashed_columns'] %}

WITH staging AS (
{{ dbtvault.stage(include_source_columns=true,
                  source_model=source_model,
                  derived_columns=derived_columns,
                  hashed_columns=hashed_columns,
                  ranked_columns=none) }}
)

SELECT *,
       TO_DATE('{{ var('load_date') }}') AS LOAD_DATE
FROM staging
v_stg_inventory.sql (クリックして展開)
{%- set yaml_metadata -%}
source_model: 'raw_inventory'
derived_columns:
  NATION_KEY: 'SUPPLIER_NATION_KEY'
  REGION_KEY: 'SUPPLIER_REGION_KEY'
  RECORD_SOURCE: '!TPCH-INVENTORY'
hashed_columns:
  SUPPLIER_PK: 'SUPPLIERKEY'
  SUPPLIER_NATION_PK: 'SUPPLIER_NATION_KEY'
  SUPPLIER_REGION_PK: 'SUPPLIER_REGION_KEY'
  REGION_PK: 'SUPPLIER_REGION_KEY'
  NATION_PK: 'SUPPLIER_NATION_KEY'
  NATION_REGION_PK:
    - 'SUPPLIER_NATION_KEY'
    - 'SUPPLIER_REGION_KEY'
  LINK_SUPPLIER_NATION_PK:
    - 'SUPPLIERKEY'
    - 'SUPPLIER_NATION_KEY'
  PART_PK: 'PARTKEY'
  INVENTORY_PK:
    - 'PARTKEY'
    - 'SUPPLIERKEY'
  SUPPLIER_HASHDIFF:
    is_hashdiff: true
    columns:
      - 'SUPPLIERKEY'
      - 'SUPPLIER_ACCTBAL'
      - 'SUPPLIER_ADDRESS'
      - 'SUPPLIER_PHONE'
      - 'SUPPLIER_COMMENT'
      - 'SUPPLIER_NAME'
  PART_HASHDIFF:
    is_hashdiff: true
    columns:
      - 'PARTKEY'
      - 'PART_BRAND'
      - 'PART_COMMENT'
      - 'PART_CONTAINER'
      - 'PART_MFGR'
      - 'PART_NAME'
      - 'PART_RETAILPRICE'
      - 'PART_SIZE'
      - 'PART_TYPE'
  SUPPLIER_REGION_HASHDIFF:
    is_hashdiff: true
    columns:
      - 'SUPPLIER_REGION_KEY'
      - 'SUPPLIER_REGION_COMMENT'
      - 'SUPPLIER_REGION_NAME'
  SUPPLIER_NATION_HASHDIFF:
    is_hashdiff: true
    columns:
      - 'SUPPLIER_NATION_KEY'
      - 'SUPPLIER_NATION_COMMENT'
      - 'SUPPLIER_NATION_NAME'
  INVENTORY_HASHDIFF:
    is_hashdiff: true
    columns:
      - 'PARTKEY'
      - 'SUPPLIERKEY'
      - 'AVAILQTY'
      - 'SUPPLYCOST'
      - 'PART_SUPPLY_COMMENT'
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{% set source_model = metadata_dict['source_model'] %}

{% set derived_columns = metadata_dict['derived_columns'] %}

{% set hashed_columns = metadata_dict['hashed_columns'] %}


WITH staging AS (
{{ dbtvault.stage(include_source_columns=true,
                  source_model=source_model,
                  derived_columns=derived_columns,
                  hashed_columns=hashed_columns,
                  ranked_columns=none) }}
)

SELECT *, 
       TO_DATE('{{ var('load_date') }}') AS LOAD_DATE,
       TO_DATE('{{ var('load_date') }}') AS EFFECTIVE_FROM
FROM staging
v_stg_transactions.sql (クリックして展開)
{%- set yaml_metadata -%}
source_model: 'raw_transactions'
derived_columns:
  RECORD_SOURCE: '!RAW_TRANSACTIONS'
  LOAD_DATE: DATEADD(DAY, 1, TRANSACTION_DATE)
  EFFECTIVE_FROM: 'TRANSACTION_DATE'
hashed_columns:
  TRANSACTION_PK:
    - 'CUSTOMER_ID'
    - 'TRANSACTION_NUMBER'
  CUSTOMER_PK: 'CUSTOMER_ID'
  ORDER_PK: 'ORDER_ID'
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{% set source_model = metadata_dict['source_model'] %}

{% set derived_columns = metadata_dict['derived_columns'] %}

{% set hashed_columns = metadata_dict['hashed_columns'] %}

{{ dbtvault.stage(include_source_columns=true,
                  source_model=source_model,
                  derived_columns=derived_columns,
                  hashed_columns=hashed_columns,
                  ranked_columns=none) }}

Hashed Staging Layer向けのマクロを用いたmodelの定義

続いて、Hashed Staging Layerを生成するためのdbtvaultのマクロを用いて、各modelを定義していきます。

まず、Hashed Staging Layerを生成するためのマクロですが、dbtvault.stageというマクロです。このマクロに対して必要なメタデータを提供することでソースに設定したテーブルから、主キーのHash値、Hash Diff、derived columns、を生成することが出来ます。

{{ dbtvault.stage(include_source_columns=true,
                  source_model=source_model,
                  derived_columns=derived_columns,
                  hashed_columns=hashed_columns,
                  ranked_columns=none) }}

続いて、v_stg_transactions.sqlの内容を元に、このマクロをどうやって使うのか見ていきます。

{%- set yaml_metadata -%}
source_model: 'raw_transactions'
derived_columns:
  RECORD_SOURCE: '!RAW_TRANSACTIONS'
  LOAD_DATE: DATEADD(DAY, 1, TRANSACTION_DATE)
  EFFECTIVE_FROM: 'TRANSACTION_DATE'
hashed_columns:
  TRANSACTION_HK:
    - 'CUSTOMER_ID'
    - 'TRANSACTION_NUMBER'
  CUSTOMER_HK: 'CUSTOMER_ID'
  ORDER_HK: 'ORDER_ID'
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{% set source_model = metadata_dict['source_model'] %}

{% set derived_columns = metadata_dict['derived_columns'] %}

{% set hashed_columns = metadata_dict['hashed_columns'] %}

{{ dbtvault.stage(include_source_columns=true,
                  source_model=source_model,
                  derived_columns=derived_columns,
                  hashed_columns=hashed_columns,
                  ranked_columns=none) }}

まず、dbtvault.stageに渡すためのメタデータのセットについて、{%- set yaml_metadata -%}としてyamlで定義します。この上で、このyaml内で各メタデータをどう定義しているかを見ていきます。

source_modelでは、どのmodelを使用してHashed Staging Layerを構築するのかを定義するため、Raw Staging Layerで該当するmodel名を入れます。

derived_columnsでは、Raw Vault層に必要なカラムを定義します。v_stg_transactions.sqlにおいては、以下3つのカラムを定義しています。

  • RECORD_SOURCE:ソースデータの名称を入れます。!RAW_TRANSACTIONSのように、1文字目に!を入れることで、任意の文字列を定義できます。
  • LOAD_DATE:対象のレコードがロードされた日付を持つカラムを指定します。※このv_stg_transactions.sqlではDATEADD関数を使用していますが、これは「トランザクション処理の翌日をロードされた日とする」と疑似的に指定しているだけです。
  • EFFECTIVE_FROM:あるレコードにおけるビジネス上の有効期限の開始日を保持するカラムを指定します。
derived_columns:
    RECORD_SOURCE: '!RAW_TRANSACTIONS'
    LOAD_DATE: DATEADD(DAY, 1, TRANSACTION_DATE)
    EFFECTIVE_FROM: 'TRANSACTION_DATE'

hashed_columnsでは、主にハッシュ化を行うカラムを定義していきます。

v_stg_transactions.sqlにおいては、以下3つのカラムを定義しています。

  • TRANSACTION_HKraw_transactions.sqlに存在するCUSTOMER_IDTRANSACTION_NUMBERを連結した、新しいハッシュ化したカラムとして定義しています。
  • CUSTOMER_HKORDER_HK:どちらも単一のカラムをHash化しているので、対象のカラム名だけを定義しています。
hashed_columns:
  TRANSACTION_HK:
    - 'CUSTOMER_ID'
    - 'TRANSACTION_NUMBER'
  CUSTOMER_HK: 'CUSTOMER_ID'
  ORDER_HK: 'ORDER_ID'

ここまででv_stg_transactions.sqlにおけるメタデータのセットの説明としては以上となります。

もう一つhashed_columnsについて、Hash Diffの説明のためにv_stg_orders.sqlで定義されている内容も見ていきます。

  • CUSTOMER_HASHDIFFcolumnsには対象のテーブルでレコードの差分を適切に検知できるカラムを指定します。また、is_hashdiff: trueとすることで、指定したカラムにおいて自動的にアルファソートが適用された上でHash化されます。
hashed_columns:
  CUSTOMER_HASHDIFF:
    is_hashdiff: true
    columns:
      - 'CUSTOMERKEY'
      - 'CUSTOMER_NAME'
      - 'CUSTOMER_ADDRESS'
      - 'CUSTOMER_PHONE'
      - 'CUSTOMER_ACCBAL'
      - 'CUSTOMER_MKTSEGMENT'
      - 'CUSTOMER_COMMENT'

Hash周りのプラクティスについては、dbtvaultもドキュメントを用意しているため、こちらも併せてご覧ください。

Hashed Staging Layerの生成処理の実行

前述した3つのモデルを実行し、Hashed Staging Layerを生成します。

このWorked exampleのリポジトリでは、dbt_project.ymlにおいて各modelsのサブフォルダごとにタグ付けがされています。そのため、Hashed Staging Layerを生成するには、以下のコマンドを実行すればOKです。

dbt run -s +tag:stage

materialized: viewとしているため、すぐに終わります。

+をつけているため、上流に位置するRaw Staging Layerのviewも改めて生成されました。

ここで、dbtvaultのマクロを使ったHashed Staging Layerでは、Raw Staging Layerとどういった差があるのかを、raw_transactions.sqlv_stg_transactions.sqlで生成されたviewの中身を見て確認してみます。

まずは、raw_transactions.sqlにより生成されたviewの中身は下図のようになっています。

続いて、dbtvault.stageを適用したraw_transactions.sqlにより生成されたviewの中身を見てみます。すると、raw_transactions.sql内でメタデータとして設定した各カラムが追加されていることがわかります。

Loading the vault

これまでの工程で、Raw Vault層の生成に必要なカラムも追加したviewがステージング層に整いました。これを用いて、Raw Vault層へのロード処理を行っていきます。

Hubs

まずは、Hubsです。基本的に、モデリング対象のテーブル群に存在するPrimary Key1つごとに、Hubsのテーブルを生成します。

Data VaultにおいてHubsは、Data Vaultのモデリングで中核となる構成要素の1つです。一般的には以下の4つのカラムで構成されますが、場合によってはnatural keyが複数列必要な場合もあるため、これ以上のカラムを持つこともあります。

  • primary/hash key(またはsurrogate key)
    • natural key(business keyとも呼ばれる)をハッシュ化した値を保持します
  • natural key
    • 顧客IDや注文番号など、レコードを一意に識別するための情報(複数列の場合もある)
  • load date または load date timestamp
    • 各レコードが最初にロードされた日時
  • source for the record
    • 各レコードがどこから生まれたデータを示す

Hubsのデプロイ

このWorked exampleのリポジトリでは、dbt_project.ymlにおいて各modelsのサブフォルダごとにタグ付けがされています。そのため、Hubsに該当するオブジェクトを生成するには、以下のコマンドを実行すればOKです。

dbt run -s tag:hub

models/raw_vaultフォルダ全体にmaterialized: incrementalが設定されているためテーブルが作成されますが、XSのウェアハウスでも数秒で終わるかと思います。

ここで改めて、Hubsに該当するテーブルを生成するdbtvaultのマクロdbtvault.hubの使い方について確認してみます。

まず、dbtvault.hubでは、5つのメタデータを使用します。

  • source_model
    • このmodelで対象とするprimary keyとnatural keyを持つmodel名を指定します(あるprimary keyが複数のmodelで定義されている場合は、複数のmodelを指定可能)
  • src_pk
    • source_modelで指定したmodelのうち、対象のHubssテーブルにおいてprimary keyとなるカラムを指定します
  • src_nk
    • source_modelで指定したmodelのうち、対象のHubssテーブルにおいてnatural keyとなるカラムを指定します(natural keyが複数のカラムで構成されている場合は、複数のカラムを指定可能)
  • src_ldts
    • source_modelで指定したmodelのうち、対象のHubssテーブルにおいて各レコードのロード日時を持つカラムを指定します
  • src_source
    • source_modelで指定したmodelのうち、対象のHubssテーブルにおいて各レコードのソースデータ名を持つカラムを指定します

この上で、2つのmodelを例に、どういった記述を行って、どんなデータが作られているかを見てみます。

hub_lineitem.sqlでは、natural keyが2つあるため、src_nk = ["LINENUMBER", "ORDERKEY"]といった形でメタデータを定義しています。

{%- set source_model = "v_stg_orders" -%}
{%- set src_pk = "LINEITEM_PK" -%}
{%- set src_nk = ["LINENUMBER", "ORDERKEY"] -%}
{%- set src_ldts = "LOAD_DATE" -%}
{%- set src_source = "RECORD_SOURCE" -%}

{{ dbtvault.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts,
                src_source=src_source, source_model=source_model) }}

hub_nation.sqlでは、対象とするprimary keyNATION_PKが、v_stg_orders.sqlv_stg_inventory.sqlそれぞれで定義されているため、source_model = ["v_stg_orders", "v_stg_inventory"]としてメタデータを定義しています。

{%- set source_model = ["v_stg_orders", "v_stg_inventory"] -%}
{%- set src_pk = "NATION_PK" -%}
{%- set src_nk = "NATION_KEY" -%}
{%- set src_ldts = "LOAD_DATE" -%}
{%- set src_source = "RECORD_SOURCE" -%}

{{ dbtvault.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts,
                src_source=src_source, source_model=source_model) }}

Links

次はLinksです。

Linksは、Data Vaultのもう一つの基本要素です。Linksでは、Hubs間をつなげるためのテーブルを生成します。

Linksは以下の要素を持ちます。

  • primary key
    • Linksテーブルにおけるprimary key。このLinksがつなげるHubsのnatural keyを連結しハッシュ化したもの
  • Foreign keys holding the primary key for each Hub referenced in the Link
    • LinksからHubsを参照するための、各Hubsのprimary key。このLinksが参照するHubsの数に応じて、2つ以上のprimary keyを1つ1つカラムとして保持する
  • load date または load date timestamp
    • 各レコードが最初にロードされた日時
  • source for the record
    • 各レコードがどこから生まれたデータを示す

Linksのデプロイ

このWorked exampleのリポジトリでは、dbt_project.ymlにおいて各modelsのサブフォルダごとにタグ付けがされています。そのため、Linksに該当するオブジェクトを生成するには、以下のコマンドを実行すればOKです。

dbt run -s tag:link

models/raw_vaultフォルダ全体にmaterialized: incrementalが設定されているためテーブルが作成されますが、XSのウェアハウスでも数十秒で終わるかと思います。

ここで改めて、Linksに該当するテーブルを生成するdbtvaultのマクロdbtvault.linkの使い方について確認してみます。

まず、dbtvault.linkでは、5つのメタデータを使用します。

  • source_model
    • このmodelの生成元となるmodel名を指定します(このLinksが対象とするHubsが複数のmodelから定義されている場合は、複数のmodelを指定可能)
  • src_pk
    • source_modelで指定したmodelのうち、対象のLinksテーブルにおけるprimary keyとなるカラムを指定します
  • src_fk
    • source_modelで指定したmodelのうち、このLinksテーブルが繋がりを示すHubsのprimary keyとなるカラムを指定します(複数指定可能)
  • src_ldts
    • source_modelで指定したmodelのうち、対象のLinksテーブルの各レコードのロード日時を持つカラムを指定します
  • src_source
    • source_modelで指定したmodelのうち、対象のLinksテーブルの各レコードのソースデータ名を持つカラムを指定します

この上で、1つのmodelを例に、どういった記述を行って、どんなデータが作られているかを見てみます。

link_nation_region.sqlは、hub_nation.sqlhub_region.sqlにより生成されるHubsのテーブルを繋げるLinksテーブルを生成します。src_pkには、事前にHashed Staging Layerに該当するv_stg_orders.sqlv_stg_inventory.sqlhashed_columnsとして定義していたNATION_REGION_PKを指定します。src_fkには、対象のHubsとなるhub_nation.sqlhub_region.sqlのprimary keyであるNATION_PKREGION_PKを指定します。

{%- set source_model = ["v_stg_orders", "v_stg_inventory"] -%}
{%- set src_pk = "NATION_REGION_PK" -%}
{%- set src_fk = ["NATION_PK", "REGION_PK"] -%}
{%- set src_ldts = "LOAD_DATE" -%}
{%- set src_source = "RECORD_SOURCE" -%}

{{ dbtvault.link(src_pk=src_pk, src_fk=src_fk, src_ldts=src_ldts,
                 src_source=src_source, source_model=source_model) }}

Satellites

次に、Satellitesです。前述のHubsとLinksではキー情報しか持っていませんでしたが、Satellitesでは、キー以外のカラム全てを保持し、Slowly Changing Dimensionの方式で変更履歴まで保持します。

Satellitesは以下の要素を持ちます。

  • primary key
    • Satellitesテーブルにおけるprimary key。基本的にはnatural keyをハッシュ化した値
  • Hash Diff
    • 後述のpayloadとprimary keyを連結してハッシュ化した値。これにより、各レコードの値が1つでも変わったらHash Diffの値が変わることになるため、レコードの変更を検知出来ます
  • payload
    • キー情報ではない、具体的なデータを保持する。例えば、顧客データの場合には顧客キー以外の、氏名、生年月日、国籍、年齢、性別、などが該当する
  • effectivity date
    • 一般的にEFFECTIVE_FROMというカラム名で定義されるもの。各レコードの有効開始日を保持します
  • load date または load date timestamp
    • 各レコードが最初にロードされた日時
  • source for the record
    • 各レコードがどこから生まれたデータを示す

ここで、「effectivity date」と「load date」どちらも持つ必要があるの?と疑問に感じた方もいると思います。

dbtvaultのドキュメントからの引用ですが、「load date」は、レコードがデータベースにロードされた時刻です。「effectivity date」は、ビジネス上のイベントが発生してからレコードがロードのためにデータベースに到着するまでにバッチ処理の遅延がある場合、異なる値を保持する可能性があります。両方のカラムを持つことで、データ的にロードされたとき、ビジネス上の有効となったとき、をそれぞれ知ることが出来ます。

Satellitesのデプロイ

このWorked exampleのリポジトリでは、dbt_project.ymlにおいて各modelsのサブフォルダごとにタグ付けがされています。そのため、Satellitesに該当するオブジェクトを生成するには、以下のコマンドを実行すればOKです。

dbt run -s tag:satellite

models/raw_vaultフォルダ全体にmaterialized: incrementalが設定されているためテーブルが作成されますが、XSのウェアハウスでも数十秒で終わるかと思います。

ここで改めて、Linksに該当するテーブルを生成するdbtvaultのマクロdbtvault.satの使い方について確認してみます。

まず、dbtvault.satでは、7つのメタデータを使用します。

  • source_model
    • このmodelの生成元となるmodelを指定します。基本的には、作成するSatellitesに紐づくHubsの生成元となっているmodelを指定します
  • src_pk
    • source_modelで指定したmodelのうち、対象のSatellitesテーブルにおいてprimary keyとなるカラムを指定します
  • src_hashdiff
    • source_modelで指定したmodelのうち、対象のSatellitesテーブルのレコード変更を検知できるHash Diffのカラムを指定します
  • src_payload
    • source_modelで指定したmodelのうち、対象のSatellitesテーブルのキー以外の具体的なデータを持つカラムを指定します(複数指定可能)
  • src_eff
    • source_modelで指定したmodelのうち、対象のSatellitesテーブルの各レコードの有効開始日を持つカラムを指定します
  • src_ldts
    • source_modelで指定したmodelのうち、対象のSatellitesテーブルの各レコードのロード日時を持つカラムを指定します
  • src_source
    • source_modelで指定したmodelのうち、対象のSatellitesテーブルの各レコードのソースデータ名を持つカラムを指定します

この上で、1つのmodelを例に、どういった記述を行って、どんなデータが作られているかを見てみます。

sat_order_cust_nation_details.sqlは、CUSTOMERテーブルのNATIONKEYに紐づくNATIONテーブルの変更履歴を保持するためのSatellitesテーブルを生成します。

src_hashdiffには、v_stg_orders.sqlで定義したCUSTOMER_REGION_HASHDIFFを指定します。CUSTOMER_REGION_HASHDIFFは、NATIONテーブルのprimary keyであるCUSTOMER_NATION_KEYと、具体的なデータを保持するカラムであるCUSTOMER_NATION_NAMECUSTOMER_NATION_COMMENT、この3つのカラムを連結した値をHash化としたものです。

{%- set source_model = "v_stg_orders" -%}
{%- set src_pk = "CUSTOMER_PK" -%}
{%- set src_hashdiff = "CUSTOMER_NATION_HASHDIFF" -%}
{%- set src_payload = ["CUSTOMER_NATION_NAME", "CUSTOMER_NATION_COMMENT"] -%}
{%- set src_eff = "EFFECTIVE_FROM" -%}
{%- set src_ldts = "LOAD_DATE" -%}
{%- set src_source = "RECORD_SOURCE" -%}

{{ dbtvault.sat(src_pk=src_pk, src_hashdiff=src_hashdiff,
                src_payload=src_payload, src_eff=src_eff,
                src_ldts=src_ldts, src_source=src_source,
                source_model=source_model) }}

Transactional Links

最後に、Transactional Linksです。

Transactional Linksは、トランザクション処理されたデータやセンサーデータなど、一度処理が完了したら基本的に更新しないデータを管理するための概念です。Linksと同じレベルの概念になりますが、キー情報だけでなくトランザクションに関するデータも持つことが出来る概念です。

Transactional Linksは以下の要素を持ちます。

  • primary key
    • Transactional Linksテーブルにおけるprimary key。基本的には対象のトランザクション処理のIDやnumberを持つカラムが該当する。もしこういったIDやnumberがない場合には、外部キーを連結しハッシュ化したものをprimary keyとする
  • Foreign keys holding the primary key for each Hub referenced in the Transactional Link
    • Transaction Linksから参照する各Hubsのprimary keyを保持する(参照するHubsの数に応じて、2つ以上指定可能)
  • payload
    • キー情報ではない、金額、種類、日付、ハッシュ化されていない取引番号など、トランザクションそのものに関するデータ
  • effectivity date
    • 一般的にEFFECTIVE_FROMというカラム名で定義されるもの。各レコードで実際にトランザクションが行われた日時を保持する
  • load date または load date timestamp
    • 各レコードが最初にロードされた日時
  • source for the record
    • 各レコードがどこから生まれたデータを示す

Transactional Linksのデプロイ

このWorked exampleのリポジトリでは、dbt_project.ymlにおいて各modelsのサブフォルダごとにタグ付けがされています。そのため、Transactional Linksに該当するオブジェクトを生成するには、以下のコマンドを実行すればOKです。

dbt run -s tag:t_link

models/raw_vaultフォルダ全体にmaterialized: incrementalが設定されているためテーブルが作成されますが、XSのウェアハウスでも数秒で終わるかと思います。

ここで改めて、Linksに該当するテーブルを生成するdbtvaultのマクロdbtvault.t_linkの使い方について確認してみます。

まず、dbtvault.t_linkでは、7つのメタデータを使用します。必要なメタデータを見ると、dbtvault.linkdbtvault.satを組み合わせたイメージですね。

  • source_model
    • このmodelの生成元となるmodelを指定します。基本的には、作成するTransactional Linkの元となるHashed Staging Layerのmodelを指定します。
  • src_pk
    • source_modelで指定したmodelのうち、対象のTransactional Linksテーブルにおいてprimary keyとなるカラムを指定します
  • src_fk
    • source_modelで指定したmodelのうち、対象のTransactional LinksテーブルとつなげるHubsのprimary keyであるカラムを指定します
  • src_payload
    • source_modelで指定したmodelのうち、対象のTransactional Linksテーブルのキー以外の具体的なデータを持つカラムを指定します(複数指定可能)
  • src_eff
    • source_modelで指定したmodelのうち、対象のTransactional Linksテーブルの各レコードの有効開始日を持つカラムを指定します
  • src_ldts
    • source_modelで指定したmodelのうち、対象のTransactional Linksテーブルの各レコードのロード日時を持つカラムを指定します
  • src_source
    • source_modelで指定したmodelのうち、対象のTransactional Linksテーブルの各レコードのソースデータ名を持つカラムを指定します

この上で、1つのmodelを例に、どういった記述を行って、どんなデータが作られているかを見てみます。

t_link_transactions.sqlは、raw_transactions.sqlに対して各種ハッシュ値を追加したv_stg_transactions.sqlをベースにして、hub_customer.sqlhub_order.sqlの2つのHubsとつながるTransactional Linksのテーブルを生成します。

{%- set source_model = "v_stg_transactions" -%}
{%- set src_pk = "TRANSACTION_PK" -%}
{%- set src_fk = ["CUSTOMER_PK", "ORDER_PK"] -%}
{%- set src_payload = ["TRANSACTION_NUMBER", "TRANSACTION_DATE", "TYPE", "AMOUNT"] -%}
{%- set src_eff = "EFFECTIVE_FROM" -%}
{%- set src_ldts = "LOAD_DATE" -%}
{%- set src_source = "RECORD_SOURCE" -%}

{{ dbtvault.t_link(src_pk=src_pk, src_fk=src_fk, src_ldts=src_ldts,
                   src_payload=src_payload, src_eff=src_eff,
                   src_source=src_source, source_model=source_model) }}

load_dateを変えて挙動を確かめてみる

最後にload_date1992-01-08から1992-01-09に変えて、どういった変更があるかを見てみます。

この上で、dbt runを実行します。全てのmodelを実行するため、XSのウェアハウスの場合には1分程度かかると思います。

例えば、hub_customer.sqlを見ると、hub_customer__dbt_tmpというtemporary tableを作成したあと、hub_customerテーブルにinsertされているのがわかります。これは、models/raw_vaultフォルダ内のmodelは全てmaterialized: incrementalが指定されており、2回目以降の実行は差分がinsertされるためです。

dbtvaultで構築したスキーマに対するサンプルクエリ

これで一通りのモデリングを終えましたが、たくさんテーブルがあってどうクエリを書けばいいのかわかりづらいところもあると思います。

そこで私の自作ですが、Worked Exampleで生成されたテーブルを用いて、どういったクエリを書けば目的のデータが得られるか、例を載せておきます。

  • 元のPARTテーブルについて、最新のレコードがほしい
    • 記述するクエリ:partについて、HubsとSatellitesをJOINする。各Satellitesは、primary keyに対して最新のload_dateでフィルタリングする
// 「hub_part」に対して、「部品の詳細情報を持つSatellites」をJOINする
select 
  hub_part.partkey,
  sat_inv_part_details.part_name,
  sat_inv_part_details.part_mfgr,
  sat_inv_part_details.part_brand,
  sat_inv_part_details.part_type,
  sat_inv_part_details.part_size,
  sat_inv_part_details.part_name,
  sat_inv_part_details.part_container,
  sat_inv_part_details.part_retailprice,
  sat_inv_part_details.part_comment,
  sat_inv_part_details.effective_from,
  sat_inv_part_details.load_date
from 
  hub_part
inner join
  sat_inv_part_details
on
  hub_part.part_pk = sat_inv_part_details.part_pk
where
  sat_inv_part_details.load_date = (
    select
      max(sat_inv_part_details_2.load_date)
    from
      sat_inv_part_details as sat_inv_part_details_2
    where
      hub_part.part_pk = sat_inv_part_details_2.part_pk
  )
order by
  partkey;

  • 1992年1月9日時点の注文一覧について、どの国に在籍するどんな氏名の会員が注文しているか知りたい
    • 記述するクエリ:orderとcustomer、それぞれでHubsとSatellitesをJOINし、orderとcustomerはLinksを用いてJOINする。各Satellitesは、primary keyに対して1992年1月9日以前で最新のload_dateでフィルタリングする
// 「hub_order」に対して、「注文の詳細情報を持つSatellites」をJOINする
with order_19920109 as (
  select
    hub_order.order_pk,
    hub_order.orderkey,
    sat_order_order_details.orderstatus,
    sat_order_order_details.totalprice,
    sat_order_order_details.orderdate
  from
    hub_order
  inner join
    sat_order_order_details
  on
    hub_order.order_pk = sat_order_order_details.order_pk
  where
    sat_order_order_details.load_date = (
      select
        max(sat_order_order_details_2.load_date)
      from
        sat_order_order_details as sat_order_order_details_2
      where
        hub_order.order_pk = sat_order_order_details_2.order_pk
        and sat_order_order_details_2.load_date < '10-Jan-1992'
    )
), 
//「hub_customer」に対して、「ユーザーの氏名を持つSatellites」「ユーザーの国情報を持つSatellites」をJOINする
cust_nation_19920109 as (
  select
    hub_customer.customer_pk,
    sat_order_customer_details.customer_name,
    sat_order_cust_nation_details.customer_nation_name,
    sat_order_cust_nation_details.load_date
  from
    hub_customer
  inner join
    sat_order_customer_details
  on
    hub_customer.customer_pk = sat_order_customer_details.customer_pk
  inner join
    sat_order_cust_nation_details
  on
    hub_customer.customer_pk = sat_order_cust_nation_details.customer_pk
  where
    sat_order_customer_details.load_date = (
      select
        max(sat_order_customer_details_2.load_date)
      from
        sat_order_customer_details as sat_order_customer_details_2
      where
        hub_customer.customer_pk = sat_order_customer_details_2.customer_pk
        and sat_order_customer_details_2.load_date < '10-Jan-1992'
    )
    and sat_order_cust_nation_details.load_date = (
      select
        max(sat_order_cust_nation_details_2.load_date)
      from
        sat_order_cust_nation_details as sat_order_cust_nation_details_2
      where
        hub_customer.customer_pk = sat_order_cust_nation_details_2.customer_pk
        and sat_order_cust_nation_details_2.load_date < '10-Jan-1992'
    )
)
// linksである「link_customer_order」を介して、「hub_order」と「hub_customer」をJOINする
select
  order_19920109.orderkey,
  order_19920109.orderstatus,
  order_19920109.totalprice,
  order_19920109.orderdate,
  cust_nation_19920109.customer_name,
  cust_nation_19920109.customer_nation_name
from
  order_19920109
left join
  link_customer_order
on
  order_19920109.order_pk = link_customer_order.order_pk
left join
  cust_nation_19920109
on
  cust_nation_19920109.customer_pk = link_customer_order.customer_pk
order by
  orderkey;

dbtvaultで他にできること

今回はdbtvault公式のWorked Exampleを試しましたが、Data Vaultの他の概念を実装するためのdbtマクロがdbtvaultパッケージに含まれています。

これらのマクロがどういった時に役立つか、少し触れておきたいと思います。

例えば、前述のサンプルクエリの内容を見るとわかると思いますが、Data Vaultでモデリングした場合、JOIN句が多くなり、WHERE句もサブクエリを用いたものが複数必要で、クエリの内容が複雑になりがちです。

この問題に対して、WHERE句にサブクエリを複数回用いるところはPIT tablesを定義することでサブクエリの記述回数は1回で済みますし、JOIN句が多くなるところはBridge Tablesを定義することでJOIN対象のテーブルを減らすことが出来ます。

これらの概念が実装されるレイヤーはBusiness Vaultとも呼ばれます。Business Vault層では、汎用的な指標を算出するビジネスロジックをSatellitesとして事前定義することもあります。

流石にこれらの機能全てを1つの記事では取り組めない(現状、私も全て理解できていないw)ので、今後試してブログに出来たらなとは考えています…

最後に

dbtvaultのSnowflakeのサンプルデータ(TPC-H)を用いたWorked exmpleを試してみました。

今回は開発済のコードを持つリポジトリを使って実行しただけですが、Data Vaultのモデリングを自分で1からSQLで構築しようとすると、どこかしらでミスをするだろうな…と感じました。 その点dbtvaultがあれば、必要なメタデータを定義し、各種マクロにそのメタデータを渡すだけでData Vaultのモデリングが出来るため、機械的な実装ができると感じました。

一方で、1からSQLで構築する場合でも、dbtvaultを使う場合でも、モデリング対象のデータの整理がとても重要だと改めて感じました。データの整理をきちんと行った上でRaw Staging Layerでテーブルの定義を行い、Hashed Staging Layerで必要なハッシュ値を正しく定義する、この工程が大事ですね。

改めて、この記事を通してdbtvaultの理解に少しでもつながっていると嬉しいです!他のdbtvaultの機能のブログは、いずれ書く…はずですw