[Talend] CDC(Change Data Capture)を使ったデータ連携(1/3)
- [Talend] CDC(Change Data Capture)を使ったデータ連携(1/3)
- [Talend] CDC(Change Data Capture)を使ったデータ連携(2/3)
- [Talend] CDC(Change Data Capture)を使ったデータ連携(3/3)
CDC(Change Data Capture)とは
CDC(Change Data Capture)を使えば、変更があったデータのみを瞬時に識別して処理し、その変更データをさらに再利用することができるものです。
下図は簡単な Talend+CDCの組み合わせによる処理の流れ
検証環境の準備
- 連携元D/B(CDCを設定するデータベース):Oracle 12c
- 連携先D/B(変更データ反映先データベース):MySQL 5.6(今回はRDSではありません。)
- ETLツール:Talend Data Integration 6.0.1
各DBへの前準備
Oracleへユーザを作成し、権限を付与します。 DATAユーザを作成
create user data identified by (任意のパスワード);
CDCユーザを作成
create user cdc identified by (任意のパスワード);
DATAユーザへ権限の付与
grant connect to data;
CDCユーザへ権限の付与
GRANT CREATE ANY VIEW to CDC; GRANT CREATE ANY TABLE to CDC;
※もしうまくCDC機能が動かない場合は好き勝手にできるテスト環境という前提で
Grant all privileges to data;
で全権限を付与するのもありかもです。 次にMySQLへユーザを作成し、権限を付与します(権限設定は環境により任意に設定してください。)
CREATE USER dwh IDENTIFIED BY '任意のパスワード'; GRANT ALL PRIVILEGES ON `dwh`.customers TO 'dwh'@'localhost';
Talend上でデータベース接続の定義
接続名をDataOracleで作成します。 同じように 接続名をCDCOracleで作成します。 次は 接続名をDWH_MySQLで作成します。
Oracle及びMySQLにテーブル作成・データを投入
tRowGeneratorコンポーネントを使ってcustomersテーブル作成とサンプルデータの生成をしてもいいのですが、ここではDDLを事前に作って流すだけにします。(Oracle側のcustomersテーブルへ登録)
CREATE TABLE "DATA"."CUSTOMERS" ( "ID" int, "FIRSTNAME" VARCHAR2(10), "LASTNAME" VARCHAR2(10), "AGE" int, PRIMARY KEY ("ID") ); insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (1,'太郎','山田',23); insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (2,'一郎','鈴木',42); insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (3,'三郎','田中',15); insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (4,'花子','山田',21); insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (5,'薔薇子','赤井',51); insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (6,'すみれ','紫',35); insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (7,'向日葵','黄色',10); insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (8,'半島','三浦',62); insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (9,'五郎','山田',58); insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (10,'落葉','池ノ上',77);
MySQL側のcustomersテーブルへ登録
USE DWH; DROP TABLE IF EXISTS `customers`; CREATE TABLE `customers` ( `ID` int(11) NOT NULL, `FIRSTNAME` varchar(10) DEFAULT NULL, `LASTNAME` varchar(10) DEFAULT NULL, `AGE` int(11) DEFAULT NULL, PRIMARY KEY (`ID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO `customers` VALUES (1,'太郎','山田',23); INSERT INTO `customers` VALUES (2,'一郎','鈴木',42); INSERT INTO `customers` VALUES (3,'三郎','田中',15); INSERT INTO `customers` VALUES (4,'花子','山田',21); INSERT INTO `customers` VALUES (5,'薔薇子','赤井',51); INSERT INTO `customers` VALUES (6,'すみれ','紫',35); INSERT INTO `customers` VALUES (7,'向日葵','黄色',10); INSERT INTO `customers` VALUES (8,'半島','三浦',62); INSERT INTO `customers` VALUES (9,'五郎','山田',58); INSERT INTO `customers` VALUES (10,'落葉','池ノ上',77);
連携元、連携先のcustomersテーブルのデータが同期取れている状態にしておきます。
各データソースに対してTable Schemaを定義する
DataOracleに対してTable Schemaを定義します。(Retrieve Schemaを選択します。) 下記画面が表示されたら Next をクリック。 下記画面が表示されたらCUSTOMERSテーブルを選択します。 選択後、Creation Statusがsuccessになるのを確認し、Nextをクリックします。 自動判別するので、異なる場合はカラム情報を適切に変更して定義します。 MySQL_DHWも同様にTable Schema作成します。 次は実際にCDCの定義を行います。