IICS CDIでNormalizer transformationを使用してカラム名やカラム順を変更してみた

2023.02.28

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

データアナリティクス事業本部ビッグデータチームのkasamaです。
IICS CDIのNormalizer transformationとは、1つの受信カラムを複数のカラムに変換させたり、その逆に複数のカラムを1つにまとめたりすることができる機能です。 今回は上記用途とはではなく、Normalizerを使用してのカラム名やカラム順の変更、新規カラムの追加、カラムのNULL化を行いたいと思います。

Normalizer transformation
IICS CDI Mapping Designer入門 〜Normalizer(ノーマライザ)編〜

実現したいこと

以下の要望があったケースを想定します。

  • Redshiftに格納されたtableデータをS3に移行したい
  • 移行する際には、実行日単位で、Spectrum tableとして参照できるようにして欲しい
  • 移行先のtableについて、カラム名やカラム順の変更、新規カラムの追加、カラムのNULL化を行いたい

上記の内容を実現するために図の構成で実装していきたいと思います。

  • ① IICS CDIのマッピングで、ソースに移行元のRedshift tableを指定
  • ② IICS CDIのNormalizerで変換処理を行い、ターゲットに移行先のS3パスを日付prefixの形式でparquetファイルとしてInsert
  • ③ 事前に作成したspectrum tableにAlter table add partitionを実行して、spectrum tableで参照できる形に

前提条件

データ準備

Redshift serverlessが事前に構築されていることとします。 まだの方は以下ブログの「Redshift Serverless構築」項目を参考にしてみてください。今回はブログで構築したflightdata tableを元にして、レコード数を30件に絞り、datetime型の検証ためにupdate_at カラムを追加したflightdata_limit_30 tableを作成します。

IICS CDI Amazon Redshift V2 ConnectorでRedshift Serverlessと接続できるか試してみた

CREATE TABLE "dev"."public"."flightdata_limit_30"
AS
SELECT *
FROM   "dev"."public"."flightdata" limit 30
ALTER TABLE "dev"."public"."flightdata_limit_30"
ADD update_at DATETIME NOT NULL DEFAULT (GETDATE());

マッピング設計

移行元の public.flightdata_limit_30 tableと移行先の spectrum tableのマッピングを以下にまとめました。赤字箇所が修正が必要なカラムになります。

No. public.flightdata_limit_30 spectrum.flightdata_limit_30 マッピング
1 flight_date flight_date flight_date
2 flight_number flight_number flight_number
3 origin_city departure_city destination_city
4 origin_state departure_state destination_state
5 destination_city origin_city origin_city
6 destination_state origin_state origin_state
7 departure_delay departure_delay departure_delay
8 arrival_delay arrival_delay arrival_delay
9 cancelled cancelled NULL
10 - cancelled_reservation cancelled
11 diverted diverted diverted
12 air_time air_time air_time
13 distance distance distance
14 distance_group distance_group distance_group
15 carrier_delay carrier_delay carrier_delay
16 weather_delay weather_delay weather_delay
17 security_delay security_delay security_delay
18 late_aircraft_delay late_aircraft_delay late_aircraft_delay
19 origin_airport_name origin_airport_name origin_airport_name
20 cancellation_reason cancellation_reason cancellation_reason
21 destination_airport_name destination_airport_name destination_airport_name
22 carrier_name carrier_name carrier_name
23 update_at update_at update_at

カラム名の変更

以下のカラム名を変更します。

修正前 修正後
destination_city departure_city
destination_state departure_state

カラム順の変更

以下のカラム順を変更します。

No. 修正前 修正後
1 origin_city destination_city
2 origin_state destination_state
3 destination_city origin_city
4 destination_state origin_state

新規カラムの追加

以下のカラムを追加します。

新規カラム
cancelled_reservation

カラムのNULL化

以下のカラムをNULL化します。

NULL化
cancelled

Redshift IAMロールの確認

次にターゲットとなるspectrum tableを以下ページを参考に作成します。今回は、AWS Glueデータカタログを用いてspectrum tableを作成するので、RedshiftのIAMロールに「AWSGlueConsoleFullAccess」、「AmazonS3ReadOnlyAccess」ポリシーを付与します。

Amazon Redshift Spectrum の開始方法

S3 spectrum準備

spectrum tableを作成するために、まずはspectrum schemaを定義する必要があります。Redshiftのクエリエディタ v2で先ほどのIAM Roleを指定した形で、spectrum schemaを定義します。

CREATE EXTERNAL SCHEMA spectrum 
FROM DATA CATALOG 
DATABASE 'kasama'
IAM_ROLE '<IAM Role ARN>'
CREATE EXTERNAL DATABASE IF NOT EXISTS;

spectrum schemaが作成できたら、今回のターゲットとなるspectrum tableをsqlで作成します。

CREATE EXTERNAL TABLE spectrum.flightdata_limit_30 (
    flight_date date ,
    flight_number integer ,
    departure_city character varying(256) ,
    departure_state character varying(256) ,
    origin_city character varying(256) ,
    origin_state character varying(256) ,
    departure_delay numeric(18, 0) ,
    arrival_delay numeric(18, 0) ,
    cancelled numeric(18, 0) ,
    cancelled_reservation numeric(18, 0) ,
    diverted numeric(18, 0) ,
    air_time numeric(18, 0) ,
    distance numeric(18, 0) ,
    distance_group numeric(18, 0) ,
    carrier_delay character varying(256) ,
    weather_delay character varying(256) ,
    security_delay character varying(256) ,
    late_aircraft_delay character varying(256) ,
    origin_airport_name character varying(256) ,
    cancellation_reason character varying(256) ,
    destination_airport_name character varying(256) ,
    carrier_name character varying(256) ,
    update_at TIMESTAMP WITHOUT TIME ZONE
)
PARTITIONED BY (year int, month int, day int)
STORED AS PARQUET
LOCATION 's3://<bucket name>/target/spectrum_prefix/'
TABLE PROPERTIES('compression_type'='snappy')

マッピング作成

それではIICS CDIでマッピングを作成していきます。

ソース

ソースにはpublic.flightdata_limit_30 tableを指定します。

一部の値をNULL化するために、左側のタブから「式」を選択し、プロパティの「式」から+ボタンを選択し、exp_null フィールドを作成します。

フィールドタイプは「出力フィールド」、名前は exp_null、タイプは string、制度は 10 と設定します。

式には NULL を返すように指定します。

Normalizer

次に、左側のタブから Normalizer を選択し、マッピングを行います。プロパティの「正規化されたフィールド」から「フィールドの作成 > 受信フィールドから選択」を選択します。

flight_date と flight_number は特に変更がないため、そのまま選択しOKボタンを押下します。

次に departure_city と departure_state ですが、ともにStringの精度が256なので、「フィールドの作成 > 新しいフィールド」から作成します。

destination_city と destination_state を飛ばした origin_city 〜 cancelled までを「フィールドの作成 > 受信フィールドから選択」で選択します。

次に cancelled_reservation カラムを cancelled と同様に、number型の精度が18で「フィールドの作成 > 新しいフィールド」から作成します。

diverted 〜 update_at は修正点は無いので、「フィールドの作成 > 受信フィールドから選択」で選択します。

次にフィールドマッピングを行います。プロパティの「フィールドマッピング」の画面に遷移し、「受信フィールド」から値をドラッグし、「マッピングされたフィールド」へドロップすることで、マッピングができます。今回は、「departure_city」「departure_state」「cancelled」「cancelled_reservation」カラム以外はフィールド名が一致しているので、それぞれドラッグ&ドロップします。

「departure_city」「departure_state」「cancelled」「cancelled_reservation」カラムについては、今回新たにマッピングした値を当てはめます。

ターゲット

次にターゲットを設定します。プロパティから「ターゲット」を選択し、形式を「Parquet」、操作は「Insert」を選択します。

ターゲットオブジェクトは「実行時に新規作成」を選択し、「特殊文字の処理」にチェックを入れ、オブジェクト名は「parquet拡張子の任意のファイル名」、パスには「year=%Y/month=%m/day=%d」と付与することでタイムスタンプ形式を含めます。

次にプロパティの「ターゲットフィールド」を選択します。Normailizerとターゲットがparquetファイルの組み合わせの場合、ターゲットフィールドがNormalizerにより不適合なものになり、Alter table実行後に、spectrum tableを参照するとエラーとなってしまうので、手動で変更する必要があります。変更方法としては、「メタデータの編集」から「ネイティブタイプ」と「タイプ」を修正します。

現状、私が検証した中で、ソースtableで以下の型で定義されている場合は、修正が必要になります。

修正対象の元の型 ネイティブタイプ タイプ ネイティブタイプ タイプ
integer parquet_decimal decimal parquet_int32 integer
boolean parquet_decimal decimal parquet_boolean integer
date parquet_string string parquet_date date/time
datetime parquet_string string parquet_int96 date/time

※ 上記の設定でもエラーとなる場合は、作成したマッピングをコピーして式、Normalizerを除いた形に変更して、ターゲットフィールドを確認してみることをお勧めします。

修正が完了したらマッピングを保存して、マッピングを実行します。

マッピング実行結果

マッピング実行結果は問題なく成功しています。

S3のパスにも問題なく、日付区切りでparquetファイルが格納されていることを確認できました。

Alter table add partition

次にAlter table add partitionをRedshiftのクエリエディタ上で実行し、spectrum tableにpartitionを追加します。

ALTER TABLE spectrum."flightdata_limit_30" ADD IF NOT EXISTS PARTITION(year = 2023, month = 02, day= 26) LOCATION 's3://<bucket name>/target/spectrum_prefix/year=2023/month=02/day=26/'

データの確認

public schemaとspectrum schemaのtableでデータに差分が無いかを確認していきます。 データを確認するために、table毎にRedshiftのクエリエディタからselect文を実行し、出力された結果をCSVファイルとしてExportします。

出力されたファイルをExcelファイルに貼り付けて、加工データのカラム順などを整えます。

public schemaとspectrum schemaのセルをそれぞれ「=」で結びつけて値がTRUEとなるか確認します。FALSEの場合は、セル背景色が赤色となるように設定しておくと見やすいと思います。結果としては、最後の update_at のみFALSEで表示されましたので、別途確認したいと思います。

IICS CDIでターゲット parquetファイルでdatetime型カラムの値が-9時間される

値を確認したところ、parquetファイルの update_at カラムが-9時間されていることを確認しました。2023/02/28現在、現状の原因は調査中の状況です。

table update_at
public.flightdata_limit_30 2023/02/26 0:14
spectrum.flightdata_limit_30 2023/02/25 15:14

暫定対応

暫定対応として、該当のカラムの値を+9時間することで時間を合わせたいと思います。ADD_TO_DATEという関数がありますので、そちらを用いて該当カラムに+9時間したものを作成し、マッピングします。

ADD_TO_DATE

マッピング実行後にAlter table文を実行し、再度確認した結果、publicスキーマと同様のdatetimeとなりました。

最後に

上記の暫定対応については、2023/2/28現在の即席で考えた対応ですので、原因を追求し、恒久対応を確認できましたら、ブログをアップデートしたいと思います。このブログが少しでも困っている方の助けになれば幸いです。