この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Treasure Dataを利用していると日々データが蓄積されて行く形となるのですが、定期的にそのデータを取得し、別環境にデータを移動させて行きたいという要望はデータ分析案件に携わっていると多く遭遇します。当エントリではその前段として『送るデータを条件指定して抽出する』ためのクエリの書き方について、通常のRDB(MS)で実施するSQLとは異なるポイントが有りましたのでその辺りをまとめつつ、順を追ってその内容を紹介していきたいと思います。
目次
処理フローイメージ図
処理の流れとしてイメージしているのは以下の図です。Webサイト等から適宜データがTreasure Dataに投入されて来ている状況から、日次処理で所定の日付時刻範囲のデータを抽出、その抽出したデータをRedshiftへ送る...というものです。この流れのうち、まずは対象範囲のデータをTreasure Dataにクエリを投げて取得する部分について、どのような形でSQLを実装すれば出来るのかという部分について見て行きたいと思います。
ちなみにここで想定している抽出クエリは、『日本時間の夜中に、前日分の日付時刻範囲のデータのものを抽出する』というようなものをイメージしています。日次処理で適宜それを動かしてデータ連携をしよう、という感じですね。
サンプルデータの用意
まずは『溜まったデータ』を想定した形でTreasure Data環境下にデータを用意したいと思います。Treasure Dataではサンプルデータが既に用意されているものがありますが、日付時刻のデータが古いものとなっていますので、また今回実行したいクエリは『システム日付時刻』を元にした内容となりますので、実際の直近日付時刻を基にしたデータを用意し、Treasure Dataにインポートする形で進めたいと思います。
データの投入にはtd-agent2を用います。以下エントリに倣う形で環境をEC2上に用意します。
Amazon Linux EC2を立ち上げ、ソフトウェアをインストール。
$ ssh -i xxx.pem ec2-user@xxx.xxx.xxx.xxx
$ sudo yum -y update
$ curl -L http://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sudo sh
バージョン及び導入状態を確認します。
$ td --version
0.15.0
$ td
usage: td [options] COMMAND [args]
options:
-c, --config PATH path to the configuration file (default: ~/.td/td.conf)
-k, --apikey KEY use this API key instead of reading the config file
-e, --endpoint API_SERVER specify the URL for API server to use (default: https://api.treasuredata.com).
The URL must contain a scheme (http:// or https:// prefix) to be valid.
Valid IPv4 addresses are accepted as well in place of the host name.
--insecure Insecure access: disable SSL (enabled by default)
-v, --verbose verbose mode
-h, --help show help
-r, --retry-post-requests retry on failed post requests.
Warning: can cause resource duplication, such as duplicated job submissions.
--version show version
Basic commands:
db # create/delete/list databases
table # create/delete/list/import/export/tail tables
query # issue a query
job # show/kill/list jobs
import # manage bulk import sessions (Java based fast processing)
bulk_import # manage bulk import sessions (Old Ruby-based implementation)
result # create/delete/list result URLs
sched # create/delete/list schedules that run a query periodically
schema # create/delete/modify schemas of tables
connector # manage connectors
workflow # manage workflows
Additional commands:
status # show scheds, jobs, tables and results
apikey # show/set API key
server # show status of the Treasure Data server
sample # create a sample log file
help # show help messages
Type 'td help COMMAND' for more information on a specific command.
$
Treasure Dataアカウントを用いてログイン。
$ td account
Enter your Treasure Data credentials. For Google SSO user, please see https://docs.treasuredata.com/articles/command-line#google-sso-users
Email: xxxxxxxxx@xxxxxx.xxx
Password (typing will be hidden):
Authenticated successfully.
Use 'td db:create <db_name>' to create a database.
td table:deleteコマンドを使ってテーブルを作成。DB領域はデフォルトで用意されているsample_db配下としました。applogというテーブルを作成しています。(ここではテーブルの器だけ作成)
$ td table:create sample_db applog
Table 'sample_db.applog' is created.
動作検証用のデータとして、以下の様な形でcsvファイルを作成。都合3日分、『日本時間(タイムゾーン:JST)』で記録されたタイムスタンプのデータ項目(time)を持つデータを用意しました。範囲が分かり易く識別出来るようにseqnoという項目に、時間の昇順で1から採番をしています。
$ vi applog-data.csv
"time","seqno","user_id","points","description"
"2016-10-27 00:00:00","1","1000",1234,"XXXXX"
"2016-10-27 00:00:01","2","1000",5678,"YYYYY"
"2016-10-27 00:59:59","3","1000",9012,"ZZZZZ"
"2016-10-27 01:00:00","4","1000",3456,"AAAAA"
"2016-10-27 01:00:01","5","1000",7890,"BBBBB"
"2016-10-27 01:59:59","6","1000",8781,"CCCCC"
"2016-10-27 02:00:00","7","1000",9890,"DDDDD"
"2016-10-27 02:00:01","8","1000",10999,"XXXXX"
"2016-10-27 02:59:59","9","1000",12108,"YYYYY"
"2016-10-27 03:00:00","10","1000",13217,"ZZZZZ":
:
:
"2016-10-27 23:00:00","70","1000",79757,"DDDDD"
"2016-10-27 23:00:01","71","1000",80866,"XXXXX"
"2016-10-27 23:59:59","72","1000",81975,"YYYYY"
"2016-10-28 00:00:00","73","1000",83084,"ZZZZZ"
"2016-10-28 00:00:01","74","1000",84193,"AAAAA"
"2016-10-28 00:59:59","75","1000",85302,"BBBBB"
:
:
:
"2016-10-28 23:00:00","142","1000",159605,"YYYYY"
"2016-10-28 23:00:01","143","1000",160714,"ZZZZZ"
"2016-10-28 23:59:59","144","1000",161823,"AAAAA"
"2016-10-29 00:00:00","145","1000",162932,"BBBBB"
"2016-10-29 00:00:01","146","1000",164041,"CCCCC"
"2016-10-29 00:59:59","147","1000",165150,"DDDDD"
:
:
:
"2016-10-29 10:00:00","175","1000",196202,"DDDDD"
"2016-10-29 10:00:01","176","1000",197311,"XXXXX"
"2016-10-29 10:59:59","177","1000",198420,"YYYYY"
td importコマンドを使い、上記作成のCSVデータをテーブルにインポート。autoオプションを使うとデータを良い感じに取り込んでくれます。
$ td import:auto \
> --format csv --column-header \
> --time-column date_time \
> --time-format "%Y-%m-%d %H:%M:%S" \
> --auto-create sample_db.applog \
> ./applog-data.csv
Create sample_db_applog_2016_10_28_1477655095088 bulk_import session
Uploading prepared sources
Session : sample_db_applog_2016_10_28_1477655095088
Source : ./applog-data.csv (8695 bytes)
Converting './applog-data.csv'...
sample row: {"seqno":1,"time":1477526400,"description":"XXXXX","user_id":1000,"points":1234}
Uploading out/applog-data_csv_0.msgpack.gz (1914 bytes)...
Prepare status:
Elapsed time: 2 sec.
Source : ./applog-data.csv
Status : SUCCESS
Read lines : 178
Valid rows : 177
Invalid rows : 0
Converted Files : out/applog-data_csv_0.msgpack.gz (1914 bytes)
Next steps:
=> execute following 'td import:upload' command. if the bulk import session is not created yet, please create it with 'td import:create <session> <database> <table>' command.
$ td import:upload <session> 'out/applog-data_csv_0.msgpack.gz'
Upload status:
Elapsed time: 5 sec.
Source : out/applog-data_csv_0.msgpack.gz
Status : SUCCESS
Part name : applog-data_csv_0_msgpack_gz
Size : 1914
Retry count : 0
Next Steps:
=> execute 'td import:perform sample_db_applog_2016_10_28_1477655095088'.
Freeze bulk import session sample_db_applog_2016_10_28_1477655095088
Perform bulk import session sample_db_applog_2016_10_28_1477655095088
Show status of bulk import session sample_db_applog_2016_10_28_1477655095088
Performing job ID : 99005881
Name : sample_db_applog_2016_10_28_1477655095088
Status : performing
Wait sample_db_applog_2016_10_28_1477655095088 bulk import session performing...
Show the result of bulk import session sample_db_applog_2016_10_28_1477655095088
Performing job ID : 99005881
Valid parts : 1
Error parts : 0
Valid records : 177
Error records : 0
Commit sample_db_applog_2016_10_28_1477655095088 bulk import session
Wait sample_db_applog_2016_10_28_1477655095088 bulk import session committing...
Delete bulk_import session sample_db_applog_2016_10_28_1477655095088
ちなみに当初はコンソールから1件ずつ『INSERT INTO テーブル名 VALUES(...)』という感じでデータを投入しようと思いましたがTDにはこの操作を行うIFはどうやら無いようです。
また、データ投入の詳細についてはこのあたりもご参考に。
取り込んだ内容をTreasure DataのWebコンソールで確認してみます。新しいバージョンのコンソールはクエリが終了したらWeb通知もしてくれるので便利ですね。
日付の範囲を指定してデータを取得するSQLサンプル
という訳で、今回の目的を満たすSQL(Hive SQL)のサンプルが以下の内容となります。幾つかのポイントがありますのでその辺りを以下箇条書きで言及。
- Treasure Dataでは『日付型』というものが無い。仕様では用意されているデータ型はint/long/double/float/string/array
の計6つのみ。値はint型のunix timestamp値として登録されており、日付やタイムスタンプのデータとして扱うには専用の関数を用いる必要がある。 - Treasure Dataに於ける日付時刻周りの関数についてはこのあたりを参照。
- Supported time formats in TD_TIME_FORMAT UDF | Treasure Data
- TreasureDataでHiveクエリを書くとき - TD_TIME_FORMAT,TD_TIME_PARSE,TD_DATE_TRUNC - Qiita
- Supported Hive UDFs (User Defined Functions) | Treasure Data
- トレジャーデータUDFで効率の良いクエリを実現する - Date Function 編 - - トレジャーデータ(Treasure Data)公式ブログ
- Prestoでの日付の扱い方 - データ解析備忘録
- 今回扱うデータは日本(タイムゾーン:JST)、処理の時刻も日本(タイムゾーン:JST)という事で全ての時制をタイムゾーン:JSTに統一。timeの値は日本時間のものなのでフォーマット関数:TD_TIME_FORMATの実行時にはタイムゾーンのオプション無し、それ以外のものについてはデフォルトUTC扱いなのでタイムゾーン指定を行って値を調整。(※参照:TD_TIME_FORMAT)
- 日付範囲の絞り込み条件(下記SQLの14〜19行目)については関数:TD_TIME_RANGEを利用。こちらはHive Performance Tuningにも記載されている、推奨の利用方法となる様です。今回は"前日"のデータという事で、FROMに前日、TOにも前日の日付入れれば適用されると思いきや対象件数ゼロ。FROM:前日、TO:当日で意図した結果を得る事が出来ました。という事なので、TOについてはTOの日付(条件)を含まない、という仕様と理解。(※参照:TD_TIME_RANGE)
- 『前日』を求める際には関数:TD_TIME_ADDを利用し、引数にマイナス値を入れる事で過去日を指定。(※参照:TD_TIME_ADD)
SELECT
TD_TIME_FORMAT(TD_SCHEDULED_TIME(),'yyyy-MM-dd HH:mm:ss','JST') AS current_date_jst,
TD_TIME_FORMAT(time,'yyyy-MM-dd HH:mm:ss') AS time_registered,
TD_TIME_FORMAT(time,'yyyy-MM-dd') AS date_registered,
TD_TIME_FORMAT(TD_TIME_ADD(TD_SCHEDULED_TIME(), "-1d"),'yyyy-MM-dd','JST') AS jst_yesterday,
TD_TIME_FORMAT(TD_SCHEDULED_TIME(),'yyyy-MM-dd','JST') AS jst_today,
'**',
seqno,
user_id,
points,
description
FROM
applog
WHERE
TD_TIME_RANGE(
time,
TD_TIME_FORMAT(TD_TIME_ADD(TD_SCHEDULED_TIME(), "-1d"),'yyyy-MM-dd','JST'),
TD_TIME_FORMAT(TD_SCHEDULED_TIME(),'yyyy-MM-dd','JST')
)
ORDER BY
seqno;
クエリ実行。新しい管理コンソールでは各種実行時のパラメータが指定出来るようです。実行時のタイムゾーンは東京、実行時間は2016/10/28 23:20としました。この内容であれば、2016/10/27 00:00:00〜23:59:59のデータが想定される取得対象のデータとなります。
実行結果。実行日2016/10/28、前日が2016/10/27となり、範囲指定で2016/10/27の00:00:00のデータから、
2016/10/17 23:59:59のデータが取得出来ている事を確認出来ました。
まとめ
という訳で、Treasure DataでHive SQLを使ってデータ抽出を行う際の日付範囲指定のSQLの書き方に関するまとめでした。作成したクエリを別途DBに送る・その送る作業をスケジューリングする方法については別のエントリでご紹介したいと思います。こちらからは以上です。