FivetranのREST APIを用いてAirflowと接続し同期処理を行ってみた

FivetranのREST APIを用いてAirflowと接続し同期処理を行ってみた

2025.10.24

かわばたです。

表題のとおり、FivetranのREST APIを用いてAirflowと接続し同期処理を行います。
特にAirflowはFivetranが提供する公式のAirflowプロバイダを使用することができるので、そちらを活用していきます。

【公式ドキュメント】
https://fivetran.com/docs/rest-api/api-tools

対象読者

  • FivetranでAPI接続について興味のある方
  • 特にAirflowとの接続に興味のある方

検証環境と今回行わないこと

検証環境

  • Fivetranアカウント
  • Docker Compose

今回行わないこと

  • Docker Composeのインストールとセットアップ
    【参考ドキュメント】

https://docs.docker.com/compose/install/

  • Airflowのセットアップ

【参考ドキュメント】
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html

Fivetranプロバイダのインストール(Docker)

  1. docker-compose.yamlがあるフォルダに、requirements.txtという名前のファイルを作成し、下記内容を記述します。
airflow-provider-fivetran-async

こちらは参考のドキュメントにも記載がありますが、Airflow2.2以上で利用可能です。

【参考】

https://github.com/astronomer/airflow-provider-fivetran-async

【以前のバージョン(現在は開発が止まっている)】

https://github.com/fivetran/airflow-provider-fivetran

  1. コンテナをビルドし直して起動します。
# まずは現在のコンテナを停止
docker compose down

# --build をつけて起動
docker compose up --build -d

2025-10-23_16h44_15

Fivetran APIキーの取得

【Fivetran REST API で同期を実行し Webhook で通知する】
https://dev.classmethod.jp/articles/fivetran-api-sync-webhook-notification/

上記記事を参考にAPIキーを取得しました。

  1. Fivetranダッシュボードでユーザー名 > API Keyをクリックします。
    2025-10-23_16h54_15

  2. Generate API keyをクリックしキーを生成します。
    2025-10-23_16h55_55

Generate API keyをクリックすると、下記画面がポップアップします。
2025-10-23_16h57_03

  1. Generateをクリックすると、キーが生成されます。
    2025-10-23_16h59_54

Airflow接続のセットアップ

  1. Airflow UIにログインし、上部メニューのAdmin > Connectionsを開きます。

2025-10-23_17h39_41

  1. +Add Connectionボタンをクリックして、新しい接続を追加します。

2025-10-23_17h40_36

  1. 以下のようなポップアップ画面が出るので、
  • Connection ID:fivetran_defaultとしてください
  • Connection Type:Fivetranを選択してください
  • Fivetran API Key:先ほど生成したキーをご入力ください
  • Fivetran API Secret:先ほど生成したキーをご入力ください

2025-10-23_19h03_33

  1. すべて入力ができましたら、以下のように登録されます。

2025-10-23_23h27_43

DAGの作成 (サンプルコード)

fivetran_provider_async.operatorsを使用して、DAGを生成します。
どのコネクタへ同期するかを指定するために、connector_idを確認します。

  1. FivetranのConnectionsから、該当のコネクタを選択し、Fivetran connection idをコピーしておきます。

2025-10-23_23h10_18

  1. dagsフォルダに、以下の内容でPythonファイルを作成しました。
import pendulum
from airflow.decorators import dag
from fivetran_provider_async.operators import FivetranOperator

# FivetranのUIで確認したConnector IDに書き換えてください
YOUR_FIVETRAN_CONNECTOR_ID = "コピーしたConnector IDをここに貼り付け"  # 例: 'perform_cautious'

@dag(
    dag_id="fivetran_sync_example",
    schedule=None,  # 手動実行
    start_date=pendulum.datetime(2024, 1, 1, tz="Asia/Tokyo"),
    catchup=False,
    tags=["fivetran", "example"],
)
def fivetran_sync_dag():
    """
    Fivetranの同期ジョブを実行し、完了まで待機するDAG
    """

    FivetranOperator(
        task_id="trigger_fivetran_sync",

        # ステップ3で 'fivetran_default' 以外で登録した場合に指定
        # fivetran_conn_id="fivetran_default", 

        # 同期したいFivetranコネクタのID
        connector_id=YOUR_FIVETRAN_CONNECTOR_ID,

        # Trueにすると、Fivetran側で同期が完了するまでこのタスクが 'running' 状態になります。
        # これにより、後続タスク(例:dbtの実行)を正しく待機させることができます。
        wait_for_completion=True,

    )

# DAGをインスタンス化
fivetran_sync_dag()
  1. AirflowUIのDagsから作成したDagを選択し、Triggerをクリックします。
    2025-10-23_23h17_08

  2. 以下画面がポップアップするので、Triggerをクリックします。
    2025-10-23_23h11_16

すると、以下のような画面に遷移し、無事成功しました。
2025-10-23_23h16_38

Fivetran側も確認し無事に同期処理が終わっていました。

2025-10-23_23h12_52

最後に

Astronomer社とFivetranによって管理されているFivetran用の新しいAirflowプロバイダーを使用しました。
今回は接続するだけでしたが、機能もいくつかあるので試してみたいです。
この記事が何かの参考になれば幸いです!

この記事をシェアする

FacebookHatena blogX

関連記事