Fivetran Teleport Sync で RDS for Oracle のデータを Snowflake に同期してみた #Fivetran

2024.04.22

Fivetran で Amazon RDS for Oracle のデータを Fivetran Teleport Sync で Snowflake に同期してみましたので、その際の手順や挙動について記事にしました。
※なお、RDS for Oracle に対する Fivetran Teleport Sync による同期は 2024/4/22 時点で Beta のためご注意ください。

RDS for Oracle との連携

Fivetran で RDS Oracle をデータソースとする場合、以下のポイントに注意します。なお、こちらは 2024/4/22 時点の情報のためご注意ください。

前提条件

  • Fivetran
    • Enterprise 以上のエディションが必要
  • Oracle 12c またはそれ以上のバージョン

接続方法

  • 直接接続
    • データベースに対して直接 Fivetran の IP CIDR を許可する
    • SSL/TLS を使用した DB インスタンスへの接続の暗号化が有効であること
  • SSH トンネルによる接続
    • インスタンスに直接接続できない場合のオプション
    • パブリックアクセス可能な踏み台サーバーを使用
  • Private networking による接続
    • インスタンスに直接接続できない場合のオプション
    • Business Critical plan が必要
    • サポートするリージョンにリソースがあること

同期方法

RDS for Oracle では、2種類の同期方法が提供されています。主な特徴は以下です。

  • LogMiner
    • Oracle データベースのユーティリティの一部である LogMiner を使用して、ソース表内の変更された行を検出
    • テーブルに関連付けられたDDL イベント (GRANTを除く) が発生した場合は、テーブルを再インポートする必要があります
    • 仮想列はサポートされていません
      • 仮想列を含む DDL イベントが発生すると、テーブルが自動的に再同期されます
    • ARCHIVELOG モードが有効化されている必要があります
      • バックアップ保持期間を 0 より大きい値に設定して自動バックアップを有効にすると、ARCHIVELOG モードが自動的に有効化されます
    • 少なくとも 24 時間以上の REDOログの保持が必要です
      • Fivetran の推奨は 7日間
    • サプリメンタルロギングの有効化が必要
      • 主キーを含む行が変更されることが予想される場合は、主キーのサプリメンタルロギングを有効にします
  • Fivetran Teleport Sync
    • 執筆時点で Beta
    • LogMiner による同期方法が使用できない(Fivetran にデータベースの変更ログへのアクセスを提供できない)場合のオプション
    • ADD COLUMN DDL 操作をサポート
      • 他のすべてのテーブルまたは列の変更では、テーブルが自動的に再同期されます
    • サポートされるテーブルカラムのデータ型と主キーのデータ型
    • 推奨事項
      • 主キーを持つテーブルを同期したい場合は、すべてのテーブルの組み合わせたサイズが100 GBを超えないようにすることが推奨されています
      • 主キーを持たないテーブルを同期したい場合は、すべてのテーブルの組み合わせたサイズが10 GBを超えないようにすることが推奨されています

検証環境

以下の環境を使用しています。

  • 宛先
    • Snowflake
  • データソース
    • Amazon RDS for Oracle Database 21c
      • Oracle Database Standard Edition Two
      • Oracle 21.0.0.0.ru-2024-01.rur-2024-01.r1
    • インスタンスタイプ:db.t3.small
    • SSL オプションを含むオプショングループとの関連付けを実施済み
  • 接続方法
    • 直接接続
  • 同期方法
    • Fivetran Teleport Sync

事前準備

手元のクライアント(Oracle SQL*Plus)からアクセス可能な検証用の RDS for Oracle を構築し、以下の通り Snowfalke に連携するデータを用意します。

  • ユーザー(スキーマ)を作成
CREATE USER test identified by password ;
GRANT CREATE SESSION,CREATE TABLE,UNLIMITED TABLESPACE to test;
  • test ユーザーで接続し、テーブルを作成
CREATE TABLE test.employees (
    employee_id NUMBER(4,0) PRIMARY KEY,
    first_name VARCHAR2(50),
    last_name VARCHAR2(50),
    hire_date DATE,
    salary NUMBER
);
--レコード追加
INSERT INTO test.employees (employee_id, first_name, last_name, hire_date, salary) VALUES (1, 'John', 'Doe', TO_DATE('2022-04-20', 'YYYY-MM-DD'), 50000);
INSERT INTO test.employees (employee_id, first_name, last_name, hire_date, salary) VALUES (2, 'Jane', 'Smith', TO_DATE('2022-04-21', 'YYYY-MM-DD'), 60000);
INSERT INTO test.employees (employee_id, first_name, last_name, hire_date, salary) VALUES (3, 'Michael', 'Johnson', TO_DATE('2022-04-22', 'YYYY-MM-DD'), 70000);
INSERT INTO test.employees (employee_id, first_name, last_name, hire_date, salary) VALUES (4, 'Emily', 'Brown', TO_DATE('2022-04-23', 'YYYY-MM-DD'), 55000);
INSERT INTO test.employees (employee_id, first_name, last_name, hire_date, salary) VALUES (5, 'David', 'Wilson', TO_DATE('2022-04-24', 'YYYY-MM-DD'), 65000);
SQL> SELECT * FROM test.employees;

EMPLOYEE_ID FIRST_NAME      LAST_NAME       HIRE_DATE   SALARY
----------- --------------- --------------- ---------- -------
          1 John            Doe             22-04-20     50000
          2 Jane            Smith           22-04-21     60000
          3 Michael         Johnson         22-04-22     70000
          4 Emily           Brown           22-04-23     55000
          5 David           Wilson          22-04-24     65000

Fivetran 用のユーザーを作成

CREATE USER fivetran IDENTIFIED BY password;
GRANT CREATE SESSION TO "FIVETRAN";

スキーマへの閲覧権限を付与

GRANT SELECT ANY TABLE TO "FIVETRAN";

Fivetran の設定

Destination

データの連携先には Snowflake を使用しました。Snowflake を Destination に設定する方法は、以下の記事をご参照ください。

Connector

コネクタに Oracle RDS を追加します。

  • 接続情報
  • 接続方法
    • Connect directly を指定
  • 同期方法
    • Fivetran Teleport Sync を指定

ここで、Fivetran の IP アドレスのリストが表示されるので、セキュリティグループで指定のポートに対するインバウンド通信を許可しておきます。
Fivetran IP Addresses | Fivetran

接続設定は以上になります。接続テスト後、同期したいテーブルを指定、どの程度まで変更をキャプチャするかの確認画面が表示されるので、任意のオプションを選択します。ここでは「Allow all」を選択し [Continue] をクリックします。
注意点として、接続方法として「直接接続」を使用する場合 SSL を使用しないと失敗します。

以上で設定は完了です。初期同期が完了すると、Snowflake 側でテーブルが作成されデータも同期されます。

主キーありテーブルにおけるスキーマの自動移行の検証

Fivetran では、対応するデータソースや同期方法など条件を満たせば、データソース側のテーブルスキーマが変更された場合でも、自動的にその変更を宛先側に反映します。

RDS for Oracle に対する Fivetran Teleport Sync 時に、どのようにこの変更が反映されるかも確認しておきます。

レコードの追加

はじめに、データソース側で以下の通り、レコードを追加してみます。

-- レコードを追加する
INSERT INTO test.employees (employee_id, first_name, last_name, hire_date, salary) VALUES (6, 'Eva', 'Martinez', TO_DATE('2022-04-28', 'YYYY-MM-DD'), 55000);
INSERT INTO test.employees (employee_id, first_name, last_name, hire_date, salary) VALUES (7, 'Frank', 'Garcia', TO_DATE('2022-04-29', 'YYYY-MM-DD'), 65000);
INSERT INTO test.employees (employee_id, first_name, last_name, hire_date, salary) VALUES (8, 'Grace', 'Lopez', TO_DATE('2022-04-30', 'YYYY-MM-DD'), 75000);
COMMIT;

Fivetran の同期が完了し、Snowflakeで確認すると、下図のようにレコードが追加されます。

レコードの変更

[employee_id] が 1 のレコードについて、first_name 列を変更してみます。

UPDATE test.employees
SET first_name = 'UpdatedFirstName'
WHERE employee_id = 1;
COMMIT;

同期完了後、Snowflakeで確認すると、変更がキャプチャされていることが確認できます。

レコードの削除

[employee_id] が 2 のレコードを削除してみます。

DELETE FROM test.employees
WHERE employee_id = 2;
COMMIT;

同期完了後、Snowflake で確認すると「_FIVETRAN_DELETED」が True に変更され論理削除が実施されていることを確認できます。

列の追加

テーブル構造の変化を伴う列の追加を行います。

ALTER TABLE test.employees
ADD (birthday DATE);
-- データを取得する
SQL> SELECT * FROM test.employees FETCH FIRST 1 ROWS ONLY;

EMPLOYEE_ID FIRST_NAME LAST_NAME  HIRE_DATE  SALARY BIRTHDAY
----------- ---------- ---------- ---------- ------ ----------
          1 UpdatedFir Doe        22-04-20    50000
            stName

Snowflake側では、レコードを追加後、テーブルに反映されました。

-- レコードを追加する
INSERT INTO employees (employee_id, first_name, last_name, hire_date, salary, birthday) 
VALUES (109, 'Sophia', 'Miller', TO_DATE('2023-05-01', 'YYYY-MM-DD'), 70000, TO_DATE('1990-07-15', 'YYYY-MM-DD'));
COMMIT;

Snowflake 側

列の削除

[Last_Name] 列を削除してみます。

ALTER TABLE employees
DROP COLUMN last_name;

Fivetran 側でスキーマを確認すると、下図のように表示され無くなります。

同期後、Snowflakeで確認すると、下図のように変更が反映されています。

ソースから削除された列は、宛先にそのまま保持される代わりに、宛先の対応する列の値には Null が書き込まれます。すでに削除済みのレコード(_FIVETRAN_DELETED が True)は変更がキャプチャされないので、そのままの値が残っています。

この時のログを確認すると、forced_resync_table イベントとして記録されています。

Logs - forced_resync_table | Fivetran

列の名前を変更

既存列の名称を変更してみます。

--first_name 列の名前を name に変更
ALTER TABLE employees
RENAME COLUMN first_name TO name;

同期完了後、Snowflake で確認すると、変更後の列名を持つ新しい列が追加されています。変更前の列は、値が NULL に変更されます。

また、列名の変更時もforced_resync_table イベントとして記録されていました。

テーブルの追加

同期済みのテーブルと同じスキーマにテーブルを追加してみます。

CREATE TABLE products (
  product_id NUMBER(3,0) PRIMARY KEY,
  product_name VARCHAR2(255),
  price NUMBER(10, 2),
  category VARCHAR2(100)
);
-- レコードを追加する
INSERT INTO products (product_id, product_name, price, category) VALUES (1, 'Tablet', 500.00, 'Electronics');
INSERT INTO products (product_id, product_name, price, category) VALUES (2, 'Headphones', 99.99, 'Electronics');
INSERT INTO products (product_id, product_name, price, category) VALUES (3, 'Blender', 79.99, 'Kitchen Appliances');
INSERT INTO products (product_id, product_name, price, category) VALUES (4, 'Bookshelf', 199.99, 'Furniture');
INSERT INTO products (product_id, product_name, price, category) VALUES (5, 'Notebook', 14.99, 'Office Supplies');
COMMIT;
-- データを取得する
SQL> SELECT * FROM products;

PRODUCT_ID PRODUCT_NAME            PRICE CATEGORY
---------- -------------------- -------- --------------------
         1 Tablet                $500.00 Electronics
         2 Headphones             $99.99 Electronics
         3 Blender                $79.99 Kitchen Appliances
         4 Bookshelf             $199.99 Furniture
         5 Notebook               $14.99 Office Supplies

Fivetranでもすぐに変更が反映されます。

同期完了後、Snowflake 側でも自動的にテーブルの追加を確認できます。

レコードも確認できます。

テーブルの削除

さいごにテーブルを削除してみます。

 DROP TABLE test.products;

Fivetran では、下図のような形で変更が反映されています。

ただし、Snowflake(Destination)側では、テーブルの削除までは反映されません。ログでは下図のようになっています。

主キーのないテーブルでのスキーマ自動移行の検証

次に、主キーのないテーブルでの挙動も見ておきます。ドキュメントでは以下に記載があります。特に、UPDATE 時の挙動が主キーありの場合と異なります。
Oracle data pipeline for analytics infrastructure | Fivetran

主キーのないテーブルを作成します。

CREATE TABLE example_table (
  id NUMBER(3,0),
  name VARCHAR2(255),
  description VARCHAR2(500)
);
-- レコードを追加する
INSERT INTO example_table (id, name, description) VALUES (1, 'First Record', 'This is the description for the first record.');
INSERT INTO example_table (id, name, description) VALUES (2, 'Second Record', 'This is the description for the second record.');
INSERT INTO example_table (id, name, description) VALUES (3, 'Third Record', 'This is the description for the third record.');
COMMIT;

同期完了後、Snowflakeで確認します。主キーのないテーブルには、「_FIVETRAN_ID」からなるシステム列が追加されます。

レコードの追加

主キーのないテーブルにレコードを追加します。

-- レコードを追加する
INSERT INTO example_table (id, name, description) VALUES (4, 'Fourth Record', 'This is the description for the fourth record.');
COMMIT;

同期後、Snowflake で確認すると下図の通り主キーありの場合と同様の挙動となります。

レコードの変更

以下の通りレコードを変更してみます。

UPDATE example_table
SET name = 'Updated Name'
WHERE id = 1;
COMMIT;

同期後、Snowflakeで確認すると下図のように、既存のレコードが論理削除され、変更後のレコードが追加される形で変更が反映されています。主キーありテーブルとは異なる挙動となります。

レコードの削除

[id] が 3 のレコードを削除してみます。

DELETE FROM example_table
WHERE id = 3;
COMMIT;

同期後、Snowflakeで確認します。 [_fivetran_id] 列がある場合、この情報を使用し、主キーありテーブルと同様の形式で論理削除が行われます。

列の追加

以下の通り、列を追加します。

ALTER TABLE example_table
ADD status VARCHAR2(100);
-- レコードを追加する
INSERT INTO example_table (id, name, description,status) VALUES (6, 'Sixth Record', 'example', 'Active');
COMMIT;

この場合も、主キーありの場合と同様にレコードを追加したタイミングで変更が反映されました。

列の削除

既存列([description])を削除してみます。

ALTER TABLE example_table
DROP COLUMN description;

この場合、UPDATE 時のように既存レコードは論理削除され、変更後の値(NULL)のレコードが追加される形で変更が反映されます。主キーありテーブルとは異なる挙動です。

変更後のデータは「_fivetran_deleted = FALSE」でフィルタすれば抽出できます。

列の名前を変更

列の名前を変更します。

ALTER TABLE example_table
RENAME COLUMN name TO item_name;

この場合、新規列が追加され、変更前列のレコード値は、NULL に置き換えられます。(既に論理削除済みのレコードは変更されません。)

サポートされていないカラムを含むテーブルを追加

さいごにサポートされていないカラムを含むテーブルを Oracle 側に追加してみます。
Oracle Fivetran Teleport Sync | Fivetran documentation

  • サポートされていないカラムを含むテーブル
        -- テーブルを作成する
        CREATE TABLE unsupport_column_table (
          id NUMBER(3,0) PRIMARY KEY,
          item_name VARCHAR2(255),
          status VARCHAR2(100),
          description CLOB
        );
        
        -- レコードを追加する
        INSERT INTO unsupport_column_table (id, item_name, status, description) VALUES 
        (1, 'First Item', 'Active', 'This is the description for the first item.');
        COMMIT;
        -- データを取得する
        SQL> SELECT * FROM unsupport_column_table;
        
          ID ITEM_NAME            STATUS
        ---- -------------------- ----------
        DESCRIPTION
        --------------------------------------------------
           1 First Item           Active
        This is the description for the first item.

この場合、サポートされているカラムは同期の対象となりますが、サポートされていないカラム(ここでは [DESCRIPTION])は同期対象として選択できません。

Snowflake 側でも同期されません。

  • サポートされていない主キーを含むテーブル

以下のテーブルを作成してみます。このテーブルでは、主キーの [employee_id] で NUMBER 型を使用し、明示的に整数のみであることを指定していません。

CREATE TABLE test.employees (
    employee_id NUMBER PRIMARY KEY,
    first_name VARCHAR2(50),
    last_name VARCHAR2(50),
    hire_date DATE,
    salary NUMBER
);
--レコード追加
INSERT INTO test.employees (employee_id, first_name, last_name, hire_date, salary) VALUES (1, 'John', 'Doe', TO_DATE('2022-04-20', 'YYYY-MM-DD'), 50000);
INSERT INTO test.employees (employee_id, first_name, last_name, hire_date, salary) VALUES (2, 'Jane', 'Smith', TO_DATE('2022-04-21', 'YYYY-MM-DD'), 60000);
INSERT INTO test.employees (employee_id, first_name, last_name, hire_date, salary) VALUES (3, 'Michael', 'Johnson', TO_DATE('2022-04-22', 'YYYY-MM-DD'), 70000);
INSERT INTO test.employees (employee_id, first_name, last_name, hire_date, salary) VALUES (4, 'Emily', 'Brown', TO_DATE('2022-04-23', 'YYYY-MM-DD'), 55000);
INSERT INTO test.employees (employee_id, first_name, last_name, hire_date, salary) VALUES (5, 'David', 'Wilson', TO_DATE('2022-04-24', 'YYYY-MM-DD'), 65000);

この場合、Fivetran 側で下図の表示となり、テーブルそのものを同期することができません。

さいごに

Fivetran Teleport Sync で RDS for Oracle のデータを Snowflake に同期してみました。
Oracle 側で追加の設定は不要ですが、対応するデータ型など一部制限があるので、同期対象のテーブルが条件を満たすか注意が必要です。
こちらの内容が何かの参考になれば幸いです。