CDC で RDS for SQL Server のデータを Snowflake に同期してみた #Fivetran

2024.04.05

Fivetran で Amazon RDS for SQL Server のデータを Change data capture(以下、CDC) で Snowflake に同期してみましたので、その際の手順や挙動について記事にしました。

RDS for SQL Server との連携

Fivetran で RDS SQL Server をデータソースとする場合、以下のポイントに注意します。なお、こちらは 2024/4/5 時点の情報のためご注意ください。

前提条件

  • SQL Server 2012 - 2022

接続方法

RDS SQL Server への代表的な接続方法として以下があります。

  • 直接接続
    • データベースに対して直接 Fivetran の IP CIDR を許可する
    • SSL/TLS を使用した DB インスタンスへの接続の暗号化が有効であること
  • SSH トンネルによる接続
    • インスタンスに直接接続できない場合のオプション
    • パブリックサブネットの踏み台サーバーを使用
  • AWS PrivateLink
    • インスタンスに直接接続できない場合のオプション
    • Fivetran は、ビジネスクリティカル以上のエディションであることが必要
    • サポートするリージョンにリソースがあること
      • 国内であれば、ap-northeast-1 (Tokyo)

同期方法

RDS SQL Server では、3種類の同期方法が提供されています。主な特徴は以下です。

  • Change tracking
    • SQL Server の変更の追跡(Change Tracking:CT)機能を使用
      • 同期対象の個々のテーブルでも CT を有効にする必要があります
    • 同期対象のテーブルは主キーを持つ必要がある
  • Change data capture
    • SQL Server の変更データ キャプチャ(Change data capture:CDC)機能を使用
      • 同期対象の個々のテーブルでも CDC を有効にする必要があります
    • 主キーのないテーブルでも同期可能
    • 列名の変更、列追加、新規テーブルの追加、などのスキーマ変更はキャプチャできません
      • データ型の変更のみ自動的に反映
    • 単一の CDC キャプチャ インスタンスを持つテーブルのみを同期可能
      • 新しいテーブルまたは既存のテーブルに列を追加する場合は、それらを含む新しい CDC インスタンスを作成し、古いインスタンスがある場合、無効にします
  • Fivetran Teleport Sync
    • 追加の設定は不要
    • 主キーのないテーブルでも同期可能
    • 以下の主キーを持つテーブルでは同期できません
      • BIT
      • DATETIMEOFFSET
      • GEOMETRY
      • GEOGRAPHY
    • 最大 4GB のサイズの主キーを持つテーブルの同期をサポート
      • この制限を超えるテーブルは同期できません

Fivetran Teleport Sync による同期については、以下の記事をご参照ください。

上記より、Fivetran Teleport Sync を使用すると設定の手間はないのですが、テーブルサイズの制限があるので、この場合は CDC や CT による同期を使用することとなります。

検証環境

以下の環境を使用しています。

  • 宛先
    • Snowflake
  • データソース
    • Amazon RDS
      • SQL Server Standard Edition
      • 2019 15.00.43455.v1
    • インスタンスタイプ:db.m5.large
  • 接続方法
    • 直接接続
  • 同期方法
    • Change data capture(CDC)

事前準備

手元のクライアント(sqlcmd)からアクセス可能な検証用の RDS SQL Server を構築し、以下の通り Snowfalke に連携するデータを用意します。

データベースの作成

CREATE DATABASE testdb;
GO

テーブルの作成とデータの追加

CREATE TABLE Employees (
    EmployeeID INT PRIMARY KEY IDENTITY(1,1),
    FirstName NVARCHAR(50),
    LastName NVARCHAR(50),
    Position NVARCHAR(50)
);
GO

データを追加

INSERT INTO Employees (FirstName, LastName, Position) VALUES
('John', 'Doe', 'Software Developer'),
('Jane', 'Doe', 'Project Manager'),
('Jim', 'Beam', 'Analyst');
GO

Fivetran 用の読み取り専用ユーザーを作成

USE testdb;
CREATE LOGIN fivetran_user WITH PASSWORD = '<パスワード>';
CREATE USER fivetran_user FOR LOGIN fivetran_user;

権限を付与

GRANT SELECT on DATABASE::testdb to fivetran_user;
GO

CDC の有効化

USE testdb;
EXEC msdb.dbo.rds_cdc_enable_db testdb;
GO

テーブルを追加

1> EXEC sys.sp_cdc_enable_table
2> @source_schema = 'dbo',
3> @source_name   = 'Employees',
4> @role_name     = 'cdc_role';
5> GO
Job 'cdc.testdb_capture' started successfully.
Job 'cdc.testdb_cleanup' started successfully.

CDC ロールにユーザーを追加

USE testdb;
EXEC sp_addrolemember 'cdc_role', 'fivetran_user';
GO

この状態で、fivetran_user で以下のコマンドを実行するとレコードが表示されます。

EXEC sys.sp_cdc_help_change_data_capture

Fivetran の設定

Destination

データの連携先には Snowflake を使用しました。Snowflake を Destination に設定する方法は、以下の記事をご参照ください。

Connector

コネクタに RDS SQL Server を追加します。

  • 接続情報
  • 接続方法
    • ここでは「Connect directly」を指定しました。
  • 同期方法
    • Read changes using CT or CDC mechanism を指定しました。

下図の通り、Fivetran の IP アドレスのリストが表示されるので、セキュリティグループでインバウンド通信を許可しておきます。

Fivetran IP Addresses | Fivetran

この後、接続テストを行います。

もし、Connector で設定したデータベースユーザーに CDC インスタンスに対する読み取り権限がない場合、下図のような表示となります。この場合、CDC で利用するロールにユーザーが割り当てられているかご確認ください。

問題なければ下図の表示となります。

どの程度まで変更をキャプチャするかの確認画面が表示されるので、任意のオプションを選択します。ここでは「Allow all」を選択し [Continue] をクリックします。

以上で設定は完了です。[Start initial Sync] をクリックし、初期同期を開始します。

Destination である Snowflake 側を確認すると、指定のデータベース内に、スキーマ・テーブルが追加されています。

USE SCHEMA FIVETRAN_DATABASE.SQL_SERVER_RDS_DBO;
SELECT * FROM EMPLOYEES ORDER BY 1;

スキーマの自動移行の検証

Fivetran では、対応するデータソースや同期方法など条件を満たせば、データソース側のテーブルスキーマが変更された場合でも、自動的にその変更を宛先側に反映します。

RDS SQL Server に対する CDC での同期時に、どのようにこの変更が反映されるかも確認しておきます。

レコードの追加

はじめに、データソース側で以下の通り、レコードを追加してみます。

INSERT INTO Employees (FirstName, LastName, Position) VALUES ('Alice', 'Johnson', 'Software Engineer');
GO

Fivetran の同期が完了し、Snowflakeで確認すると、下図のようにレコードが追加されます。

レコードの変更

[EmployeeID] が 1 のレコードについて、Position 列を変更してみます。

UPDATE Employees SET Position = 'Senior Software Engineer' WHERE EmployeeID = 1;
GO
1> SELECT * FROM Employees;
2> GO
EmployeeID  FirstName                                          LastName                                           Position
----------- -------------------------------------------------- -------------------------------------------------- --------------------------------------------------
          1 John                                               Doe                                                Senior Software Engineer
          2 Jane                                               Doe                                                Project Manager
          3 Jim                                                Beam                                               Analyst
          4 Alice                                              Johnson                                            Software Engineer

(4 行処理されました)

同期完了後、Snowflakeで確認すると、変更がキャプチャされていることが確認できます。

レコードを削除

[EmployeeID] が 2 のレコードを削除してみます。

DELETE FROM Employees WHERE EmployeeID = 2;
GO
1> SELECT * FROM Employees;
2> GO
EmployeeID  FirstName                                          LastName                                           Position
----------- -------------------------------------------------- -------------------------------------------------- --------------------------------------------------
          1 John                                               Doe                                                Senior Software Engineer
          3 Jim                                                Beam                                               Analyst
          4 Alice                                              Johnson                                            Software Engineer

(3 行処理されました)

同期完了後、Snowflake で確認すると「_FIVETRAN_DELETED」が True に変更され論理削除が実施されていることを確認できます。

列の追加

テーブル構造の変化を伴う列の追加を行います。

ALTER TABLE Employees ADD birthday DATE;
GO
1> SELECT * FROM Employees;
2> GO
EmployeeID  FirstName                                          LastName                                           Position                                           birthday
----------- -------------------------------------------------- -------------------------------------------------- -------------------------------------------------- ----------------   
          1 John                                               Doe                                                Senior Software Engineer                                       NULL   
          3 Jim                                                Beam                                               Analyst                                                        NULL   
          4 Alice                                              Johnson                                            Software Engineer                                              NULL   

(3 行処理されました)

コネクタの「Schema」タブからは、列の追加を確認できますが、下図の通り変更はキャプチャされません。Destination 側にも反映されません。

以下のドキュメントにあるように、CDC による同期では、列の追加、名称変更、新しいテーブルの追加などの DDL 関連の変更はキャプチャされません。

CDC インスタンスの変更

カラムやテーブルの追加など、DDL による変更を反映したい場合は、以下に手順があるように、新しい CDC インスタンスを作成し、既存のインスタンスを無効にします。

テーブルを追加する場合は、CDC の対象にそのテーブルも追加する必要がある点に注意します。

CREATE TABLE products (
  product_id INT IDENTITY(1,1) PRIMARY KEY,
  product_name VARCHAR(255),
  price DECIMAL(10, 2),
  category VARCHAR(100)
);
INSERT INTO products (product_name, price, category) VALUES 
('Laptop', 1200.00, 'Electronics'),
('Smartphone', 800.00, 'Electronics'),
('Coffee Maker', 150.00, 'Kitchen Appliances'),
('Desk Lamp', 45.99, 'Furniture'),
('Ergonomic Chair', 249.99, 'Office Supplies');
GO

CDC の対象にテーブル追加していない場合、下図の表示となります。

CDC の対象にテーブル追加します。

EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name   = 'products',
@role_name     = 'cdc_role';
GO

同期が完了すると、Snowflake 側にも追加されます。

さいごに

CDC で RDS SQL Server のデータを同期してみました。
SQL Server 側で CDC インスタンスの設定が必要かつ、DDL による変更があった際は、CDC インスタンスの置き換えが必要です。Fivetran Teleport Sync では、追加設定が不要ですが、同期可能なテーブルサイズに制限があるため、ソース側のテーブルサイズによってはこちらの同期方法を指定する必要があります。
こちらの内容が何かの参考になれば幸いです。