
Fivetran Platform Connectorを活用してSnowflake上でMARを自動検知する仕組みを作成してみた
かわばたです。
表題のとおり、Fivetran Platform Connectorを活用してSnowflake上でMARを自動検知する仕組みを作成していきたいと思います。
【公式ドキュメント】
対象読者
- Fivetran Platform Connectorについて確認したい方
- FivetranのMARを自動検知したい方
検証環境と事前準備
検証環境
- Fivetran アカウント
- Snowflakeトライアルアカウント Enterprise版
事前準備
Fivetran Platform Connectorのセットアップは公式ドキュメントと下記ブログが参考になりますのでご確認ください。
【公式ドキュメント】
【参考ブログ】
Destinationの設定も行います。
今回はSnowflakeで行いましたので、下記ブログが参考になるかと思います。
【参考ブログ】
試してみた
INCREMENTAL_MARについて
INCREMENTAL_MARには、各宛先スキーマの毎日のアクティブ行 (MAR) の増分、関連付けられているテーブル、およびMARが計算される時刻が項目としてあります。
具体的には下記です。
| カラム名 | 内容 |
|---|---|
| destination_id | MARが計算される宛先のID |
| free_type | 無料のMARの場合、値は無料のMARの種類となり、有料のMARの場合、PAIDとなる |
| incremental_rows | 接続に対して同期された現在の日付の新しい個別の主キーの数 |
| connection_name | MARが計算される接続の名前 |
| measured_date | MARが計算される日付(UTC 形式) |
| schema_name | MARが計算される宛先スキーマ名 |
| sync_type | MARの計算対象となる同期がHISTORICALかINCREMENTALかを定義 |
| table_name | MARに関連付けられたテーブル名 |
| _fivetran_synced | Fivetranが最後に行を正常に同期したときのタイムスタンプ |
| updated_at | MARが更新されたときのタイムスタンプ |
VIEWの作成
当月の有料のMARを算出するVIEWを作成します。
CREATE OR REPLACE VIEW mar_summary_final_view AS
SELECT
DATE_TRUNC('month', measured_date) AS measured_month,
SUM(incremental_rows) AS total_monthly_active_rows
FROM
FIVETRAN_DATABASE.FIVETRAN_LOG.INCREMENTAL_MAR
WHERE
FREE_TYPE = 'PAID'
AND
DATE_TRUNC('month', measured_date) = DATE_TRUNC('month', CURRENT_DATE())
GROUP BY
measured_month;
結果を確認すると、下記のように当月のConnection別のMARを確認することができます。
※トライアル版で試しているため、FREE_TYPE = 'PAID'のロジックを入れずに出力していますがドキュメント上問題ない認識です。

アラートの設定
total_monthly_active_rowsがしきい値を超えた場合にメールで通知するアラート設定を行います。
【ドキュメント】
CREATE OR REPLACE ALERT high_mar_alert
WAREHOUSE = COMPUTE_WH -- 実行するウェアハウスを指定
SCHEDULE = 'USING CRON 0 2 * * * UTC' -- 毎日午前2時 (UTC) に実行(同期時間などを考慮する)
IF (EXISTS (
SELECT 1
FROM mar_summary_final_view
WHERE measured_month = DATE_TRUNC('month', CURRENT_DATE())
AND total_monthly_active_rows > 100000 -- 監視したいMARのしきい値を設定
))
THEN
CALL SYSTEM$SEND_EMAIL(
'mar_alert_email_integration',
'your-email@example.com',
'[Fivetran MAR Alert] 月間アクティブ行数が多いことが検出されました',
'1つ以上のコネクタがMARしきい値を超えました。詳細はSnowflakeの「mar_summary_final_view」をご確認ください。'
);
メールアドレスの部分はダミーを入れておりますので、ご自身のメールアドレスに置き換えてください。
下記で手動実行が可能です。
EXECUTE ALERT high_mar_alert;
下記のように無事にメールを通知することができました。

補足情報として、アラートの停止と再開のコマンドは下記のとおりです。
-- 停止するコマンド
ALTER ALERT high_mar_alert SUSPEND;
-- 再開するコマンド
ALTER ALERT high_mar_alert RESUME;
dbt Packageのfivetran_log
dbt PackageにもFivetran Platform Connectorを活用したパッケージがあります。
fivetran_platform__mar_table_historyモデルは、テーブルの1か月の無料MAR、有料MAR、合計MARを表し、接続と送信先に関するデータも含まれています。

【ドキュメント】
packages.ymlファイルに下記内容を記載し、dbt depsコマンドでインストールします。
packages:
- package: fivetran/fivetran_log
version: [">=2.5.0", "<2.6.0"]
下記のような形でインストールできました。

今回はfivetran_platform__mar_table_historyだけ実行したいので、下記を実行しました。
dbt run --select +fivetran_platform__mar_table_history

Snowflake側にも下記のように反映されました。

これらのモデルを活用して、先ほどと同様にメール通知を行うこともできますのでぜひ試してみてください。
最後に
今回はVIEWを用いて作成しましたが、ダイナミックテーブルを用いてFivetranでデータが更新されたら自動的に更新されるような仕組みにしても良いと感じました。
非常にシンプルな形で作成したので、Connectionごとにアラート設定したり、スケジュールも月次・週次など応用できるので是非試してみてください!
この記事が何かの参考になれば幸いです!








