IICS CDIでNormalizer transformationを使用してカラム名やカラム順を変更してみた
はじめに
データアナリティクス事業本部ビッグデータチームのyosh-kです。
IICS CDIのNormalizer transformationとは、1つの受信カラムを複数のカラムに変換させたり、その逆に複数のカラムを1つにまとめたりすることができる機能です。
今回は上記用途とはではなく、Normalizerを使用してのカラム名やカラム順の変更、新規カラムの追加、カラムのNULL化を行いたいと思います。
Normalizer transformation
IICS CDI Mapping Designer入門 〜Normalizer(ノーマライザ)編〜
実現したいこと
以下の要望があったケースを想定します。
- Redshiftに格納されたtableデータをS3に移行したい
- 移行する際には、実行日単位で、Spectrum tableとして参照できるようにして欲しい
- 移行先のtableについて、カラム名やカラム順の変更、新規カラムの追加、カラムのNULL化を行いたい
上記の内容を実現するために図の構成で実装していきたいと思います。
- ① IICS CDIのマッピングで、ソースに移行元のRedshift tableを指定
- ② IICS CDIのNormalizerで変換処理を行い、ターゲットに移行先のS3パスを日付prefixの形式でparquetファイルとしてInsert
- ③ 事前に作成したspectrum tableにAlter table add partitionを実行して、spectrum tableで参照できる形に
前提条件
データ準備
Redshift serverlessが事前に構築されていることとします。 まだの方は以下ブログの「Redshift Serverless構築」項目を参考にしてみてください。今回はブログで構築したflightdata tableを元にして、レコード数を30件に絞り、datetime型の検証ためにupdate_at カラムを追加したflightdata_limit_30 tableを作成します。
IICS CDI Amazon Redshift V2 ConnectorでRedshift Serverlessと接続できるか試してみた
CREATE TABLE "dev"."public"."flightdata_limit_30" AS SELECT * FROM "dev"."public"."flightdata" limit 30
ALTER TABLE "dev"."public"."flightdata_limit_30" ADD update_at DATETIME NOT NULL DEFAULT (GETDATE());
マッピング設計
移行元の public.flightdata_limit_30 tableと移行先の spectrum tableのマッピングを以下にまとめました。赤字箇所が修正が必要なカラムになります。
No. | public.flightdata_limit_30 | → | spectrum.flightdata_limit_30 | マッピング |
---|---|---|---|---|
1 | flight_date | flight_date | flight_date | |
2 | flight_number | flight_number | flight_number | |
3 | origin_city | departure_city | destination_city | |
4 | origin_state | departure_state | destination_state | |
5 | destination_city | origin_city | origin_city | |
6 | destination_state | origin_state | origin_state | |
7 | departure_delay | departure_delay | departure_delay | |
8 | arrival_delay | arrival_delay | arrival_delay | |
9 | cancelled | cancelled | NULL | |
10 | - | cancelled_reservation | cancelled | |
11 | diverted | diverted | diverted | |
12 | air_time | air_time | air_time | |
13 | distance | distance | distance | |
14 | distance_group | distance_group | distance_group | |
15 | carrier_delay | carrier_delay | carrier_delay | |
16 | weather_delay | weather_delay | weather_delay | |
17 | security_delay | security_delay | security_delay | |
18 | late_aircraft_delay | late_aircraft_delay | late_aircraft_delay | |
19 | origin_airport_name | origin_airport_name | origin_airport_name | |
20 | cancellation_reason | cancellation_reason | cancellation_reason | |
21 | destination_airport_name | destination_airport_name | destination_airport_name | |
22 | carrier_name | carrier_name | carrier_name | |
23 | update_at | update_at | update_at |
カラム名の変更
以下のカラム名を変更します。
修正前 | → | 修正後 |
---|---|---|
destination_city | departure_city | |
destination_state | departure_state |
カラム順の変更
以下のカラム順を変更します。
No. | 修正前 | → | 修正後 |
---|---|---|---|
1 | origin_city | destination_city | |
2 | origin_state | destination_state | |
3 | destination_city | origin_city | |
4 | destination_state | origin_state |
新規カラムの追加
以下のカラムを追加します。
新規カラム |
---|
cancelled_reservation |
カラムのNULL化
以下のカラムをNULL化します。
NULL化 |
---|
cancelled |
Redshift IAMロールの確認
次にターゲットとなるspectrum tableを以下ページを参考に作成します。今回は、AWS Glueデータカタログを用いてspectrum tableを作成するので、RedshiftのIAMロールに「AWSGlueConsoleFullAccess」、「AmazonS3ReadOnlyAccess」ポリシーを付与します。
Amazon Redshift Spectrum の開始方法
S3 spectrum準備
spectrum tableを作成するために、まずはspectrum schemaを定義する必要があります。Redshiftのクエリエディタ v2で先ほどのIAM Roleを指定した形で、spectrum schemaを定義します。
CREATE EXTERNAL SCHEMA spectrum FROM DATA CATALOG DATABASE 'kasama' IAM_ROLE '<IAM Role ARN>' CREATE EXTERNAL DATABASE IF NOT EXISTS;
spectrum schemaが作成できたら、今回のターゲットとなるspectrum tableをsqlで作成します。
CREATE EXTERNAL TABLE spectrum.flightdata_limit_30 ( flight_date date , flight_number integer , departure_city character varying(256) , departure_state character varying(256) , origin_city character varying(256) , origin_state character varying(256) , departure_delay numeric(18, 0) , arrival_delay numeric(18, 0) , cancelled numeric(18, 0) , cancelled_reservation numeric(18, 0) , diverted numeric(18, 0) , air_time numeric(18, 0) , distance numeric(18, 0) , distance_group numeric(18, 0) , carrier_delay character varying(256) , weather_delay character varying(256) , security_delay character varying(256) , late_aircraft_delay character varying(256) , origin_airport_name character varying(256) , cancellation_reason character varying(256) , destination_airport_name character varying(256) , carrier_name character varying(256) , update_at TIMESTAMP WITHOUT TIME ZONE ) PARTITIONED BY (year int, month int, day int) STORED AS PARQUET LOCATION 's3://<bucket name>/target/spectrum_prefix/' TABLE PROPERTIES('compression_type'='snappy')
マッピング作成
それではIICS CDIでマッピングを作成していきます。
ソース
ソースにはpublic.flightdata_limit_30 tableを指定します。
式
一部の値をNULL化するために、左側のタブから「式」を選択し、プロパティの「式」から+ボタンを選択し、exp_null フィールドを作成します。
フィールドタイプは「出力フィールド」、名前は exp_null、タイプは string、制度は 10 と設定します。
式には NULL を返すように指定します。
Normalizer
次に、左側のタブから Normalizer を選択し、マッピングを行います。プロパティの「正規化されたフィールド」から「フィールドの作成 > 受信フィールドから選択」を選択します。
flight_date と flight_number は特に変更がないため、そのまま選択しOKボタンを押下します。
次に departure_city と departure_state ですが、ともにStringの精度が256なので、「フィールドの作成 > 新しいフィールド」から作成します。
destination_city と destination_state を飛ばした origin_city 〜 cancelled までを「フィールドの作成 > 受信フィールドから選択」で選択します。
次に cancelled_reservation カラムを cancelled と同様に、number型の精度が18で「フィールドの作成 > 新しいフィールド」から作成します。
diverted 〜 update_at は修正点は無いので、「フィールドの作成 > 受信フィールドから選択」で選択します。
次にフィールドマッピングを行います。プロパティの「フィールドマッピング」の画面に遷移し、「受信フィールド」から値をドラッグし、「マッピングされたフィールド」へドロップすることで、マッピングができます。今回は、「departure_city」「departure_state」「cancelled」「cancelled_reservation」カラム以外はフィールド名が一致しているので、それぞれドラッグ&ドロップします。
「departure_city」「departure_state」「cancelled」「cancelled_reservation」カラムについては、今回新たにマッピングした値を当てはめます。
ターゲット
次にターゲットを設定します。プロパティから「ターゲット」を選択し、形式を「Parquet」、操作は「Insert」を選択します。
ターゲットオブジェクトは「実行時に新規作成」を選択し、「特殊文字の処理」にチェックを入れ、オブジェクト名は「parquet拡張子の任意のファイル名」、パスには「year=%Y/month=%m/day=%d」と付与することでタイムスタンプ形式を含めます。
次にプロパティの「ターゲットフィールド」を選択します。Normailizerとターゲットがparquetファイルの組み合わせの場合、ターゲットフィールドがNormalizerにより不適合なものになり、Alter table実行後に、spectrum tableを参照するとエラーとなってしまうので、手動で変更する必要があります。変更方法としては、「メタデータの編集」から「ネイティブタイプ」と「タイプ」を修正します。
現状、私が検証した中で、ソースtableで以下の型で定義されている場合は、修正が必要になります。
修正対象の元の型 | ネイティブタイプ | タイプ | → | ネイティブタイプ | タイプ |
---|---|---|---|---|---|
integer | parquet_decimal | decimal | parquet_int32 | integer | |
boolean | parquet_decimal | decimal | parquet_boolean | integer | |
date | parquet_string | string | parquet_date | date/time | |
datetime | parquet_string | string | parquet_int96 | date/time |
※ 上記の設定でもエラーとなる場合は、作成したマッピングをコピーして式、Normalizerを除いた形に変更して、ターゲットフィールドを確認してみることをお勧めします。
修正が完了したらマッピングを保存して、マッピングを実行します。
マッピング実行結果
マッピング実行結果は問題なく成功しています。
S3のパスにも問題なく、日付区切りでparquetファイルが格納されていることを確認できました。
Alter table add partition
次にAlter table add partitionをRedshiftのクエリエディタ上で実行し、spectrum tableにpartitionを追加します。
ALTER TABLE spectrum."flightdata_limit_30" ADD IF NOT EXISTS PARTITION(year = 2023, month = 02, day= 26) LOCATION 's3://<bucket name>/target/spectrum_prefix/year=2023/month=02/day=26/'
データの確認
public schemaとspectrum schemaのtableでデータに差分が無いかを確認していきます。 データを確認するために、table毎にRedshiftのクエリエディタからselect文を実行し、出力された結果をCSVファイルとしてExportします。
出力されたファイルをExcelファイルに貼り付けて、加工データのカラム順などを整えます。
public schemaとspectrum schemaのセルをそれぞれ「=」で結びつけて値がTRUEとなるか確認します。FALSEの場合は、セル背景色が赤色となるように設定しておくと見やすいと思います。結果としては、最後の update_at のみFALSEで表示されましたので、別途確認したいと思います。
IICS CDIでターゲット parquetファイルでdatetime型カラムの値が-9時間される
値を確認したところ、parquetファイルの update_at カラムが-9時間されていることを確認しました。2023/02/28現在、現状の原因は調査中の状況です。
table | update_at |
---|---|
public.flightdata_limit_30 | 2023/02/26 0:14 |
spectrum.flightdata_limit_30 | 2023/02/25 15:14 |
暫定対応
暫定対応として、該当のカラムの値を+9時間することで時間を合わせたいと思います。ADD_TO_DATEという関数がありますので、そちらを用いて該当カラムに+9時間したものを作成し、マッピングします。
マッピング実行後にAlter table文を実行し、再度確認した結果、publicスキーマと同様のdatetimeとなりました。
最後に
上記の暫定対応については、2023/2/28現在の即席で考えた対応ですので、原因を追求し、恒久対応を確認できましたら、ブログをアップデートしたいと思います。このブログが少しでも困っている方の助けになれば幸いです。