Treasure Dataから日付範囲指定したデータを取得する

2016.10.29

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Treasure Dataを利用していると日々データが蓄積されて行く形となるのですが、定期的にそのデータを取得し、別環境にデータを移動させて行きたいという要望はデータ分析案件に携わっていると多く遭遇します。当エントリではその前段として『送るデータを条件指定して抽出する』ためのクエリの書き方について、通常のRDB(MS)で実施するSQLとは異なるポイントが有りましたのでその辺りをまとめつつ、順を追ってその内容を紹介していきたいと思います。

目次

 

処理フローイメージ図

処理の流れとしてイメージしているのは以下の図です。Webサイト等から適宜データがTreasure Dataに投入されて来ている状況から、日次処理で所定の日付時刻範囲のデータを抽出、その抽出したデータをRedshiftへ送る...というものです。この流れのうち、まずは対象範囲のデータをTreasure Dataにクエリを投げて取得する部分について、どのような形でSQLを実装すれば出来るのかという部分について見て行きたいと思います。

td-to-rs-on-scheduled-query

ちなみにここで想定している抽出クエリは、『日本時間の夜中に、前日分の日付時刻範囲のデータのものを抽出する』というようなものをイメージしています。日次処理で適宜それを動かしてデータ連携をしよう、という感じですね。

 

サンプルデータの用意

まずは『溜まったデータ』を想定した形でTreasure Data環境下にデータを用意したいと思います。Treasure Dataではサンプルデータが既に用意されているものがありますが、日付時刻のデータが古いものとなっていますので、また今回実行したいクエリは『システム日付時刻』を元にした内容となりますので、実際の直近日付時刻を基にしたデータを用意し、Treasure Dataにインポートする形で進めたいと思います。

データの投入にはtd-agent2を用います。以下エントリに倣う形で環境をEC2上に用意します。

Amazon Linux EC2を立ち上げ、ソフトウェアをインストール。

$ ssh -i xxx.pem ec2-user@xxx.xxx.xxx.xxx
$ sudo yum -y update
$ curl -L http://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sudo sh

バージョン及び導入状態を確認します。

$ td --version
0.15.0
$ td
usage: td [options] COMMAND [args]

options:
  -c, --config PATH                path to the configuration file (default: ~/.td/td.conf)
  -k, --apikey KEY                 use this API key instead of reading the config file
  -e, --endpoint API_SERVER        specify the URL for API server to use (default: https://api.treasuredata.com).
                                     The URL must contain a scheme (http:// or https:// prefix) to be valid.
                                     Valid IPv4 addresses are accepted as well in place of the host name.
      --insecure                   Insecure access: disable SSL (enabled by default)
  -v, --verbose                    verbose mode
  -h, --help                       show help
  -r, --retry-post-requests        retry on failed post requests.
                                   Warning: can cause resource duplication, such as duplicated job submissions.
      --version                    show version

Basic commands:

  db             # create/delete/list databases
  table          # create/delete/list/import/export/tail tables
  query          # issue a query
  job            # show/kill/list jobs
  import         # manage bulk import sessions (Java based fast processing)
  bulk_import    # manage bulk import sessions (Old Ruby-based implementation)
  result         # create/delete/list result URLs
  sched          # create/delete/list schedules that run a query periodically
  schema         # create/delete/modify schemas of tables
  connector      # manage connectors
  workflow       # manage workflows

Additional commands:

  status         # show scheds, jobs, tables and results
  apikey         # show/set API key
  server         # show status of the Treasure Data server
  sample         # create a sample log file
  help           # show help messages

Type 'td help COMMAND' for more information on a specific command.
$

Treasure Dataアカウントを用いてログイン。

$ td account
Enter your Treasure Data credentials. For Google SSO user, please see https://docs.treasuredata.com/articles/command-line#google-sso-users
Email: xxxxxxxxx@xxxxxx.xxx
Password (typing will be hidden): 
Authenticated successfully.
Use 'td db:create <db_name>' to create a database.

td table:deleteコマンドを使ってテーブルを作成。DB領域はデフォルトで用意されているsample_db配下としました。applogというテーブルを作成しています。(ここではテーブルの器だけ作成)

$ td table:create sample_db applog
Table 'sample_db.applog' is created.

動作検証用のデータとして、以下の様な形でcsvファイルを作成。都合3日分、『日本時間(タイムゾーン:JST)』で記録されたタイムスタンプのデータ項目(time)を持つデータを用意しました。範囲が分かり易く識別出来るようにseqnoという項目に、時間の昇順で1から採番をしています。

$ vi applog-data.csv 
"time","seqno","user_id","points","description"
"2016-10-27 00:00:00","1","1000",1234,"XXXXX"
"2016-10-27 00:00:01","2","1000",5678,"YYYYY"
"2016-10-27 00:59:59","3","1000",9012,"ZZZZZ"
"2016-10-27 01:00:00","4","1000",3456,"AAAAA"
"2016-10-27 01:00:01","5","1000",7890,"BBBBB"
"2016-10-27 01:59:59","6","1000",8781,"CCCCC"
"2016-10-27 02:00:00","7","1000",9890,"DDDDD"
"2016-10-27 02:00:01","8","1000",10999,"XXXXX"
"2016-10-27 02:59:59","9","1000",12108,"YYYYY"
"2016-10-27 03:00:00","10","1000",13217,"ZZZZZ":
:
:
"2016-10-27 23:00:00","70","1000",79757,"DDDDD"
"2016-10-27 23:00:01","71","1000",80866,"XXXXX"
"2016-10-27 23:59:59","72","1000",81975,"YYYYY"
"2016-10-28 00:00:00","73","1000",83084,"ZZZZZ"
"2016-10-28 00:00:01","74","1000",84193,"AAAAA"
"2016-10-28 00:59:59","75","1000",85302,"BBBBB"
:
:
:
"2016-10-28 23:00:00","142","1000",159605,"YYYYY"
"2016-10-28 23:00:01","143","1000",160714,"ZZZZZ"
"2016-10-28 23:59:59","144","1000",161823,"AAAAA"
"2016-10-29 00:00:00","145","1000",162932,"BBBBB"
"2016-10-29 00:00:01","146","1000",164041,"CCCCC"
"2016-10-29 00:59:59","147","1000",165150,"DDDDD"
:
:
:
"2016-10-29 10:00:00","175","1000",196202,"DDDDD"
"2016-10-29 10:00:01","176","1000",197311,"XXXXX"
"2016-10-29 10:59:59","177","1000",198420,"YYYYY"

td importコマンドを使い、上記作成のCSVデータをテーブルにインポート。autoオプションを使うとデータを良い感じに取り込んでくれます。

$ td import:auto \
>   --format csv --column-header \
>   --time-column date_time \
>   --time-format "%Y-%m-%d %H:%M:%S" \
>   --auto-create sample_db.applog \
>   ./applog-data.csv
Create sample_db_applog_2016_10_28_1477655095088 bulk_import session

Uploading prepared sources
  Session    : sample_db_applog_2016_10_28_1477655095088
  Source     : ./applog-data.csv (8695 bytes)

Converting './applog-data.csv'...
sample row: {"seqno":1,"time":1477526400,"description":"XXXXX","user_id":1000,"points":1234}
Uploading out/applog-data_csv_0.msgpack.gz (1914 bytes)...

Prepare status:
  Elapsed time: 2 sec.
  Source     : ./applog-data.csv
    Status          : SUCCESS
    Read lines      : 178
    Valid rows      : 177
    Invalid rows    : 0
    Converted Files : out/applog-data_csv_0.msgpack.gz (1914 bytes)


Next steps:
  => execute following 'td import:upload' command. if the bulk import session is not created yet, please create it with 'td import:create <session> <database> <table>' command.
     $ td import:upload <session> 'out/applog-data_csv_0.msgpack.gz'


Upload status:
  Elapsed time: 5 sec.
  Source  : out/applog-data_csv_0.msgpack.gz
    Status          : SUCCESS
    Part name       : applog-data_csv_0_msgpack_gz
    Size            : 1914
    Retry count     : 0


Next Steps:
  => execute 'td import:perform sample_db_applog_2016_10_28_1477655095088'.

Freeze bulk import session sample_db_applog_2016_10_28_1477655095088
Perform bulk import session sample_db_applog_2016_10_28_1477655095088
Show status of bulk import session sample_db_applog_2016_10_28_1477655095088
  Performing job ID : 99005881
  Name              : sample_db_applog_2016_10_28_1477655095088
  Status            : performing

Wait sample_db_applog_2016_10_28_1477655095088 bulk import session performing...
Show the result of bulk import session sample_db_applog_2016_10_28_1477655095088
  Performing job ID : 99005881
  Valid parts       : 1
  Error parts       : 0
  Valid records     : 177
  Error records     : 0

Commit sample_db_applog_2016_10_28_1477655095088 bulk import session
Wait sample_db_applog_2016_10_28_1477655095088 bulk import session committing...
Delete bulk_import session sample_db_applog_2016_10_28_1477655095088

ちなみに当初はコンソールから1件ずつ『INSERT INTO テーブル名 VALUES(...)』という感じでデータを投入しようと思いましたがTDにはこの操作を行うIFはどうやら無いようです。

また、データ投入の詳細についてはこのあたりもご参考に。

取り込んだ内容をTreasure DataのWebコンソールで確認してみます。新しいバージョンのコンソールはクエリが終了したらWeb通知もしてくれるので便利ですね。

td-to-rs-query_01

 

日付の範囲を指定してデータを取得するSQLサンプル

という訳で、今回の目的を満たすSQL(Hive SQL)のサンプルが以下の内容となります。幾つかのポイントがありますのでその辺りを以下箇条書きで言及。

SELECT
  TD_TIME_FORMAT(TD_SCHEDULED_TIME(),'yyyy-MM-dd HH:mm:ss','JST') AS current_date_jst, 
  TD_TIME_FORMAT(time,'yyyy-MM-dd HH:mm:ss') AS time_registered, 
  TD_TIME_FORMAT(time,'yyyy-MM-dd') AS date_registered, 
  TD_TIME_FORMAT(TD_TIME_ADD(TD_SCHEDULED_TIME(), "-1d"),'yyyy-MM-dd','JST') AS jst_yesterday, 
  TD_TIME_FORMAT(TD_SCHEDULED_TIME(),'yyyy-MM-dd','JST') AS jst_today, 
  '**',
  seqno,
  user_id,
  points,
  description
FROM
  applog
WHERE
  TD_TIME_RANGE(
    time,
    TD_TIME_FORMAT(TD_TIME_ADD(TD_SCHEDULED_TIME(), "-1d"),'yyyy-MM-dd','JST'),
    TD_TIME_FORMAT(TD_SCHEDULED_TIME(),'yyyy-MM-dd','JST')
  )
ORDER BY
  seqno;

クエリ実行。新しい管理コンソールでは各種実行時のパラメータが指定出来るようです。実行時のタイムゾーンは東京、実行時間は2016/10/28 23:20としました。この内容であれば、2016/10/27 00:00:00〜23:59:59のデータが想定される取得対象のデータとなります。

td-to-rs-query_04

実行結果。実行日2016/10/28、前日が2016/10/27となり、範囲指定で2016/10/27の00:00:00のデータから、

td-to-rs-query_02

2016/10/17 23:59:59のデータが取得出来ている事を確認出来ました。

td-to-rs-query_03

 

まとめ

という訳で、Treasure DataでHive SQLを使ってデータ抽出を行う際の日付範囲指定のSQLの書き方に関するまとめでした。作成したクエリを別途DBに送る・その送る作業をスケジューリングする方法については別のエントリでご紹介したいと思います。こちらからは以上です。