この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
- [Talend] CDC(Change Data Capture)を使ったデータ連携(1/3)
- [Talend] CDC(Change Data Capture)を使ったデータ連携(2/3)
- [Talend] CDC(Change Data Capture)を使ったデータ連携(3/3)
CDC(Change Data Capture)とは
CDC(Change Data Capture)を使えば、変更があったデータのみを瞬時に識別して処理し、その変更データをさらに再利用することができるものです。
下図は簡単な Talend+CDCの組み合わせによる処理の流れ
検証環境の準備
- 連携元D/B(CDCを設定するデータベース):Oracle 12c
- 連携先D/B(変更データ反映先データベース):MySQL 5.6(今回はRDSではありません。)
- ETLツール:Talend Data Integration 6.0.1
各DBへの前準備
Oracleへユーザを作成し、権限を付与します。 DATAユーザを作成
create user data identified by (任意のパスワード);
CDCユーザを作成
create user cdc identified by (任意のパスワード);
DATAユーザへ権限の付与
grant connect to data;
CDCユーザへ権限の付与
GRANT CREATE ANY VIEW to CDC;
GRANT CREATE ANY TABLE to CDC;
※もしうまくCDC機能が動かない場合は好き勝手にできるテスト環境という前提で
Grant all privileges to data;
で全権限を付与するのもありかもです。 次にMySQLへユーザを作成し、権限を付与します(権限設定は環境により任意に設定してください。)
CREATE USER dwh IDENTIFIED BY '任意のパスワード';
GRANT ALL PRIVILEGES ON `dwh`.customers TO 'dwh'@'localhost';
Talend上でデータベース接続の定義
接続名をDataOracleで作成します。 同じように 接続名をCDCOracleで作成します。 次は 接続名をDWH_MySQLで作成します。
Oracle及びMySQLにテーブル作成・データを投入
tRowGeneratorコンポーネントを使ってcustomersテーブル作成とサンプルデータの生成をしてもいいのですが、ここではDDLを事前に作って流すだけにします。(Oracle側のcustomersテーブルへ登録)
CREATE TABLE "DATA"."CUSTOMERS"
( "ID" int,
"FIRSTNAME" VARCHAR2(10),
"LASTNAME" VARCHAR2(10),
"AGE" int,
PRIMARY KEY ("ID")
);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (1,'太郎','山田',23);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (2,'一郎','鈴木',42);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (3,'三郎','田中',15);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (4,'花子','山田',21);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (5,'薔薇子','赤井',51);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (6,'すみれ','紫',35);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (7,'向日葵','黄色',10);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (8,'半島','三浦',62);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (9,'五郎','山田',58);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (10,'落葉','池ノ上',77);
MySQL側のcustomersテーブルへ登録
USE DWH;
DROP TABLE IF EXISTS `customers`;
CREATE TABLE `customers` (
`ID` int(11) NOT NULL,
`FIRSTNAME` varchar(10) DEFAULT NULL,
`LASTNAME` varchar(10) DEFAULT NULL,
`AGE` int(11) DEFAULT NULL,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `customers` VALUES (1,'太郎','山田',23);
INSERT INTO `customers` VALUES (2,'一郎','鈴木',42);
INSERT INTO `customers` VALUES (3,'三郎','田中',15);
INSERT INTO `customers` VALUES (4,'花子','山田',21);
INSERT INTO `customers` VALUES (5,'薔薇子','赤井',51);
INSERT INTO `customers` VALUES (6,'すみれ','紫',35);
INSERT INTO `customers` VALUES (7,'向日葵','黄色',10);
INSERT INTO `customers` VALUES (8,'半島','三浦',62);
INSERT INTO `customers` VALUES (9,'五郎','山田',58);
INSERT INTO `customers` VALUES (10,'落葉','池ノ上',77);
連携元、連携先のcustomersテーブルのデータが同期取れている状態にしておきます。
各データソースに対してTable Schemaを定義する
DataOracleに対してTable Schemaを定義します。(Retrieve Schemaを選択します。) 下記画面が表示されたら Next をクリック。 下記画面が表示されたらCUSTOMERSテーブルを選択します。 選択後、Creation Statusがsuccessになるのを確認し、Nextをクリックします。 自動判別するので、異なる場合はカラム情報を適切に変更して定義します。 MySQL_DHWも同様にTable Schema作成します。 次は実際にCDCの定義を行います。