Fivetranとdbtの統合機能「dbt transformations」を使ってみた
大阪オフィスの玉井です。
Fivetranは、データ連携時に、(Fivetran側が)予め定義した構造にテーブルを変換します。その変換は、Fivetranの中で実行するのではなく、連携先のDBやDWH上でクエリを実行して行います(元データに影響が及ばない)。そして、Fivetranには、予め定義された変換クエリに追加する形で、別途ユーザーが定義したクエリを実行させることができます。この機能はTransformationと呼ばれています。
簡単なデータ変換であれば、FivetranのTransformation機能を使えばいいのですが、データ分析の規模が大きくなってくると、その分析に応じたデータ変換処理も複雑になってきます(複数の変換処理間に依存関係が出てくる等)。データ変換処理を管理するため、バージョン管理や自動テストの必要も出てきます。そういう時は、dbt(data build tool)というツールを使って、データ変換処理をいい感じに管理できます(概要は下記の記事を参照)。
そして、最近、dbtで作成したデータ変換処理(モデル)を、Fivetranと連携して直接実行できるようになりました。FivetranのTransformationをdbtで強化したような感じですね。今回はこれを実際に使ってみました。
注意
「dbt transformations」は、2020年12月現在、BETA版となります。
あったほうがいい予備知識
やってみたことの概要
超概要
- FivetranでSalesforceデータをBigQueryに同期する
- 上記のデータを元にしたdbt packageを、Fivetran上から動かす
- Salesforceデータの分析に役立つテーブルが生成される
全体像
Fivetranがdbt自体をキックするのかな?と思っていたのですが、Fivetranが見るのはdbtプロジェクトのGitリポジトリで、そのリポジトリのdbtのコードをFivetran自身が実行する仕組みとなっています(厳密にいうと、Fivetranが、dbtプロジェクトのdeployment.yml
に書かれているdbtコマンドを実行する)。
説明
Fivetranが生成・同期するテーブルを前提として処理が組まれたSQLクエリ…これがdbtのパッケージとして既に用意されています。そのパッケージをインストールしたdbtプロジェクトを用意し、それをFivetranと統合して動作させます。つまり「Fivetran自体のSalesforce連携」と「Fivetranで連携したデータをもとに、dbtでさらにデータ変換」という処理…これら2つをFivetran上で行います。
参考(公式)
やってみる前の準備
Fivetranの準備
- 何らかのDestination(何らかのDWHなど)を登録しておきます。
- 今回はGoogle BigQueryを使用します。
- ConnectorとしてSalesforceを登録しておきます。
- 無料で使える開発環境「Developer Edition」を使用します。
- Developer Edition サインアップ手順 | Developer Force Blog
似たようなことを下記の記事でやっていますので、参考にしてください。
dbtの準備
- FivetranにDestinationとして登録したものと同じBigQuery(プロジェクトも同じ)を接続したProjectを用意しておきます。
- ProjectのInitializeを実施しておきます。
- 外部のGitリポジトリを使用します。
- Fivetranが参照するため、dbt自体のリポジトリではなく、Github等を使用してください。
- 今回はGithubのプライベートリポジトリを使用します。
- Cloud版かCLI版か…どちらでも可
- 今回はCloud版を使用
上記のdbtエントリもあわせてご覧ください。
実際にやってみた(とりあえず動作編)
[dbt] deployment.yml
を作成する
Fivetranにdbtプロジェクト(のGitリポジトリ)を登録するにあたり、絶対に必要なファイルは下記の2つです。
dbt_project.yml
deployment.yml
前者はInitialize時に作成されますが、後者は手動で用意する必要があります。ですので、普通に新規ファイル作成でdeployment.ymlを作成します。階層はdbt_project.yml
と同じにします(通常だと一番外側)。
記述する内容ですが、雛形が公式ドキュメントから落とせます(下記のSTEP 5)。
雛形の内容は下記の通りです。
jobs: # Give each job a unique name. The name will be displayed in the Fivetran dashboard once your jobs are imported. - name: daily schedule: 0 12 * * * # Define when this job should run, using cron format. This example will run every day at 12:00pm (according to your warehouse timezone). For help with cron formatting, visit https://crontab.guru/. steps: - name: run models # Give each step in your job a name. This will enable you to track the steps in the logs. command: dbt run # Enter the dbt command that should run in this step. This example will run all your models. For a list of available commands visit https://docs.getdbt.com/reference/model-selection-syntax/. - name: test models command: dbt test # This example will run all your tests. - name: nightly schedule: 0 0 * * * # This example will run every day at midnight. steps: - name: run models command: dbt run --models my_first_model my_second_model # This example will run two specific models. - name: weekdays schedule: 30 7 * * 1-5 # The example will run every weekday at 7:30am. steps: - name: run models command: dbt run - name: every5minutes schedule: '*/5 * * * *' # The example will run every 5 minutes. Note that the cron string is quoted as otherwise it will be treated as invalid alias node (see https://yaml.org/spec/1.2/spec.html#*%20alias// for details) steps: - name: run models command: dbt run
Fivetranで実行したいdbtコマンドと、実行する頻度(cron式)を、ジョブという形で、このファイルに記述します。
name
- ジョブの名前
schedule
- ジョブの実行頻度(cron式)
steps
- 実行するdbtコマンド
- 複数記述可能
この時点では、ひとまず不要なコメントは消して、cron式の部分をシングルコートで囲み直し、後はそのままのものを用意しました。
[Fivetran] dbtプロジェクトを連携させる
予め用意しておいたdbtプロジェクトのGitリポジトリを、Fivetranに登録します。
FivetranのメニューからTransformationを選び、「Try dbt Transformation」をクリックします。
dbt Transformationの概要が表示されます。画面下の「Enable〜」をクリックします。
「dbtプロジェクトのGitリポジトリを準備しとけ」的なメッセージが出ます。「I am ready〜」をクリックします。
dbtプロジェクトのGitリポジトリに関する情報を登録する画面に遷移します。
- Public Key
- Fivetranが該当リポジトリを参照するための公開鍵(既に出力されている)
- これをリポジトリ側に登録する(読み取り権限だけでOK)
- Repository URL
- dbtプロジェクトのGitリポジトリのURL
- Default Schema
- このdbtを(Fivetranから)実行させた際に生成するテーブルを格納するスキーマの名前
- 今回は
test_fivetran_dbt
としました。
ちなみにAdvanced Optionというのもあり、master以外のブランチを参照させたい場合は、こちらのオプションは使います(今回は未設定)。逆に言うと、このオプションを指定しない場合、Fivetranが参照するのはmasterなので、dbt側で何らかの編集した場合、必ずmasterにmergeしておく必要があります。
設定に成功すると、Fivetranのdbt Transformationの画面に遷移します。ここに、先程deployment.yml
に記述した内容が反映されます。4つのジョブを定義していたので、4つのジョブがそのまま表示されていますね。
[Fivetran] とりあえずdbt Transformationを一度実行してみる
まだそれらしいモデルは何も作成していませんが、一旦動作確認のため、ジョブを動かしてみたいと思います。
nightly
というジョブを動かしてみます。動かしたいジョブを選び、画面右上のENABLEDを押すと、ジョブが有効化され、予め定義した頻度に従ってジョブが定期実行されます。今回はすぐに結果が知りたいのでRUN NOWを押します。
コマンド後の実行結果を確認する前に、nightly
というジョブの内容を再確認しましょう。
- name: nightly schedule: '0 0 * * *' steps: - name: run models command: dbt run --models my_first_model my_second_model
今、私はこのジョブをRUN NOWしました。これはつまり、dbt run --models my_first_model my_second_model
というコマンドが(Fivetran越しに)実行されたということになります。
my_first_model
とmy_second_mode
というモデルは、Initialize時に生成されるサンプルモデルです。中身はさておき、このコマンドが正常終了したということは、この2つのモデル(テーブル or ビュー)がBigQuery側に生成されているはずです。
確認しましょう。
先程指定したスキーマ(test_fivetran_dbt
)が生成され、その下にmy_first_model
とmy_second_mode
が生成されていますね。Fivetranからdbtを実行させることに成功しました。
実際にやってみた(dbt Package使用編)
dbt Transformationの動作自体は成功しましたが、もう少し実践的な処理もやってみたいと思います。
dbtにはdbt Packageというものがあります。特定のデータソースに対して、処理が予め組まれたdbtプロジェクト…という感じのものです。Packageの一覧は下記にのっています。
このPackageの中には、「Fivetranで同期したデータから、さらに新しいデータを生成する」というものがあります。要するにFivetranの使用を前提としたPackageということです。
今回、予めSalesforceデータをFivetranで連携しています。ですので、その「Fivetranで同期したSalesforceデータ」を使用するPackageを導入して、それをFivetranから動かしてみたいと思います
- dbt (data build tool) - Explore Packages - salesforce_source
- dbt (data build tool) - Explore Packages - salesforce
[dbt] Packageをインストールする
Packageの入れ方を簡単にいうと、packages.yml
を作成→そこにインストールしたいPackageを記述→dbt deps
というコマンドを実行…という感じです。
最終的には下記画像のようになります。
packages.yml
に書く内容ですが、Packageのドキュメントに必ずInstallationという見出しがあり、記述内容はそこに書いてあるので、それをそのままコピればOKです。
packages.yml
の準備ができたら、dbt deps
を実行します。すると、packages.yml
に記述したPackageがインストールされます(ついでに依存関係のあるPackageもあわせてインストールされていました)。Packageはdbt_module
というフォルダ下に入ります。
[dbt] deployment.yml
を編集する
これは必須作業ではないのですが、ちょっとシンプルに変えました。
- name: test_salesforce schedule: '0 2 * * * ' steps: - name: run models command: dbt run - name: test models command: dbt test
先程のサンプルモデル(my_first_model
とmy_second_mode
)は消して、プロジェクト全体でモデルがSalesforceのPackageしかない状態なので、シンプルにdbt run
とdbt test
が行われるジョブを一つだけ、という形にしました。
[Fivetran] dbt Transformationのジョブを実行する
dbtの設定画面を再度開いて、何も変えずにSAVEすると、リポジトリが参照し直されて、ジョブ一覧が新しいものに反映されました。
というわけで、先程と同様にジョブをRUN NOWします。ジョブが完了すると、下記のようにジョブの実行結果が表示されます。
[Destination(BigQuery)] データを確認してみる
Packageが生成するテーブルや、その仕様(どういうモデルが生成されるのかなど)等はドキュメントに書かれています。ちなみにPackageのコード自体もGithubに上がっています。
Salesforce_source
というPackageでベースのモデルを作った後、それを元にsalesforce
というPackageで、本チャンのモデルを生成するという2段構成になっています。作成されるテーブルは最終的には4つで、商談における詳細データがすぐにわかるテーブル等が作成されるようです。
今回設定しているBigQuery側を確認したところ、ちゃんと4つのテーブルが生成されていました(この4つを作るための中間テーブルも複数あります)。
試しに、salesforce__owner_performance
というテーブルを見てみます。ドキュメントによれば、このテーブルは、1レコード = 1営業担当者で、営業一人一人の商談獲得率(失注率)とかが計算されたテーブルだそうです。
参考までに、どういうカラムがあるのかを載せておきます(ドキュメントにもありますが…)。その人の当月獲得金額とかが集計済なので、BIツール等で可視化するのがすごく楽そうなテーブルですね。
{ "owner_id": "0055h000001AhI4AAK", "manager_id": null, "b_manager_id": null, "b_owner_id": "0055h000001AhI4AAK", "bookings_amount_closed_this_month": "75000", "bookings_amount_closed_this_quarter": "2375000", "total_number_bookings": "18", "total_bookings_amount": "3645000", "bookings_count_closed_this_month": "1", "bookings_count_closed_this_quarter": "10", "avg_bookings_amount": "202500", "largest_booking": "915000", "avg_days_to_close": "56.277777777777786", "l_manager_id": null, "l_owner_id": null, "lost_amount_this_month": null, "lost_amount_this_quarter": null, "total_number_lost": null, "total_lost_amount": null, "lost_count_this_month": null, "lost_count_this_quarter": null, "p_manager_id": null, "p_owner_id": "0055h000001AhI4AAK", "pipeline_created_amount_this_month": "2115000", "pipeline_created_amount_this_quarter": "2115000", "pipeline_created_forecast_amount_this_month": "1.0945E8", "pipeline_created_forecast_amount_this_quarter": "1.0945E8", "pipeline_count_created_this_month": "13", "pipeline_count_created_this_quarter": "13", "total_number_pipeline": "13", "total_pipeline_amount": "2115000", "total_pipeline_forecast_amount": "1.0945E8", "avg_pipeline_opp_amount": "162692", "largest_deal_in_pipeline": "675000", "avg_days_open": "-3.0", "owner_name": "tamai rei", "owner_city": null, "owner_state": null, "win_percent_this_month": "0", "win_percent_this_quarter": "0", "total_win_percent": "0" },
[Destination(BigQuery)] クエリの履歴も確認してみる
dbtはデータウェアハウス側で処理を行うため、データウェアハウス側の履歴を確認すれば、dbtがどういう処理をしていたかもわかります。
怒涛のクエリ祭り(?)ですね(g-capabilities〜というのは、Fivetran用に発行したBigQueryの権限です。私が直接クエリを実行するとtamai_reiという感じになります)。
ちなみに後半はdbt test
が実行されています。つまり、生成されたデータに問題がないかどうかのテストクエリも走っています(これもPackageに予め組まれている)。
おわりに
Fvietranとdbtを組み合わせることで、主要なSaaS(SalesforceやZendesk、marketoなど)の分析がめちゃくちゃ楽になると思いました。気をつけないといけない点としては、DWH側にクエリがたくさん走るため、コストやパフォーマンスについて対策をしておく点でしょうか。