Amazon RDS for MariaDB のデータを Fivetran で Snowflake に同期してみる
はじめに
Fivetran から Amazon RDS for MariaDB のデータを Snowflake にロードしてみましたので、検証手順を本記事でまとめてみます。
RDS for MariaDB との連携
Fivetran で Amazon RDS for MariaDB をデータソースとする際の設定は以下に記載があります。
執筆時点での主な特徴は以下です。
- サポートするバージョン
- 10.1.2 - 11.7
- 接続方法は以下より選択
- 直接接続
- SSH トンネル
- AWS PrivateLink を使用して接続
- 増分同期の方法
- バイナリログ
binlog_formatを ROW に設定する必要がある- バイナリログの保持期間は少なくとも 24 時間の設定が必要
- Fivetran Teleport Sync
- 追加の設定は不要
- 大規模でデータの変更が急激に行われるテーブルにおいて同期に失敗した場合、Fivetran Teleport Syncはそのテーブルの同期を自動的に無効化される
- バイナリログ
前提条件
以下の環境を使用しています。
- 宛先
- Snowflake
- データソース
- Amazon RDS for MariaDB:バージョン 11.8.5
- インスタンスタイプ:db.t4g.micro
- プライベートサブネットに構築し、SSH サーバーからのアクセスを許可
- Amazon RDS for MariaDB:バージョン 11.8.5
- 接続方法
- SSH トンネル
- SSH サーバー(EC2)をパブリックサブネットに構築し、Fivetran IP からの接続を許可する
- SSH サーバー
- OS:Amazon Linux 2023
- ローカルから SSH 接続可能
- 同期方法
- バイナリログ
事前準備
セキュリティグループの設定
SSH トンネル経由で接続するにあたり、以下のインバウンドルールを設定します。
- RDS のセキュリティグループ:SSH サーバー(EC2)からのポート 3306 を許可
- SSH サーバーのセキュリティグループ:Fivetran サーバーの IP アドレスからのポート 22 を許可
Fivetran の IP アドレスは、Fivetran コンソールのコネクション設定画面に表示されます。
MySQL クライアントのインストール
パブリックサブネットの踏み台サーバー(EC2)から RDS に接続できるように、ローカルからログイン後 MySQL クライアントをインストールします。
sudo dnf install -y mariadb105
RDS への接続確認(SSL 使用):
curl -o global-bundle.pem https://truststore.pki.rds.amazonaws.com/global/global-bundle.pem
mysql -h <エンドポイント> -P 3306 -u <ユーザー> -p --ssl-verify-server-cert --ssl-ca=./global-bundle.pem
サンプルデータの作成
以下の手順でサンプルデータとして、データベースとテーブルを2つ作成しました。
-- データベース(スキーマ)作成:mydb
CREATE DATABASE mydb;
USE mydb;
-- サンプルテーブル作成
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- レコードを追加
INSERT INTO users (name, email) VALUES
('山田太郎', 'yamada@example.com'),
('鈴木花子', 'suzuki@example.com'),
('田中一郎', 'tanaka@example.com');
-- サンプルデータ2
CREATE DATABASE shop;
USE shop;
CREATE TABLE products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
category VARCHAR(50),
price INT NOT NULL,
stock INT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO products (name, category, price, stock) VALUES
('ノートPC', 'electronics', 98000, 10),
('マウス', 'electronics', 2980, 50),
('デスク', 'furniture', 45000, 5),
('チェア', 'furniture', 32000, 8),
('USBハブ', 'electronics', 1980, 30);
RDS 側の設定
バイナリログによる増分同期(インクリメンタル更新)を行うには、データベースの変更履歴を追跡できるようにRDSの構成を変更する必要があります。以下の設定を順に行います。
binlog_format の変更
RDS コンソールからパラメータグループを作成・編集し、binlog_formatをROWに変更します。あわせてbinlog_row_imageをFULLに設定する必要があります(特に変更していない場合、デフォルトでFULLとなっていました)。

変更後、以下のコマンドで設定を確認します。
> SHOW GLOBAL VARIABLES LIKE 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
> SHOW GLOBAL VARIABLES LIKE 'binlog_row_image';
+------------------+-------+
| Variable_name | Value |
+------------------+-------+
| binlog_row_image | FULL |
+------------------+-------+
> SHOW GLOBAL VARIABLES LIKE 'mysql56_temporal_format';
+-------------------------+-------+
| Variable_name | Value |
+-------------------------+-------+
| mysql56_temporal_format | ON |
+-------------------------+-------+
mysql56_temporal_format は ON である必要があります(デフォルトで ON)。
また、必須ではありませんが、Fivetran は GTID モードの有効化もオプションで可能です。必要に応じてパラメータグループに以下を追加設定します。
gtid_mode = ON
gtid_strict_mode = ON
自動バックアップの確認
RDS コンソールのバックアップ保持期間が 1 日以上に設定されていることを確認します。デフォルトで 1 日に設定されています。
バイナリログ保持期間の設定
Fivetran でバイナリログによる同期を行う際は、データベースがバイナリログを少なくとも24時間保持する必要があります。
Fivetran としては7日間(168 時間)の保持期間を推奨しています。ここでは検証目的のため、以下のように設定(24時間)しました。
CALL mysql.rds_set_configuration('binlog retention hours', 24);
設定内容は以下で確認可能です。
CALL mysql.rds_show_configuration();
+------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| name | value | description |
+------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| binlog retention hours | 24 | binlog retention hours specifies the duration in hours before binary logs are automatically deleted. |
| source delay | 0 | source delay specifies the replication delay in seconds between this instance and its primary (master) DB instance. |
| target delay | 0 | target delay specifies the replication delay in seconds between this instance and any future RDS-managed read replicas created from this instance. Ignored for non-RDS-managed read replicas. |
+------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Fivetran 専用ユーザーの作成と権限付与
プライマリデータベース上に Fivetran 専用のユーザーを作成し、以下の権限を付与します。
- SELECT:同期対象のすべてのテーブル・カラムへの読み取り権限
- REPLICATION CLIENT / REPLICATION SLAVE:増分同期(バイナリログ)に必要なレプリケーション権限
- mysql.rds_heartbeat2 / mysql.rds_configuration への SELECT:RDS 用コネクタの増分同期に必要なシステム領域へのアクセス権限
- EXECUTE ON PROCEDURE mysql.rds_kill(オプションだが推奨):アイドル状態や孤立したセッションを安全に終了させるための権限
-- ユーザー作成
CREATE USER 'fivetran'@'%' IDENTIFIED BY '<パスワード>';
-- 権限付与
-- ここではすべてのデータベースとに対する権限を付与
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'fivetran'@'%';
GRANT SELECT ON mysql.rds_heartbeat2 TO 'fivetran'@'%';
GRANT SELECT ON mysql.rds_configuration TO 'fivetran'@'%';
GRANT EXECUTE ON PROCEDURE mysql.rds_kill TO 'fivetran'@'%';
-- 確認
SHOW GRANTS FOR 'fivetran'@'%';
SSH トンネルのための設定
ここでは接続方法として SSH トンネルを使用したので、SSH サーバーで以下の設定を行います。
Fivetran 専用ユーザーの作成
SSH サーバー(EC2)で以下を実行し、Fivetran 専用のユーザーを作成します。
# グループを作成
sudo groupadd fivetran
# ユーザーを作成
sudo useradd -m -g fivetran fivetran
# ユーザーに切り替え
sudo su - fivetran
# .ssh ディレクトリを作成
mkdir ~/.ssh
# 権限設定
chmod 700 ~/.ssh
# 作成したディレクトリに移動
cd ~/.ssh
# ファイルを作成
touch authorized_keys
# ファイルの権限を設定
chmod 600 authorized_keys
こちらの詳細は以下をご参照ください。
Fivetran 側の設定
Destination
宛先には Snowflake を使用しました。こちらの接続設定については以下をご参照ください。
Connection
RDS for MariaDB をソースとするコネクションを追加します。
はじめに接続方法として「Connect via SSH tunnel」を選択し、SSH サーバーに関する情報を指定します。

また、SSH トンネルの場合「Public Key」欄に公開鍵が生成されるので、この内容を先ほど SSH サーバー側で作成したauthorized_keysに追加します。
vi authorized_keys
続けてソースとなるデータベースに関する情報を指定します。同期方法はバイナリログを指定しました。

各種設定後、接続テストを行い問題なければ下図の表示となります。

次の画面で、ソース側のスキーマ情報が取得され、同期を対象のデータベースを選択できます。

さいごにどの程度のスキーマ変更を許容するか選択します。

以上で設定完了です。「Start Initial Sync」で初期同期を開始します。
初期同期の確認
初期同期完了後、Snowflake 側でレコードを確認します。

宛先設定で同期先に指定したデータベース内に、<コネクション名>_<データベース名>_<テーブル名>の内容で MariaDB のデータベースごとにスキーマが作成されていました。
Usersテーブル:

Productsテーブル:

各テーブルにレコードが同期されていることを確認できました。
増分同期の確認
RDS 側でレコードを変更します。
-- users テーブル
-- レコード追加
INSERT INTO mydb.users (name, email) VALUES
('佐藤次郎', 'sato@example.com'),
('高橋美咲', 'takahashi@example.com');
-- 更新
UPDATE mydb.users SET email = 'yamada_new@example.com' WHERE id = 1;
-- products テーブル
-- レコード追加:
INSERT INTO shop.products (name, category, price, stock) VALUES
('キーボード', 'electronics', 8980, 20),
('モニター', 'electronics', 35000, 7);
-- 更新
UPDATE shop.products SET price = 89000, stock = 8 WHERE id = 1;
再度同期を実行し Snowflake 側を確認します。
Usersテーブル:

Productsテーブル:

内容が更新されていることを確認できました。
さいごに
Fivetran の SSH トンネルを使って Amazon RDS for MariaDB のデータを Snowflake に同期してみました。
こちらの内容がどなたかの参考になれば幸いです。







