Treasure Dataのクエリ結果を直接Amazon Redshiftに投入する
良く訓練されたWar Boys、しんやです。夏場以降、例の映画 *1にハマってしまいこれまでに映画館で6回鑑賞(V6)してしまいました。しかし世の中には8回(V8)以上の鑑賞を成し遂げている方々も居るようですのでまだまだ精進せねばならぬと思っている今日この頃です。
さて、以前当ブログで Treasure DataからAmazon Redshiftへのデータ投入をtd-agentを使って行う という方法についてご紹介しました。こちらの情報でも充分便利な仕組みとなっているのですが、先日"管理コンソールの設定のみで"直接Treasure DataのデータをAmazon Redshiftに投入する手法について紹介されていましたので、当エントリでその手順を試してみたいと思います。
目次
Amazon Redshiftクラスタの準備
まずは投入先となるAmazon Redshiftクラスタを準備します。こちらは外部から接続可能な手頃なAmazon Redshiftクラスタを一つ用意しておいてもらえれば問題ありません。クラスタを立ち上げ、COPY/INSERT可能なユーザーで接続出来るような状態にしておいてください。投入先のテーブルに関してはこの後の手順で作成する事になるので特にこのタイミングで用意しておく必要はありません。
Treasure Data設定
次いでTreasure Data側の設定を行います。こちらは先のブログエントリにあるように、アカウント作成時点で用意されているサンプルデータを使う事にしてみたいと思います。データベースから『sample_datasets』を選択。
sample_datasetsの中に用意されている『nasdaq』のテーブルの[Query]ボタンをクリック。
TypeについてはHiveのまま、実行文を設定するテキストボックスに以下のクエリを記載します。volumeの降順のソート順で先頭10件を取得するクエリです。設定出来たら[Run]をクリックして実行。
SELECT volume, high, low, close FROM nasdaq ORDER by volume DESC LIMIT 10
所定のクエリを実行し、[SUCCESS]とステータスが表示され、結果も表示される事を確認してクエリの正当性を確認しておきます。確認が出来たら[Edit Query]をクリック。
クエリのLIMIT句指定を外し、全件取得した内容をRedshiftに投入するシナリオで実行してみたいと思います。テキストエリア内クエリのLIMIT句を除去し、[Add]をクリック。
エクスポート先のデータソースに関する設定を行います。接続に必要なAmazon Redshiftクラスタに関する情報をそれぞれ設定します。
スキーマ及びデータベース名、また投入する際の方式についての設定を行います。今回は追記(Append)の形でCOPY文を実行する形を取ってみました。Redshiftの場合、INSERT文はCOPY文に比べるとパフォーマンス面で相当コストが掛かりますので、ここはCOPY一択にしておくのが良いのではないでしょうか。設定が完了したら[Use]をクリック。
実行結果確認
設定情報を改めて確認し、[Run]をクリック。
しばらくすると実行が完了します。
実行ログを確認してみましょう。まずはOutputから。こちらは途中、RedshiftのCREATE TABLE文やCOPY文を実行している事が確認出来ますね。
started at 2015-08-22T02:02:25Z 15/08/22 02:02:30 WARN conf.Configuration: org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@96e32ba:an attempt to override final parameter: mapreduce.input.fileinputformat.split.minsize; Ignoring. Hive history file=/mnt/hive/tmp/5315/hive_job_log_29278ec3-6f85-4b14-b2eb-9940a017eb58_1375404562.txt 15/08/22 02:02:37 INFO physical.Vectorizer: Vectorizing task... ** ** WARNING: time index filtering is not set! ** This query could be very slow as a result. ** If you used 'unix_timestmap' please modify your query to use TD_SCHEDULED_TIME instead ** or rewrite the condition using TD_TIME_RANGE ** Please see http://docs.treasure-data.com/articles/performance-tuning#leveraging-time-based-partitioning ** finished at 2015-08-22T02:06:05Z 15/08/22 02:06:08 INFO result.AbstractJDBCResult: Mode: in-place append 15/08/22 02:06:08 INFO result.RedshiftResult: Method: copy 15/08/22 02:06:08 INFO result.RedshiftResult: Failed to guess region from the host name. Using the default region 15/08/22 02:06:08 INFO result.RedshiftResult: S3 region: us-east-1 15/08/22 02:06:08 INFO result.ResultWorker: Data size: 88,543,429 bytes (compressed) 15/08/22 02:06:08 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 15/08/22 02:06:08 INFO compress.CodecPool: Got brand-new decompressor [.gz] 15/08/22 02:06:08 INFO result.AbstractJDBCResult: Connected to Database: PostgreSQL 8.0.2 15/08/22 02:06:08 INFO jdbc.JDBCUtils: SQL: SET search_path TO "public" 15/08/22 02:06:08 INFO jdbc.JDBCUtils: > 0.01 seconds 15/08/22 02:06:08 INFO result.AbstractJDBCResult: Getting list of tables... 15/08/22 02:06:08 INFO result.AbstractJDBCResult: Table "nasdaq" at "public" does not exist. Creating it using the schema of query results. 15/08/22 02:06:08 INFO jdbc.JDBCUtils: SQL: CREATE TABLE "nasdaq" ("volume" BIGINT, "high" DOUBLE PRECISION, "low" DOUBLE PRECISION, "close" DOUBLE PRECISION) 15/08/22 02:06:08 INFO jdbc.JDBCUtils: > 0.01 seconds 15/08/22 02:06:09 INFO result.AbstractJDBCResult: Connected to Database: PostgreSQL 8.0.2 15/08/22 02:06:09 INFO jdbc.JDBCUtils: SQL: SET search_path TO "public" 15/08/22 02:06:09 INFO jdbc.JDBCUtils: > 0.00 seconds 15/08/22 02:06:09 INFO result.RedshiftResult: Copy SQL: COPY "nasdaq" ("volume", "high", "low", "close") FROM ? GZIP DELIMITER '\t' NULL '\N' ESCAPE TRUNCATECOLUMNS ACCEPTINVCHARS STATUPDATE OFF COMPUPDATE OFF 15/08/22 02:07:05 INFO jdbc.RedshiftCopyBatchInsert: Uploading file id 81fb27ec-3541-4be3-a813-a00a12e1b54f to S3 (68,341,632 bytes 8,807,278 rows) 15/08/22 02:07:08 INFO result.AbstractJDBCResult: Connected to Database: PostgreSQL 8.0.2 15/08/22 02:07:08 INFO jdbc.JDBCUtils: SQL: SET search_path TO "public" 15/08/22 02:07:08 INFO jdbc.JDBCUtils: > 0.00 seconds 15/08/22 02:07:08 INFO jdbc.RedshiftCopyBatchInsert: Running COPY from file id 81fb27ec-3541-4be3-a813-a00a12e1b54f 15/08/22 02:07:23 INFO jdbc.RedshiftCopyBatchInsert: Loaded file id 81fb27ec-3541-4be3-a813-a00a12e1b54f (14.22 seconds for COPY) 15/08/22 02:07:23 INFO jdbc.RedshiftCopyBatchInsert: Loaded 1 files.
次いでDetail。およそ3分半で約880万件のCOPY処理がなされた事を確認出来ます。
Query ID = 5315_20150822020202_7cb7ec1c-f16c-483e-ad3c-1d80f87513f6 Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1439526184895_19940, Tracking URL = http://ip-10-95-160-94.ec2.internal:8088/proxy/application_1439526184895_19940/ Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1439526184895_19940 Hadoop job information for Stage-1: number of mappers: 10; number of reducers: 1 2015-08-22 02:02:44,036 Stage-1 map = 0%, reduce = 0% 2015-08-22 02:02:54,340 Stage-1 map = 4%, reduce = 0%, Cumulative CPU 5.66 sec 2015-08-22 02:02:55,373 Stage-1 map = 7%, reduce = 0%, Cumulative CPU 10.85 sec 2015-08-22 02:02:57,438 Stage-1 map = 9%, reduce = 0%, Cumulative CPU 12.52 sec 2015-08-22 02:02:58,469 Stage-1 map = 11%, reduce = 0%, Cumulative CPU 14.22 sec 2015-08-22 02:03:01,558 Stage-1 map = 12%, reduce = 0%, Cumulative CPU 16.25 sec 2015-08-22 02:03:04,644 Stage-1 map = 13%, reduce = 0%, Cumulative CPU 18.29 sec 2015-08-22 02:03:06,704 Stage-1 map = 16%, reduce = 0%, Cumulative CPU 20.52 sec 2015-08-22 02:03:07,734 Stage-1 map = 17%, reduce = 0%, Cumulative CPU 21.1 sec 2015-08-22 02:03:10,832 Stage-1 map = 20%, reduce = 0%, Cumulative CPU 23.18 sec 2015-08-22 02:03:17,002 Stage-1 map = 23%, reduce = 0%, Cumulative CPU 28.5 sec 2015-08-22 02:03:20,096 Stage-1 map = 28%, reduce = 0%, Cumulative CPU 35.53 sec 2015-08-22 02:03:23,184 Stage-1 map = 31%, reduce = 0%, Cumulative CPU 38.08 sec 2015-08-22 02:03:26,269 Stage-1 map = 32%, reduce = 0%, Cumulative CPU 40.2 sec 2015-08-22 02:03:29,356 Stage-1 map = 33%, reduce = 0%, Cumulative CPU 42.0 sec 2015-08-22 02:03:32,440 Stage-1 map = 37%, reduce = 0%, Cumulative CPU 44.89 sec 2015-08-22 02:03:35,525 Stage-1 map = 40%, reduce = 0%, Cumulative CPU 47.27 sec 2015-08-22 02:03:42,716 Stage-1 map = 43%, reduce = 0%, Cumulative CPU 58.45 sec 2015-08-22 02:03:45,806 Stage-1 map = 48%, reduce = 0%, Cumulative CPU 64.91 sec 2015-08-22 02:03:48,896 Stage-1 map = 50%, reduce = 0%, Cumulative CPU 67.53 sec 2015-08-22 02:03:51,985 Stage-1 map = 52%, reduce = 0%, Cumulative CPU 69.43 sec 2015-08-22 02:03:55,062 Stage-1 map = 53%, reduce = 0%, Cumulative CPU 71.67 sec 2015-08-22 02:03:59,165 Stage-1 map = 57%, reduce = 0%, Cumulative CPU 74.82 sec 2015-08-22 02:04:01,222 Stage-1 map = 60%, reduce = 0%, Cumulative CPU 77.21 sec 2015-08-22 02:04:08,393 Stage-1 map = 63%, reduce = 0%, Cumulative CPU 82.43 sec 2015-08-22 02:04:10,445 Stage-1 map = 67%, reduce = 0%, Cumulative CPU 87.65 sec 2015-08-22 02:04:11,472 Stage-1 map = 68%, reduce = 0%, Cumulative CPU 88.97 sec 2015-08-22 02:04:13,520 Stage-1 map = 70%, reduce = 0%, Cumulative CPU 90.67 sec 2015-08-22 02:04:14,544 Stage-1 map = 71%, reduce = 0%, Cumulative CPU 91.96 sec 2015-08-22 02:04:16,592 Stage-1 map = 72%, reduce = 0%, Cumulative CPU 92.84 sec 2015-08-22 02:04:19,663 Stage-1 map = 73%, reduce = 0%, Cumulative CPU 95.24 sec 2015-08-22 02:04:23,767 Stage-1 map = 77%, reduce = 0%, Cumulative CPU 98.66 sec 2015-08-22 02:04:25,816 Stage-1 map = 80%, reduce = 0%, Cumulative CPU 100.67 sec 2015-08-22 02:04:32,983 Stage-1 map = 83%, reduce = 0%, Cumulative CPU 105.77 sec 2015-08-22 02:04:35,030 Stage-1 map = 87%, reduce = 0%, Cumulative CPU 111.15 sec 2015-08-22 02:04:36,053 Stage-1 map = 89%, reduce = 0%, Cumulative CPU 112.55 sec 2015-08-22 02:04:38,098 Stage-1 map = 90%, reduce = 0%, Cumulative CPU 114.18 sec 2015-08-22 02:04:39,126 Stage-1 map = 91%, reduce = 0%, Cumulative CPU 115.09 sec 2015-08-22 02:04:41,172 Stage-1 map = 92%, reduce = 0%, Cumulative CPU 116.04 sec 2015-08-22 02:04:44,241 Stage-1 map = 93%, reduce = 0%, Cumulative CPU 118.38 sec 2015-08-22 02:04:48,335 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 123.33 sec 2015-08-22 02:04:58,575 Stage-1 map = 100%, reduce = 54%, Cumulative CPU 133.13 sec 2015-08-22 02:05:01,651 Stage-1 map = 100%, reduce = 67%, Cumulative CPU 136.25 sec 2015-08-22 02:05:04,724 Stage-1 map = 100%, reduce = 68%, Cumulative CPU 142.03 sec 2015-08-22 02:05:07,800 Stage-1 map = 100%, reduce = 70%, Cumulative CPU 145.62 sec 2015-08-22 02:05:10,879 Stage-1 map = 100%, reduce = 71%, Cumulative CPU 149.2 sec 2015-08-22 02:05:13,956 Stage-1 map = 100%, reduce = 73%, Cumulative CPU 152.76 sec 2015-08-22 02:05:17,032 Stage-1 map = 100%, reduce = 75%, Cumulative CPU 156.3 sec 2015-08-22 02:05:20,102 Stage-1 map = 100%, reduce = 76%, Cumulative CPU 159.73 sec 2015-08-22 02:05:23,175 Stage-1 map = 100%, reduce = 78%, Cumulative CPU 163.05 sec 2015-08-22 02:05:26,248 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 166.11 sec MapReduce Total cumulative CPU time: 2 minutes 46 seconds 110 msec Ended Job = job_1439526184895_19940 MapReduce Jobs Launched: Job 0: Map: 10 Reduce: 1 Cumulative CPU: 166.11 sec HDFS Read: 1207380 HDFS Write: 313612407 SUCCESS Total MapReduce CPU Time Spent: 2 minutes 46 seconds 110 msec OK MapReduce time taken: 170.45 seconds Fetching results... Total CPU Time: 166110 Total Records: 8,807,278 Time taken: 207.325 seconds
実際にRedshiftのデータベースを確認してみましょう。ログインを行い。所定のテーブルが出来ているかを確認。テーブルも新規で作成され、ログで表示されていた件数のデータが格納されている事を確認出来ました。
$ psql -h (ホスト名/Endpoint) -U (ユーザー名) -d (データベース名) -p (接続ポート) Password for user xxxxxxxxxx: (パスワードを入力) psql (9.4.1, server 8.0.2) SSL connection (protocol: TLSv1.2, cipher: ECDHE-RSA-AES256-GCM-SHA384, bits: 256, compression: on) Type "help" for help. # \d public.nasdaq; Table "public.nasdaq" Column | Type | Modifiers --------+------------------+----------- volume | bigint | high | double precision | low | double precision | close | double precision | # SELECT COUNT(*) FROM public.nasdaq; count --------- 8807278 (1 row) # SELECT * FROM public.nasdaq ORDER BY volume DESC LIMIT 10; volume | high | low | close ------------+--------+--------+-------- 7522511800 | 0.0743 | 0.0705 | 0.0715 6935025000 | 0.05 | 0.05 | 0.05 6024738600 | 0.09 | 0.08 | 0.09 5245411600 | 0.0655 | 0.0603 | 0.0604 4890173300 | 0.0754 | 0.0709 | 0.0751 4752863000 | 0.073 | 0.0642 | 0.0727 4374987500 | 0.0685 | 0.0636 | 0.065 4122998500 | 0.04 | 0.04 | 0.04 3799612500 | 0.06 | 0.06 | 0.06 3722479300 | 0.0684 | 0.0656 | 0.0675 (10 rows) #
まとめ
以上、Treasure Dataから管理コンソール上の設定のみでAmazon Redshiftへの投入を行う手順についてのご紹介でした。Treasure DataからAmazon Redshiftへのデータ投入という手順は多くのケースで求められる部分となると思います。そんな手順を管理コンソールの手順のみで設定、管理出来るというのはユーザーにとっては嬉しい部分ですね。こちらからは以上です。