Snowflake の外部リネージに Fivetran による連携元のテーブルを追加してみた

Snowflake の外部リネージに Fivetran による連携元のテーブルを追加してみた

2026.02.18

はじめに

2026年1月のアップデートで、Snowsight 上で以前から表示できたリネージ図に、外部データソースと宛先を含めることができるようになりました。こちらを試してみた内容を本記事でまとめてみます。

https://docs.snowflake.com/ja/release-notes/2026/other/2026-01-16-external-lineage

アップデートの概要

本機能については、以下に記載があります。

https://docs.snowflake.com/ja/user-guide/external-lineage

OpenLineage フレームワークを活用しており、Snowflake が提供する REST エンドポイント(/api/v2/lineage/external-lineage)を通じてリクエストを送ることで OpenLineage 互換のイベントを受け入れ、Snowsight 上に表示できるように構成できます。

OpenLineage との統合機能を持つツールの他、手動リクエストを送信してリネージを確立することも可能です。
例として、Snowflake 側で任意の宛先テーブル(ここではTEST_DB.PUBLIC.TEST_TARGET)と、以下のような JSON を用意します。ここでは外部のソースを使用し、宛先を Snowflake テーブルとする設定です。

{
  "eventType": "COMPLETE",
  "eventTime": "2026-02-18T12:00:00.000Z",
  "producer": "https://example.com/manual-test",
  "schemaURL": "https://openlineage.io/spec/0-0-1/OpenLineage.json",
  "job": {
    "namespace": "manual-test",
    "name": "pg_to_snowflake_test"
  },
  "run": {
    "runId": "11111111-1111-1111-1111-111111111111"
  },
  "inputs": [
    {
      "namespace": "postgres://dummy-server:5432",
      "name": "sales.public.orders"
    }
  ],
  "outputs": [
    {
      "namespace": "snowflake://{組織}-{アカウント}",
      "name": "TEST_DB.PUBLIC.TEST_TARGET"
    }
  ]
}

権限のあるユーザーで指定のエンドポイントにリクエストを送信します。

curl -X POST \
  -H "Authorization: Bearer $SNOWFLAKE_TOKEN" \
  -H "Content-Type: application/json" \
  -d @lineage.json \
  https://{組織}-{アカウント}.snowflakecomputing.com/api/v2/lineage/external-lineage

すると下図のようにリネージ図に外部ソースが表示されます。

image

ポイントや主な制約は以下です。

  • データとイベントの制約
    • COMPLETE イベントのみをサポート
    • 執筆時点で Snowflake は OpenLineage バージョン2 をサポートしていません
    • カラムレベルのリネージはサポートされていません
  • 接続に関する制約
    • リネージは必ず「Snowflakeオブジェクト」と「外部オブジェクト」の組み合わせである必要があります
      • 外部オブジェクト同士やSnowflake オブジェクト同士のリネージをこの API で作成することはできません,
    • 入力か出力の少なくともどちらか一方が Snowflake のテーブルやビューである必要があります
  • 容量と制限
    • 同じアカウントに保存できるイベントは 10,000個までという制限があります
    • 外部リネージイベントの保持期間は 1年間です
    • データセットの完全修飾名(FQN)は1,000文字を超えることはできません

検証内容

ここでは、以下の手順で外部リネージの追加を試してみます。

構成要素

  • データソース:Amazon RDS for PostgreSQL
  • データパイプラインツール:Fivetran
  • DWH:Snowflake

実装手順

  1. Fivetran で Webhook を作成し、同期完了イベントを Amazon API Gateway に送信
  2. API Gateway 経由で AWS Lambda 関数をトリガー
  3. Lambda 関数内で以下を実行
    • ソーステーブル(RDS PostgreSQL)のスキーマ・テーブル情報を取得
    • 宛先テーブル(Snowflake)のスキーマ・テーブル情報を取得
    • ※いずれも Fivetran の API から取得
  4. 取得した情報を使用して、Snowflake の外部リネージ API エンドポイントにリクエストを送信

前提条件

ここでは、Fivetran の単一のコネクションを想定しています。この場合、ソースとなる PostgreSQL のデータベース名、宛先となる Snowflake のデータベース名は固定となるため、環境変数経由で与えました。

事前準備

Fivetran で RDS for PostgreSQL のデータを Snowflake にロード

PostgreSQL インスタンスを作成し Fivetran で初期同期を行います。こちらの設定手順は以下に記載があります。同期方法は Logical replication としました。

https://fivetran.com/docs/connectors/databases/postgresql/rds-setup-guide

サンプルデータベースとテーブルは以下の通り作成しました。

CREATE DATABASE sampledb;
--テーブル作成:users
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100)
);
--レコードの追加
INSERT INTO users (name, email) VALUES ('John Doe', 'john.doe@example.com');
INSERT INTO users (name, email) VALUES ('Jane Smith', 'jane.smith@example.com');
INSERT INTO users (name, email) VALUES ('Mike Brown', 'mike.brown@example.com');

--テーブル作成:orders
CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    customer_name VARCHAR(100),
    product_name VARCHAR(100),
    amount NUMERIC(10,2),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
--レコードの追加
INSERT INTO orders (customer_name, product_name, amount)
VALUES
('Alice', 'Laptop', 1200.00),
('Bob', 'Keyboard', 150.50),
('Charlie', 'Monitor', 320.99);

設定後、Snowflake 側にテーブルが作成されることを確認しておきます。

image 1

Fivetran:API キーの取得

API 経由でソースとターゲットに関する情報を取得するため、Fivetran 側で API キーを発行します。基本的な手順は以下をご参照ください。

https://dev.classmethod.jp/articles/fivetran-api-sync-webhook-notification/

キー発行後、以下のような問い合わせを行うとソースと宛先のスキーマとテーブル情報を取得できます。

curl -s -H "Authorization: Basic $AUTH_HEADER" \
https://api.fivetran.com/v1/connectors/$CONNECTOR_ID/schemas \
| jq -r '
  .data.schemas
  | to_entries[]
  | .key as $src_schema
  | .value.name_in_destination as $dest_schema
  | .value.tables
  | to_entries[]
  | "\($src_schema).\(.key)  ->  \($dest_schema).\(.value.name_in_destination)"
'

出力

public.orders  ->  postgres_rds_public.orders
public.users  ->  postgres_rds_public.users

Snowflake:API 接続用ユーザーを作成

Snowflake 側では API 認証用のユーザーが必要となるので、以下の手順で用意しました。ポイントとして、外部リネージの作成・削除用に専用のアカウント権限が必要となります。
認証にはプログラムアクセストークンを使用しました。API での利用は以下の記事をご参照ください。

https://dev.classmethod.jp/articles/snowflake-sql-api-pat-try/

なお、Lambda 関数からのアクセス用にネットワークポリシーは広めな設定としている点にご注意ください。

USE ROLE SECURITYADMIN;

-- サービスユーザーを作成(外部接続用)
CREATE USER IF NOT EXISTS ext_integration_user
    TYPE = SERVICE
    COMMENT = '外部システム連携用サービスユーザー';

-- サービスユーザー用のロールを作成
CREATE ROLE ext_lineage_role;

-- アカウントレベルの外部リネージ権限
GRANT INGEST LINEAGE ON ACCOUNT TO ROLE ext_lineage_role;
GRANT DELETE LINEAGE ON ACCOUNT TO ROLE ext_lineage_role;

GRANT ROLE ext_lineage_role TO USER ext_integration_user;
GRANT ROLE ext_lineage_role TO ROLE SYSADMIN;

-- デフォルトロールを設定
ALTER USER ext_integration_user SET DEFAULT_ROLE = ext_lineage_role;

-- ネットワークポリシー
CREATE OR REPLACE NETWORK POLICY ext_integration_user_net_policy
    ALLOWED_IP_LIST = ('0.0.0.0/0');

--ユーザーにネットワークポリシーを適用
ALTER USER ext_integration_user SET NETWORK_POLICY = ext_integration_user_net_policy;
ALTER USER ext_integration_user UNSET NETWORK_POLICY;

-- PATを生成
ALTER USER ext_integration_user ADD PROGRAMMATIC ACCESS TOKEN  api_token
  days_to_expiry = 365 -- アクセストークンの有効期限:最大365日
  role_restriction = ext_lineage_role; 

Fivetran:Webhook を作成

Fivetran では Webhook によるイベント通知の構成が可能です。これにより、例えば同期完了(sync_end)をイベントに通知を送信できます。

以下の手順で Webhook を作成しました。

#group id の取得
curl -X GET https://api.fivetran.com/v1/groups \
-H "Content-Type: application/json" \
-H "Authorization: Basic $AUTH_HEADER"

# Webhookの作成
# URL: API Gatewayのエンドポイント
# FIVETRAN_SECRET: 任意の文字列
curl -X POST "https://api.fivetran.com/v1/webhooks/group/${GROUP_ID}" \
  -H "Content-Type: application/json" \
  -H "Authorization: Basic $AUTH_HEADER" \
  -d "{
    \"url\": \"${URL}\",
    \"events\": [\"sync_end\"],
    \"active\": true,
    \"secret\": \"${FIVETRAN_SECRET}\"
  }"

Webhook 作成後は、IDが出力されるので、メモしておくとテストも可能です。

# Webhookのテスト
WEBHOOK_ID="XXXXX"

curl -X POST \
  -H "Authorization: Basic $AUTH_HEADER" \
  -H "Content-Type: application/json" \
  -d '{
        "event": "sync_end"
      }' \
  https://api.fivetran.com/v1/webhooks/${WEBHOOK_ID}/test

出力

{"code":"Success","message":"Webhook test completed successfully","data":{"succeed":true,"status":200,"message":"{\"message\": \"Processed sync_end\"}"}}

AWS側:API Gateway・Lambda 関数の作成

こちらの基本的な手順は以下をご参照ください。

https://dev.classmethod.jp/articles/fivetran-api-sync-webhook-notification/

https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/services-apigateway.html

Lambda 関数は以下の処理を実装しています。

  1. Fivetran Webhook の署名検証
  2. Fivetran API からソーステーブルと宛先テーブルのマッピング情報を取得
  3. OpenLineage 形式のペイロード生成
    • テーブル単位で個別のリネージイベントを作成
    • runId と job.name をテーブルごとにユニーク化することで、各テーブルのリネージを個別に追加
  4. Snowflake の外部リネージ API への送信

なお、本記事では検証を簡素化するため、トークン等を環境変数に直接設定していますが、本番環境では AWS Secrets Manager 等を使用ください。

import os
import json
import base64
import urllib.request
import urllib.error
import hmac
import hashlib

# =========================================================
# Fivetran 署名検証
# =========================================================
def verify_signature(secret, body, signature):
    computed = hmac.new(
        secret.encode(),
        body.encode(),
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(computed, signature)

# =========================================================
# Snowflake送信
# =========================================================
def send_lineage_to_snowflake(payload_json):
    account = os.environ["SNOWFLAKE_ACCOUNT"]
    token = os.environ["SNOWFLAKE_TOKEN"]

    url = f"https://{account}.snowflakecomputing.com/api/v2/lineage/external-lineage"

    data_bytes = json.dumps(payload_json).encode("utf-8")

    req = urllib.request.Request(
        url,
        data=data_bytes,
        method="POST"
    )

    req.add_header("Authorization", f"Bearer {token}")
    req.add_header("Content-Type", "application/json")
    req.add_header("Accept", "application/json")

    try:
        with urllib.request.urlopen(req) as response:
            response_body = response.read().decode()
            print("✅ Snowflake success response:")
            print(response_body)
            return response_body

    except urllib.error.HTTPError as e:
        error_body = e.read().decode()
        print("❌ Snowflake HTTP Error:", e.code)
        print("❌ Snowflake Response Body:")
        print(error_body)
        raise

# =========================================================
# Lambda Handler
# =========================================================
def lambda_handler(event, context):

    print("✅ Raw event:", json.dumps(event))

    # ===== 環境変数 =====
    api_key = os.environ["FIVETRAN_API_KEY"]
    api_secret = os.environ["FIVETRAN_API_SECRET"]
    connector_id = os.environ["CONNECTOR_ID"]
    webhook_secret = os.environ["FIVETRAN_WEBHOOK_SECRET"]

    source_db_name = os.environ["SOURCE_DB_NAME"]
    source_host = os.environ["SOURCE_DB_HOST"]

    snowflake_account = os.environ["SNOWFLAKE_ACCOUNT"]
    snowflake_database = os.environ["SNOWFLAKE_DATABASE"]

    # ===== body取得 =====
    body = event.get("body", "")

    if event.get("isBase64Encoded"):
        body = base64.b64decode(body).decode()

    headers = event.get("headers", {})
    signature = headers.get("x-fivetran-signature-256")

    if not signature:
        return {"statusCode": 401, "body": "Missing signature"}

    if not verify_signature(webhook_secret, body, signature):
        return {"statusCode": 401, "body": "Invalid signature"}

    payload = json.loads(body)

    if payload.get("event") != "sync_end":
        return {"statusCode": 200, "body": "Ignored"}

    sync_id = payload.get("sync_id")
    event_time = payload.get("created")
    connector_name = payload.get("connector_name")

    if not sync_id or not event_time:
        return {"statusCode": 400, "body": "Missing sync_id or event_time"}

    print(f"✅ Processing sync_id: {sync_id}")

    # =====================================================
    # Fivetran schemas API取得
    # =====================================================
    credentials = f"{api_key}:{api_secret}"
    encoded_credentials = base64.b64encode(credentials.encode()).decode()

    schemas_url = f"https://api.fivetran.com/v1/connectors/{connector_id}/schemas"

    req = urllib.request.Request(schemas_url)
    req.add_header("Authorization", f"Basic {encoded_credentials}")

    with urllib.request.urlopen(req) as response:
        result = json.loads(response.read().decode())

    schemas = result["data"]["schemas"]

    # =====================================================
    # ✅ 1テーブル = 1イベント
    # ✅ runId テーブル単位
    # ✅ job.name もテーブル単位でユニーク
    # =====================================================
    for src_schema_name, schema_data in schemas.items():

        dest_schema = schema_data["name_in_destination"]

        for table_name, table_data in schema_data["tables"].items():

            if not table_data["enabled"]:
                continue

            dest_table = table_data["name_in_destination"]

            table_run_id = f"{sync_id}_{dest_table.upper()}"

            # job名をユニーク化
            unique_job_name = f"{connector_name}_{dest_schema}_{dest_table}".lower()

            lineage_payload = {
                "eventType": "COMPLETE",
                "eventTime": event_time,
                "producer": "https://aws-lambda-fivetran-lineage",
                "schemaURL": "https://openlineage.io/spec/1-0-0/OpenLineage.json",
                "job": {
                    "namespace": "fivetran",
                    "name": unique_job_name
                },
                "run": {
                    "runId": table_run_id
                },
                "inputs": [
                    {
                        "namespace": source_host,
                        "name": f"{source_db_name}.{src_schema_name}.{table_name}"
                    }
                ],
                "outputs": [
                    {
                        "namespace": f"snowflake://{snowflake_account}",
                        "name": f"{snowflake_database.upper()}.{dest_schema.upper()}.{dest_table.upper()}"
                    }
                ]
            }

            print("✅ Sending lineage for table:")
            print(json.dumps(lineage_payload, indent=2))

            send_lineage_to_snowflake(lineage_payload)

    return {
        "statusCode": 200,
        "body": "Lineage sent successfully"
    }

Fivetran で同期を実行

各種設定ができたので、Fivetran で同期を実行します。下図の2テーブルが更新されます。

image 2

同期が完了すると、Webhook が起動し API Gateway 側で同期完了のイベントを受け取り Lambda 関数が起動します。

Lambda 関数では Fivetran の API からコネクションに関するソース・宛先のテーブル情報を取得し、Snowflake の送信する OpenLineage 準拠の JSON スキーマを作成し、リクエストを送信します。

image 3

実行完了後、Snowflake 側を確認すると下図のように外部のソーステーブルを含むリネージ図を確認することができました。

  • Ordersテーブル

image 4

  • Usersテーブル

image 5

新規テーブルの追加

データソース側で新規テーブルを追加し、同期とあわせてリネージ図も反映されるか確認します。

-- PostgreSQLで新規テーブルを作成: customersテーブル
\c sampledb

CREATE TABLE public.customers (
    id SERIAL PRIMARY KEY,
    name TEXT NOT NULL,
    email TEXT UNIQUE NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

INSERT INTO public.customers (name, email)
VALUES
    ('Alice Johnson', 'alice@example.com'),
    ('Bob Smith', 'bob@example.com'),
    ('Charlie Brown', 'charlie@example.com');

Fivetran で同期を行います。テーブルの追加を確認できます。

image 6

Fivetran のスキーマ側でもテーブルの追加を確認できます。

image 7

Snowflake 側を確認すると、Lambda 関数により新規テーブルについても、自動で外部リネージが追加されていることを確認できました。

image 8

外部リネージの削除

追加した外部リネージは、以下のようなリクエストで現在 API 経由でのみ削除できます。

curl -i -X DELETE \
  -H "Authorization: Bearer $SNOWFLAKE_TOKEN" \
  -H "Accept: application/json" \
  "https://{SNOWFLAKE_ACCOUNT}.snowflakecomputing.com/api/v2/lineage/external-lineage?sourceNamespace=postgres%3A%2F%2F{RDS_ENDPOINT}%3A5432&sourceName=sampledb.public.orders&sourceDatasetType=External%20Node&targetNamespace=snowflake%3A%2F%2F{SNOWFLAKE_ACCOUNT}&targetName=FIVETRAN_DATABASE.POSTGRES_RDS_PUBLIC.ORDERS&targetDatasetType=External%20Node"

さいごに

簡単な設定としましたが Snowflake の外部リネージ機能を試してみました。
こちらの内容がどなたかの参考になれば幸いです。

この記事をシェアする

FacebookHatena blogX

関連記事