データ統合基盤CSアナリティクスを使ってS3上のデータをRedshiftに定期連携してみる

2020.07.14

弊社では、短期間かつお手軽にAWS上にデータ分析基盤を導入可能なCSA(Customer Story Analytics)というデータ統合基盤パッケージを提供しています。

JMCでは、ジョブの構成要素として「データ連携」、「プログラム」、「SQL」の3つを定義することが可能で、その構成要素の一つ、あるいは複数を組み合わせる事によってジョブとして構成し、実行しますが、今回は、その中でも「データ連携」を紹介します(参考:JMCの概要説明

なお、JMC含め、CSAの全体像の説明については、こちらのブログでも触れていますので、是非ご一読ください。

目次

構成要素「データ連携」とは?

  • S3に格納されたデータを、Redshift上のテーブルに取り込む機能
  • JMCではコマンドを書かずに、GUI上で、inputとなる対象データを格納するS3上のバケットの任意のフォルダと、output先となるRedshiftの任意のスキーマ及びテーブルを定義することが可能

テーブル作成

Redshift上に対象となるスキーマとテーブルをSQLクライアントツールで作成します。SQLクライアントツールから対象のRedshiftに接続した上で、以下のSQL文を実行します。なお、今回、データはサンプルとしてtableauのcsvサンプルデータ(アメリカで人気の新生児名)を使いますので、そのデータに沿ったSQL文でテーブルを作成します。

CREATE SCHEMA top_babies_schema;

CREATE TABLE IF NOT EXISTS top_babies_schema.top_babies (
    "state" character(2),
    "gender" character(1) encode lzo,
    "year" smallint encode az64,
    "top name" character varying(11) encode zstd,
    "occurences" smallint encode az64
);

※ご参考:SQLクライントツール

S3へのデータ格納

対象バケット配下のフォルダ構造は下記のとおりとなります。

CSAは日次連携で登録するデータは「daily」、週次なら「weekly」、月次なら「monthly」、毎時なら「hourly」、スケジュール未設定なら「manual」配下にデータを格納します。任意で作成したpythonファイル(構成要素「プログラム」の中身)は「program」配下に、任意で作成したSQLファイル(構成要素「SQL」の中身)は「sql」配下に格納します。 今回は、日次連携でやってみたいと思いますので、「daily」配下にCSVデータを格納します。

***-***-***-bucket(バケット名)
├─monthly
│   └─bbbbb(テーブル名)
│    └─YYYY
│        └─MM
│          xxxxxxxxxxxxxxx.csv.gz
├─weekly
│   └─bbbbb(テーブル名)
│    └─YYYY
│        └─MM
│            └─DD
│              xxxxxxxxxxxxxxx.csv.gz
├─daily
│   └─bbbbb(テーブル名)
│    └─YYYY
│        └─MM
│            └─DD
│              xxxxxxxxxxxxxxx.csv.gz
├─hourly
│   └─bbbbb(テーブル名)
│    └─YYYY
│        └─MM
│            └─DD
│              └─HH
│                xxxxxxxxxxxxxxx.csv.gz
├─manual
│   └─bbbbb(テーブル名)
│    └─YYYY
│        └─MM
│            └─DD
│                xxxxxxxxxxxxxxx.csv.gz
├─program
│   └python
│ 
├─sql

daily配下のフォルダに上記のように対象となる年、月、日を表すフォルダを作成し、対象のデータを「日」を表すフォルダの中に格納します。 なお、今回の説明では割愛しますが、フォルダの作成は、JMCの構成要素の一つである「プログラム」でジョブ実行時に自動生成することが可能です。

「データ連携」作成

JMCにログインして、ヘッダーメニューの「構成要素」から「データ連携」を選択します。

「データ連携」の画面で「作成」をクリックします。

作成画面で以下のとおり、入力し、最下部の「保存」あるいは「テスト&保存」を押します。

S3のファイルパスは、末端の「日」を表すフォルダまで指定しません。今回の場合は日次のジョブですが、JMCが現在日時から、取得先のフォルダを自動的に判断(2020年7月7日の場合、「2020」→「07」→「07」を見に行き、翌日のジョブ実行時には、「2020」→「07」→「08」を見に行きます。)して、S3からデータを取得しますので、ここでは、「年」を表すフォルダを格納している「top_babies」フォルダを指定します。

上記、入力項目各々の説明は以下のとおりです。

項目 説明
スキーマ名 対象となるファイルを取り込むテーブルを配備するスキーマ名を選択。(※選択した値はプルダウン横の「クリップボード」機能でコピー可能です)
テーブル名 対象となるファイルを取り込むテーブルの名前を選択。(※選択した値はプルダウン横の「クリップボード」機能でコピー可能です)
説明 備考欄として利用可能な項目です。任意の文字列を設定する事が出来ます。
[自動設定]ボタン 押下すると、テーブル作成時の情報を転記します。
バケット名 取り込みファイルを格納しているフォルダのS3バケットを指定します。[サイト設定]→[データ連携設定]画面より対応するバケットを追加する事が可能です。
S3ファイルパスを固定表示にする ファイルのアップロードパスを固定させたい場合に利用します。
S3ファイルパス 取り込みファイルを格納しているS3ファイルパスのうち、「テーブル名」に相当する値を末尾にスラッシュ(/)を付与する形で設定してください。S3ファイルパスは、取り込みサイクルに応じた形で構成されます。(daily/monthly/などのスケジュールサイクル系フォルダが指定可能です)
取り込み方式 対象ファイルで実現したい「取り込み方式」を「全件洗替(DELETE ALL & COPY)」、「 」、「差分更新(UPSERT)」の中から選択。
キーカラム名 判定条件としたいカラムを指定。※取り込み方式が「増分追記」もしくは「差分更新」の場合に表示・選択可能
ファイルの種類 取り込み対象のファイル種別を「CSV/TSV」、「JSON」、「Parquet」の3種類の中から選択。
ファイル囲み文字 ファイルで利用されている囲み文字の情報を指定します。
区切り文字 ファイルで利用されている区切り文字の情報を指定します。
GZip圧縮有無 取り込み対象にファイルにおける「GZip圧縮」の状況を選択。
文字エンコーディング 取り込み対象ファイルの文字エンコーディングを選択。
データの日付形式 取り込み対象のファイルで扱っている日付(DATE)の書式を選択。(※未指定の場合、'YYYY-MM-DD'で処理を行います)
データの日時書式 取り込み対象のファイルで扱っている日時(TIMESTAMP)の書式を選択。(※未指定の場合、'YYYY-MM-DD HH:MI:SS'で処理を行います)
リトライ間隔(分) 取り込み対象ファイルが存在しなかった場合に処理を再実行(リトライ)する際の時間間隔を入力。※「S3にファイルが存在しない場合、リトライを行う」をOnにした場合に利用可能
リトライ回数 取り込み対象ファイルが存在しなかった場合に処理を再実行(リトライ)する際の回数を入力。※「S3にファイルが存在しない場合、リトライを行う」をOnにした場合に利用可能
S3ファイルが存在しない場合の扱い 該当S3パスにファイルが存在しない場合の処理の扱いを選択可能です。

「ジョブ」作成

ヘッダーメニューから「ジョブ」→「ジョブ一覧」を選択します。

ジョブ一覧の画面で「ジョブの追加」を押下。

下記のようなモーダルウィンドウが表示されますので、ジョブ名に任意の名前、また、ジョブ実行種別を選択します。ジョブ実行種別は、「構成要素を実行」、「ジョブを実行」の2種類から選択します。今回は、前者を選択しますが、例えば、すでに作成したジョブを、組み合わせて別のジョブを作成する際には、後者を選択します。

追加を押すと、ジョブの編集画面が以下のとおり表示されます。今回は日次のジョブとなりますので、「日次」を選択の上、ジョブの実行時間を入力した後、「編集」を押します。

「下記のモーダルウィンドウが表示されます。左側の「設定可能な構成要素」にさきほど作成した構成要素の「データ連携」がありますので、これを、ドラッグアンドドロップ、あるいは、「+」ボタンを押すことで、右側の「現在の構成要素」に移動させ、ジョブの構成要素に含めます。右下の「保存」を押します。

ジョブの構成要素を表示する場所に以下のとおり、今登録した「データ連携」の項目が反映されていることを確認して、左下の「保存」を押します。

ジョブの一覧画面に戻ると、登録した「top_babies_job」というジョブがジョブの一覧に反映されていることが確認できます。このジョブの左側にある、ジョブのON/OFFをONに変更し、これで毎日、23:30に開始されるジョブの実行準備は完了です。

ジョブ実行・結果確認

23:30に、ジョブが開始され、成功したことが確認できます。(インジケーターは左から「成功」、「実行中」、「失敗」、「待機中」となります。)

SQLクライアントツールから、Redshift上のテーブルを確認すると、データが連携されていることが確認できました。

まとめ

JMCは直感的なGUIでのジョブの設定、実行を可能にするツールで、お客様のニーズに合わせ、新たなジョブの作成、既存ジョブの修正等をお客様自身で行って頂けるツールです。

今回は、構成要素の「データ連携」について説明しましたが、S3のデータをRedshiftに反映した上で、そのデータを元にしたデータマートの作成は、さらに構成要素「SQL」を使ったジョブを実行することで実現可能です。構成要素「SQL」については、今後、別のブログで説明を設けられればと思います。

今後もCSAの機能について定期的に発信していきます。