Snowflake×dbtを試してみた~Part4:パイプライン構築編その2~ #SnowflakeDB #dbt

2021.12.22

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

※本エントリは、Snowflakeをより使いこなそう! Advent Calendar 2021の22日目の記事となります。

さがらです。

Snowflake公式のdbtと連携した時の機能を一通り試すことが出来るQUICKSTARTSに関して試してみた内容をまとめていきます。※この記事は「Part4:パイプライン構築編その2」となります。

この記事の内容について

Snowflake公式のQUICKSTARTSに、Accelerating Data Teams with dbt & SnowflakeというSnowflakeとdbtを組み合わせたときの利点を一通り試すことが出来るクイックスタートがあります。

こちらの内容について、以下の合計5本の構成で試してみた内容を書いていきます。

この記事では、「パイプライン構築編その2」ということで「10. dbt pipelines - Seeds」から「12. dbt pipelines - Facts」の内容についてまとめていきます。

10. dbt pipelines - Seeds

ここでは、トレーディングブックに関するcsv形式でデータを2つ用意し、dbt seed機能を使ってアップロードしていきます。ちなみにトレーディングブックの内容としては、AAPL株を売買し、支払/受取の現金を異なる通貨(USDとGBP)で記録しています。

csvデータの準備

まず、csvのデータを2つ作成していきます。

New Fileから、名前をseeds/manual_book1.csvに書き換え、新しく作成してください。※公式のページでは「data」という新しいフォルダを定義していましたが、この手順通りだと後のdbt seedコマンドが正しく動かないため注意してください。

作成したmanual_book1.csvに対して、以下の内容をコピーして貼り付け、右上のsaveを押してください。

Book,Date,Trader,Instrument,Action,Cost,Currency,Volume,Cost_Per_Share,Stock_exchange_name
B2020SW1,2021-03-03,Jeff A.,AAPL,BUY,-17420,GBP,200,87.1,NASDAQ
B2020SW1,2021-03-03,Jeff A.,AAPL,BUY,-320050,GBP,3700,86.5,NASDAQ
B2020SW1,2021-01-26,Jeff A.,AAPL,SELL,52500,GBP,-500,105,NASDAQ
B2020SW1,2021-01-22,Jeff A.,AAPL,BUY,-100940,GBP,980,103,NASDAQ
B2020SW1,2021-01-22,Nick Z.,AAPL,SELL,5150,GBP,-50,103,NASDAQ
B2020SW1,2019-08-31,Nick Z.,AAPL,BUY,-9800,GBP,100,98,NASDAQ
B2020SW1,2019-08-31,Nick Z.,AAPL,BUY,-1000,GBP,50,103,NASDAQ

続いて、もう1つのcsvデータを定義していきます。

New Fileから、名前をseeds/manual_book2.csvに書き換え、新しく作成してください。

作成したmanual_book2.csvに対して、以下の内容をコピーして貼り付け、右上のsaveを押してください。

Book,Date,Trader,Instrument,Action,Cost,Currency,Volume,Cost_Per_Share,Stock_exchange_name
B-EM1,2021-03-03,Tina M.,AAPL,BUY,-17420,EUR,200,87.1,NASDAQ
B-EM1,2021-03-03,Tina M.,AAPL,BUY,-320050,EUR,3700,86.5,NASDAQ
B-EM1,2021-01-22,Tina M.,AAPL,BUY,-100940,EUR,980,103,NASDAQ
B-EM1,2021-01-22,Tina M.,AAPL,BUY,-100940,EUR,980,103,NASDAQ
B-EM1,2019-08-31,Tina M.,AAPL,BUY,-9800,EUR,100,98,NASDAQ

dbt seedによるデータの取り込み

続いて、先程定義した2つのcsvのデータをSnowflakeに対してロードしていきます。

コマンドラインにdbt seedと入力し、実行してください。

※補足として、このdbt seedで数十万行のデータを読み込むことは可能ですが、大規模なデータセット用に作成されたものではないので、そのような場合はCOPY/SnowpipeなどSnowflakeで推奨されている方法でロードを行ってください。

下図のように表示されれば、実行完了です。

実際のSnowflakeでテーブルの有無を確認してみると、ユーザーのデフォルトのスキーマに対してロードされていることがわかります。

無事にロードが出来たら、今の状態をCommitしておきましょう

ロードしたデータをUNIONするmodelの定義・実行

続いて、dbt seedによりロードした2つのデータをUNIONするmodelを定義していきます。

New Fileから、名前をmodels/marts/core/intermediate/int_unioned_book.sqlに書き換えて、新しくファイルを作成します。

作成したint_unioned_book.sqlファイルに対して、以下のクエリをコピーして貼り付け、右上のsaveを押します。

with 
unioned as (
    {{ dbt_utils.union_relations(
        relations=[ref('manual_book1'), ref('manual_book2')]
    ) }}
 
),
 
renamed as (
    select      
        Book,
        Date as book_date,
        Trader,
        Instrument,
        Action as book_action,
        Cost,
        Currency,
        Volume,
        Cost_Per_Share,
        Stock_exchange_name
    from unioned 
)
 
select * from renamed

ここで、一度Compileを押してみてください。下図のようにUNIONが行われているクエリになっていることがわかると思います。

このUNION処理は、貼り付けたクエリのdbt_utils.union_relations()マクロにより作られています。このマクロに対して、UNIONしたいテーブルを指定するだけで、カラム名や型を考慮し、自動でUNIONを行うクエリに変換してくれます。

最後に、このmodelを実行しておきましょう。

コマンドラインでdbt run -m int_unioned_bookと入力し、実行してください。

実行後にSnowflakeでも確認してみると、適切にテーブルが作られていることがわかると思います。

11. dbt pipelines - Intermediate Part 2

ここでは、また別の中間modelを2つ定義していきます。

内容として、株式の購入・売却の記録に関する取引活動のログ情報はすでにあるのですが、株式が保有されている間の情報を日次で保持し、よりパフォーマンスを計測・確認しやすくするためのデータ生成を行うmodelを定義していきます。

1つ目のmodel定義

まず1つ目のmodelを定義していきます。

New Fileから、名前をmodels/marts/core/intermediate/int_daily_position.sqlに書き換えて、新しいファイルを作成します。

作成したら、下記のクエリをコピーして貼り付けて、右上のsaveを押します。

with 
stock_history as (
    select * from {{ ref('int_stock_history_major_currency') }} 
), 

unioned_book as (
    select * from {{ ref('int_unioned_book') }}
),

cst_market_days as (
    select distinct stock_date
        from stock_history
        where stock_history.stock_date >= (select min(book_date) as min_dt from  unioned_book)
),

joined as (
    select 
        cst_market_days.stock_date,
        unioned_book.trader,
        unioned_book.stock_exchange_name,
        unioned_book.instrument,
        unioned_book.book,
        unioned_book.currency,
        sum(unioned_book.volume) as total_shares
    from cst_market_days
    inner join unioned_book on unioned_book.book_date = cst_market_days.stock_date
    where unioned_book.book_date <= cst_market_days.stock_date
    {{ dbt_utils.group_by(6) }}
)

select * from joined

このクエリの途中で、dbt_utils.group_byというマクロを使用しています。通常GROUP BY句を書くと、対象のカラム名を全て指定しないといけないのですが、このマクロを使用することでSELECT句内の上から順番にマクロの引数で指定した数だけカラム名を取得するので、クエリのコード量を削減することが出来ます。

2つ目のmodel定義

New Fileから、名前をmodels/marts/core/intermediate/int_daily_position_with_trades.sqlに書き換えて、新しいファイルを作成します。

このint_daily_position_with_trades.sqlファイルに対して下記のクエリをコピーして貼り付けて、右上のsaveを押してください。

with unioned_book as (
    select * from {{ ref('int_unioned_book') }}
),
 
daily_position as (
    select * from {{ ref('int_daily_position') }}
),
 
unioned as (
    select 
        book,
        book_date,
        trader,
        instrument,
        book_action,
        cost, 
        currency,
        volume, 
        cost_per_share, 
        stock_exchange_name,
        sum(unioned_book.volume) 
            over(
                partition by 
                    instrument, 
                    stock_exchange_name, 
                    trader 
                order by 
                    unioned_book.book_date rows unbounded preceding) 
                        as total_shares
    from unioned_book  
 
    union all 
 
    select  
        book,
        stock_date as book_date, 
        trader, 
        instrument, 
        'HOLD' as book_action,
        0 as cost,
        currency, 
        0 as volume, 
        0 as cost_per_share,
        stock_exchange_name,
        total_shares
    from daily_position
    where (book_date,trader,instrument,book,stock_exchange_name) 
        not in 
        (select book_date,trader,instrument,book,stock_exchange_name
            from unioned_book
        )
)
 
select * from unioned

定義したmodelの実行

コマンドラインでdbt run --models int_unioned_book+と入力して、これまでに作成したmodelを実行します。+をmodel名の後ろにつけることで、対象のmodelの子に位置するmodelが順番にまとめて実行されます。

実行後、下図のように3つのmodelが実行されていればOKのです!

Commitと作成したデータへのクエリ

まずここまでの内容を一度Commitしておきましょう。私はset up intermediate model for daily performance analysisと入れておきました。

この上で、新しいStatementを追加し、下記SQLを貼り付けてスキーマ名を自身のものに変更し、Previewを押してみてください。

select * 
from pc_dbt_db.<dev_schema>_marts.int_daily_position_with_trades
where trader = 'Tina M.'
order by  book_date

下図のように、日々の売買実績の合計がTOTAL_SHAREとして確認できていればOKです!dbtでのデータ変換を通して、より実用性の高いデータを整えることが出来ました。

12. dbt pipelines - Facts

ここでは、最終的にBIツールなどで使用されるmodelを定義していきます。

11章までの作業で、取引履歴と株価の履歴に関するmodelを定義してきました。これらのmodelを活用して、Market ValueとPnL(損益のこと)が時間とともにどのように変化したかを示すモデルを作っていきましょう。

modelの定義

New Fileを押し、名前をmodels/marts/core/fct_trading_pnl.sqlに書き換えて新しくファイルを作成します。

これまで中間modelの定義はmodels/marts/core/intermediate/で行ってきましたが、このmodelはmodels/marts/core/の直下に定義していきます。coreの名の通り、これはビジネス上で使用するコアとなるデータを出すためのロジックを定義しているmodelであるためです。

fct_trading_pnl.sqlファイルが出来ましたら、下記のクエリをコピーして貼り付けて、右上のsaveを押します。

{{ 
config(
      tags = 'core'
      ) 
}}


with
daily_positions as (
    select * from {{ ref('int_daily_position_with_trades' )}}

),

stock_history as (
    select * from {{ ref('int_stock_history_major_currency') }}

),

joined as (
    select 
        daily_positions.instrument, 
        daily_positions.stock_exchange_name, 
        daily_positions.book_date, 
        daily_positions.trader, 
        daily_positions.volume,
        daily_positions.cost, 
        daily_positions.cost_per_share,
        daily_positions.currency,
        sum(cost) over(
                partition by 
                    daily_positions.instrument, 
                    daily_positions.stock_exchange_name, 
                    trader 
                order by
                    daily_positions.book_date rows unbounded preceding 
                    )
                as cash_cumulative,
       case when daily_positions.currency = 'GBP' then gbp_close
            when daily_positions.currency = 'EUR' then eur_close
            else 'Close'
       end AS close_price_matching_ccy, 
       daily_positions.total_shares  * close_price_matching_ccy as market_value, 
       daily_positions.total_shares  * close_price_matching_ccy + cash_cumulative as PnL
   from daily_positions
   inner join stock_history 
      on daily_positions.instrument = stock_history.company_symbol 
     and stock_history.stock_date = daily_positions.book_date 
     and daily_positions.stock_exchange_name = stock_history.stock_exchange_name
)

select * from joined

ファイルの保存ができたら、コマンドラインで実行してみましょう!

コマンドラインでdbt run --m fct_trading_pnlと入力し、実行してみます。(公式ページ上のコマンドdbt run -m fct_trading_pnl.sqlではファイルの拡張子まで含んでおり、正しく実行できません。)

増分更新を適用したmodelの定義

dbtでは中間modelを定義すること、テーブル作成時のクエリ時間を削減することが出来ます。

しかし、dbtでは増分更新を適用することが出来ます。これにより、modelが対象とするテーブルに対し追加されたレコードだけを追記していく処理になるため、都度テーブルの作り直しが発生しません。

dbtでの増分更新の説明と使い所については、下記の記事にまとまっていますので併せてご覧ください。

では、増分更新を適用させたmodelを作っていきましょう。

New Fileを押し、名前をmodels/marts/core/fct_trading_pnl_incremental.sqlに書き換えて新しくファイルを作成します。

fct_trading_pnl_incremental.sqlファイルが出来ましたら、下記のクエリをコピーして貼り付けて、右上のsaveを押します。

{{ 
config(
      materialized='incremental',
      unique_key= 'pk_key',
      tags = 'core'
      ) 
}}
 
 
with 
daily_positions as (
    select * from {{ ref('int_daily_position_with_trades' )}}
),
 
stock_history as (
    select * from {{ ref('int_stock_history_major_currency') }}
),
 
joined as (
    select 
 
        daily_positions.instrument, 
        daily_positions.stock_exchange_name, 
        daily_positions.book_date, 
        daily_positions.trader, 
        daily_positions.volume,
        daily_positions.cost, 
        daily_positions.cost_per_share,
        daily_positions.currency,
        sum(cost) over(
                partition by 
                    daily_positions.instrument, 
                    daily_positions.stock_exchange_name, 
                    trader 
                order by
                    daily_positions.book_date rows unbounded preceding 
                    )
                as cash_cumulative,
       case when daily_positions.currency = 'GBP' then gbp_close
            when daily_positions.currency = 'EUR' then eur_close
            else 'Close'
       end as close_price_matching_ccy, 
       daily_positions.total_shares  * close_price_matching_ccy as market_value, 
       daily_positions.total_shares  * close_price_matching_ccy + cash_cumulative as PnL
   from daily_positions
   inner join stock_history 
      on daily_positions.instrument = stock_history.company_symbol 
     and stock_history.stock_date = daily_positions.book_date 
     and daily_positions.stock_exchange_name = stock_history.stock_exchange_name
),
 
primary_key as (
 
    select 
 
        {{ dbt_utils.surrogate_key([
                'trader', 
                'instrument', 
                'book_date', 
                'stock_exchange_name',
                'PnL', 
            ]) }} as pk_key,
                *
 
    from joined 
)
 
select * from primary_key
 
{% if is_incremental() %}
  -- this filter will only be applied on an incremental run
   where book_date > (select max(book_date) from {{ this }})
 
{% endif %}

増分更新対応でどういった内容を変更したか少し触れておきます。

configunique_keyを追加しています。このキーの指定により、ターゲットテーブルの既存の行のユニークキーが、増分対象で変換されたレコードのいずれかと一致する場合、ターゲットテーブルの既存のレコードは更新されます。これにより、ソースデータ内の 1 つの行に対してターゲットテーブル内に複数の行が存在することがなくなります。

また、末尾のif is_incremental()で囲まれているクエリについては、増分更新として実行されているときのみ実行されるようになります。増分更新の時に限り、book_dateがより新しい日付であるレコードだけに絞り込む、ということを行っています。

では、定義した増分更新のmodelを実行してみます!

コマンドラインでdbt run -m fct_trading_pnl_incrementalと入力し、実行してください。

この実行されたmodelをSnowflakeから確認すると、create or replace transient tableコマンドが実行されていることがわかると思います。初回だからテーブルを作っているんですね。

続いて、もう一度コマンドラインでdbt run -m fct_trading_pnl_incrementalと入力し、実行してください。

この上で、実行されたmodelをSnowflakeから確認すると、一度TEMPORARYテーブルを作成後、merge intoコマンドが実行されているのがわかります。2回目以降だから増分更新が適用されているということですね!

一般的に増分更新と聞くと単純にレコードが増加していくパターンでしか使えない場合も多いのですが、このようにdbtの増分更新はユニークキーを指定するだけで、新規レコードのINSERTだけでなく既存レコードのUPDATEも行ってくれます。個人的に、増分更新のクエリを書いたり処理を作るのは大変な印象が強かったのですが、dbtの増分更新はとても簡単で便利な機能だと感じています!!

次回

Snowflakeをより使いこなそう! Advent Calendar 2021、次回の23日目では、「Snowflake×dbtを試してみた~Part5:テスト&Doc&デプロイ編~」というタイトルで執筆します。お楽しみに!