CSA JMCで実践!データ連携 〜Amazon Redshift環境でのデータ取り込み(COPY)処理をコード記述無しで行う〜 #データ統合基盤 #CSアナリティクス

2020.12.21

当エントリは『クラスメソッド CSアナリティクス Advent Calendar 2020』21日目のエントリです。

当エントリでは、クラスメソッドが展開しているデータ統合基盤サービス『CSアナリティクス』(以降"CSA")のプロダクト「CSA JMC(Job Management Console)」でAmazon Redshift環境上でデータ連携を行う際の手順について紹介します。

目次

 

Amazon Redshiftにおけるデータ連携の進め方と課題

アドベントカレンダー企画で投稿した前日のエントリでは、Redshift環境において従来行っていた「テーブル作成」の作業を、CSA JMCを活用する事で大幅に簡略化、引いてはリソースの削減や手順の自動化にまで展開させることが出来る旨を紹介していました。当エントリで紹介する「データ連携」においても、同様にCSA JMCを活用する事で大幅に作業工数を削減出来ます。

ここで言う「データ連携」とは、DWH(Redshift)環境内のデータベースにデータを取り込む処理の事を指します。すなわちRedshiftのCOPYコマンドを使って行う部分です。

従来、何も無い状態であれば、Amazon Redshiftにデータを取り込む処理を準備して実行するまでにおおよそ以下のような流れを汲むはずです。ざっと思い付く感じで並べてみましたが、Amazon RedshiftのCOPY処理を手作業で作り上げたことのある方々であれば、こういった作業は実に面倒臭いものであるというのをご理解頂けるかと思います。数が少なければまだ対処のしようがあるかも知れませんが、これ(データ取り込み対象のテーブルや取り込み処理自体)の数が50個、100個...となるととてもじゃないですがうんざりするようなボリューム感となってしまいます。

  1. [テーブル作成] 取り込み対象のファイルに関する定義を、データの内容を元に列毎に確認していく(カラム名、テーブル定義における列毎の属性(データ型やサイズ、NULL許可)
  2. [テーブル作成] 上記内容を元にテーブルの列定義を定める
  3. [テーブル作成] テーブルの属性(主キーやソートキー、分散キー)を定める
  4. [テーブル作成] 上記内容を元にCREATE TABLE文を作成し実行
  5. [データ連携] 作成したテーブルにデータが入るか検証を行うためのサンプルデータをS3にアップロード
  6. [データ連携] Amazon RedshiftのCOPY文を準備(COPY処理の際の詳細オプションなどを、処理内容に応じてカスタマイズする作業も含む)
  7. [データ連携] Amazon RedshiftのCOPY文を実行
  8. [データ連携] 上記実行処理がエラーとなる場合はエラーの内容を確認し、データもしくはテーブル定義の微調整を行う(多分大半は後者=データが入るテーブル定義への変更となると思う)
  9. [データ連携] エラーが無くなるまでCOPY検証を行い、出来上がったCOPY処理を任意のサイクルでスケジュールに組み込む

 

CSA JMCではデータ連携も良い感じにシステム側で対応します

CSA JMCでは、この「データ連携(COPY文の準備と実行)」にまつわる手間や労力を削減するために「データ連携に関する処理の作成・実行スケジュールの指定も、システム側でテーブル作成をよしなに対応してしまう」方式を採用しました。

まず、上記「課題」の項で列挙した「テーブル作成」の作業については、先日公開したエントリで紹介している機能でカバーする事が出来ます。

そして、「データ連携」の作業については、取り込むデータのサイクルを制御・連動しやすくするためにクラウドストレージ(Amazon S3)へのファイル配置のルールを「所定の配置・構成で統一化」させて頂きました。このルールを守って頂くことで、テーブル作成からデータ連携までの一連の設定作業、及びデータ連携のスケジュール実行を円滑に行う流れを実現出来ます。

主なルールは以下です。

  • データ連携対象S3バケット配下に、下記スケジュールや用途に応じたフォルダを予め用意しておくこと
    • init (テーブル作成時に取り込むデータを格納しておくフォルダ)
    • monthly (データ投入サイクルが「月次」のデータを格納するフォルダ)
    • weekly (データ投入サイクルが「週次」のデータを格納するフォルダ)
    • daily (データ投入サイクルが「日次」のデータを格納するフォルダ)
    • hourly (データ投入サイクルが「毎時」のデータを格納するフォルダ)
    • manual (任意の構成で自由に利用可能な形とするフォルダ)
    • program (構成要素「プログラム」を格納するフォルダ)
    • sql (構成要素「SQL」を格納するフォルダ)
  • スケジュールに応じたフォルダ配下にファイルを格納する際は、スケジュールサイクルごとに定められたフォルダ構成を遵守すること
    • 例).日次連携を行う場合
      [(csaの指定バケット名)]
      └─[daily]
          └[(テーブル名フォルダ)]
              ├[2020]
                  ├[11]
                      ├[01]
                          xxxxxxxxxx1.csv.gz
                          xxxxxxxxxx2.csv.gz
                      ├[02]
                      ├[03]
                      :
                      ├[30]
                  ├[12]
                      ├[01]
                      :

 

CSA JMCでのデータ連携実演

ではここから、CSA JMCの機能を用いた「データ連携」の実践解説を行います。

 

(1).データ連携対象テーブルの用意

まずはデータ連携の処理対象となるテーブルの準備から。前述言及した「テーブル作成」のエントリ手順に従い、こんな感じの内容を持つCSVデータファイルを、

csa_sales_sample_20201220_01.csv

order_date,order_store_cd,order_store_name,product_cd,sum_sales
2020/12/20,A10001,ああああああああああ,XXXXX,42104
2020/12/20,A10001,ああああああああああ,YYYYY,1515155
2020/12/20,A10001,ああああああああああ,ZZZZZ,20595
2020/12/20,A10001,ああああああああああ,QQQQQ,540359
2020/12/20,A10002,いいいいいいいい,AAAAA,5235
2020/12/20,A10002,いいいいいいいい,BBBBB,235325
2020/12/20,A10002,いいいいいいいい,CCCCC,329085
2020/12/20,A10002,いいいいいいいい,DDDDD,112853
2020/12/20,A10002,いいいいいいいい,EEEEE,21985
2020/12/20,B20001,うううううううう,FFFFF,532523
2020/12/20,B20001,うううううううう,GGGGG,532985
2020/12/20,B20001,うううううううう,HHHHH,53290508
2020/12/20,B20001,うううううううう,IIIII,82741
2020/12/20,B20002,ええええええええ,JJJJJ,325983
2020/12/20,B20003,おおおおおおおお,KKKKK,72151
2020/12/20,B20004,かかかかかかかかかかか,LLLLL,410985
2020/12/20,B20005,きききききききききき,MMMMM,139484

予めAmazon S3の所定のフォルダに格納しておき、

CSA JMCの「テーブル作成」機能を使って以下のような設定を行い、テーブルを作成しました。

作成後のSQLによる確認内容は以下の通りです。

# \d csademo.csa_sales_sample;
                         Table "csademo.csa_sales_sample"
       Column       |            Type             | Collation | Nullable | Default 
--------------------+-----------------------------+-----------+----------+---------
 order_date         | date                        |           | not null | 
 order_store_cd     | character(6)                |           | not null | 
 order_store_name   | character varying(33)       |           |          | 
 product_cd         | character(5)                |           |          | 
 sum_sales          | integer                     |           |          | 
 csa_dwh_file_path  | character varying(200)      |           |          | 
 csa_dwh_created_at | timestamp without time zone |           |          | 
Indexes:
    "csa_sales_sample_tmp_for_ginga_pkey1" PRIMARY KEY, btree (order_date, order_store_cd)

# 
# SELECT * FROM csademo.csa_sales_sample ORDER BY order_date, order_store_cd;
 order_date | order_store_cd |    order_store_name    | product_cd | sum_sales |                    csa_dwh_file_path                     | csa_dwh_created_at  
------------+----------------+------------------------+------------+-----------+----------------------------------------------------------+---------------------
 2020-12-20 | A10001         | ああああああああああ   | XXXXX      |     42104 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A10001         | ああああああああああ   | YYYYY      |   1515155 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A10001         | ああああああああああ   | ZZZZZ      |     20595 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A10001         | ああああああああああ   | QQQQQ      |    540359 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A10002         | いいいいいいいい       | AAAAA      |      5235 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A10002         | いいいいいいいい       | BBBBB      |    235325 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A10002         | いいいいいいいい       | EEEEE      |     21985 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A10002         | いいいいいいいい       | DDDDD      |    112853 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A10002         | いいいいいいいい       | CCCCC      |    329085 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A30001         | ああああああああああ   | ZZZZZ      |     20595 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A30001         | ああああああああああ   | XXXXX      |     42104 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A30001         | ああああああああああ   | QQQQQ      |    540359 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A30001         | ああああああああああ   | YYYYY      |   1515155 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A30002         | いいいいいいいい       | CCCCC      |    329085 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A30002         | いいいいいいいい       | AAAAA      |      5235 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A30002         | いいいいいいいい       | BBBBB      |    235325 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A30002         | いいいいいいいい       | EEEEE      |     21985 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | A30002         | いいいいいいいい       | DDDDD      |    112853 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B20001         | うううううううう       | FFFFF      |    532523 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B20001         | うううううううう       | GGGGG      |    532985 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B20001         | うううううううう       | HHHHH      |  53290508 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B20001         | うううううううう       | IIIII      |     82741 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B20002         | ええええええええ       | JJJJJ      |    325983 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B20003         | おおおおおおおお       | KKKKK      |     72151 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B20004         | かかかかかかかかかかか | LLLLL      |    410985 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B20005         | きききききききききき   | MMMMM      |    139484 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B40001         | うううううううう       | FFFFF      |    532523 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B40001         | うううううううう       | GGGGG      |    532985 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B40001         | うううううううう       | HHHHH      |  53290508 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B40001         | うううううううう       | IIIII      |     82741 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B40002         | ええええええええ       | JJJJJ      |    325983 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B40003         | おおおおおおおお       | KKKKK      |     72151 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B40004         | かかかかかかかかかかか | LLLLL      |    410985 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
 2020-12-20 | B40005         | きききききききききき   | MMMMM      |    139484 | s3://csa-jmc-v5demo-redshift-copy/init/csa_sales_sample/ | 2020-12-21 xx:xx:xx
(34 rows)

 

(2).データ連携を使って取り込むデータの「取り込みサイクル」を決める

次は、データ連携で取り込むデータの「サイクル決め」と「テスト用のデータ配備」を行います。

今回試す取り込みサイクルは、最もポピュラーであろう「日次」とします。定められた構成を踏まえて、以下のようなフォルダ構成及びファイルを用意しました。この日付フォルダ配下のファイルを対象テーブルに取り込むイメージです。

 

(3).構成要素(データ連携)の作成と取り込みテスト

当エントリの本題である「構成要素(データ連携)」の作成です。

CSA JMCでは、実際に処理を行う1つのまとまりを「ジョブ」と呼び、そのジョブ配下で実際に処理を行っている定義の部分を「構成要素」と呼んでいます。

構成要素の種類には「データ連携」「SQL」「プログラム」の3種類が存在します。今回はこの中の「データ連携」について作成と実践を行う形となります。

データ連携作成を行うには、メニューの[構成要素]→[データ連携]を選択。

一覧上部にある[作成]を押下。

データ連携を行うテーブルの「スキーマ名」と「テーブル名」を選択。この時、作成したテーブルがCSA JMCを利用したものの場合、すぐ下にある[自動設定]を押下することで、

テーブル作成時の設定をそのまま活用する形で一発設定してくれます!必要であれば設定を書き換えて保存する流れになりますが、今回はこの指定のままで問題ないので先に進みます。[テスト&保存]を押下。

保存だけであれば設定画保存されるのみですが、この[テスト]機能を使うことで「実際に意図した形でデータ連携処理が稼働するかどうか」を確認することが出来ます。(※この操作自体では実際に取り込み処理を行うことはありません)

動作検証が済んだらキャンセルまたは[x]押下でウインドウを閉じ、改めて[保存]を押下してください。

構成要素(データ連携)が作成出来ました。

ちなみにテストの際、存在していないフォルダ(を示す日付)を指定すると以下の様にエラーとなります。

 

(4).データ連携処理の実行

処理に組み込む構成要素(データ連携)は作成出来たので、いよいよその構成要素を動かすジョブを作成します。CSA JMCのジョブ一覧画面にて[ジョブの追加]を押下、構成要素を実行するタイプのジョブを作成します。

実行予定時刻に、今回想定しているサイクルである「日次」の種別及び時間指定を行います。その上で[編集]を押下。

ここでは、予め作成済みの構成要素を任意の順番で処理対象とすることが出来ます。今回やりたいのはデータ連携なので、先程作成したデータ連携の[+]部分をクリックし、「現在の構成要素」枠にエントリーします。そして[保存]を押下。

ジョブの構成要素設定、及びスケジュール設定が完了しました。保存のタイミングに合わせてジョブ右上のスケジュールサイクル部分の内容も更新されていることが確認出来ます。

ジョブ一覧に戻り、行左端のスケジュール実行有効チェックをONにしておきます。

日次で設定した時間がやってきました。程なくしてジョブ実行キューにジョブが入り、実行が始まります。

実行完了。ログを確認してみます。

データ取り込みに関する処理が行われていることがログ内容からも確認出来ています。また、取り込み件数の情報についても、データが取り込まれた結果増えていることが確認出来ました。

実際のDB内容をSQLでも確認してみます。ちゃんと反映されていますね!

# SELECT COUNT(*) FROM csademo.csa_sales_sample;
 count 
-------
    85
(1 row)

# SELECT order_date,COUNT(order_date) FROM csademo.csa_sales_sample GROUP BY order_date ORDER BY order_date;
 order_date | count 
------------+-------
 2020-12-20 |    34
 2020-12-21 |    51
(2 rows)

 

まとめ

という訳で、『クラスメソッド CSアナリティクス Advent Calendar 2020』21本目のエントリ、「CSA JMC(Job Management Console)」でAmazon Redshift環境上でデータ連携を行う際の手順に関する内容の紹介でした。

CSA Data Uploaderは1ヶ月間のトライアル利用が可能となっています。興味をお持ち頂いた方は是非無料版ダウンロードページからインストーラを入手頂き、お試し頂けますと幸いです。また、CSA JMCに関しても下記バナーから製品ページにアクセスする事が出来ます。是非御覧ください。

では、明日(22日目)のエントリもお楽しみに!