ちょっと話題の記事

Amazon LinuxからTreasure Dataを使ってみる

Fluentdが良さげなのでTreasure Dataも使ってみる

td-000

Treasure Dataは、Big Data as a Serviceを提供しています。あらゆるログを転送する仕組みがFluentdでしたが、Treasure Dataは生成され転送されるログを溜め込んで価値ある情報として加工して提供しましょうと感じです。ある程度までは無料で使えるようですので早速使ってみましょう。

アカウントを作成する

クラウド型のサービスを利用しますのでアカウントを取得しましょう。

td-001

セットアップする

td-002

いつもの流れということで、まずはAmazon Linuxを立ち上げてください。そして以下のコマンドへと続きます。

td-003

コマンド1発でインストールが終わりそうな感じですが、Amazon Linuxに足りないものを入れておきましょう。

$ sudo yum update -y
$ sudo yum install rubygems ruby-devel gcc make -y
$ sudo gem install td iconv json

さくっと終わりましたね!

コマンド

どんなコマンドが使えるかヘルプで見てみます。

$ td
usage: td [options] COMMAND [args]

options:
  -c, --config PATH                path to config file (~/.td/td.conf)
  -k, --apikey KEY                 use this API key instead of reading the config file
  -v, --verbose                    verbose mode
  -h, --help                       show help

Basic commands:

  db             # create/delete/list databases
  table          # create/delete/list/import/export/tail tables
  query          # issue a query
  job            # show/kill/list jobs
  bulk_import    # manage bulk import sessions
  result         # create/delete/list result URLs

Additional commands:

  sched          # create/delete/list schedules that run a query periodically
  schema         # create/delete/modify schemas of tables
  status         # show scheds, jobs, tables and results
  apikey         # show/set API key
  server         # show status of the Treasure Data server
  help           # show help messages

Type 'td help COMMAND' for more information on a specific command.

さらに詳細なのはこちら

$ td help:all

  db:list                                    # Show list of tables in a database
  db:show <db>                               # Describe information of a database
  db:create <db>                             # Create a database
  db:delete <db>                             # Delete a database

  table:list [db]                            # Show list of tables
  table:show <db> <table>                    # Describe information of a table
  table:create <db> <table>                  # Create a table
  table:delete <db> <table>                  # Delete a table
  table:import <db> <table> <files...>       # Parse and import files to a table
  table:export <db> <table>                  # Dump logs in a table to the specified storage
  table:swap <db> <table1> <table2>          # Swap names of two tables
  table:tail <db> <table>                    # Get recently imported logs
  table:partial_delete <db> <table>          # Delete logs from the table within the specified time range

  bulk_import:list                           # List bulk import sessions
  bulk_import:show <name>                    # Show list of uploaded parts
  bulk_import:create <name> <db> <table>     # Create a new bulk import session to the the table
  bulk_import:prepare_parts <files...>       # Convert files into part file format
  bulk_import:prepare_parts2 <files...>      # Convert files into part file format
  bulk_import:upload_part <name> <id> <path.msgpack.gz>   # Upload or re-upload a file into a bulk import session
  bulk_import:upload_parts <name> <files...>   # Upload or re-upload files into a bulk import session
  bulk_import:delete_part <name> <id>        # Delete a uploaded file from a bulk import session
  bulk_import:delete_parts <name> <ids...>   # Delete uploaded files from a bulk import session
  bulk_import:perform <name>                 # Start to validate and convert uploaded files
  bulk_import:error_records <name>           # Show records which did not pass validations
  bulk_import:commit <name>                  # Start to commit a performed bulk import session
  bulk_import:delete <name>                  # Delete a bulk import session
  bulk_import:freeze <name>                  # Reject succeeding uploadings to a bulk import session
  bulk_import:unfreeze <name>                # Unfreeze a frozen bulk import session

  result:list                                # Show list of result URLs
  result:show <name>                         # Describe information of a result URL
  result:create <name> <URL>                 # Create a result URL
  result:delete <name>                       # Delete a result URL

  status                                     # Show schedules, jobs, tables and results

  schema:show <db> <table>                   # Show schema of a table
  schema:set <db> <table> [columns...]       # Set new schema on a table
  schema:add <db> <table> <columns...>       # Add new columns to a table
  schema:remove <db> <table> <columns...>    # Remove columns from a table

  sched:list                                 # Show list of schedules
  sched:create <name> <cron> <sql>           # Create a schedule
  sched:delete <name>                        # Delete a schedule
  sched:update <name>                        # Modify a schedule
  sched:history <name> [max]                 # Show history of scheduled queries
  sched:run <name> <time>                    # Run scheduled queries for the specified time

  query <sql>                                # Issue a query

  job:show <job_id>                          # Show status and result of a job
  job:list [max]                             # Show list of jobs
  job:kill <job_id>                          # Kill or cancel a job

  password:change                            # Change password

  apikey:show                                # Show Treasure Data API key
  apikey:set <apikey>                        # Set Treasure Data API key

  user:list                                  # Show list of users
  user:show <name>                           # Show an user
  user:create <name>                         # Create an user
  user:delete <name>                         # Delete an user
  user:apikey:list <name>                    # Show API keys
  user:password:change <name>                # Change password

  role:list                                  # Show list of roles
  role:show <name>                           # Show a role
  role:create <name>                         # Create a role
  role:delete <name>                         # Delete a role
  role:grant <name> <user>                   # Grant role to an user
  role:revoke <name> <user>                  # Revoke role from an user

  org:list                                   # Show list of organizations
  org:show <name>                            # Show an organizations
  org:create <name>                          # Create an organizations
  org:delete <name>                          # Delete an organizations

  acl:list                                   # Show list of access controls
  acl:grant <subject> <action> <scope>       # Grant an access control
  acl:revoke <subject> <action> <scope>      # Revoke an access control

  server:status                              # Show status of the Treasure Data server

  sample:apache <path.json>                  # Create a sample log file

Type 'td help COMMAND' for more information on a specific command.

データベースの作成

td-004

続いて接続確認とデータベースの作成です。次の操作を教えてくれるのは心遣いですね。

$ td account -f
Enter your Treasure Data credentials.
Email: hoge.hoge.hoge@classmethod.jp
Password (typing will be hidden): 
Authenticated successfully.
Use 'td db:create <db_name>' to create a database.

データベースの作成コマンドです。

td-005

$ td db:create testdb
Database 'testdb' is created.
Use 'td table:create testdb <table_name>' to create a table.

テーブルの作成

ナビゲーションに沿ってテーブルを作成します。

$ td table:create testdb www_access
Table 'testdb.www_access' is created.

サンプルデータの生成と挿入

テーブルを作成したのでFluentdからデータを転送しても良いのですが取り急ぎ動作確認するためにサンプルデータを入れています。

$ td sample:apache apache.json
Create apache.json with 5000 records whose time is
from Sun Jan 20 23:50:16 +0000 2013 to Mon Jan 21 17:42:46 +0000 2013.
Use 'td table:import <db> <table> --json apache.json' to import this file.
$ td table:import testdb www_access --json apache.json
importing apache.json...
  uploading 117247 bytes...
  imported 5000 entries from apache.json.
done.

テーブル一覧の表示

作成したテーブルを見てみましょう。

$ td table:list
+----------+------------+------+-------+--------+--------------------------------+--------+
| Database | Table      | Type | Count | Size   | Last import                    | Schema |
+----------+------------+------+-------+--------+--------------------------------+--------+
| testdb   | www_access | log  | 5000  | 0.0 GB | Mon Jan 21 17:43:03 +0000 2013 |        |
+----------+------------+------+-------+--------+--------------------------------+--------+
1 row in set

クエリーを発行する

今回のサンプルはレコード数が少ないですが、実際のビジネスでは数億とかいう規模のデータをドカーンと入れて、Hadoop使って解析してくれるわけですね。んで、この問合せ方法としてSQLライクに表記できるHiveを使うことができるようです。最新の技術に対して使いなれた構文でアクセスできるのは嬉しいですね。

件数の取得

まずはレコードの件数を出してみましょう。

$ td query -w -d testdb "SELECT count(*) FROM www_access"
Job 1568184 is queued.
Use 'td job:show 1568184' to show the status.
queued...
  started at 2013-01-22T16:51:19Z
  Hive history file=/tmp/1461/hive_job_log__91678880.txt
  Total MapReduce jobs = 1
  Launching Job 1 out of 1
  Number of reduce tasks determined at compile time: 1
  In order to change the average load for a reducer (in bytes):
    set hive.exec.reducers.bytes.per.reducer=<number>
  In order to limit the maximum number of reducers:
    set hive.exec.reducers.max=<number>
  In order to set a constant number of reducers:
    set mapred.reduce.tasks=<number>
  Starting Job = job_201301150013_26879, Tracking URL = http://ip-10-8-189-47.ec2.internal:50030/jobdetails.jsp?jobid=job_201301150013_26879
  Kill Command = /usr/lib/hadoop/bin/hadoop job  -Dmapred.job.tracker=10.8.189.47:8021 -kill job_201301150013_26879
  2013-01-22 16:51:32,501 Stage-1 map = 0%,  reduce = 0%
  2013-01-22 16:51:36,561 Stage-1 map = 100%,  reduce = 0%
  finished at 2013-01-22T16:51:48Z
  2013-01-22 16:51:45,650 Stage-1 map = 100%,  reduce = 100%
  Ended Job = job_201301150013_26879
  OK
  MapReduce time taken: 22.455 seconds
  Time taken: 22.598 seconds
Status     : success
Result     :
+------+
| _c0  |
+------+
| 5000 |
+------+
1 row in set

条件を入れて検索したいところですが、どんなカラムがあるか分からないので1件だけデータを取得してみます。

$ td query -w -d testdb "SELECT * FROM www_access limit 1"
Job 1568185 is queued.
Use 'td job:show 1568185' to show the status.
queued...
  started at 2013-01-22T16:53:54Z
  Hive history file=/tmp/1461/hive_job_log__1446690009.txt
  finished at 2013-01-22T16:54:03Z
  OK
  MapReduce time taken: 0.189 seconds
  Time taken: 1.841 seconds
Status     : success
Result     :
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
| v                                                                                                                                                                                                                                                                                                                                                                       | time       |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
| {"path":"/category/electronics","host":"176.72.215.97","method":"GET","user":"-","agent":"Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; YTB730; GTB7.2; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET4.0C; .NET4.0E; Media Center PC 6.0)","referer":"/item/books/919","code":"200","time":"1358787594","size":"76"} | 1358787594 |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
1 row in set

条件を付けて検索

カラム名が分かったので条件を付けてみたいと思います。agentにMacintoshが含まれるレコードの件数です。全文検索っすね!

$ td query -w -d testdb "SELECT count(*) FROM www_access where v['agent'] like '%Macintosh%'"
Job 1568190 is queued.
Use 'td job:show 1568190' to show the status.
queued...
  started at 2013-01-22T16:58:02Z
  Hive history file=/tmp/1461/hive_job_log__1636264800.txt
  Total MapReduce jobs = 1
  Launching Job 1 out of 1
  Number of reduce tasks determined at compile time: 1
  In order to change the average load for a reducer (in bytes):
    set hive.exec.reducers.bytes.per.reducer=<number>
  In order to limit the maximum number of reducers:
    set hive.exec.reducers.max=<number>
  In order to set a constant number of reducers:
    set mapred.reduce.tasks=<number>
  Starting Job = job_201301150013_26886, Tracking URL = http://ip-10-8-189-47.ec2.internal:50030/jobdetails.jsp?jobid=job_201301150013_26886
  Kill Command = /usr/lib/hadoop/bin/hadoop job  -Dmapred.job.tracker=10.8.189.47:8021 -kill job_201301150013_26886
  2013-01-22 16:58:15,145 Stage-1 map = 0%,  reduce = 0%
  2013-01-22 16:58:20,202 Stage-1 map = 100%,  reduce = 0%
  2013-01-22 16:58:29,299 Stage-1 map = 100%,  reduce = 100%
  Ended Job = job_201301150013_26886
  OK
  MapReduce time taken: 22.395 seconds
  Time taken: 22.539 seconds
  finished at 2013-01-22T16:58:32Z
Status     : success
Result     :
+-----+
| _c0 |
+-----+
| 445 |
+-----+
1 row in set

GROUP BYとORDER BYとLIMIT

SQLといえばグループ化と順序付けでしょう。ということで使ってみます。agentでグループ化してTOP3を表示しています。

$ td query -w -d testdb "SELECT v['agent'] as agent , count(*) as count FROM www_access group by v['agent'] order by count desc limit 3"
Job 1568253 is queued.
Use 'td job:show 1568253' to show the status.
queued...
  started at 2013-01-22T17:01:12Z
  Hive history file=/tmp/1461/hive_job_log__1562578541.txt
  Total MapReduce jobs = 2
  Launching Job 1 out of 2
  Number of reduce tasks not specified. Defaulting to jobconf value of: 12
  In order to change the average load for a reducer (in bytes):
    set hive.exec.reducers.bytes.per.reducer=<number>
  In order to limit the maximum number of reducers:
    set hive.exec.reducers.max=<number>
  In order to set a constant number of reducers:
    set mapred.reduce.tasks=<number>
  Starting Job = job_201301150013_26899, Tracking URL = http://ip-10-8-189-47.ec2.internal:50030/jobdetails.jsp?jobid=job_201301150013_26899
  Kill Command = /usr/lib/hadoop/bin/hadoop job  -Dmapred.job.tracker=10.8.189.47:8021 -kill job_201301150013_26899
  2013-01-22 17:01:29,672 Stage-1 map = 0%,  reduce = 0%
  2013-01-22 17:01:35,742 Stage-1 map = 100%,  reduce = 0%
  2013-01-22 17:01:45,845 Stage-1 map = 100%,  reduce = 8%
  2013-01-22 17:01:53,920 Stage-1 map = 100%,  reduce = 17%
  2013-01-22 17:02:02,025 Stage-1 map = 100%,  reduce = 25%
  2013-01-22 17:02:10,120 Stage-1 map = 100%,  reduce = 33%
  2013-01-22 17:02:18,195 Stage-1 map = 100%,  reduce = 42%
  2013-01-22 17:02:25,260 Stage-1 map = 100%,  reduce = 50%
  2013-01-22 17:02:33,375 Stage-1 map = 100%,  reduce = 58%
  2013-01-22 17:02:41,450 Stage-1 map = 100%,  reduce = 67%
  2013-01-22 17:02:49,526 Stage-1 map = 100%,  reduce = 75%
  2013-01-22 17:02:57,601 Stage-1 map = 100%,  reduce = 83%
  2013-01-22 17:03:04,667 Stage-1 map = 100%,  reduce = 92%
  2013-01-22 17:03:12,742 Stage-1 map = 100%,  reduce = 100%
  Ended Job = job_201301150013_26899
  Launching Job 2 out of 2
  Number of reduce tasks determined at compile time: 1
  In order to change the average load for a reducer (in bytes):
    set hive.exec.reducers.bytes.per.reducer=<number>
  In order to limit the maximum number of reducers:
    set hive.exec.reducers.max=<number>
  In order to set a constant number of reducers:
    set mapred.reduce.tasks=<number>
  Starting Job = job_201301150013_26904, Tracking URL = http://ip-10-8-189-47.ec2.internal:50030/jobdetails.jsp?jobid=job_201301150013_26904
  Kill Command = /usr/lib/hadoop/bin/hadoop job  -Dmapred.job.tracker=10.8.189.47:8021 -kill job_201301150013_26904
  2013-01-22 17:03:20,258 Stage-2 map = 0%,  reduce = 0%
  2013-01-22 17:03:23,285 Stage-2 map = 100%,  reduce = 0%
  2013-01-22 17:03:32,429 Stage-2 map = 100%,  reduce = 100%
  Ended Job = job_201301150013_26904
  finished at 2013-01-22T17:03:35Z
  OK
  MapReduce time taken: 133.066 seconds
  Time taken: 133.249 seconds
Status     : success
Result     :
+-------------------------------------------------------------------------------------+-------+
| agent                                                                               | count |
+-------------------------------------------------------------------------------------+-------+
| Mozilla/5.0 (Windows NT 6.0; rv:10.0.1) Gecko/20100101 Firefox/10.0.1               | 630   |
| Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)              | 497   |
| Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:9.0.1) Gecko/20100101 Firefox/9.0.1 | 445   |
+-------------------------------------------------------------------------------------+-------+
3 rows in set

FluentdからTreasure Dataにデータを送る

基本的な使い方が分かりましたので、Fluentdからデータを送り込んでみましょう!Fluentdのセットアップは前回やりましたので割愛しますね。FluentdからTreasure Dataにデータを送るためにはAPIKEYが必要になりますので表示してみます。

$ td apikey
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

次にプラグインをインストールします。

$ sudo yum install make gcc -y
$ sudo /usr/lib64/fluent/ruby/bin/fluent-gem install fluent-plugin-td

プラグインに会わせてtd-agentの設定ファイルを記述したいと思います。

$ cd /etc/td-agent
$ sudo vi td-agent.conf

発生したApacheのログをS3とMongoとTreasure Dataに転送しています。

<source>
  type forward
  port 22222
</source>

<source>
  type tail
  path /var/log/httpd/access_log
  format apache
  tag td.testdb.www_access
</source>

<match *.**>
  type copy
  <store>
    type tdlog
    apikey XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    auto_create_table
    buffer_type file
    buffer_path /var/log/td-agent/buffer/td
    use_ssl true
    flush_interval 10s
  </store>
  <store>
    type mongo
    database httpd
    collection accesslog
    host localhost
    port 27017
    flush_interval 10s
  </store>
  <store>
    type forest
    subtype s3
    <template>
      s3_bucket akari-ec2-log
      s3_endpoint s3-ap-northeast-1.amazonaws.com
      path ${tag}/
      buffer_path /var/log/td-agent/buffer/${tag}
      time_slice_format %Y/%m/%d/ec2-%Y-%m-%d-%H
      flush_interval 1m
    </template>
  </store>
</match>

tdコマンドでデータが登録されているか見てみましょう。

$ td table:list
+----------+------------+------+-------+--------+--------------------------------+--------+
| Database | Table      | Type | Count | Size   | Last import                    | Schema |
+----------+------------+------+-------+--------+--------------------------------+--------+
| fluent   | info       | log  | 11    | 0.0 GB | Tue Jan 22 18:25:01 +0000 2013 |        |
| testdb   | www_access | log  | 5042  | 0.0 GB | Tue Jan 22 18:25:04 +0000 2013 |        |
+----------+------------+------+-------+--------+--------------------------------+--------+
2 rows in set

確かにカウントが増えていますね。td-agentからTreasureDataにデータを送る際に注意する点ですが、ログに付けているタグ名からデータベース名やテーブル名を判別しています。td.データベース名.テーブル名になるようにタグ付けしてくださいね。リクエストとしてはタグ付けでインスタンスIDやIPアドレスを入れたいこともあるので、データベース名やテーブル名はタグ以外で指定できるほうがうれしいかな。ログがどのインスタンスで発生したのか何かしら知りたい時ってないかなぁ。仮想サーバーは使い捨てと思えば不要かなぁ。

まとめ

自前でHadoop環境作ってデータ解析するのは正直面倒だなぁと思っている方は、FluentdとTreasureDataを組み合わせてお気楽ビックデータ解析をしてみましょう。今回の収穫は、Hiveを使ってビックデータを簡単に問合せ・集計できたことでした。特定カラムの部分一致をカウントしてランキングする等、実務でも十分活用できそうです!今日から君もビックデータの魔法使いになろう!

参考資料

Treasure Data

Continuous Data Import from Fluentd

Analyzing Apache Logs on the Cloud