Fivetran Platform Connectorを活用してSnowflake上でMARを自動検知する仕組みを作成してみた

Fivetran Platform Connectorを活用してSnowflake上でMARを自動検知する仕組みを作成してみた

2025.12.17

かわばたです。

表題のとおり、Fivetran Platform Connectorを活用してSnowflake上でMARを自動検知する仕組みを作成していきたいと思います。

【公式ドキュメント】
https://fivetran.com/docs/logs/fivetran-platform

対象読者

  • Fivetran Platform Connectorについて確認したい方
  • FivetranのMARを自動検知したい方

検証環境と事前準備

検証環境

  • Fivetran アカウント
  • Snowflakeトライアルアカウント Enterprise版

事前準備

Fivetran Platform Connectorのセットアップは公式ドキュメントと下記ブログが参考になりますのでご確認ください。

【公式ドキュメント】
https://fivetran.com/docs/logs/fivetran-platform/setup-guide

【参考ブログ】
https://dev.classmethod.jp/articles/05361328511a48aeea6202944a1a5f23f2ba43fa/

Destinationの設定も行います。
今回はSnowflakeで行いましたので、下記ブログが参考になるかと思います。
【参考ブログ】
https://dev.classmethod.jp/articles/fivetran-destination-setting-snowflake/

試してみた

INCREMENTAL_MARについて

INCREMENTAL_MARには、各宛先スキーマの毎日のアクティブ行 (MAR) の増分、関連付けられているテーブル、およびMARが計算される時刻が項目としてあります。
具体的には下記です。

カラム名 内容
destination_id MARが計算される宛先のID
free_type 無料のMARの場合、値は無料のMARの種類となり、有料のMARの場合、PAIDとなる
incremental_rows 接続に対して同期された現在の日付の新しい個別の主キーの数
connection_name MARが計算される接続の名前
measured_date MARが計算される日付(UTC 形式)
schema_name MARが計算される宛先スキーマ名
sync_type MARの計算対象となる同期がHISTORICALかINCREMENTALかを定義
table_name MARに関連付けられたテーブル名
_fivetran_synced Fivetranが最後に行を正常に同期したときのタイムスタンプ
updated_at MARが更新されたときのタイムスタンプ

VIEWの作成

当月の有料のMARを算出するVIEWを作成します。

CREATE OR REPLACE VIEW mar_summary_final_view AS
SELECT
    DATE_TRUNC('month', measured_date) AS measured_month,
    SUM(incremental_rows) AS total_monthly_active_rows
FROM
    FIVETRAN_DATABASE.FIVETRAN_LOG.INCREMENTAL_MAR
WHERE
    FREE_TYPE = 'PAID'
AND
    DATE_TRUNC('month', measured_date) = DATE_TRUNC('month', CURRENT_DATE())
GROUP BY
    measured_month;

結果を確認すると、下記のように当月のConnection別のMARを確認することができます。
※トライアル版で試しているため、FREE_TYPE = 'PAID'のロジックを入れずに出力していますがドキュメント上問題ない認識です。
2025-12-17_20h57_02

アラートの設定

total_monthly_active_rowsがしきい値を超えた場合にメールで通知するアラート設定を行います。

【ドキュメント】
https://docs.snowflake.com/ja/user-guide/notifications/email-stored-procedures

CREATE OR REPLACE ALERT high_mar_alert
    WAREHOUSE = COMPUTE_WH -- 実行するウェアハウスを指定
    SCHEDULE = 'USING CRON 0 2 * * * UTC' -- 毎日午前2時 (UTC) に実行(同期時間などを考慮する)
    IF (EXISTS (
        SELECT 1
        FROM mar_summary_final_view
        WHERE measured_month = DATE_TRUNC('month', CURRENT_DATE())
          AND total_monthly_active_rows > 100000 -- 監視したいMARのしきい値を設定
    ))
    THEN
        CALL SYSTEM$SEND_EMAIL(
            'mar_alert_email_integration',
            'your-email@example.com',
            '[Fivetran MAR Alert] 月間アクティブ行数が多いことが検出されました',
            '1つ以上のコネクタがMARしきい値を超えました。詳細はSnowflakeの「mar_summary_final_view」をご確認ください。'
        );

メールアドレスの部分はダミーを入れておりますので、ご自身のメールアドレスに置き換えてください。

下記で手動実行が可能です。

EXECUTE ALERT high_mar_alert;

下記のように無事にメールを通知することができました。
2025-12-17_21h08_51

補足情報として、アラートの停止と再開のコマンドは下記のとおりです。

-- 停止するコマンド
ALTER ALERT high_mar_alert SUSPEND;

-- 再開するコマンド
ALTER ALERT high_mar_alert RESUME;

dbt Packageのfivetran_log

dbt PackageにもFivetran Platform Connectorを活用したパッケージがあります。
fivetran_platform__mar_table_historyモデルは、テーブルの1か月の無料MAR、有料MAR、合計MARを表し、接続と送信先に関するデータも含まれています。

2025-12-17_21h37_21

【ドキュメント】
https://hub.getdbt.com/fivetran/fivetran_log/latest/

packages.ymlファイルに下記内容を記載し、dbt depsコマンドでインストールします。

packages:
  - package: fivetran/fivetran_log
    version: [">=2.5.0", "<2.6.0"]

下記のような形でインストールできました。
2025-12-17_21h39_30

今回はfivetran_platform__mar_table_historyだけ実行したいので、下記を実行しました。

dbt run --select +fivetran_platform__mar_table_history

2025-12-17_21h41_05

Snowflake側にも下記のように反映されました。
2025-12-17_21h43_03

これらのモデルを活用して、先ほどと同様にメール通知を行うこともできますのでぜひ試してみてください。

最後に

今回はVIEWを用いて作成しましたが、ダイナミックテーブルを用いてFivetranでデータが更新されたら自動的に更新されるような仕組みにしても良いと感じました。
非常にシンプルな形で作成したので、Connectionごとにアラート設定したり、スケジュールも月次・週次など応用できるので是非試してみてください!

この記事が何かの参考になれば幸いです!

この記事をシェアする

FacebookHatena blogX

関連記事