
FivetranのREST APIを用いてAirflowと接続し同期処理を行ってみた
かわばたです。
表題のとおり、FivetranのREST APIを用いてAirflowと接続し同期処理を行います。
特にAirflowはFivetranが提供する公式のAirflowプロバイダを使用することができるので、そちらを活用していきます。
【公式ドキュメント】
対象読者
- FivetranでAPI接続について興味のある方
- 特にAirflowとの接続に興味のある方
検証環境と今回行わないこと
検証環境
- Fivetranアカウント
- Docker Compose
今回行わないこと
- Docker Composeのインストールとセットアップ
【参考ドキュメント】
- Airflowのセットアップ
【参考ドキュメント】
Fivetranプロバイダのインストール(Docker)
docker-compose.yamlがあるフォルダに、requirements.txtという名前のファイルを作成し、下記内容を記述します。
airflow-provider-fivetran-async
こちらは参考のドキュメントにも記載がありますが、Airflow2.2以上で利用可能です。
【参考】
【以前のバージョン(現在は開発が止まっている)】
- コンテナをビルドし直して起動します。
# まずは現在のコンテナを停止
docker compose down
# --build をつけて起動
docker compose up --build -d

Fivetran APIキーの取得
【Fivetran REST API で同期を実行し Webhook で通知する】
上記記事を参考にAPIキーを取得しました。
-
Fivetranダッシュボードで
ユーザー名 > API Keyをクリックします。

-
Generate API keyをクリックしキーを生成します。

Generate API keyをクリックすると、下記画面がポップアップします。

Generateをクリックすると、キーが生成されます。

Airflow接続のセットアップ
- Airflow UIにログインし、上部メニューの
Admin > Connectionsを開きます。

+Add Connectionボタンをクリックして、新しい接続を追加します。

- 以下のようなポップアップ画面が出るので、
- Connection ID:fivetran_defaultとしてください
- Connection Type:Fivetranを選択してください
- Fivetran API Key:先ほど生成したキーをご入力ください
- Fivetran API Secret:先ほど生成したキーをご入力ください

- すべて入力ができましたら、以下のように登録されます。

DAGの作成 (サンプルコード)
fivetran_provider_async.operatorsを使用して、DAGを生成します。
どのコネクタへ同期するかを指定するために、connector_idを確認します。
- Fivetranの
Connectionsから、該当のコネクタを選択し、Fivetran connection idをコピーしておきます。

- dagsフォルダに、以下の内容でPythonファイルを作成しました。
import pendulum
from airflow.decorators import dag
from fivetran_provider_async.operators import FivetranOperator
# FivetranのUIで確認したConnector IDに書き換えてください
YOUR_FIVETRAN_CONNECTOR_ID = "コピーしたConnector IDをここに貼り付け" # 例: 'perform_cautious'
@dag(
dag_id="fivetran_sync_example",
schedule=None, # 手動実行
start_date=pendulum.datetime(2024, 1, 1, tz="Asia/Tokyo"),
catchup=False,
tags=["fivetran", "example"],
)
def fivetran_sync_dag():
"""
Fivetranの同期ジョブを実行し、完了まで待機するDAG
"""
FivetranOperator(
task_id="trigger_fivetran_sync",
# ステップ3で 'fivetran_default' 以外で登録した場合に指定
# fivetran_conn_id="fivetran_default",
# 同期したいFivetranコネクタのID
connector_id=YOUR_FIVETRAN_CONNECTOR_ID,
# Trueにすると、Fivetran側で同期が完了するまでこのタスクが 'running' 状態になります。
# これにより、後続タスク(例:dbtの実行)を正しく待機させることができます。
wait_for_completion=True,
)
# DAGをインスタンス化
fivetran_sync_dag()
-
AirflowUIのDagsから作成したDagを選択し、
Triggerをクリックします。

-
以下画面がポップアップするので、
Triggerをクリックします。

すると、以下のような画面に遷移し、無事成功しました。

Fivetran側も確認し無事に同期処理が終わっていました。

最後に
Astronomer社とFivetranによって管理されているFivetran用の新しいAirflowプロバイダーを使用しました。
今回は接続するだけでしたが、機能もいくつかあるので試してみたいです。
この記事が何かの参考になれば幸いです!








