dbtでJinjaを利用して柔軟なデータモデルを開発する

二礼二拍手一礼
2020.08.18

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

大阪オフィス所属だが現在は奈良県でリモートワーク中の玉井です。

dbtは、ELTのTをソフトウェア(またはアプリケーション)と同じように開発することができるサービスです。しかも、SQLのSELECT文さえ分かっていれば、もう早速使うことができてしまいます。

ただし、SQLはいわゆる宣言型言語で、柔軟なデータモデルを作るためには限界があります。そういう時のために、dbtはjinjaという言語が使えるようになっています。

今回はjinjaのチュートリアルを「実際にやってみつつ」、どういう事ができるかをご紹介したいと思います。

そもそもdbtとは?という方へ

下記をご覧ください。

Jinjaに関する公式情報

本家

dbt

本記事では下記に記載されているものを実際にやってみます。

やってみた

Projectを新規作成する

今回の作業用に新規Projectを用意します。具体的な方法は上記の別記事をどうぞ。ちなみに今回はSnowflakeを接続します。

csvをロードする

今回は下記のCSVファイルを使用します。

「え~DWHにCSVを入れるの面倒だなあ」って思ったんですが、dbtには便利な機能があります。dbt seedを使えば、dbtから直接csvデータを接続しているDWHにロードすることができます。

dataディレクトリ下にraw_payments.csvを作成します。

上記のcsvデータをそのまま貼り付けます。

画面下部のRunsからdbt seedを実行します。

DWHにcsvデータがロードされました。

Jinjaを使わずに(SQLオンリー)モデルを作成する

raw_payments.csvというデータは、支払方法が付与された注文データです。このデータを見ると、同一オーダー(order_id)に対して異なる支払方法(payments_method)が存在しています。

このデータから「各支払方法別にどれだけお金が支払われているかを、オーダー毎に集計する」というモデルを作成したいとします。dbtでこれをSQLのみでやろうとすると、下記のようになります。modelsディレクトリ下にorder_payment_method_amounts.sqlを作成して、下記のクエリを書きます。

select
order_id,
sum(case when payment_method = 'bank_transfer' then amount end) as bank_transfer_amount,
sum(case when payment_method = 'credit_card' then amount end) as credit_card_amount,
sum(case when payment_method = 'gift_card' then amount end) as gift_card_amount,
sum(amount) as total_amount
from {{ ref('raw_payments') }}
group by 1

dbtはモデル(クエリ)をその場で実行して検証することができます。下記はこのモデルの実行結果です。

オーダー毎に集計できているので、このクエリ自体に問題はありません。しかし、このモデルを定期的に運用していくとなると、例えば元データのカラム名が変われば、このクエリも変更しないといけません。また、新しい支払方法が増えた時も、追記する必要がありますよね。その追記内容もCASE文をコピペして対応すると思われるので、ミスに気づきにくくなりますし、そもそもCASE文が羅列されたクエリは可読性が悪いです。

…とまあ、このモデルの運用は、かなり難儀であることがわかります。

Jinjaを使ってモデルを作成する

上記のままだとしんどいので、Jinjaを使用して、より良いコードに書き換えていきましょう。

For文を使用する

まず、冗長なCASE文をFor文でまとめます。

select
order_id,
{% for payment_method in ["bank_transfer", "credit_card", "gift_card"] %}
sum(case when payment_method = '{{payment_method}}' then amount end) as {{payment_method}}_amount,
{% endfor %}
sum(amount) as total_amount
from {{ ref('raw_payments') }}
group by 1

モデルに表示したい3つの支払方法を記述して、その分だけCASE文をループで記述するようにします。これで、SELECT句のpayment_methodに関しては、1行で済むようになりました。

変数の記述を冒頭に記述する

大体のプログラミング言語は、冒頭で変数を宣言すると思います。dbtでもそういう風にしましょう。

{% set payment_methods = ["bank_transfer", "credit_card", "gift_card"] %}

select
order_id,
{% for payment_method in payment_methods %}
sum(case when payment_method = '{{payment_method}}' then amount end) as {{payment_method}}_amount,
{% endfor %}
sum(amount) as total_amount
from {{ ref('raw_payments') }}
group by 1

SELECT句の末尾にカンマが入らないように処理する

上記の場合、ループの外側にsum(amount) as total_amountが固定で記述されています。SELECT句の最後に必ずこのカラムが必要であれば良いのですが、そうじゃない場合(SELECT句がFor文で終わる場合)、最後の記述の最後にカンマが入っていると、SQLの文法的にエラーになります。

dbtでは、IF文とloop.lastという定義を組み合わせることで、末尾のカンマを回避することができます。

{% set payment_methods = ["bank_transfer", "credit_card", "gift_card"] %}

select
order_id,
{% for payment_method in payment_methods %}
sum(case when payment_method = '{{payment_method}}' then amount end) as {{payment_method}}_amount
{% if not loop.last %},{% endif %}
{% endfor %}
from {{ ref('raw_payments') }}
group by 1

「もしループが最後じゃない場合はカンマを入れる(最後の場合は入れない)」という風になっていますね。

Whitespace Controlについて

これまで書いてきたJinjaを実行しても、冒頭のSQLオンリーのモデルと同じ結果(チュートリアルの便宜上、total_amountは無くしていますが)が得られます。

しかし、このコードの場合、コンパイル後のクエリは、めっちゃスペースが入るようになっています。

実は、Jinjaには「Whitespace Control」という概念があります。詳細はさておき、下記のように記述し直すと、コンパイル後の空白を無くすことができます。

{%- set payment_methods = ["bank_transfer", "credit_card", "gift_card"] -%}

select
order_id,
{%- for payment_method in payment_methods %}
sum(case when payment_method = '{{payment_method}}' then amount end) as {{payment_method}}_amount
{%- if not loop.last %},{% endif -%}
{% endfor %}
from {{ ref('raw_payments') }}
group by 1

For文の開始部分やIF文の前後にハイフンが入っていますね。このコードをコンパイルすると、スッキリしたクエリになります。

JinjaのWhitespace Controlの詳細については下記をどうぞ。

空白の調整はコード内容によるため、試行錯誤が必要ですが、可読性が重要なのはモデル側(ユーザーが読むほう)であり、コンパイル後のクエリにそこまで気合を入れる必要はそこまでないはずです。Whitespace Controlに入れ込むのは、ほどほどにしましょう。

マクロを使用する

ここまでJinjaの基本的な記述をやってきました。しかし、今のところ、集計する支払方法の種別はハードコーディングしています。

この部分を動的にするため、マクロを使ってみます。マクロは何回でも呼び出せるコードで、ざっくりいうと関数みたいなものです。複数のモデルにまたがって必要な処理を記述するとき等に便利です。

ひとまず、支払方法をベタ書きしている部分を、マクロで外出ししてみます。macrosディレクトリ下にget_payment_methods.sqlを作成し、下記のコードを書いてみます。

{% macro get_payment_methods() %}
{{ return(["bank_transfer", "credit_card", "gift_card"]) }}
{% endmacro %}

現時点では、引数無しの、returnで3つの支払方法の固定値を返すだけのマクロになっています。

先程のモデルを、このマクロを呼び出すように編集します。

{%- set payment_methods = get_payment_methods() -%}

select
order_id,
{%- for payment_method in payment_methods %}
sum(case when payment_method = '{{payment_method}}' then amount end) as {{payment_method}}_amount
{%- if not loop.last %},{% endif -%}
{% endfor %}
from {{ ref('raw_payments') }}
group by 1

冒頭で支払方法の値を固定記述していた部分が、マクロを呼び出して取得するように変わりました。

マクロを使って値を動的に取得する

今までハードコーディングしていた支払方法の値の取得部分を、いよいよ動的にしたいと思います。

ロジックとしては「まず実際に支払方法の値を全て取得する」→「取得した値の分だけ、モデル側でCASE文を生成する」という形にします。

「元データの『支払方法』の値を全て取得する」処理を記述する

言い換えれば、元データのpayment_methodのユニーク値を全て取得するということですが、これは下記のクエリで実現できます。

select distinct
payment_method
from {{ ref('raw_payments') }}
order by 1

これを、マクロに組み込んでみます。run_queryというメソッドを使用して、変数に格納したクエリを実行することができます。

{% macro get_payment_methods() %}

{% set payment_methods_query %}
select distinct
payment_method
from {{ ref('raw_payments') }}
order by 1
{% endset %}

{% set results = run_query(payment_methods_query) %}

{{ log(results, info=True) }}

{{ return([]) }}

{% endmacro %}

上記では、クエリの実行結果をresultsという変数に入れて、ログに出力しています。実際に実行してログを見てみましょう。

実は、resultsに格納されているのは、クエリの結果ではなく、Agate Tableというオブジェクトでした。

実際に結果を取得するには、このAgate Tableから、実際に値を取り出す必要があります。

{% macro get_payment_methods() %}

{% set payment_methods_query %}
select distinct
payment_method
from {{ ref('raw_payments') }}
order by 1
{% endset %}

{% set results = run_query(payment_methods_query) %}

{% if execute %}
{# Return the first column #}
{% set results_list = results.columns[0].values() %}
{% else %}
{% set results_list = [] %}
{% endif %}

{{ return(results_list) }}

{% endmacro %}

注目ポイントは{% if execute %}です。「処理がexecuteだったら~」という処理なのですが、これはどういう意味なのでしょうか。

このIF文を無くした状態にしてみます。すると、実行するまでもなくコンパイルエラーになりました。

dbtは、裏側でProject下のファイルをスキャンしたりしている(らしい)のですが、スキャンしている時は実際にデータを取りに行くわけじゃないので、「データを取りに行ったけど無い」という状態のため(厳密にはそういう風なコードになっている時点で)エラーとなります。この状態はexecute == falseです。

ですので、モデルを実際に作成するときやテストを実行するとき等、実際にこのコードが走る、execute == Trueな時だけ、実際にデータを格納するコードが実行されるようにする必要があります。

マクロの各処理をモジュール化する

最後の仕上げとして、マクロを2つの処理に分割します。こうすることで、別モデルからの利用もしやすくなります。

{% macro get_column_values(column_name, relation) %}

{% set relation_query %}
select distinct
{{ column_name }}
from {{ relation }}
order by 1
{% endset %}

{% set results = run_query(relation_query) %}

{% if execute %}
{# Return the first column #}
{% set results_list = results.columns[0].values() %}
{% else %}
{% set results_list = [] %}
{% endif %}

{{ return(results_list) }}

{% endmacro %}


{% macro get_payment_methods() %}

{{ return(get_column_values('payment_method', ref('raw_payments'))) }}

{% endmacro %}

支払方法(のカラム)の値一覧を取得するget_payment_methodsと、指定したカラムの値一覧を取得するget_column_valuesに分割しました。後者を利用することで、似たような処理を別データに利用することができます。

dbt runする

せっかくなので、ここまでやってきたモデルを実際に実行してみましょう。

Snowflakeにデータモデル(今回はビュー)が生成されました。このProjectを本番デプロイすれば、このモデルを定期的に運用することができます。

おわりに

Jinjaを使うことで柔軟なデータ変換処理ができるのはもちろんのこと、より読みやすく、より楽に、コードを開発していくことができますね。