【CDI】Informatica Cloud Data Integrationでハンズオン課題を考えてみた

2023.10.19

はじめに

データアナリティクス事業本部ビッグデータチームのyosh-kです。
チームに新規参入の方が増えたので、その際のCDI ハンズオン課題をブログに残しておきたいと思います。

前提条件

  • CDIを操作できる環境を構築済みであることを前提とします。
  • Redshift ServerlessとS3 Bucketへの相互のロード処理を行いますので、事前にRedshift Serverless, S3 Bucket, IICSコネクタの設定をしていることとします。未設定の場合は、以下Redshift Connectorの接続ブログをご確認だください。
  • 今回使用するデータとddl、csv, jsonはすべて以下Github上に格納しています。AWS公式ハンズオンのサンプルデータをダウンロードし、csvファイル形式としたものになります。ddlに関しては事前に実行してtableを作成しておいてください。
  • CDIには様々な実装方法があり、必ずしも今回のハンズオン実装が正解とは限りませんのであらかじめご承知おきください。

  • 参考:

課題

構成図に沿って作成していきます。

  • 課題1)DWH tableへのInsert
  • 課題2)DataMart tableへのUpsert

課題1

  • sales table用タスクフロー作成:ユーザーはDWHsalestableに格納するcsvファイルを手動で任意の格納先S3に配置し、それをトリガーとして起動するタスクフローを作成します。
    • salesマッピング :salestableにInsertするマッピング処理を実装します。Insertする際はupdate_atカラムに更新日付も登録します。
  • event table用タスクフロー作成:ユーザーはDWHeventtableに格納するcsvファイルを手動で任意の格納先S3に配置し、それをトリガーとして起動するタスクフローを作成します。
    • eventマッピング :eventtableにInsertするマッピング処理を実装します。Insertする際は、update_atカラムに更新日付も登録します。
1回目のデータ 2回目のデータ
event_data.csv upsert_event_data.csv
sales_data.csv upsert_sales_data.csv

課題2

  • event_sales_summarytable用タスクフロー作成:DWHへInsertする処理後にスケジュール起動するタスクフローを作成します。
    • select_sales_last_update_time マッピング: sales tableの前回実行以降のデータを差分抽出するために、last_updated_timetableからsaleslast_updatedを抽出するマッピングを作成し、取得した値を入出力パラメータに格納します。
    • select_event_last_update_time マッピング: event tableの前回実行以降のデータを差分抽出するために、last_updated_timetableからeventlast_updatedを抽出するマッピングを作成し、取得した値を入出力パラメータに格納します。
    • event_sales_summary マッピング:以下のsql条件で、先程の入出力パラメータの値を用いて前回実行以降のデータを差分抽出し、DataMartのevent_sales_summary tableへUpsertするマッピングを作成します。
    • upsert_sales_last_update_time マッピング: DataMart処理実行後にlast_updated_time tableのtable_nameカラムがsaleslast_updatedカラムをUpsertします。
    • upsert_event_last_update_time マッピング: DataMart処理実行後にlast_updated_time tableのtable_nameカラムがeventlast_updatedカラムをUpsertします。
-- DataMart SQL
WITH last_updated_sales AS (
  SELECT last_updated 
  FROM last_updated_time 
  WHERE table_name = 'sales'
), 
last_updated_event AS (
  SELECT last_updated 
  FROM last_updated_time 
  WHERE table_name = 'event'
)
SELECT
  e.eventname,
  s.sellerid,
  SUM(s.pricepaid * s.qtysold) AS total_sales
FROM
  sales s
  JOIN event e ON s.eventid = e.eventid
  JOIN last_updated_sales lus ON s.update_at > lus.last_updated
  JOIN last_updated_event lue ON e.update_at > lue.last_updated
WHERE
  e.catid = '9'
GROUP BY
  e.eventname,
  s.sellerid
ORDER BY
  total_sales DESC;

注意点

今回の構成は、CDIでの実装ハンズオンのため、CDI上でSQL コンポーネントを使用しないように実装してみてください。以降は、実際のハンズオンとなりますので、一度時間をかけて実装してみた後に自身の実装と照らし合わせていただければと思います。

課題1のハンズオン

m_sw_sales

それでは以下のマッピングから作成していきます。

  • salesマッピング :salestableにInsertするマッピング処理を実装します。Insertする際はupdate_atカラムに更新日付も登録します。

事前にsales_header.csvsales_data .csv配置し、不要なファイルはdoneフォルダに格納するようにS3 Bucket上で操作します。

まずはソースに先ほど格納したsales_header.csvを選択します。 次に詳細からSource TypeをDirectoryを選択し、フォルダーパスに先ほどsales_header.csvファイルを格納したS3 Pathを指定します。この設定により、オブジェクトで指定したheaderのcsvファイルではなく、Folder Path直下のファイルをすべて参照することになります。これによりsales_data_yyyymmdd.csvのような可変なcsvファイル名で来た場合でも取り込めるようになります。sales_header.csvの役割としては、フィールド値を指定する必要があるため作成しています。なので、headerファイルの中身はカラム名のみとなっています。

次に形式オプションを開き、スキーマファイルにGithubに格納されているshm_ m_sw_sales.jsonを設定します。スキーマファイルを設定することでフィールドの精度をそれぞれのカラムに合わせてカスタマイズすることができます。修飾子モードは修飾子として"を使用したいため、ALLを選択し、それ以外はデフォルトのままで設定しておきます。

次に式Transformationを設定します。

プラスボタンの新しいフィールドからupdate_atカラムの値のためにnew_updt_atフィールドを作成し、システム変数のSYSDATEを使用します。

saletimeカラムにdatetime型として値を登録するためにnew_saletimeを以下の式の値で設定します。

IIF( IsNull(saletime), NULL, To_Date(saletime,'YYYY-MM-DD HH24:MI:SS' ))

次にターゲットを設定します。ターゲットにはdwhsalestableを指定し、Upsertで更新カラムはsalesidを指定します。 Redshiftへデータを取り込む際に一時的利用するS3 Bucketには任意のBucket名を記載します。

フィールドマッピングではフィールドマップオプションを手動に設定し、自動マップからスマートマップを実行、実行後にsaletimeはマップ解除し、new_の値をそれぞれ、左からドラッグ&ドロップします。

mt_sw_sales

次にマッピングタスクですが、先ほど作成したマッピングを指定し、詳細セッションプロパティで以下の値を入力し、保存したら完了となります。

セッションプロパティ名 セッションプロパティ値 備考
日時形式文字列 YYYY-MM-DD HH24:MI:SS datetime型の形式を指定
エラー時に停止 1 エラー時に1であると停止する設定

tf_sw_sales

次にタスクフローを作成します。左側の新規ボタンからタスクフローを選択します。 データタスクで先ほど作成したマッピングタスクを指定します。 エラー処理ではエラー発生時カスタムエラー処理にチェックを入れます。 エラー時に通知タスクを紐づけます。任意のメールと件名を設定します。 本文はHTML形式を選択し、任意のエラー文言と該当のデータタスクフィールドを左から選択してエラー内容を表示します。 エラー時にスローコンポーネントを紐づけて、データタスクの失敗時のコード、詳細、理由をそれぞれ選択します。

fl_tf_sw_sales

次にファイルリスナを作成します。

新規のコンポーネントからファイルリスナを選択します。 それぞれ以下の設定とします。表の設定項目以外は、デフォルトのまま使用します。

項目 設定
ファイルリスナの名前 fl_<タスクフロー名>
ソースタイプ コネクタ
接続タイプ Amazon S3 v2
ソースタイプ コネクタ
接続 事前に作成したs3 Connector
フォルダパス ファイルをリスニングするフォルダパス

次にタスクフローにファイルリスナを紐づけるために、タスクフローを一度publish解除します。 解除した後に、開始プロパティからバインディングにイベントを選択し、イベントリソース名に先ほど作成したファイルリスナ名を選択し、再びタスクフローpublishしたら完了です。

課題1のsales tableについて、それぞれ作成しました。event tableについては、全く同様の手順で作成できるため、割愛させていただきます。

課題2のハンズオン

次に課題2のハンズオンになります。以下の順番で実装していきたいと思います。

  • select_sales_last_update_time マッピング: sales tableの前回実行以降のデータを差分抽出するために、last_updated_timetableからsaleslast_updatedを抽出するマッピングを作成し、取得した値を入出力パラメータに格納します。
  • event_sales_summary マッピング:以下のsql条件で、先程の入出力パラメータの値を用いて前回実行以降のデータを差分抽出し、DataMartのevent_sales_summary tableへUpsertするマッピングを作成します。
  • upsert_sales_last_update_time マッピング: DataMart処理実行後にlast_updated_time tableのtable_nameカラムがsaleslast_updatedカラムをUpsertします。
  • event_sales_summarytable用タスクフロー作成:DWHへInsertする処理後にスケジュール起動するタスクフローを作成します。

m_w_select_sales_last_update_time

それでは、last_update_timetableからsalestableの最終更新日付を取得するマッピングを作成します。 まずはソースにlast_update_timetableを選択し、クエリオプションのフィルタでsalesで絞り込むように設定します。

次にevent_sales_summary マッピングで最終更新日付を使用できるように入出力パラメータを作成します。

入出力パラメータについては以下記事を参考にしました。

【CDI】Informatica Cloud Data Integrationでターゲットファイルを作成せずに入出力パラメータに値を代入する方法

次に式をの中で新たにevent_last_updatedフィールドをdatetime型で作成します。 フィールドの式としては以下のように設定することで、先ほど作成したパラメータに最終更新日付をセットすることができます。

SetVariable($$param_sales_last_updated,last_updated)

ターゲットでは何も出力したくないので、フラットファイルコネクタでdev/nullを指定することでファイルを出力しないようにします。

マッピングタスクは課題1と同様の設定で作成すれば良いので割愛させていただきます。m_w_select_event_last_update_timeマッピングについても同様の手順となるので、割愛します。

m_wm_event_sales_summary

次にevent_sales_summarytableへUpsertするマッピングを作成します。 まず、先ほど渡した入出力パラメータを使用するために、本マッピングでも入出力パラメータを設定します。

ソースにsalestableを選択します。 クエリオプションのフィルタの設定で以下の条件とすることで、最終更新日付以降のデータを取得します。

sales.update_at  > '$$param_sales_last_updated'

eventtableのソースも同様に設定します。

次にjoinerの設定をします。まず、受信フィールドですが一部カラム名が重複しているので一括でそれぞれのプレフィックスを追加する機能を使用してカラム名を一意にします。 salestableの場合はs_eventtableの場合は、e_と指定しました。

結合条件としては、eventidで結合するようにします。Masterにサイズの小さいデータソースを割り当てることが推奨されていますが、今回はどちらも同じデータサイズのため、salesをマスタに設定します。

IICS CDI Mapping Designer入門 〜Joiner(結合)編〜

次にsqlでいうwhere句の処理にあたるfilerを実装します。条件としてe_catid9で絞り込みます。

次にaggregatorで集計関数を設定します。グループ化でGrop Byで指定するe_eventnames_selleridを指定します。 集計で新たにtotal_salesフィールドを定義し、Sum関数で二つの値をかけた値を格納します。

Sum(s_pricepaid * s_qtysold)

Sorterでは、SQLでいうOrder byを表現するため、total_salesフィールドの降順を設定します。

ターゲットではevent_sales_summarytableを設定し、eventnameカラムをUpsert Keyとします。ステージング用のS3 bucketも設定します。 フィールドマッピングを以下のように設定し、本マッピングの作成は完了です。マッピングタスクについては課題1と同様の作成方法のため割愛します。

m_w_upsert_sales_last_update_time

event_sales_summarytableへのUpsert処理後に最終更新日付を更新するマッピングを作成します。

ソースにlast_updated_timetableを指定し、フィルタの条件でsalestableに絞り込みます。 式ではnew_last_updatedフィールドを作成し、SYSDATEを設定します。 ターゲットでは、last_updated_timetableを指定し、table_nameをUpsert Keyに設定します。 フィールドマッピングに先ほど作成したフィールドを設定し、本マッピングの作成は完了です。マッピングタスクについては課題1と同様の作成方法のため割愛します。

tf_wm_event_sales_summary

タスクフローを作成します。まず、並列パスを設定し、mt_w_select_event_last_update_timemt_w_select_sales_last_update_timeを設定するデータタスクを作成します。

次にmt_wm_event_sales_summaryを設定するデータタスクを作成します。本データタスクの入力フィールドを設定し、先ほどのデータタスクの入出力パラメータを値として設定するようにします。 並列パスを設定し、mt_w_upsert_event_last_update_timemt_w_upsert_sales_last_update_timeを設定するデータタスクを作成します。

sc_tf_wm_event_sales_summary

最後にスケジュール設定を行います。

作成したタスクフローを右クリックし、スケジュールを選択します。 ここではジョブ名として、job_<タスクフロー>と入力し、新しいスケジュールを選択します。 名前はsc_<タスクフロー>と入力し、スケジュール時間を人二の値に設定します。 最後にスケジュールの割り当てを押下したら完了となります。

初回実行

それでは初回実行となりますが、初回実行の前に、last_updated_time tableに初期値が必要ですので、以下のInsert文を実行します。

INSERT INTO last_updated_time (table_name, last_updated) 
VALUES 
    ('event', '2000-01-01 00:00:00'),
    ('sales', '2000-01-01 00:00:00');

それでは、ファイルリスナが起動した状態に設定し、S3 Bucketに対象ファイルを格納します。

ファイル転送ログからファイルリスナがそれぞれのファイルを検知していることを確認できました。

ファイルリスナ起動で実行された実行結果になります。共に正常に終了されていることを確認できました。

Redshift上で確認しても100件取り込めていることがわかります。

この状態で、MartにInsertするSQLを実行し、IICSでの処理後にMartがどの様なレコードが入るかを確認します。 全5レコードとなります。

次にMart処理を実行します。Martはスケジュール起動のため、任意の時間に設定します。

タスクフローが問題なく成功していることを確認しました。

Redshiftのevent_sales_summary tableを確認しましたが、件数もレコードの値も先ほどの内容と一致したので、想定通りの結果となりました。

last_updated_time tableのlast_updatedカラムも更新されていることが確認できました。

差分実行

続いて差分処理を確認していきます。 まずは先ほどと同様に差分用のcsvファイルを格納しDWH処理を実行します。

タスクフローが成功しました。

Redshiftにも3件登録されていることがわかります。

先ほどと同様に、MartにInsertするSQLを実行し、IICSでの処理後にMartがどの様なレコードが入るかを確認します。 実行結果は以下2レコードとなりました。

スケジュールでタスクフローを実行したところ、正常に終了しました。

SQLと同様に追加の2件がInsertされていることを確認できたので、問題なさそうですね。

最後に

どうしても画面操作が多く、画像での説明が多くなってしまいましたが、お付き合いいただきありがとうございました。 その他にもIICSのハンズオンや参考記事として、以下も確認いただければより理解が深まると思います。 お疲れ様でした!!