Snowflake & dbt Cloudハンズオン実践 #4: 『実践編1(ソース設定&ステージングモデル作成)』 #snowflakeDB #dbt

2024.01.31

アライアンス事業部 エンジニアグループ モダンデータスタック(MDS)チームの しんや です。

Snowflakeが展開しているサイト『Snowflake Quickstarts』では、Snowflake単体、またSnowflakeと他サービスとの連携について実戦形式で手を動かしながら学んでいけるコンテンツが多数公開されています。

その中の1つ『Accelerating Data Teams with Snowflake and dbt Cloud Hands On Lab(Snowflake と dbt Cloud ハンズオン ラボを使用してデータ チームを加速する)』は、dbt CloudとSnowflakeを連携させる形で、Snowflakeのデータを使ってdbt Cloudでデータ変換の処理を作り上げていく流れを学ぶことが出来る非常に参考になるコンテンツです。

当エントリ及び一連のエントリ群では、この一連の手順を実際に手を動かしながら進めた記録をまとめて行こうと思います。

第4弾の当エントリでは『実践編1(ソース設定&ステージングモデル作成)』パートについて実践内容を紹介します。

一連の内容を1本にまとめようとするとめちゃくちゃボリュームが大きくなる内容でしたので、以下の形でそれぞれ分けて紹介していこうと思います。
#1: Snowflake環境準備編 [Snowflake QuickStarts: Step01-04]
#2: dbt Cloud IDE探索編 [Snowflake QuickStarts: Step05]
#3: dbt Cloud 基本構造紹介編 [Snowflake QuickStarts: Step06]
#4: dbt Cloud 実践編1(ソース設定&ステージングモデル作成) [Snowflake QuickStarts: Step07]
#5: dbt Cloud 実践編2(シード&マテリアライゼーション) [Snowflake QuickStarts: Step08]
#6: dbt Cloud 実践編3(マートモデルの作成) [Snowflake QuickStarts: Step09]
#7: dbt Cloud 実践編4(テスト&ドキュメント) [Snowflake QuickStarts: Step10]
#8: dbt Cloud 実践編5(デプロイ) [Snowflake QuickStarts: Step11]
#9: Snowsightダッシュボード可視化編 [Snowflake QuickStarts: Step12]

目次

 

Step07. ソース(source)とステージング層モデル

このステップでは、ソースデータに関する設定と「ステージング層」のモデルについて見ていきます。

dbtのソース(source)を使用することで、、dbtプロジェクト内でデータウェアハウスの生データに名前を付けて記述することができ、生データから変換されたモデルへのリネージを確立することができます。メリットはそれだけでなく、dbtでソースを定義すると、変換後のモデルだけでなく、ソースに対しても同じテストとドキュメンテーションのベストプラクティスを適用することができます。

ステージング・モデルはソース・テーブルと1対1の関係にあります。ここで、名前の変更やキャストの変更、列をより使いやすい形式に変更するなどの簡単な変換を行います。

ステージング層のモデルを構築することで、下流のモデリングのための明確な基礎ができ、モジュール式のDRY(Don't Repeat Yourself)モデリングが可能になります。ロジックが変更された場合、同じサブクエリで5つのモデルを更新するのではなく、1つのモデルで更新することができます。

今回のステップ用のブランチを作成し、開発スタートです。

 

ソースの作成

ここではまず、TPCH_SF1スキーマの ordersテーブルと lineitemテーブルを対象として変換処理を行いたいと思います。この2テーブルに対するソース定義を作成します。

models/staging/tpch/フォルダ配下にtpch_sources.ymlというファイルを作成、以下の内容で保存。このファイルでは、データの取得元データベース (snowflake_sample_data)、スキーマ (tpch_sf1)、ビルドと変換を行うテーブルが定義されています。各テーブル名の下には、ソースに適用した説明とテストが記載されていますが、これについては後のテストとドキュメントのセクションで詳しく説明します。

models/staging/tpch/tpch_sources.yml

version: 2
sources:
  - name: tpch
    description: source tpch data
    database: snowflake_sample_data
    schema: tpch_sf1
    tables:
      - name: orders
        description: main order tracking table
        
        columns:
          - name: o_orderkey
            description: SF*1,500,000 are sparsely populated
            tests: 
              - unique
              - not_null

      - name: lineitem
        description: main lineitem table
        columns:
          - name: l_orderkey
            description: Foreign Key to O_ORDERKEY
            tests:
              - relationships:
                  to: source('tpch', 'orders')
                  field: o_orderkey

ステージング層のモデルを作成

次いで、上記で定義した2つのデータソースを参照する形でステージング・モデルを作成します。ステージング・モデルと対応するソース・テーブルの間には1対1のリレーションシップがあるため、ここでは2つのステージング・モデルを構築します。

ordersテーブルにおけるステージング層モデルをstg_tpch_orders.sqlというファイルで定義します。作成・配置するフォルダはmodels/staging/tpch/とします。
(相対パス:models/staging/tpch/stg_tpch_orders.sql)

ここで行ったことは、ソース関数を使用してソースデータをモデルに取り込み、カラムの名前を変更することだけです。

これは、このデータを参照する必要がある他のすべてのモデルの出発点として機能し、プロジェクト全体で命名が一貫した状態を維持します。

models/staging/tpch/stg_tpch_orders.sql

with source as (

    select * from {{ source('tpch', 'orders') }}

),

renamed as (

    select

        o_orderkey as order_key,
        o_custkey as customer_key,
        o_orderstatus as status_code,
        o_totalprice as total_price,
        o_orderdate as order_date,
        o_orderpriority as priority_code,
        o_clerk as clerk_name,
        o_shippriority as ship_priority,
        o_comment as comment

    from source

)

select * from renamed

2つ目のモデル(lineitem)に関しても上記同様の流れで作成します。
(相対パス:models/staging/tpch/stg_tpch_line_items.sql)

11〜14行目に記載しているdbt_utils.surrogate_keyは、マクロにリストされたカラムを使用してorder_item_keyと呼ぶハッシュ化されたサロゲートキーを作成します。このサロゲートキーを実装することで、一意なカラムを得ることができます。サロゲート・キー・マクロの動作の詳細については下記を参照してください。

models/staging/tpch/stg_tpch_line_items.sql

with source as (

    select * from {{ source('tpch', 'lineitem') }}

),

renamed as (

    select
    
        {{ dbt_utils.surrogate_key(
            ['l_orderkey', 
            'l_linenumber']) }}
                as order_item_key,
        l_orderkey as order_key,
        l_partkey as part_key,
        l_suppkey as supplier_key,
        l_linenumber as line_number,
        l_quantity as quantity,
        l_extendedprice as extended_price,
        l_discount as discount_percentage,
        l_tax as tax_rate,
        l_returnflag as return_flag,
        l_linestatus as status_code,
        l_shipdate as ship_date,
        l_commitdate as commit_date,
        l_receiptdate as receipt_date,
        l_shipinstruct as ship_instructions,
        l_shipmode as ship_mode,
        l_comment as comment

    from source

)

select * from renamed

 

ソース関数

ここで、各ステージング・モデルの最初のバイトで、生のデータ・ソースを参照するために使用しているソース関数(source function)について説明します。

ハードコードされたデータベース参照の代わりにソース関数を使用する理由はいくつかありますが、ここで注目すべき理由の1つは、ソースデータベースオブジェクトとステージングモデルの間に依存関係を作成することです。ここでソース関数を使った定義をしておくことで、後述の情報参照、またハンズオン自体の後半でデータ・リネージを見ていく際に非常に重要になります。ソースを定義し、ソース関数で参照することで、ソースの上に構築するプロジェクトの他のモデルと同様に、ソースをテストし、文書化することができます。また、ソースがデータベースやスキーマを変更した場合、そのソースが使用される可能性のあるすべてのモデルを更新するのではなく、tpch_sources.ymlファイルを更新するだけで済むようになります。

 

ステージング層モデルのビルド

上記手順でステージング層のモデルの作成が出来たので、モデルを実行してみます。コマンド:dbt runで、プロジェクト内のすべてのモデル(2つの新しいステージングモデルと既存のサンプルモデルを含む)を実行します。正常に完了し、実行結果のすべてのモデルの横に緑色のチェックマークが表示されればOKです。

ここで大事なことは、モデルを再実行しても出力は最初の実行後とまったく同じになるということです。これはべき等プロセス(idempotent process)の例であり、プロセスが何回実行されても、プロセスの各実行後の出力は同じであることを意味します。dbtの重要な考え方の一つは、べき等なワークフローを維持することです。

Snowflakeの環境側でデータベース要素の確認をしてみます。新しいモデルに対応する要素が作成されていることが確認出来ました。

ちなみにこの時点での要素を可視化したリネージは以下のような状態であることが確認出来ます。ソースに関する情報が2つあり、それぞれのソースを参照したモデルが作成されている、という状況ですね。

以上で、当該ステップは完了です。コミット&プッシュで編集結果をGitリポジトリに反映しておきます。

 

まとめ

という訳で、Snowflake QuickStarts『Accelerating Data Teams with Snowflake and dbt Cloud Hands On Lab』実践第4弾、dbt Cloud実践編1(ソース設定&ステージングモデル作成)に関する内容の紹介でした。

次エントリ(第5弾)ではdbt Cloudでのモデル作成実践編その2(シーズ&マテリアライゼーション)に関するパートを進めていきます。