[Talend] CDC(Change Data Capture)を使ったデータ連携(1/3)

talend

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

  1. [Talend] CDC(Change Data Capture)を使ったデータ連携(1/3)
  2. [Talend] CDC(Change Data Capture)を使ったデータ連携(2/3)
  3. [Talend] CDC(Change Data Capture)を使ったデータ連携(3/3)

CDC(Change Data Capture)とは

CDC(Change Data Capture)を使えば、変更があったデータのみを瞬時に識別して処理し、その変更データをさらに再利用することができるものです。

下図は簡単な Talend+CDCの組み合わせによる処理の流れ
Talend_CDC

検証環境の準備

  • 連携元D/B(CDCを設定するデータベース):Oracle 12c
  • 連携先D/B(変更データ反映先データベース):MySQL 5.6(今回はRDSではありません。)
  • ETLツール:Talend Data Integration 6.0.1

各DBへの前準備

Oracleへユーザを作成し、権限を付与します。

DATAユーザを作成

create user data identified by (任意のパスワード);

CDCユーザを作成

create user cdc identified by (任意のパスワード);

DATAユーザへ権限の付与

grant connect to data;

CDCユーザへ権限の付与

GRANT CREATE ANY VIEW to CDC;
GRANT CREATE ANY TABLE to CDC;

※もしうまくCDC機能が動かない場合は好き勝手にできるテスト環境という前提で

Grant all privileges to data;

で全権限を付与するのもありかもです。
次にMySQLへユーザを作成し、権限を付与します(権限設定は環境により任意に設定してください。)

CREATE USER dwh IDENTIFIED BY '任意のパスワード';
GRANT ALL PRIVILEGES ON `dwh`.customers TO 'dwh'@'localhost';

Talend上でデータベース接続の定義

接続名をDataOracleで作成します。
oracle_con1
oracle_con2
同じように 接続名をCDCOracleで作成します。
oracle_con3 oracle_con4
次は 接続名をDWH_MySQLで作成します。
mysql_dwh_con11 mysql_dwh_con22

Oracle及びMySQLにテーブル作成・データを投入

tRowGeneratorコンポーネントを使ってcustomersテーブル作成とサンプルデータの生成をしてもいいのですが、ここではDDLを事前に作って流すだけにします。(Oracle側のcustomersテーブルへ登録)

CREATE TABLE "DATA"."CUSTOMERS"
  (    "ID" int,
       "FIRSTNAME" VARCHAR2(10),
       "LASTNAME" VARCHAR2(10),
       "AGE" int,
        PRIMARY KEY ("ID")
        );
        
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (1,'太郎','山田',23);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (2,'一郎','鈴木',42);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (3,'三郎','田中',15);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (4,'花子','山田',21);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (5,'薔薇子','赤井',51);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (6,'すみれ','紫',35);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (7,'向日葵','黄色',10);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (8,'半島','三浦',62);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (9,'五郎','山田',58);
insert into customers (ID,FIRSTNAME,LASTNAME,AGE) values (10,'落葉','池ノ上',77);

MySQL側のcustomersテーブルへ登録

USE DWH;
DROP TABLE IF EXISTS `customers`;
CREATE TABLE `customers` (
  `ID` int(11) NOT NULL,
  `FIRSTNAME` varchar(10) DEFAULT NULL,
  `LASTNAME` varchar(10) DEFAULT NULL,
  `AGE` int(11) DEFAULT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `customers` VALUES (1,'太郎','山田',23);
INSERT INTO `customers` VALUES (2,'一郎','鈴木',42);
INSERT INTO `customers` VALUES (3,'三郎','田中',15);
INSERT INTO `customers` VALUES (4,'花子','山田',21);
INSERT INTO `customers` VALUES (5,'薔薇子','赤井',51);
INSERT INTO `customers` VALUES (6,'すみれ','紫',35);
INSERT INTO `customers` VALUES (7,'向日葵','黄色',10);
INSERT INTO `customers` VALUES (8,'半島','三浦',62);
INSERT INTO `customers` VALUES (9,'五郎','山田',58);
INSERT INTO `customers` VALUES (10,'落葉','池ノ上',77);

連携元、連携先のcustomersテーブルのデータが同期取れている状態にしておきます。

各データソースに対してTable Schemaを定義する

DataOracleに対してTable Schemaを定義します。(Retrieve Schemaを選択します。) oracle_con5
下記画面が表示されたら Next をクリック。 oracle_con6
下記画面が表示されたらCUSTOMERSテーブルを選択します。
選択後、Creation Statusがsuccessになるのを確認し、Nextをクリックします。
oracle_con7
自動判別するので、異なる場合はカラム情報を適切に変更して定義します。
oracle_con8
MySQL_DHWも同様にTable Schema作成します。
mysql_dwh1 mysql_dwh2 mysql_dwh3
次は実際にCDCの定義を行います。

AWS Cloud Roadshow 2017 福岡