Snowflake×dbtを試してみた~Part3:パイプライン構築編その1~ #SnowflakeDB #dbt

2021.12.21

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

※本エントリは、Snowflakeをより使いこなそう! Advent Calendar 2021の21日目の記事となります。

さがらです。

Snowflake公式のdbtと連携した時の機能を一通り試すことが出来るQUICKSTARTSに関して試してみた内容をまとめていきます。※この記事は「Part3:パイプライン構築編その1」となります。

この記事の内容について

Snowflake公式のQUICKSTARTSに、Accelerating Data Teams with dbt & SnowflakeというSnowflakeとdbtを組み合わせたときの利点を一通り試すことが出来るクイックスタートがあります。

こちらの内容について、以下の合計5本の構成で試してみた内容を書いていきます。

  • Part1:基本設定編
    • 「1.Overview」から「5. dbt Configuration」
  • Part2:Project設定編
    • 「6. dbt Project Configuration」
  • Part3:パイプライン構築編その1 ※この記事です。
    • 「7. Building dbt Data Pipelines」から「9. dbt pipelines - Intermediate」
  • Part4:パイプライン構築編その2 ※12/22に公開予定、公開後にリンクも正常に動作します。
    • 「10. dbt pipelines - Seeds」から「12. dbt pipelines - Facts」
  • Part5:テスト&Doc&デプロイ編 ※12/23に公開予定、公開後にリンクも正常に動作します。
    • 「13. dbt pipelines - Tests & Docs」から「16. Appendix」

この記事では、「パイプライン構築編その1」ということで「7. Building dbt Data Pipelines」から「9. dbt pipelines - Intermediate」の内容についてまとめていきます。

7. Building dbt Data Pipelines

ここでは、dbtを用いてどのようなパイプラインを構築していくかについて説明されています。公式の英文を引用すると、以下の4つに関するTransformを行うパイプラインを構築すると書いてあります。

  • Stock trading history(株式取引履歴)
  • Currency exchange rates(為替レート)
  • Trading books(取引帳簿)
  • Profit & Loss calculation(損益計算)

8. dbt pipelines - Sources & Staging

ここでは、使用するテーブルの定義と、今後のTransformのベースとなるステージングモデルを定義していきます。

使用するテーブルの定義

dbt sourcesを定義していきます。

sourcesを定義することで、記述するselect文のfrom句で{{ source(~~) }}と記述することで参照するテーブルを指定できるようになります。

まず、定義用のファイルを作ります。

先に作成していたmodels/staging/knoemaフォルダの配下に、knoema_sources.ymlというファイルを作成します。使用するデータとしては、日次の為替レートと日次の米国取引履歴に関するテーブルです。

ファイルが出来たら、以下の内容をコピーして貼り付けし、右上のsaveを押します。

※これは補足説明ですが先頭でversion: 2と記述しているのは、yamlファイルの構造に大きな変化がありその変化点がversion2のため、version:2の構造を使用することを明示的にしていることが理由です。詳細はこちらのDocをご確認ください。

version: 2

sources:
  - name: knoema_economy_data_atlas
    database: knoema_economy_data_atlas
    schema: economy
    tables:
      - name: exratescc2018
      - name: usindssp2020

ここで一度コミットしておきます。コミットメッセージはset up knoema source fileと入れておきます。※内容はsourceファイルを定義した、ということがわかれば何でもOKです。

ステージングモデルの定義(1つ目)

続いて、ステージングモデルの定義をしていきます。

ステージングモデルは、それが表すソースのテーブルと一対一の関係を持ちます。テーブルの粒度は同じですが、カラムは何らかの方法で名前を変えたり、キャストし直したり、一貫性のある有用なフォーマットに従うような処理を施しています。これらを最初に作成しておくことで、より複雑な変換を構築するための一貫した基盤を構築することができます。

ステージングモデルに関するdbtの見解については、こちらのページも併せてご確認ください。

では、ステージングモデルを作っていきます。まずはexratescc2018テーブルに関して作成します。

models/staging/knoemaフォルダの配下に、stg_knoema_fx_rates.sqlというファイルを作ります。

ファイルが出来たら、下記の内容をコピーして貼り付けして右上のsaveを押します。

with source as (
 
    select * from {{source('knoema_economy_data_atlas','exratescc2018') }}
 
), 
 
renamed as (
 
select 
 
    "Currency" as currency,
    "Currency Unit" as currency_unit,
    "Frequency" as frequency,
    "Date" as exchange_date,
    "Value" as exchange_value,
    "Indicator" as indicator,
    "Indicator Name" as indicator_name,
    'Knoema.FX Rates' as data_source_name
 
from source 
 
) 
 
select * from renamed

この上で、compileボタンを押してください。すると、stg_knoema_fx_rates.sqlで記述した{{source('knoema_economy_data_atlas','exratescc2018') }}knoema_economy_data_atlas.economy.exratescc2018に変化していることがわかると思います。

これが、事前に作成したknoema_sources.ymlファイルの効力です。

ここで、sourceを事前に定義することによるメリットについて2つ触れておきます。

まず1つ目、Lineageを押すと、対象のmodelがsourceとどう関連しているかをDAGの形式で表示してくれます。

続いて2つ目、見て頂いたとおりコードを変更することなく動的に参照先を変更できるため、開発環境と本番環境でデータベースやスキーマを分けていた場合でも同じコードを使用することが出来ますし、target jinja関数などを使用すれば、条件に沿って使用するデータベースを定義することも可能です。

ステージングモデルの定義(2つ目)

続いて、usindssp2020テーブルに関するステージングモデルを作成していきます。

models/staging/knoemaフォルダの配下に、stg_knoema_stock_history.sqlというファイルを作ります。

ファイルが出来たら、下記の内容をコピーして貼り付けして右上のsaveを押します。

with source as (

    select * from {{source('knoema_economy_data_atlas','usindssp2020')}}
), 

renamed as (

    select 

        "Company" as company,
        "Company Name" as company_name,
        "Company Symbol" as company_symbol,
        "Stock Exchange" as stock_exchange,
        "Stock Exchange Name" as stock_exchange_name,
        "Indicator" as indicator,
        "Indicator Name" as indicator_name,
        "Units" as units,
        "Scale" as scale, 
        "Frequency" as frequency, 
        "Date" as stock_date,
        "Value" as stock_value,
        'Knoema.Stock History' as data_source_name 

    from source 

) 

select * from renamed

定義したステージングモデルの実行

ここまでに2つのステージングモデルの定義を行ったので、早速実行してビューを作ってみましょう。

コマンドラインでdbt run -m staging.*と入力し、実行してください。-mの後に実行したいmodelがあるフォルダを指定して.*を付けると、その配下にあるmodelだけを実行してくれます。

実行後、下図のようにログが表示されればOKです。

正しく実行できたならば、その状態で一度Commitしておきましょう。コミットメッセージは任意ですが、set up staging modelsのように「ステージングモデルを定義したよ」ということがわかるように指定すると良いと思います。

続いて、モデルを実行して作られたビューに対してクエリを実行してみます。

下図の手順に沿って、新しいStatementを追加し、下記SQLを貼り付けてスキーマ名を自身のものに変更しPreviewを押してみてください。

select * 
  from pc_dbt_db.<dev_schema>_staging.stg_knoema_stock_history
 where company_symbol ='AAPL' 
   and stock_date ='2021-03-01'

現在のデータを見ると、終値(Close)、始値(Open)、高値(High)、安値(Low)が別の行として定義されています。

このため、下図のように列で分けて見ることができるように、次章以降で変換処理を構築していきます。

9. dbt pipelines - Intermediate

ここでは、目的のテーブルを出力するための中間テーブル/ビューを構築するmodelの定義と、自動生成されるDocについても少し確認していきます。

ピボットを用いた中間modelの定義

まず、新しいファイルを作ります。フォルダも併せて作るため、任意のフォルダからNew Fileを押して、入力欄を全てmodels/marts/core/intermediate/int_knoema_stock_history.sqlに書き換えた上で、ファイルを作成してください。

作成したファイルに対して、以下のクエリをコピーして貼り付けし、右上のsaveを押してください。

ここでは、dbt_utilspivotマクロを使って、データセットを行から列に転置する処理を書いています。また、列の値を動的にリストアップするために、get_columnsという別のdbt_utilsのマクロをネストしています。

with stock_history as (

    select * from {{ ref('stg_knoema_stock_history') }}
        where indicator_name in ('Close', 'Open','High','Low', 'Volume', 'Change %') 

),

pivoted as (

    select 
        company_symbol, 
        company_name, 
        stock_exchange_name, 
        stock_date, 
        data_source_name,
        {{ dbt_utils.pivot(
      column = 'indicator_name',
      values = dbt_utils.get_column_values(ref('stg_knoema_stock_history'), 'indicator_name'),
      then_value = 'stock_value'
            ) }}
    
    from stock_history
    group by company_symbol, company_name, stock_exchange_name, stock_date, data_source_name

)

select * from pivoted

この上でCompileボタンを押すと、このマクロがどういったSQLに変換されているかがわかります。具体的には、case文を駆使して該当する場合はその行のstock_valueの値を持ってきて、行数を圧縮するためにSUMで集計しているようですね。

続いて、dbt_utilsマクロを使わずにコードを書くとどうなるかを見てみましょう。同じフォルダに新規ファイルを作成します。任意のフォルダからNew Fileを押して、入力欄を全てmodels/marts/core/intermediate/int_knoema_stock_history_alt.sqlに書き換えた上で、ファイルを作成してください。

こちらについても作成したファイルに対して、以下のクエリをコピーして貼り付けし、右上のsaveを押してください。

こちらは、Snowflakeのpivot関数を使用した場合のクエリになります。

with stock_history as (
 
    select * from {{ ref('stg_knoema_stock_history') }}
        where indicator_name IN ('Close', 'Open','High','Low', 'Volume', 'Change %') 
 
),
 
pivoted as (
 
    select * from stock_history 
        pivot(sum(stock_value) for indicator_name in ('Close', 'Open','High','Low', 'Volume', 'Change %')
            ) as 
            p(
                company_symbol, 
                company_name, 
                stock_exchange_name, 
                stock_date, 
                data_source_name, 
                close,
                open,
                high,
                low,
                volume,change
            ) 
)
 
select * from pivoted

ここで、「dbt_utilspivotマクロ」と「Snowflakeのpivot関数」2つのパターンについてクエリを見てきました。いろいろな見方があると思いますが、以下の理由から「dbt_utilspivotマクロ」の方が汎用性が高いと思います。

  • pivotマクロとget_columnsマクロを組み合わせる場合、引数に対象列、model名、参照する値を持つ列、を入れるだけでよい
  • Snowflakeの関数の場合、値を1つ1つ指定しないといけないが、dbtのpivotマクロの場合は動的に値を取得してピボットしてくれる

ref()について

先程のピボットに関するクエリの中で、select * from {{ ref('stg_knoema_stock_history') }}のように、ref()という記述があったと思います。このref()を使用することにより、動的に参照するテーブルを変えることが出来ます。

dbtでmodelを記述する際は、基本的に参照先のテーブル名を直接指定することはバッドプラクティスです。ref()やsource()を使用しましょう!

ピボットを用いた中間modelの実行

先程作成したmodelを実行してみます。

コマンドラインで、dbt run -m +int_knoema_stock_historyと入力し実行しましょう。+を入れておくことで、対象のmodelの親modelも併せて実行されます。

実行後、下図のように親modelも実行されていることがわかると思います。

ここで、modelを介して作られたテーブルに対してクエリを発行してみましょう。dbt Cloud上、あるいはSnowflake上で、下記のクエリをコピーして、スキーマ名を自身のものに変更した上で実行してみてください。

SELECT * 
  FROM pc_dbt_db.<dev_schema>_marts.int_knoema_stock_history
 WHERE company_symbol = 'AAPL'
   AND stock_date = '2021-03-01'

狙い通り、1行でClose、Low、Open、Highが並んでいればOKです!

為替レートに関する中間modelの定義

続いて、為替レートに関する中間modelを定義していきます。

まず、新しいファイルを作成します。New Fileを押して、models/marts/core/intermediate/int_fx_rates.sqlに書き換えた上で新しいファイルを作成します。

ファイルが出来たら、以下のクエリをコピーして貼り付けて、右上のsaveを押しましょう。

{{ 
config(
    materialized='view', 
      tags=["hourly"]
    ) 
}}
 
select * from {{ ref('stg_knoema_fx_rates') }} 
 where indicator_name = 'Close' 
   and frequency = 'D' 
   and exchange_date > '2016-01-01'

この貼り付けたコードで見るべきはconfigとその中身です。

まずconfigについて、このmodelに対するオプションを設定することが出来ます。

  • materialized:対象のmodel実行後、どのオブジェクトで生成するかを定義する。dbt_project.ymlでも定義がされている場合には、このmodel内でのconfigの内容が優先されます。
  • tags:対象のmodelに対してタグを付与できます。タグは、プロジェクトのうちの一部を実行するために用いたり、modelをグループ化する際に便利です。

configについての設定は、こちらのDocも併せてご覧ください。

為替レートに関する中間modelの実行

ということで、早速定義した中間modelを実行してみます。

コマンドラインで、dbt run -m tag:hourlyと入力して実行してください。-m tag:で、指定したtagに関係するmodelだけを実行可能です。

実行すると、先程タグで指定したmodelだけ実行されているのがわかると思います。

為替とトレード履歴を併せるmodelの定義

続いて、これまでに作成した2つのmodelを用いたmodelを作っていきます。

まず、ファイルを作成します。New Fileを押して、models/marts/core/intermediate/int_stock_history_major_currency.sqlに書き換えた上で新しいファイルを作成します。

続いて、下記のクエリをコピーして貼り付けし、右上のsaveを押します。

with
stock_history as (
    select * from {{ ref('int_knoema_stock_history')}}
),
 
fx_rates as (
    select * from {{ ref('int_fx_rates') }}
),
 
fx_rates_gdp as (
    select * from fx_rates
        where currency = 'USD/GBP'   
),
 
fx_rates_eur as (
    select * from fx_rates
        where currency = 'USD/EUR' 
),
 
joined as (
    select 
        stock_history.*,
        fx_rates_gdp.exchange_value * stock_history."Open" as gbp_open,       
        fx_rates_gdp.exchange_value * stock_history."High" as gbp_high,     
        fx_rates_gdp.exchange_value * stock_history."Low" as gbp_low,   
        fx_rates_gdp.exchange_value * stock_history."Close" as gbp_close,     
        fx_rates_eur.exchange_value * stock_history."Open" as eur_open,       
        fx_rates_eur.exchange_value * stock_history."High" as eur_high,     
        fx_rates_eur.exchange_value *stock_history."Low" as eur_low,
        fx_rates_eur.exchange_value * stock_history."Close" as eur_close    
    from stock_history
    left join fx_rates_gdp on stock_history.stock_date = fx_rates_gdp.exchange_date
    left join fx_rates_eur on stock_history.stock_date = fx_rates_eur.exchange_date
)
 
select * from joined

続いて、このmodelについて親modelと併せて実行してみます。 コマンドラインにdbt run --model +int_stock_history_major_currencyと入力し、実行してください。

親モデル含めて、5つのモデルが実行されていればOKです!

ドキュメントの生成

dbtは、Snowflakeのinformation_schemaと同様にdbtプロジェクトの情報を取り込んで、各種modelやデータの情報を持つ静的なWebページを生成することができます。このプロジェクトに関するドキュメント、と言ってもよいでしょう。

このドキュメントは、各カラム、タグ、テストだけでなく、ソースコードに関するすべての重要な情報が含まれているため、社内のチームと情報を共有するためにはとても優れています。インタラクティブなDAGを提供するので、modelのリネージ全体像を見ることができます。

更に、コマンド一つで生成可能なため、運用負荷もかかりません。

では、早速ドキュメントを生成していきましょう。

コマンドラインでdbt docs generateと入力し、実行してください。

完了したら、画面左上のview docsが押せるようになったはずです。こちらを押してみてください。

すると、対象のプロジェクトに関するドキュメントが別タブで立ち上がると思います。

下図のようにmodelごとのカラム情報や所有者、テーブルのサイズを確認したり、右下のアイコンを押すことで、リネージを確認することも可能です。

おまけ:9章までの内容のコミット

これは公式ページ上には載ってないのですが、中間モデルを生成したということで一度Commitしておきましょう。

次回

Snowflakeをより使いこなそう! Advent Calendar 2021、次回の22日目では、「Snowflake×dbtを試してみた~Part4:パイプライン構築編その2~」というタイトルで執筆します。お楽しみに!