Fivetranとdbtの統合機能「dbt transformations」を使ってみた

IWGPタッグチャンピオンレベルのタッグ
2020.12.10

大阪オフィスの玉井です。

Fivetranは、データ連携時に、(Fivetran側が)予め定義した構造にテーブルを変換します。その変換は、Fivetranの中で実行するのではなく、連携先のDBやDWH上でクエリを実行して行います(元データに影響が及ばない)。そして、Fivetranには、予め定義された変換クエリに追加する形で、別途ユーザーが定義したクエリを実行させることができます。この機能はTransformationと呼ばれています。

簡単なデータ変換であれば、FivetranのTransformation機能を使えばいいのですが、データ分析の規模が大きくなってくると、その分析に応じたデータ変換処理も複雑になってきます(複数の変換処理間に依存関係が出てくる等)。データ変換処理を管理するため、バージョン管理や自動テストの必要も出てきます。そういう時は、dbt(data build tool)というツールを使って、データ変換処理をいい感じに管理できます(概要は下記の記事を参照)。

そして、最近、dbtで作成したデータ変換処理(モデル)を、Fivetranと連携して直接実行できるようになりました。FivetranのTransformationをdbtで強化したような感じですね。今回はこれを実際に使ってみました。

注意

「dbt transformations」は、2020年12月現在、BETA版となります。

あったほうがいい予備知識

やってみたことの概要

超概要

  • FivetranでSalesforceデータをBigQueryに同期する
  • 上記のデータを元にしたdbt packageを、Fivetran上から動かす
    • Salesforceデータの分析に役立つテーブルが生成される

全体像

Fivetranがdbt自体をキックするのかな?と思っていたのですが、Fivetranが見るのはdbtプロジェクトのGitリポジトリで、そのリポジトリのdbtのコードをFivetran自身が実行する仕組みとなっています(厳密にいうと、Fivetranが、dbtプロジェクトのdeployment.ymlに書かれているdbtコマンドを実行する)。

説明

Fivetranが生成・同期するテーブルを前提として処理が組まれたSQLクエリ…これがdbtのパッケージとして既に用意されています。そのパッケージをインストールしたdbtプロジェクトを用意し、それをFivetranと統合して動作させます。つまり「Fivetran自体のSalesforce連携」と「Fivetranで連携したデータをもとに、dbtでさらにデータ変換」という処理…これら2つをFivetran上で行います。

参考(公式)

やってみる前の準備

Fivetranの準備

  • 何らかのDestination(何らかのDWHなど)を登録しておきます。
    • 今回はGoogle BigQueryを使用します。
  • ConnectorとしてSalesforceを登録しておきます。

似たようなことを下記の記事でやっていますので、参考にしてください。

dbtの準備

  • FivetranにDestinationとして登録したものと同じBigQuery(プロジェクトも同じ)を接続したProjectを用意しておきます。
  • ProjectのInitializeを実施しておきます。
  • 外部のGitリポジトリを使用します。
    • Fivetranが参照するため、dbt自体のリポジトリではなく、Github等を使用してください。
    • 今回はGithubのプライベートリポジトリを使用します。
  • Cloud版かCLI版か…どちらでも可
    • 今回はCloud版を使用

上記のdbtエントリもあわせてご覧ください。

実際にやってみた(とりあえず動作編)

[dbt] deployment.ymlを作成する

Fivetranにdbtプロジェクト(のGitリポジトリ)を登録するにあたり、絶対に必要なファイルは下記の2つです。

  • dbt_project.yml
  • deployment.yml

前者はInitialize時に作成されますが、後者は手動で用意する必要があります。ですので、普通に新規ファイル作成でdeployment.ymlを作成します。階層はdbt_project.ymlと同じにします(通常だと一番外側)。

記述する内容ですが、雛形が公式ドキュメントから落とせます(下記のSTEP 5)。

雛形の内容は下記の通りです。

jobs:
# Give each job a unique name. The name will be displayed in the Fivetran dashboard once your jobs are imported.
 - name: daily
   schedule: 0 12 * * * # Define when this job should run, using cron format. This example will run every day at 12:00pm (according to your warehouse timezone). For help with cron formatting, visit https://crontab.guru/.
   steps:
     - name: run models # Give each step in your job a name. This will enable you to track the steps in the logs.
       command: dbt run # Enter the dbt command that should run in this step. This example will run all your models. For a list of available commands visit https://docs.getdbt.com/reference/model-selection-syntax/.
     - name: test models
       command: dbt test # This example will run all your tests.

 - name: nightly
   schedule: 0 0 * * * # This example will run every day at midnight.
   steps:
     - name: run models
       command: dbt run --models my_first_model my_second_model # This example will run two specific models.

 - name: weekdays
   schedule: 30 7 * * 1-5 # The example will run every weekday at 7:30am.
   steps:
     - name: run models
       command: dbt run

 - name: every5minutes
   schedule: '*/5 * * * *' # The example will run every 5 minutes. Note that the cron string is quoted as otherwise it will be treated as invalid alias node (see https://yaml.org/spec/1.2/spec.html#*%20alias// for details)
   steps:
     - name: run models
       command: dbt run

Fivetranで実行したいdbtコマンドと、実行する頻度(cron式)を、ジョブという形で、このファイルに記述します。

  • name
    • ジョブの名前
  • schedule
    • ジョブの実行頻度(cron式)
  • steps
    • 実行するdbtコマンド
    • 複数記述可能

この時点では、ひとまず不要なコメントは消して、cron式の部分をシングルコートで囲み直し、後はそのままのものを用意しました。

[Fivetran] dbtプロジェクトを連携させる

予め用意しておいたdbtプロジェクトのGitリポジトリを、Fivetranに登録します。

FivetranのメニューからTransformationを選び、「Try dbt Transformation」をクリックします。

dbt Transformationの概要が表示されます。画面下の「Enable〜」をクリックします。

「dbtプロジェクトのGitリポジトリを準備しとけ」的なメッセージが出ます。「I am ready〜」をクリックします。

dbtプロジェクトのGitリポジトリに関する情報を登録する画面に遷移します。

  • Public Key
    • Fivetranが該当リポジトリを参照するための公開鍵(既に出力されている)
    • これをリポジトリ側に登録する(読み取り権限だけでOK)
  • Repository URL
    • dbtプロジェクトのGitリポジトリのURL
  • Default Schema
    • このdbtを(Fivetranから)実行させた際に生成するテーブルを格納するスキーマの名前
    • 今回はtest_fivetran_dbt としました。

ちなみにAdvanced Optionというのもあり、master以外のブランチを参照させたい場合は、こちらのオプションは使います(今回は未設定)。逆に言うと、このオプションを指定しない場合、Fivetranが参照するのはmasterなので、dbt側で何らかの編集した場合、必ずmasterにmergeしておく必要があります。

設定に成功すると、Fivetranのdbt Transformationの画面に遷移します。ここに、先程deployment.ymlに記述した内容が反映されます。4つのジョブを定義していたので、4つのジョブがそのまま表示されていますね。

[Fivetran] とりあえずdbt Transformationを一度実行してみる

まだそれらしいモデルは何も作成していませんが、一旦動作確認のため、ジョブを動かしてみたいと思います。

nightlyというジョブを動かしてみます。動かしたいジョブを選び、画面右上のENABLEDを押すと、ジョブが有効化され、予め定義した頻度に従ってジョブが定期実行されます。今回はすぐに結果が知りたいのでRUN NOWを押します。

コマンド後の実行結果を確認する前に、nightlyというジョブの内容を再確認しましょう。

 - name: nightly
   schedule: '0 0 * * *'
   steps:
     - name: run models
       command: dbt run --models my_first_model my_second_model

今、私はこのジョブをRUN NOWしました。これはつまり、dbt run --models my_first_model my_second_modelというコマンドが(Fivetran越しに)実行されたということになります。

my_first_modelmy_second_modeというモデルは、Initialize時に生成されるサンプルモデルです。中身はさておき、このコマンドが正常終了したということは、この2つのモデル(テーブル or ビュー)がBigQuery側に生成されているはずです。

確認しましょう。

先程指定したスキーマ(test_fivetran_dbt )が生成され、その下にmy_first_modelmy_second_modeが生成されていますね。Fivetranからdbtを実行させることに成功しました。

実際にやってみた(dbt Package使用編)

dbt Transformationの動作自体は成功しましたが、もう少し実践的な処理もやってみたいと思います。

dbtにはdbt Packageというものがあります。特定のデータソースに対して、処理が予め組まれたdbtプロジェクト…という感じのものです。Packageの一覧は下記にのっています。

このPackageの中には、「Fivetranで同期したデータから、さらに新しいデータを生成する」というものがあります。要するにFivetranの使用を前提としたPackageということです。

今回、予めSalesforceデータをFivetranで連携しています。ですので、その「Fivetranで同期したSalesforceデータ」を使用するPackageを導入して、それをFivetranから動かしてみたいと思います

[dbt] Packageをインストールする

Packageの入れ方を簡単にいうと、packages.ymlを作成→そこにインストールしたいPackageを記述→dbt depsというコマンドを実行…という感じです。

最終的には下記画像のようになります。

packages.ymlに書く内容ですが、Packageのドキュメントに必ずInstallationという見出しがあり、記述内容はそこに書いてあるので、それをそのままコピればOKです。

packages.ymlの準備ができたら、dbt depsを実行します。すると、packages.ymlに記述したPackageがインストールされます(ついでに依存関係のあるPackageもあわせてインストールされていました)。Packageはdbt_moduleというフォルダ下に入ります。

[dbt] deployment.ymlを編集する

これは必須作業ではないのですが、ちょっとシンプルに変えました。

  - name: test_salesforce
    schedule: '0 2 * * * '
    steps:
      - name: run models
        command: dbt run
      - name: test models
        command: dbt test

先程のサンプルモデル(my_first_modelmy_second_mode)は消して、プロジェクト全体でモデルがSalesforceのPackageしかない状態なので、シンプルにdbt rundbt testが行われるジョブを一つだけ、という形にしました。

[Fivetran] dbt Transformationのジョブを実行する

dbtの設定画面を再度開いて、何も変えずにSAVEすると、リポジトリが参照し直されて、ジョブ一覧が新しいものに反映されました。

というわけで、先程と同様にジョブをRUN NOWします。ジョブが完了すると、下記のようにジョブの実行結果が表示されます。

[Destination(BigQuery)] データを確認してみる

Packageが生成するテーブルや、その仕様(どういうモデルが生成されるのかなど)等はドキュメントに書かれています。ちなみにPackageのコード自体もGithubに上がっています。

Salesforce_sourceというPackageでベースのモデルを作った後、それを元にsalesforceというPackageで、本チャンのモデルを生成するという2段構成になっています。作成されるテーブルは最終的には4つで、商談における詳細データがすぐにわかるテーブル等が作成されるようです。

今回設定しているBigQuery側を確認したところ、ちゃんと4つのテーブルが生成されていました(この4つを作るための中間テーブルも複数あります)。

試しに、salesforce__owner_performanceというテーブルを見てみます。ドキュメントによれば、このテーブルは、1レコード = 1営業担当者で、営業一人一人の商談獲得率(失注率)とかが計算されたテーブルだそうです。

参考までに、どういうカラムがあるのかを載せておきます(ドキュメントにもありますが…)。その人の当月獲得金額とかが集計済なので、BIツール等で可視化するのがすごく楽そうなテーブルですね。

{
    "owner_id": "0055h000001AhI4AAK",
    "manager_id": null,
    "b_manager_id": null,
    "b_owner_id": "0055h000001AhI4AAK",
    "bookings_amount_closed_this_month": "75000",
    "bookings_amount_closed_this_quarter": "2375000",
    "total_number_bookings": "18",
    "total_bookings_amount": "3645000",
    "bookings_count_closed_this_month": "1",
    "bookings_count_closed_this_quarter": "10",
    "avg_bookings_amount": "202500",
    "largest_booking": "915000",
    "avg_days_to_close": "56.277777777777786",
    "l_manager_id": null,
    "l_owner_id": null,
    "lost_amount_this_month": null,
    "lost_amount_this_quarter": null,
    "total_number_lost": null,
    "total_lost_amount": null,
    "lost_count_this_month": null,
    "lost_count_this_quarter": null,
    "p_manager_id": null,
    "p_owner_id": "0055h000001AhI4AAK",
    "pipeline_created_amount_this_month": "2115000",
    "pipeline_created_amount_this_quarter": "2115000",
    "pipeline_created_forecast_amount_this_month": "1.0945E8",
    "pipeline_created_forecast_amount_this_quarter": "1.0945E8",
    "pipeline_count_created_this_month": "13",
    "pipeline_count_created_this_quarter": "13",
    "total_number_pipeline": "13",
    "total_pipeline_amount": "2115000",
    "total_pipeline_forecast_amount": "1.0945E8",
    "avg_pipeline_opp_amount": "162692",
    "largest_deal_in_pipeline": "675000",
    "avg_days_open": "-3.0",
    "owner_name": "tamai rei",
    "owner_city": null,
    "owner_state": null,
    "win_percent_this_month": "0",
    "win_percent_this_quarter": "0",
    "total_win_percent": "0"
  },

[Destination(BigQuery)] クエリの履歴も確認してみる

dbtはデータウェアハウス側で処理を行うため、データウェアハウス側の履歴を確認すれば、dbtがどういう処理をしていたかもわかります。

怒涛のクエリ祭り(?)ですね(g-capabilities〜というのは、Fivetran用に発行したBigQueryの権限です。私が直接クエリを実行するとtamai_reiという感じになります)。

ちなみに後半はdbt testが実行されています。つまり、生成されたデータに問題がないかどうかのテストクエリも走っています(これもPackageに予め組まれている)。

おわりに

Fvietranとdbtを組み合わせることで、主要なSaaS(SalesforceやZendesk、marketoなど)の分析がめちゃくちゃ楽になると思いました。気をつけないといけない点としては、DWH側にクエリがたくさん走るため、コストやパフォーマンスについて対策をしておく点でしょうか。