Treasure Dataでのクエリ実行結果をスケジュール機能を使ってRedshiftに定期的にCOPYする

2016.10.29

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

先日投稿した下記のエントリではTreasure Dataで任意の日付範囲のデータを抽出するクエリを作成してみましたが、当エントリではそのクエリを定期的に実行を行うクエリとしてスケジューリングし、結果をAmazon Redshiftに投入するフローを形成してみたいと思います。

目次

 

利用するクエリ

先日投稿のエントリで利用したSQLを整理し、以下の様な形で前日分のデータを取得するようにしてみます。

SELECT
  seqno,
  TD_TIME_FORMAT(time,'yyyy-MM-dd HH:mm:ss') as registered_time,
  user_id,
  points,
  description
FROM
  applog
WHERE
  TD_TIME_RANGE(
    time,
    TD_TIME_FORMAT(TD_TIME_ADD(TD_SCHEDULED_TIME(), "-1d"),'yyyy-MM-dd','JST'),
    TD_TIME_FORMAT(TD_SCHEDULED_TIME(),'yyyy-MM-dd','JST')
  )
ORDER BY
  time;

 

クエリの作成

上記SQLクエリを用いて、スケジュール実行するための諸々の設定を進めます。

 

クエリの設定

クエリの新規作成を行い、実行するSQLクエリを貼り付けた後、[Output results]のチェックボックスを選択。

td-query-move-to-redshift_01

 

接続先情報の設定

[Create Connection]ウインドウが起動。Typeに[Redshift]を選択し、各種必要な接続情報を設定、[SAVE AND CONTINUE]を押下。

td-query-move-to-redshift_02

[Configuration]については、接続DB名、スキーマ名、テーブル名を選択。また、接続モードについては[append](追記)を選択します。選択可能な接続モードは以下の通り。設定後は[DONE]を押下。

  • Append(追記)
  • Replace(置換え)
  • Truncate(全件削除→追記)
  • Update(更新)

Redshiftに於ける接続設定の作成詳細については以下をご参照ください。

td-query-move-to-redshift_03

 

スケジュール設定

次にスケジュールの設定追加を行います。初回の場合は[Schedule]の横リンクが[None]となっているのでそのリンクを押下。

td-query-move-to-redshift_04

スケジュールは幾つかタイプが選べます。プルダウンを選択すると、

td-query-move-to-redshift_05

以下の様に候補が出てきますのでここでは[Custom cron...]を選択。スケジュールに関する詳細は以下をご参照ください。

td-query-move-to-redshift_06

ここでは、夜中の1:10に実行するスケジュールを組んでみたいと思います。cron形式の設定を入力し、タイムゾーンは日本時間で動かすので[Japan]を設定します。

td-query-move-to-redshift_07

一通りの設定が終わったら[SAVE AND SCHEDULE]を押下。

td-query-move-to-redshift_08

クエリ自体を保存しておきます。任意の名前と説明を入力し[SAVE]押下。

td-query-move-to-redshift_09

以下の様な形でクエリが保存されます。

td-query-move-to-redshift_10

 

スケジューリングされたクエリの実行確認

所定の時間が来ました。クエリ詳細の[Run History]タブにはこれまでの実行記録が履歴として表示されています。

td-query-move-to-redshift_11

正常に終了した事を確認した後、対象Redshiftクラスタの内容を確認してみます。一応データとしては登録されているようですが、タイムスタンプ型として登録したかったregistered_timeが文字列型として登録されており、またこのregistered_timeとdescriptionの2つの文字列型項目の文字列長が65535バイトと、VARCHAR型の最大桁数で定義されてしまっています。Redshiftのお作法としては『必要最低限の桁数が望ましい』のでこれは宜しくないですね...

# \d public.applog;
                 Table "public.applog"
     Column      |           Type           | Modifiers 
-----------------+--------------------------+-----------
 seqno           | bigint                   | 
 registered_time | character varying(65535) | 
 user_id         | bigint                   | 
 points          | bigint                   | 
 description     | character varying(65535) | 

# SELECT * FROM public.applog ORDER BY registered_time;
 seqno |   registered_time   | user_id | points | description 
-------+---------------------+---------+--------+-------------
    73 | 2016-10-28 00:00:00 |    1000 |  83084 | ZZZZZ
    74 | 2016-10-28 00:00:01 |    1000 |  84193 | AAAAA
    75 | 2016-10-28 00:59:59 |    1000 |  85302 | BBBBB
    :
    :
   142 | 2016-10-28 23:00:00 |    1000 | 159605 | YYYYY
   143 | 2016-10-28 23:00:01 |    1000 | 160714 | ZZZZZ
   144 | 2016-10-28 23:59:59 |    1000 | 161823 | AAAAA
(72 rows)

という訳で、改めてテーブルを適切な型で再定義・再作成します。registered_timeはTIMESTAMP型に、descriptionについては作成データの最大桁数は5バイトですが、少し余裕を持って10バイトとしてみました。

# DROP TABLE IF EXISTS public.applog;
CREATE TABLE IF NOT EXISTS public.applog (
  seqno INT encode NOT NULL,
  registered_time TIMESTAMP NOT NULL,
  user_id INT encode lzo NOT NULL,
  points INT encode mostly16,
  description VARCHAR(10) encode lzo,
  PRIMARY KEY(seqno)
)
SORTKEY(registered_time)
;

# \d public.applog;
                   Table "public.applog"
     Column      |            Type             | Modifiers 
-----------------+-----------------------------+-----------
 seqno           | integer                     | not null
 registered_time | timestamp without time zone | not null
 user_id         | integer                     | not null
 points          | integer                     | 
 description     | character varying(10)       | 
Indexes:
    "applog_pkey" PRIMARY KEY, btree (seqno)

形としてはTreasure Data側にテーブルを作らせず、Redshift側で用意した形となりますが、Treasure Data側の設定としては上記のままで特に問題ありません。スケジュールの時間設定を少し変更して改めてスケジュール起動からのデータが上記作成のテーブルに対して投入される事が確認出来てました。

また、データの追記についても特に問題無し。上記72件(SQLの抽出条件に合致する対象データの件数)が入っている状態で改めてスケジュール実行してみたところ、(値の内容は重複した形となりますが)正常に追記出来ている事が確認出来ました。

# SELECT * FROM public.applog ORDER BY seqno;
 seqno |   registered_time   | user_id | points | description 
-------+---------------------+---------+--------+-------------
    73 | 2016-10-28 00:00:00 |    1000 |  83084 | ZZZZZ
    73 | 2016-10-28 00:00:00 |    1000 |  83084 | ZZZZZ
    74 | 2016-10-28 00:00:01 |    1000 |  84193 | AAAAA
    74 | 2016-10-28 00:00:01 |    1000 |  84193 | AAAAA
    75 | 2016-10-28 00:59:59 |    1000 |  85302 | BBBBB
    75 | 2016-10-28 00:59:59 |    1000 |  85302 | BBBBB
    76 | 2016-10-28 01:00:00 |    1000 |  86411 | CCCCC
    :
    :
   140 | 2016-10-28 22:00:01 |    1000 | 157387 | DDDDD
   141 | 2016-10-28 22:59:59 |    1000 | 158496 | XXXXX
   141 | 2016-10-28 22:59:59 |    1000 | 158496 | XXXXX
   142 | 2016-10-28 23:00:00 |    1000 | 159605 | YYYYY
   142 | 2016-10-28 23:00:00 |    1000 | 159605 | YYYYY
   143 | 2016-10-28 23:00:01 |    1000 | 160714 | ZZZZZ
   143 | 2016-10-28 23:00:01 |    1000 | 160714 | ZZZZZ
   144 | 2016-10-28 23:59:59 |    1000 | 161823 | AAAAA
   144 | 2016-10-28 23:59:59 |    1000 | 161823 | AAAAA
(144 rows)

 

まとめ

計2本に渡って『Treasure Dataに蓄積されたデータを抽出し、Redshiftに継続して転送する』という処理の構築方法についてご紹介してきました。適宜設定は必要ですが、一度出来てしまえば後はTreasure Dataに全てお任せでRedshiftにデータを転送してくれる様になります。上手く活用してデータ分析を捗らせて行きたいところですね。こちらからは以上です。