[Talend] CDC(Change Data Capture)を使ったデータ連携(3/3)

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

  1. [Talend] CDC(Change Data Capture)を使ったデータ連携(1/3)
  2. [Talend] CDC(Change Data Capture)を使ったデータ連携(2/3)
  3. [Talend] CDC(Change Data Capture)を使ったデータ連携(3/3)

Talendで同期をとるジョブ作成する

前準備が長かったですが、ようやくOracle→MySQLのテーブル同期をとるためのジョブを作成に入ります。

Create JOBでSyncDWHとしてジョブを作成します。
まずはMySQLへ接続できるかを確認するためにtMysqlConnectionコンポーネントを配置します。
Sync_1

DeleteのSubjob用にメタデータのCDCOracleをドラッグ&ドロップでジョブデザイナーエリアに配置します。
※Event to catchは Delete のチェックボックスにチェックを入れていることに注意してください。
Sync_2

メタデータのDWH_MySQLをドラッグ&ドロップでジョブデザイナーエリアに配置します。
※Action on dataは Delete が選択されていることに注意してください。
Sync_3

それぞれに関係線をメインで繋いでラベルをdeleteに変更し、もう一方はmysql_customers2に変更します。
Sync_4

tMapのMap Editorでマッピングを作成します。

delete.TALEND_CDC_TYPE.equals("D")

Sync_5

UpdateのSubjob用にメタデータのCDCOracleをドラッグ&ドロップでジョブデザイナーエリアに配置します。
※Event to catchは Update のチェックボックスにチェックを入れていることに注意してください。
Sync_6

メタデータのDWH_MySQLをドラッグ&ドロップでジョブデザイナーエリアに配置します。
※Action on dataは Delete が選択されていることに注意してください。
Sync_7

それぞれに関係線をメインで繋いでラベルをupdateに変更し、もう一方はmysql_customers3に変更します。
Sync_8

tMapのMap Editorでマッピングを作成します。

delete.TALEND_CDC_TYPE.equals("U")

Sync_9_1

InsertのSubjob用にメタデータのCDCOracleをドラッグ&ドロップでジョブデザイナーエリアに配置します。
※Event to catchは Insert のチェックボックスにチェックを入れていることに注意してください。
Sync_10

メタデータのDWH_MySQLをドラッグ&ドロップでジョブデザイナーエリアに配置します。
※Action on dataは Insert が選択されていることに注意してください。
Sync_11

それぞれに関係線をメインで繋いでラベルをinsertに変更し、もう一方はmysql_customers1に変更します。
Sync_12

tMapのMap Editorでマッピングを作成します。

delete.TALEND_CDC_TYPE.equals("I")

Sync_13_1

tMysqlCommitコンポーネントを配置して各SubjobがOKの場合のトリガでフローを繋げて異種間D/Bでのテーブル同期ジョブは完成です。
Sync_14

ジョブを動作させてみる

  1. 最初にOracle上、MySQL上のcustomersテーブルの内容が同期されているかを確認します。
  2. 変更管理テーブルの内容が空になっているかを確認します。
  3. ModifyTablesOracleのジョブを実行して、Oracle上のcustomersテーブルに対してDelete/Insert/Updateして変更を加えます。
  4. 変更されたテーブル内容(Delete/Insert/Update)がCDC機能で正しくキャプチャーされているかを確認します。
  5. SyncDWHジョブを実行して、MySQLのcustomersテーブルと同期されるかを確認します。
  6. 変更されたテーブル内容(Delete/Insert/Update)がSyncDWHジョブを実行後に空になっているかを確認します。
  7. 空になっていたら正しく動作していることになります。

1.customersテーブルの内容を再確認

Oracle customers Table Data
customers_oracle
MySQL customers Table Data
customers_mysql

2.変更管理テーブルの内容が空になっているかを確認

View All Changesを選択すると一覧が表示されます。
2_changetable_crear

3.ModifyTablesOracleのジョブを実行してOracleテーブルへ変更を加える

ModifyTablesOracleジョブを実行してみます。

modifyjob_exec
特にエラーもなく、ジョブが正常終了しました。

4.CDC機能で正しくキャプチャーされているかを確認

正常に終了しているならば上記ジョブで変更された情報が変更管理テーブルへ書き込まれているはずなので、その内容を確認してます。
View All Changesで内容を見てみると下記のようなデータが登録されていることが確認できます。(変更前は空だった部分に3レコード分の変更情報が登録されています。)
modifyjob_exec_confirm

5.SyncDWHジョブを実行してテーブルの同期をする

SyncDWHジョブを実行してみます。
modifyjob_exec_sync
特にエラーもなく、ジョブが正常終了しました。

6.変更履歴が空になっているかを確認する

MySQLのcustomersテーブルへの同期が正常に終了しているのであれば下記のようにレコードが空になっているはずです。
modifyjob_exec_confirm2
Oracle customersテーブル内容
modifyjob_exec_confirm4
MySQL customersテーブル内容
modifyjob_exec_confirm3

テーブル内容が同期とれていることも確認できました。

まとめ

上記の一連の確認が完了すればあとはcronやバッチスケジューラなどで定期的にSyncDWHジョブを実行することでcustomersテーブルの同期処理が可能になります。
さて、この同期をとるためのジョブが一番重要なのですが、ポイントとしてはOracleのcustomersテーブルに外部アプリなどから変更されたタイミングで即座にMySQLのcustomersテーブルに変更データが反映されるというものではなく、変更履歴テーブルに対して即座に反映されるという点です。
従ってMySQL等へのデータ同期はジョブサーバもしくはバッチサーバなどが定期的にこのSyncDWHジョブを実行して、初めて同期されるものとなり、このジョブの実行間隔を短くすればするだけリアルタイムの同期処理に近づくというものです。

参考サイト

How to use Change Data Capture in Oracle