Snowflake の外部リネージに Fivetran による連携元のテーブルを追加してみた
はじめに
2026年1月のアップデートで、Snowsight 上で以前から表示できたリネージ図に、外部データソースと宛先を含めることができるようになりました。こちらを試してみた内容を本記事でまとめてみます。
アップデートの概要
本機能については、以下に記載があります。
OpenLineage フレームワークを活用しており、Snowflake が提供する REST エンドポイント(/api/v2/lineage/external-lineage)を通じてリクエストを送ることで OpenLineage 互換のイベントを受け入れ、Snowsight 上に表示できるように構成できます。
OpenLineage との統合機能を持つツールの他、手動リクエストを送信してリネージを確立することも可能です。
例として、Snowflake 側で任意の宛先テーブル(ここではTEST_DB.PUBLIC.TEST_TARGET)と、以下のような JSON を用意します。ここでは外部のソースを使用し、宛先を Snowflake テーブルとする設定です。
{
"eventType": "COMPLETE",
"eventTime": "2026-02-18T12:00:00.000Z",
"producer": "https://example.com/manual-test",
"schemaURL": "https://openlineage.io/spec/0-0-1/OpenLineage.json",
"job": {
"namespace": "manual-test",
"name": "pg_to_snowflake_test"
},
"run": {
"runId": "11111111-1111-1111-1111-111111111111"
},
"inputs": [
{
"namespace": "postgres://dummy-server:5432",
"name": "sales.public.orders"
}
],
"outputs": [
{
"namespace": "snowflake://{組織}-{アカウント}",
"name": "TEST_DB.PUBLIC.TEST_TARGET"
}
]
}
権限のあるユーザーで指定のエンドポイントにリクエストを送信します。
curl -X POST \
-H "Authorization: Bearer $SNOWFLAKE_TOKEN" \
-H "Content-Type: application/json" \
-d @lineage.json \
https://{組織}-{アカウント}.snowflakecomputing.com/api/v2/lineage/external-lineage
すると下図のようにリネージ図に外部ソースが表示されます。

ポイントや主な制約は以下です。
- データとイベントの制約
- COMPLETE イベントのみをサポート
- 執筆時点で Snowflake は OpenLineage バージョン2 をサポートしていません
- カラムレベルのリネージはサポートされていません
- 接続に関する制約
- リネージは必ず「Snowflakeオブジェクト」と「外部オブジェクト」の組み合わせである必要があります
- 外部オブジェクト同士やSnowflake オブジェクト同士のリネージをこの API で作成することはできません,
- 入力か出力の少なくともどちらか一方が Snowflake のテーブルやビューである必要があります
- リネージは必ず「Snowflakeオブジェクト」と「外部オブジェクト」の組み合わせである必要があります
- 容量と制限
- 同じアカウントに保存できるイベントは 10,000個までという制限があります
- 外部リネージイベントの保持期間は 1年間です
- データセットの完全修飾名(FQN)は1,000文字を超えることはできません
検証内容
ここでは、以下の手順で外部リネージの追加を試してみます。
構成要素
- データソース:Amazon RDS for PostgreSQL
- データパイプラインツール:Fivetran
- DWH:Snowflake
実装手順
- Fivetran で Webhook を作成し、同期完了イベントを Amazon API Gateway に送信
- API Gateway 経由で AWS Lambda 関数をトリガー
- Lambda 関数内で以下を実行
- ソーステーブル(RDS PostgreSQL)のスキーマ・テーブル情報を取得
- 宛先テーブル(Snowflake)のスキーマ・テーブル情報を取得
- ※いずれも Fivetran の API から取得
- 取得した情報を使用して、Snowflake の外部リネージ API エンドポイントにリクエストを送信
前提条件
ここでは、Fivetran の単一のコネクションを想定しています。この場合、ソースとなる PostgreSQL のデータベース名、宛先となる Snowflake のデータベース名は固定となるため、環境変数経由で与えました。
事前準備
Fivetran で RDS for PostgreSQL のデータを Snowflake にロード
PostgreSQL インスタンスを作成し Fivetran で初期同期を行います。こちらの設定手順は以下に記載があります。同期方法は Logical replication としました。
サンプルデータベースとテーブルは以下の通り作成しました。
CREATE DATABASE sampledb;
--テーブル作成:users
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100)
);
--レコードの追加
INSERT INTO users (name, email) VALUES ('John Doe', 'john.doe@example.com');
INSERT INTO users (name, email) VALUES ('Jane Smith', 'jane.smith@example.com');
INSERT INTO users (name, email) VALUES ('Mike Brown', 'mike.brown@example.com');
--テーブル作成:orders
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_name VARCHAR(100),
product_name VARCHAR(100),
amount NUMERIC(10,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
--レコードの追加
INSERT INTO orders (customer_name, product_name, amount)
VALUES
('Alice', 'Laptop', 1200.00),
('Bob', 'Keyboard', 150.50),
('Charlie', 'Monitor', 320.99);
設定後、Snowflake 側にテーブルが作成されることを確認しておきます。

Fivetran:API キーの取得
API 経由でソースとターゲットに関する情報を取得するため、Fivetran 側で API キーを発行します。基本的な手順は以下をご参照ください。
キー発行後、以下のような問い合わせを行うとソースと宛先のスキーマとテーブル情報を取得できます。
curl -s -H "Authorization: Basic $AUTH_HEADER" \
https://api.fivetran.com/v1/connectors/$CONNECTOR_ID/schemas \
| jq -r '
.data.schemas
| to_entries[]
| .key as $src_schema
| .value.name_in_destination as $dest_schema
| .value.tables
| to_entries[]
| "\($src_schema).\(.key) -> \($dest_schema).\(.value.name_in_destination)"
'
出力
public.orders -> postgres_rds_public.orders
public.users -> postgres_rds_public.users
Snowflake:API 接続用ユーザーを作成
Snowflake 側では API 認証用のユーザーが必要となるので、以下の手順で用意しました。ポイントとして、外部リネージの作成・削除用に専用のアカウント権限が必要となります。
認証にはプログラムアクセストークンを使用しました。API での利用は以下の記事をご参照ください。
なお、Lambda 関数からのアクセス用にネットワークポリシーは広めな設定としている点にご注意ください。
USE ROLE SECURITYADMIN;
-- サービスユーザーを作成(外部接続用)
CREATE USER IF NOT EXISTS ext_integration_user
TYPE = SERVICE
COMMENT = '外部システム連携用サービスユーザー';
-- サービスユーザー用のロールを作成
CREATE ROLE ext_lineage_role;
-- アカウントレベルの外部リネージ権限
GRANT INGEST LINEAGE ON ACCOUNT TO ROLE ext_lineage_role;
GRANT DELETE LINEAGE ON ACCOUNT TO ROLE ext_lineage_role;
GRANT ROLE ext_lineage_role TO USER ext_integration_user;
GRANT ROLE ext_lineage_role TO ROLE SYSADMIN;
-- デフォルトロールを設定
ALTER USER ext_integration_user SET DEFAULT_ROLE = ext_lineage_role;
-- ネットワークポリシー
CREATE OR REPLACE NETWORK POLICY ext_integration_user_net_policy
ALLOWED_IP_LIST = ('0.0.0.0/0');
--ユーザーにネットワークポリシーを適用
ALTER USER ext_integration_user SET NETWORK_POLICY = ext_integration_user_net_policy;
ALTER USER ext_integration_user UNSET NETWORK_POLICY;
-- PATを生成
ALTER USER ext_integration_user ADD PROGRAMMATIC ACCESS TOKEN api_token
days_to_expiry = 365 -- アクセストークンの有効期限:最大365日
role_restriction = ext_lineage_role;
Fivetran:Webhook を作成
Fivetran では Webhook によるイベント通知の構成が可能です。これにより、例えば同期完了(sync_end)をイベントに通知を送信できます。
以下の手順で Webhook を作成しました。
#group id の取得
curl -X GET https://api.fivetran.com/v1/groups \
-H "Content-Type: application/json" \
-H "Authorization: Basic $AUTH_HEADER"
# Webhookの作成
# URL: API Gatewayのエンドポイント
# FIVETRAN_SECRET: 任意の文字列
curl -X POST "https://api.fivetran.com/v1/webhooks/group/${GROUP_ID}" \
-H "Content-Type: application/json" \
-H "Authorization: Basic $AUTH_HEADER" \
-d "{
\"url\": \"${URL}\",
\"events\": [\"sync_end\"],
\"active\": true,
\"secret\": \"${FIVETRAN_SECRET}\"
}"
Webhook 作成後は、IDが出力されるので、メモしておくとテストも可能です。
# Webhookのテスト
WEBHOOK_ID="XXXXX"
curl -X POST \
-H "Authorization: Basic $AUTH_HEADER" \
-H "Content-Type: application/json" \
-d '{
"event": "sync_end"
}' \
https://api.fivetran.com/v1/webhooks/${WEBHOOK_ID}/test
出力
{"code":"Success","message":"Webhook test completed successfully","data":{"succeed":true,"status":200,"message":"{\"message\": \"Processed sync_end\"}"}}
AWS側:API Gateway・Lambda 関数の作成
こちらの基本的な手順は以下をご参照ください。
Lambda 関数は以下の処理を実装しています。
- Fivetran Webhook の署名検証
- Fivetran API からソーステーブルと宛先テーブルのマッピング情報を取得
- OpenLineage 形式のペイロード生成
- テーブル単位で個別のリネージイベントを作成
runIdとjob.nameをテーブルごとにユニーク化することで、各テーブルのリネージを個別に追加
- Snowflake の外部リネージ API への送信
なお、本記事では検証を簡素化するため、トークン等を環境変数に直接設定していますが、本番環境では AWS Secrets Manager 等を使用ください。
import os
import json
import base64
import urllib.request
import urllib.error
import hmac
import hashlib
# =========================================================
# Fivetran 署名検証
# =========================================================
def verify_signature(secret, body, signature):
computed = hmac.new(
secret.encode(),
body.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(computed, signature)
# =========================================================
# Snowflake送信
# =========================================================
def send_lineage_to_snowflake(payload_json):
account = os.environ["SNOWFLAKE_ACCOUNT"]
token = os.environ["SNOWFLAKE_TOKEN"]
url = f"https://{account}.snowflakecomputing.com/api/v2/lineage/external-lineage"
data_bytes = json.dumps(payload_json).encode("utf-8")
req = urllib.request.Request(
url,
data=data_bytes,
method="POST"
)
req.add_header("Authorization", f"Bearer {token}")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req) as response:
response_body = response.read().decode()
print("✅ Snowflake success response:")
print(response_body)
return response_body
except urllib.error.HTTPError as e:
error_body = e.read().decode()
print("❌ Snowflake HTTP Error:", e.code)
print("❌ Snowflake Response Body:")
print(error_body)
raise
# =========================================================
# Lambda Handler
# =========================================================
def lambda_handler(event, context):
print("✅ Raw event:", json.dumps(event))
# ===== 環境変数 =====
api_key = os.environ["FIVETRAN_API_KEY"]
api_secret = os.environ["FIVETRAN_API_SECRET"]
connector_id = os.environ["CONNECTOR_ID"]
webhook_secret = os.environ["FIVETRAN_WEBHOOK_SECRET"]
source_db_name = os.environ["SOURCE_DB_NAME"]
source_host = os.environ["SOURCE_DB_HOST"]
snowflake_account = os.environ["SNOWFLAKE_ACCOUNT"]
snowflake_database = os.environ["SNOWFLAKE_DATABASE"]
# ===== body取得 =====
body = event.get("body", "")
if event.get("isBase64Encoded"):
body = base64.b64decode(body).decode()
headers = event.get("headers", {})
signature = headers.get("x-fivetran-signature-256")
if not signature:
return {"statusCode": 401, "body": "Missing signature"}
if not verify_signature(webhook_secret, body, signature):
return {"statusCode": 401, "body": "Invalid signature"}
payload = json.loads(body)
if payload.get("event") != "sync_end":
return {"statusCode": 200, "body": "Ignored"}
sync_id = payload.get("sync_id")
event_time = payload.get("created")
connector_name = payload.get("connector_name")
if not sync_id or not event_time:
return {"statusCode": 400, "body": "Missing sync_id or event_time"}
print(f"✅ Processing sync_id: {sync_id}")
# =====================================================
# Fivetran schemas API取得
# =====================================================
credentials = f"{api_key}:{api_secret}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
schemas_url = f"https://api.fivetran.com/v1/connectors/{connector_id}/schemas"
req = urllib.request.Request(schemas_url)
req.add_header("Authorization", f"Basic {encoded_credentials}")
with urllib.request.urlopen(req) as response:
result = json.loads(response.read().decode())
schemas = result["data"]["schemas"]
# =====================================================
# ✅ 1テーブル = 1イベント
# ✅ runId テーブル単位
# ✅ job.name もテーブル単位でユニーク
# =====================================================
for src_schema_name, schema_data in schemas.items():
dest_schema = schema_data["name_in_destination"]
for table_name, table_data in schema_data["tables"].items():
if not table_data["enabled"]:
continue
dest_table = table_data["name_in_destination"]
table_run_id = f"{sync_id}_{dest_table.upper()}"
# job名をユニーク化
unique_job_name = f"{connector_name}_{dest_schema}_{dest_table}".lower()
lineage_payload = {
"eventType": "COMPLETE",
"eventTime": event_time,
"producer": "https://aws-lambda-fivetran-lineage",
"schemaURL": "https://openlineage.io/spec/1-0-0/OpenLineage.json",
"job": {
"namespace": "fivetran",
"name": unique_job_name
},
"run": {
"runId": table_run_id
},
"inputs": [
{
"namespace": source_host,
"name": f"{source_db_name}.{src_schema_name}.{table_name}"
}
],
"outputs": [
{
"namespace": f"snowflake://{snowflake_account}",
"name": f"{snowflake_database.upper()}.{dest_schema.upper()}.{dest_table.upper()}"
}
]
}
print("✅ Sending lineage for table:")
print(json.dumps(lineage_payload, indent=2))
send_lineage_to_snowflake(lineage_payload)
return {
"statusCode": 200,
"body": "Lineage sent successfully"
}
Fivetran で同期を実行
各種設定ができたので、Fivetran で同期を実行します。下図の2テーブルが更新されます。

同期が完了すると、Webhook が起動し API Gateway 側で同期完了のイベントを受け取り Lambda 関数が起動します。
Lambda 関数では Fivetran の API からコネクションに関するソース・宛先のテーブル情報を取得し、Snowflake の送信する OpenLineage 準拠の JSON スキーマを作成し、リクエストを送信します。

実行完了後、Snowflake 側を確認すると下図のように外部のソーステーブルを含むリネージ図を確認することができました。
- Ordersテーブル

- Usersテーブル

新規テーブルの追加
データソース側で新規テーブルを追加し、同期とあわせてリネージ図も反映されるか確認します。
-- PostgreSQLで新規テーブルを作成: customersテーブル
\c sampledb
CREATE TABLE public.customers (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO public.customers (name, email)
VALUES
('Alice Johnson', 'alice@example.com'),
('Bob Smith', 'bob@example.com'),
('Charlie Brown', 'charlie@example.com');
Fivetran で同期を行います。テーブルの追加を確認できます。

Fivetran のスキーマ側でもテーブルの追加を確認できます。

Snowflake 側を確認すると、Lambda 関数により新規テーブルについても、自動で外部リネージが追加されていることを確認できました。

外部リネージの削除
追加した外部リネージは、以下のようなリクエストで現在 API 経由でのみ削除できます。
curl -i -X DELETE \
-H "Authorization: Bearer $SNOWFLAKE_TOKEN" \
-H "Accept: application/json" \
"https://{SNOWFLAKE_ACCOUNT}.snowflakecomputing.com/api/v2/lineage/external-lineage?sourceNamespace=postgres%3A%2F%2F{RDS_ENDPOINT}%3A5432&sourceName=sampledb.public.orders&sourceDatasetType=External%20Node&targetNamespace=snowflake%3A%2F%2F{SNOWFLAKE_ACCOUNT}&targetName=FIVETRAN_DATABASE.POSTGRES_RDS_PUBLIC.ORDERS&targetDatasetType=External%20Node"
さいごに
簡単な設定としましたが Snowflake の外部リネージ機能を試してみました。
こちらの内容がどなたかの参考になれば幸いです。







