
dbt platformから各種ログをFivetranでSnowflakeに連携してみた
かわばたです。
表題のとおり、dbt platformから各種ログをFivetranでSnowflakeに連携を試してみます。
UIから取得する方法や、APIを用いて取得する方法がありますが、今回はFivetranを利用してより簡単にログを取得していきます。
対象読者
- dbt platformのログについて興味がある方
- Fivetranでdbt platformのログを取得する方法について関心のある方
検証環境と事前準備
検証環境
- SnowflakeトライアルアカウントEnterpriseプラン
- dbt platformアカウントEnterpriseプラン
- Fivetranアカウント
事前準備
- Snowflakeに各種データベース等作成します。
-- ========================================
-- Fivetran 用の環境セットアップ
-- ========================================
-- 1. 専用データベースの作成
CREATE DATABASE IF NOT EXISTS FIVETRAN_DB;
-- 2. dbt Cloud データ用スキーマの作成
CREATE SCHEMA IF NOT EXISTS FIVETRAN_DB.DBT_CLOUD;
-- 3. Fivetran 専用ウェアハウスの作成(オプション)
CREATE WAREHOUSE IF NOT EXISTS FIVETRAN_WH
WITH WAREHOUSE_SIZE = 'XSMALL'
AUTO_SUSPEND = 60
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED = TRUE;
-- 4. Fivetran 専用ロールの作成
CREATE ROLE IF NOT EXISTS FIVETRAN_ROLE;
-- 5. 権限の付与
GRANT USAGE ON DATABASE FIVETRAN_DB TO ROLE FIVETRAN_ROLE;
GRANT USAGE ON SCHEMA FIVETRAN_DB.DBT_CLOUD TO ROLE FIVETRAN_ROLE;
GRANT CREATE TABLE ON SCHEMA FIVETRAN_DB.DBT_CLOUD TO ROLE FIVETRAN_ROLE;
GRANT CREATE VIEW ON SCHEMA FIVETRAN_DB.DBT_CLOUD TO ROLE FIVETRAN_ROLE;
GRANT USAGE ON WAREHOUSE FIVETRAN_WH TO ROLE FIVETRAN_ROLE;
-- 6. ロールをユーザーに付与
GRANT ROLE FIVETRAN_ROLE TO USER KAWABATA;
- FivetranのDestination(接続先)の設定
【参考ブログ】
dbt platformログについて
Fivetranのコネクタで連携できるログ情報を下記にまとめます。
※非推奨とありますが、これはdbtドキュメント上に記載されております。dbt platformにはAPI v2とAPI v3がありFivetranは基本API v2を使用していそうです。非推奨の項目はAPI v3の使用を推奨していますが、Fivetranのドキュメントに使用しているAPIバージョンの記載が確認できなかったので、利用する場合は考慮して使用するか事前にサポートに問い合わせが必要と思います。

【公式ドキュメント】
コネクタの作成と同期
Fivetranの公式ドキュメントに従って、コネクタを作成していきます。
【セットアップガイド】
サービストークンを作成
dbt platformでサービストークンを作成します。
ホーム画面左下のAccount Settingsを選択します。

メニューバーのService tokensを選択し、+Create Service tokenを選択します。

ポップアップした画面にToken nameとPermission setを入力します。
※teamプランはRead-only、EnterpriseプランはAccount Viewerを選択します。

下記のようにService tokenが出るので、メモしておきます。

コネクタの作成
ConnectionsのAdd connectionを選択します。

遷移した画面でdbtと検索しdbt Cloudを選択します。
※dbt platformに名称変更がありましたがまだ反映されていないようで、旧名称のdbt Cloudを選択しましょう。

Select a destinationで事前に準備していたdestinationを選択します。

遷移した画面でスキーマ名とdbt platformのリージョンとさきほど作成したService tokenを入力しSave&Testを押下します。

下記画面で取得するスキーマを選択します。

選択後、スキーマを取得するためにデータ通信を行いスキーマの情報を得ます。
※コネクタによって取得する量が異なるためすぐに終わるケースもあれば3時間以上かかるケースもあります。
取得したスキーマの情報からSnowflakeに連携するスキーマを選択します。

スキーマやカラムが変わったときにどこまで許容するかを選択します。
- Allow all
- すべての新しいスキーマ、テーブル、列を許可
- Allow columns
- 新しいスキーマと構成可能なテーブルはブロックしますが、新しい列は許可
- Block all
- すべての新しいスキーマ、構成可能なテーブルと列をブロック

ここまできたら初回の同期を進めていきます。

同期が終了しました。

Snowflakeにも目的のスキーマが出力されていました。

まとめ
今回はdbt platformの各種ログをFivetranで取得しました。
APIを介して取得することも可能ですが、もしFivetranを活用していればGUIベースでポチポチと選択するだけで同期まですぐに完了することができました。
APIを利用する場合はAirflowなどのオーケストレーションツールの活用か、Pythonモデルを用いてJobを設定する形になるかと思います。
この記事が何かの参考になれば幸いです!









