Treasure Dataのクエリ結果を直接Amazon Redshiftに投入する

2015.08.24

良く訓練されたWar Boys、しんやです。夏場以降、例の映画 *1にハマってしまいこれまでに映画館で6回鑑賞(V6)してしまいました。しかし世の中には8回(V8)以上の鑑賞を成し遂げている方々も居るようですのでまだまだ精進せねばならぬと思っている今日この頃です。

さて、以前当ブログで Treasure DataからAmazon Redshiftへのデータ投入をtd-agentを使って行う という方法についてご紹介しました。こちらの情報でも充分便利な仕組みとなっているのですが、先日"管理コンソールの設定のみで"直接Treasure DataのデータをAmazon Redshiftに投入する手法について紹介されていましたので、当エントリでその手順を試してみたいと思います。

目次

Amazon Redshiftクラスタの準備

まずは投入先となるAmazon Redshiftクラスタを準備します。こちらは外部から接続可能な手頃なAmazon Redshiftクラスタを一つ用意しておいてもらえれば問題ありません。クラスタを立ち上げ、COPY/INSERT可能なユーザーで接続出来るような状態にしておいてください。投入先のテーブルに関してはこの後の手順で作成する事になるので特にこのタイミングで用意しておく必要はありません。

td-from-redshift_02

Treasure Data設定

次いでTreasure Data側の設定を行います。こちらは先のブログエントリにあるように、アカウント作成時点で用意されているサンプルデータを使う事にしてみたいと思います。データベースから『sample_datasets』を選択。

td-from-redshift_03

sample_datasetsの中に用意されている『nasdaq』のテーブルの[Query]ボタンをクリック。

td-from-redshift_04

TypeについてはHiveのまま、実行文を設定するテキストボックスに以下のクエリを記載します。volumeの降順のソート順で先頭10件を取得するクエリです。設定出来たら[Run]をクリックして実行。

SELECT
    volume,
    high,
    low,
    close
FROM
    nasdaq
ORDER by
    volume DESC
LIMIT 10

td-from-redshift_05

所定のクエリを実行し、[SUCCESS]とステータスが表示され、結果も表示される事を確認してクエリの正当性を確認しておきます。確認が出来たら[Edit Query]をクリック。

td-from-redshift_061

クエリのLIMIT句指定を外し、全件取得した内容をRedshiftに投入するシナリオで実行してみたいと思います。テキストエリア内クエリのLIMIT句を除去し、[Add]をクリック。

td-from-redshift_07

エクスポート先のデータソースに関する設定を行います。接続に必要なAmazon Redshiftクラスタに関する情報をそれぞれ設定します。

td-from-redshift_08

スキーマ及びデータベース名、また投入する際の方式についての設定を行います。今回は追記(Append)の形でCOPY文を実行する形を取ってみました。Redshiftの場合、INSERT文はCOPY文に比べるとパフォーマンス面で相当コストが掛かりますので、ここはCOPY一択にしておくのが良いのではないでしょうか。設定が完了したら[Use]をクリック。

td-from-redshift_09

実行結果確認

設定情報を改めて確認し、[Run]をクリック。

td-from-redshift_10

しばらくすると実行が完了します。

td-from-redshift_11

実行ログを確認してみましょう。まずはOutputから。こちらは途中、RedshiftのCREATE TABLE文やCOPY文を実行している事が確認出来ますね。

Output

started at 2015-08-22T02:02:25Z
15/08/22 02:02:30 WARN conf.Configuration: org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@96e32ba:an attempt to override final parameter: mapreduce.input.fileinputformat.split.minsize;  Ignoring.
Hive history file=/mnt/hive/tmp/5315/hive_job_log_29278ec3-6f85-4b14-b2eb-9940a017eb58_1375404562.txt
15/08/22 02:02:37 INFO physical.Vectorizer: Vectorizing task...
**
** WARNING: time index filtering is not set!
** This query could be very slow as a result.
** If you used 'unix_timestmap' please modify your query to use TD_SCHEDULED_TIME instead
**   or rewrite the condition using TD_TIME_RANGE
** Please see http://docs.treasure-data.com/articles/performance-tuning#leveraging-time-based-partitioning
**
finished at 2015-08-22T02:06:05Z
15/08/22 02:06:08 INFO result.AbstractJDBCResult: Mode: in-place append
15/08/22 02:06:08 INFO result.RedshiftResult: Method: copy
15/08/22 02:06:08 INFO result.RedshiftResult: Failed to guess region from the host name. Using the default region
15/08/22 02:06:08 INFO result.RedshiftResult: S3 region: us-east-1
15/08/22 02:06:08 INFO result.ResultWorker: Data size: 88,543,429 bytes (compressed)
15/08/22 02:06:08 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
15/08/22 02:06:08 INFO compress.CodecPool: Got brand-new decompressor [.gz]
15/08/22 02:06:08 INFO result.AbstractJDBCResult: Connected to Database: PostgreSQL 8.0.2
15/08/22 02:06:08 INFO jdbc.JDBCUtils: SQL: SET search_path TO "public"
15/08/22 02:06:08 INFO jdbc.JDBCUtils: > 0.01 seconds
15/08/22 02:06:08 INFO result.AbstractJDBCResult: Getting list of tables...
15/08/22 02:06:08 INFO result.AbstractJDBCResult: Table "nasdaq" at "public" does not exist. Creating it using the schema of query results.
15/08/22 02:06:08 INFO jdbc.JDBCUtils: SQL: CREATE TABLE "nasdaq" ("volume" BIGINT, "high" DOUBLE PRECISION, "low" DOUBLE PRECISION, "close" DOUBLE PRECISION)
15/08/22 02:06:08 INFO jdbc.JDBCUtils: > 0.01 seconds
15/08/22 02:06:09 INFO result.AbstractJDBCResult: Connected to Database: PostgreSQL 8.0.2
15/08/22 02:06:09 INFO jdbc.JDBCUtils: SQL: SET search_path TO "public"
15/08/22 02:06:09 INFO jdbc.JDBCUtils: > 0.00 seconds
15/08/22 02:06:09 INFO result.RedshiftResult: Copy SQL: COPY "nasdaq" ("volume", "high", "low", "close") FROM ? GZIP DELIMITER '\t' NULL '\N' ESCAPE TRUNCATECOLUMNS ACCEPTINVCHARS STATUPDATE OFF COMPUPDATE OFF
15/08/22 02:07:05 INFO jdbc.RedshiftCopyBatchInsert: Uploading file id 81fb27ec-3541-4be3-a813-a00a12e1b54f to S3 (68,341,632 bytes 8,807,278 rows)
15/08/22 02:07:08 INFO result.AbstractJDBCResult: Connected to Database: PostgreSQL 8.0.2
15/08/22 02:07:08 INFO jdbc.JDBCUtils: SQL: SET search_path TO "public"
15/08/22 02:07:08 INFO jdbc.JDBCUtils: > 0.00 seconds
15/08/22 02:07:08 INFO jdbc.RedshiftCopyBatchInsert: Running COPY from file id 81fb27ec-3541-4be3-a813-a00a12e1b54f
15/08/22 02:07:23 INFO jdbc.RedshiftCopyBatchInsert: Loaded file id 81fb27ec-3541-4be3-a813-a00a12e1b54f (14.22 seconds for COPY)
15/08/22 02:07:23 INFO jdbc.RedshiftCopyBatchInsert: Loaded 1 files.

次いでDetail。およそ3分半で約880万件のCOPY処理がなされた事を確認出来ます。

Details

Query ID = 5315_20150822020202_7cb7ec1c-f16c-483e-ad3c-1d80f87513f6
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1439526184895_19940, Tracking URL = http://ip-10-95-160-94.ec2.internal:8088/proxy/application_1439526184895_19940/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1439526184895_19940
Hadoop job information for Stage-1: number of mappers: 10; number of reducers: 1
2015-08-22 02:02:44,036 Stage-1 map = 0%,  reduce = 0%
2015-08-22 02:02:54,340 Stage-1 map = 4%,  reduce = 0%, Cumulative CPU 5.66 sec
2015-08-22 02:02:55,373 Stage-1 map = 7%,  reduce = 0%, Cumulative CPU 10.85 sec
2015-08-22 02:02:57,438 Stage-1 map = 9%,  reduce = 0%, Cumulative CPU 12.52 sec
2015-08-22 02:02:58,469 Stage-1 map = 11%,  reduce = 0%, Cumulative CPU 14.22 sec
2015-08-22 02:03:01,558 Stage-1 map = 12%,  reduce = 0%, Cumulative CPU 16.25 sec
2015-08-22 02:03:04,644 Stage-1 map = 13%,  reduce = 0%, Cumulative CPU 18.29 sec
2015-08-22 02:03:06,704 Stage-1 map = 16%,  reduce = 0%, Cumulative CPU 20.52 sec
2015-08-22 02:03:07,734 Stage-1 map = 17%,  reduce = 0%, Cumulative CPU 21.1 sec
2015-08-22 02:03:10,832 Stage-1 map = 20%,  reduce = 0%, Cumulative CPU 23.18 sec
2015-08-22 02:03:17,002 Stage-1 map = 23%,  reduce = 0%, Cumulative CPU 28.5 sec
2015-08-22 02:03:20,096 Stage-1 map = 28%,  reduce = 0%, Cumulative CPU 35.53 sec
2015-08-22 02:03:23,184 Stage-1 map = 31%,  reduce = 0%, Cumulative CPU 38.08 sec
2015-08-22 02:03:26,269 Stage-1 map = 32%,  reduce = 0%, Cumulative CPU 40.2 sec
2015-08-22 02:03:29,356 Stage-1 map = 33%,  reduce = 0%, Cumulative CPU 42.0 sec
2015-08-22 02:03:32,440 Stage-1 map = 37%,  reduce = 0%, Cumulative CPU 44.89 sec
2015-08-22 02:03:35,525 Stage-1 map = 40%,  reduce = 0%, Cumulative CPU 47.27 sec
2015-08-22 02:03:42,716 Stage-1 map = 43%,  reduce = 0%, Cumulative CPU 58.45 sec
2015-08-22 02:03:45,806 Stage-1 map = 48%,  reduce = 0%, Cumulative CPU 64.91 sec
2015-08-22 02:03:48,896 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU 67.53 sec
2015-08-22 02:03:51,985 Stage-1 map = 52%,  reduce = 0%, Cumulative CPU 69.43 sec
2015-08-22 02:03:55,062 Stage-1 map = 53%,  reduce = 0%, Cumulative CPU 71.67 sec
2015-08-22 02:03:59,165 Stage-1 map = 57%,  reduce = 0%, Cumulative CPU 74.82 sec
2015-08-22 02:04:01,222 Stage-1 map = 60%,  reduce = 0%, Cumulative CPU 77.21 sec
2015-08-22 02:04:08,393 Stage-1 map = 63%,  reduce = 0%, Cumulative CPU 82.43 sec
2015-08-22 02:04:10,445 Stage-1 map = 67%,  reduce = 0%, Cumulative CPU 87.65 sec
2015-08-22 02:04:11,472 Stage-1 map = 68%,  reduce = 0%, Cumulative CPU 88.97 sec
2015-08-22 02:04:13,520 Stage-1 map = 70%,  reduce = 0%, Cumulative CPU 90.67 sec
2015-08-22 02:04:14,544 Stage-1 map = 71%,  reduce = 0%, Cumulative CPU 91.96 sec
2015-08-22 02:04:16,592 Stage-1 map = 72%,  reduce = 0%, Cumulative CPU 92.84 sec
2015-08-22 02:04:19,663 Stage-1 map = 73%,  reduce = 0%, Cumulative CPU 95.24 sec
2015-08-22 02:04:23,767 Stage-1 map = 77%,  reduce = 0%, Cumulative CPU 98.66 sec
2015-08-22 02:04:25,816 Stage-1 map = 80%,  reduce = 0%, Cumulative CPU 100.67 sec
2015-08-22 02:04:32,983 Stage-1 map = 83%,  reduce = 0%, Cumulative CPU 105.77 sec
2015-08-22 02:04:35,030 Stage-1 map = 87%,  reduce = 0%, Cumulative CPU 111.15 sec
2015-08-22 02:04:36,053 Stage-1 map = 89%,  reduce = 0%, Cumulative CPU 112.55 sec
2015-08-22 02:04:38,098 Stage-1 map = 90%,  reduce = 0%, Cumulative CPU 114.18 sec
2015-08-22 02:04:39,126 Stage-1 map = 91%,  reduce = 0%, Cumulative CPU 115.09 sec
2015-08-22 02:04:41,172 Stage-1 map = 92%,  reduce = 0%, Cumulative CPU 116.04 sec
2015-08-22 02:04:44,241 Stage-1 map = 93%,  reduce = 0%, Cumulative CPU 118.38 sec
2015-08-22 02:04:48,335 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 123.33 sec
2015-08-22 02:04:58,575 Stage-1 map = 100%,  reduce = 54%, Cumulative CPU 133.13 sec
2015-08-22 02:05:01,651 Stage-1 map = 100%,  reduce = 67%, Cumulative CPU 136.25 sec
2015-08-22 02:05:04,724 Stage-1 map = 100%,  reduce = 68%, Cumulative CPU 142.03 sec
2015-08-22 02:05:07,800 Stage-1 map = 100%,  reduce = 70%, Cumulative CPU 145.62 sec
2015-08-22 02:05:10,879 Stage-1 map = 100%,  reduce = 71%, Cumulative CPU 149.2 sec
2015-08-22 02:05:13,956 Stage-1 map = 100%,  reduce = 73%, Cumulative CPU 152.76 sec
2015-08-22 02:05:17,032 Stage-1 map = 100%,  reduce = 75%, Cumulative CPU 156.3 sec
2015-08-22 02:05:20,102 Stage-1 map = 100%,  reduce = 76%, Cumulative CPU 159.73 sec
2015-08-22 02:05:23,175 Stage-1 map = 100%,  reduce = 78%, Cumulative CPU 163.05 sec
2015-08-22 02:05:26,248 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 166.11 sec
MapReduce Total cumulative CPU time: 2 minutes 46 seconds 110 msec
Ended Job = job_1439526184895_19940
MapReduce Jobs Launched: 
Job 0: Map: 10  Reduce: 1   Cumulative CPU: 166.11 sec   HDFS Read: 1207380 HDFS Write: 313612407 SUCCESS
Total MapReduce CPU Time Spent: 2 minutes 46 seconds 110 msec
OK
MapReduce time taken: 170.45 seconds
Fetching results...
Total CPU Time: 166110
Total Records: 8,807,278
Time taken: 207.325 seconds

実際にRedshiftのデータベースを確認してみましょう。ログインを行い。所定のテーブルが出来ているかを確認。テーブルも新規で作成され、ログで表示されていた件数のデータが格納されている事を確認出来ました。

$ psql -h (ホスト名/Endpoint) -U (ユーザー名) -d (データベース名) -p (接続ポート)
Password for user xxxxxxxxxx: (パスワードを入力)
psql (9.4.1, server 8.0.2)
SSL connection (protocol: TLSv1.2, cipher: ECDHE-RSA-AES256-GCM-SHA384, bits: 256, compression: on)
Type "help" for help.

# \d public.nasdaq;
         Table "public.nasdaq"
 Column |       Type       | Modifiers 
--------+------------------+-----------
 volume | bigint           | 
 high   | double precision | 
 low    | double precision | 
 close  | double precision | 

# SELECT COUNT(*) FROM public.nasdaq;
  count  
---------
 8807278
(1 row)

# SELECT * FROM public.nasdaq ORDER BY volume DESC LIMIT 10;
   volume   |  high  |  low   | close  
------------+--------+--------+--------
 7522511800 | 0.0743 | 0.0705 | 0.0715
 6935025000 |   0.05 |   0.05 |   0.05
 6024738600 |   0.09 |   0.08 |   0.09
 5245411600 | 0.0655 | 0.0603 | 0.0604
 4890173300 | 0.0754 | 0.0709 | 0.0751
 4752863000 |  0.073 | 0.0642 | 0.0727
 4374987500 | 0.0685 | 0.0636 |  0.065
 4122998500 |   0.04 |   0.04 |   0.04
 3799612500 |   0.06 |   0.06 |   0.06
 3722479300 | 0.0684 | 0.0656 | 0.0675
(10 rows)

# 

まとめ

以上、Treasure Dataから管理コンソール上の設定のみでAmazon Redshiftへの投入を行う手順についてのご紹介でした。Treasure DataからAmazon Redshiftへのデータ投入という手順は多くのケースで求められる部分となると思います。そんな手順を管理コンソールの手順のみで設定、管理出来るというのはユーザーにとっては嬉しい部分ですね。こちらからは以上です。

脚注