この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Fluentdが良さげなのでTreasure Dataも使ってみる
Treasure Dataは、Big Data as a Serviceを提供しています。あらゆるログを転送する仕組みがFluentdでしたが、Treasure Dataは生成され転送されるログを溜め込んで価値ある情報として加工して提供しましょうと感じです。ある程度までは無料で使えるようですので早速使ってみましょう。
アカウントを作成する
クラウド型のサービスを利用しますのでアカウントを取得しましょう。
セットアップする
いつもの流れということで、まずはAmazon Linuxを立ち上げてください。そして以下のコマンドへと続きます。
コマンド1発でインストールが終わりそうな感じですが、Amazon Linuxに足りないものを入れておきましょう。
$ sudo yum update -y
$ sudo yum install rubygems ruby-devel gcc make -y
$ sudo gem install td iconv json
さくっと終わりましたね!
コマンド
どんなコマンドが使えるかヘルプで見てみます。
$ td
usage: td [options] COMMAND [args]
options:
-c, --config PATH path to config file (~/.td/td.conf)
-k, --apikey KEY use this API key instead of reading the config file
-v, --verbose verbose mode
-h, --help show help
Basic commands:
db # create/delete/list databases
table # create/delete/list/import/export/tail tables
query # issue a query
job # show/kill/list jobs
bulk_import # manage bulk import sessions
result # create/delete/list result URLs
Additional commands:
sched # create/delete/list schedules that run a query periodically
schema # create/delete/modify schemas of tables
status # show scheds, jobs, tables and results
apikey # show/set API key
server # show status of the Treasure Data server
help # show help messages
Type 'td help COMMAND' for more information on a specific command.
さらに詳細なのはこちら
$ td help:all
db:list # Show list of tables in a database
db:show <db> # Describe information of a database
db:create <db> # Create a database
db:delete <db> # Delete a database
table:list [db] # Show list of tables
table:show <db> <table> # Describe information of a table
table:create <db> <table> # Create a table
table:delete <db> <table> # Delete a table
table:import <db> <table> <files...> # Parse and import files to a table
table:export <db> <table> # Dump logs in a table to the specified storage
table:swap <db> <table1> <table2> # Swap names of two tables
table:tail <db> <table> # Get recently imported logs
table:partial_delete <db> <table> # Delete logs from the table within the specified time range
bulk_import:list # List bulk import sessions
bulk_import:show <name> # Show list of uploaded parts
bulk_import:create <name> <db> <table> # Create a new bulk import session to the the table
bulk_import:prepare_parts <files...> # Convert files into part file format
bulk_import:prepare_parts2 <files...> # Convert files into part file format
bulk_import:upload_part <name> <id> <path.msgpack.gz> # Upload or re-upload a file into a bulk import session
bulk_import:upload_parts <name> <files...> # Upload or re-upload files into a bulk import session
bulk_import:delete_part <name> <id> # Delete a uploaded file from a bulk import session
bulk_import:delete_parts <name> <ids...> # Delete uploaded files from a bulk import session
bulk_import:perform <name> # Start to validate and convert uploaded files
bulk_import:error_records <name> # Show records which did not pass validations
bulk_import:commit <name> # Start to commit a performed bulk import session
bulk_import:delete <name> # Delete a bulk import session
bulk_import:freeze <name> # Reject succeeding uploadings to a bulk import session
bulk_import:unfreeze <name> # Unfreeze a frozen bulk import session
result:list # Show list of result URLs
result:show <name> # Describe information of a result URL
result:create <name> <URL> # Create a result URL
result:delete <name> # Delete a result URL
status # Show schedules, jobs, tables and results
schema:show <db> <table> # Show schema of a table
schema:set <db> <table> [columns...] # Set new schema on a table
schema:add <db> <table> <columns...> # Add new columns to a table
schema:remove <db> <table> <columns...> # Remove columns from a table
sched:list # Show list of schedules
sched:create <name> <cron> <sql> # Create a schedule
sched:delete <name> # Delete a schedule
sched:update <name> # Modify a schedule
sched:history <name> [max] # Show history of scheduled queries
sched:run <name> <time> # Run scheduled queries for the specified time
query <sql> # Issue a query
job:show <job_id> # Show status and result of a job
job:list [max] # Show list of jobs
job:kill <job_id> # Kill or cancel a job
password:change # Change password
apikey:show # Show Treasure Data API key
apikey:set <apikey> # Set Treasure Data API key
user:list # Show list of users
user:show <name> # Show an user
user:create <name> # Create an user
user:delete <name> # Delete an user
user:apikey:list <name> # Show API keys
user:password:change <name> # Change password
role:list # Show list of roles
role:show <name> # Show a role
role:create <name> # Create a role
role:delete <name> # Delete a role
role:grant <name> <user> # Grant role to an user
role:revoke <name> <user> # Revoke role from an user
org:list # Show list of organizations
org:show <name> # Show an organizations
org:create <name> # Create an organizations
org:delete <name> # Delete an organizations
acl:list # Show list of access controls
acl:grant <subject> <action> <scope> # Grant an access control
acl:revoke <subject> <action> <scope> # Revoke an access control
server:status # Show status of the Treasure Data server
sample:apache <path.json> # Create a sample log file
Type 'td help COMMAND' for more information on a specific command.
データベースの作成
続いて接続確認とデータベースの作成です。次の操作を教えてくれるのは心遣いですね。
$ td account -f
Enter your Treasure Data credentials.
Email: hoge.hoge.hoge@classmethod.jp
Password (typing will be hidden):
Authenticated successfully.
Use 'td db:create <db_name>' to create a database.
データベースの作成コマンドです。
$ td db:create testdb
Database 'testdb' is created.
Use 'td table:create testdb <table_name>' to create a table.
テーブルの作成
ナビゲーションに沿ってテーブルを作成します。
$ td table:create testdb www_access
Table 'testdb.www_access' is created.
サンプルデータの生成と挿入
テーブルを作成したのでFluentdからデータを転送しても良いのですが取り急ぎ動作確認するためにサンプルデータを入れています。
$ td sample:apache apache.json
Create apache.json with 5000 records whose time is
from Sun Jan 20 23:50:16 +0000 2013 to Mon Jan 21 17:42:46 +0000 2013.
Use 'td table:import <db> <table> --json apache.json' to import this file.
$ td table:import testdb www_access --json apache.json
importing apache.json...
uploading 117247 bytes...
imported 5000 entries from apache.json.
done.
テーブル一覧の表示
作成したテーブルを見てみましょう。
$ td table:list
+----------+------------+------+-------+--------+--------------------------------+--------+
| Database | Table | Type | Count | Size | Last import | Schema |
+----------+------------+------+-------+--------+--------------------------------+--------+
| testdb | www_access | log | 5000 | 0.0 GB | Mon Jan 21 17:43:03 +0000 2013 | |
+----------+------------+------+-------+--------+--------------------------------+--------+
1 row in set
クエリーを発行する
今回のサンプルはレコード数が少ないですが、実際のビジネスでは数億とかいう規模のデータをドカーンと入れて、Hadoop使って解析してくれるわけですね。んで、この問合せ方法としてSQLライクに表記できるHiveを使うことができるようです。最新の技術に対して使いなれた構文でアクセスできるのは嬉しいですね。
件数の取得
まずはレコードの件数を出してみましょう。
$ td query -w -d testdb "SELECT count(*) FROM www_access"
Job 1568184 is queued.
Use 'td job:show 1568184' to show the status.
queued...
started at 2013-01-22T16:51:19Z
Hive history file=/tmp/1461/hive_job_log__91678880.txt
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapred.reduce.tasks=<number>
Starting Job = job_201301150013_26879, Tracking URL = http://ip-10-8-189-47.ec2.internal:50030/jobdetails.jsp?jobid=job_201301150013_26879
Kill Command = /usr/lib/hadoop/bin/hadoop job -Dmapred.job.tracker=10.8.189.47:8021 -kill job_201301150013_26879
2013-01-22 16:51:32,501 Stage-1 map = 0%, reduce = 0%
2013-01-22 16:51:36,561 Stage-1 map = 100%, reduce = 0%
finished at 2013-01-22T16:51:48Z
2013-01-22 16:51:45,650 Stage-1 map = 100%, reduce = 100%
Ended Job = job_201301150013_26879
OK
MapReduce time taken: 22.455 seconds
Time taken: 22.598 seconds
Status : success
Result :
+------+
| _c0 |
+------+
| 5000 |
+------+
1 row in set
条件を入れて検索したいところですが、どんなカラムがあるか分からないので1件だけデータを取得してみます。
$ td query -w -d testdb "SELECT * FROM www_access limit 1"
Job 1568185 is queued.
Use 'td job:show 1568185' to show the status.
queued...
started at 2013-01-22T16:53:54Z
Hive history file=/tmp/1461/hive_job_log__1446690009.txt
finished at 2013-01-22T16:54:03Z
OK
MapReduce time taken: 0.189 seconds
Time taken: 1.841 seconds
Status : success
Result :
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
| v | time |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
| {"path":"/category/electronics","host":"176.72.215.97","method":"GET","user":"-","agent":"Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; YTB730; GTB7.2; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET4.0C; .NET4.0E; Media Center PC 6.0)","referer":"/item/books/919","code":"200","time":"1358787594","size":"76"} | 1358787594 |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
1 row in set
条件を付けて検索
カラム名が分かったので条件を付けてみたいと思います。agentにMacintoshが含まれるレコードの件数です。全文検索っすね!
$ td query -w -d testdb "SELECT count(*) FROM www_access where v['agent'] like '%Macintosh%'"
Job 1568190 is queued.
Use 'td job:show 1568190' to show the status.
queued...
started at 2013-01-22T16:58:02Z
Hive history file=/tmp/1461/hive_job_log__1636264800.txt
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapred.reduce.tasks=<number>
Starting Job = job_201301150013_26886, Tracking URL = http://ip-10-8-189-47.ec2.internal:50030/jobdetails.jsp?jobid=job_201301150013_26886
Kill Command = /usr/lib/hadoop/bin/hadoop job -Dmapred.job.tracker=10.8.189.47:8021 -kill job_201301150013_26886
2013-01-22 16:58:15,145 Stage-1 map = 0%, reduce = 0%
2013-01-22 16:58:20,202 Stage-1 map = 100%, reduce = 0%
2013-01-22 16:58:29,299 Stage-1 map = 100%, reduce = 100%
Ended Job = job_201301150013_26886
OK
MapReduce time taken: 22.395 seconds
Time taken: 22.539 seconds
finished at 2013-01-22T16:58:32Z
Status : success
Result :
+-----+
| _c0 |
+-----+
| 445 |
+-----+
1 row in set
GROUP BYとORDER BYとLIMIT
SQLといえばグループ化と順序付けでしょう。ということで使ってみます。agentでグループ化してTOP3を表示しています。
$ td query -w -d testdb "SELECT v['agent'] as agent , count(*) as count FROM www_access group by v['agent'] order by count desc limit 3"
Job 1568253 is queued.
Use 'td job:show 1568253' to show the status.
queued...
started at 2013-01-22T17:01:12Z
Hive history file=/tmp/1461/hive_job_log__1562578541.txt
Total MapReduce jobs = 2
Launching Job 1 out of 2
Number of reduce tasks not specified. Defaulting to jobconf value of: 12
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapred.reduce.tasks=<number>
Starting Job = job_201301150013_26899, Tracking URL = http://ip-10-8-189-47.ec2.internal:50030/jobdetails.jsp?jobid=job_201301150013_26899
Kill Command = /usr/lib/hadoop/bin/hadoop job -Dmapred.job.tracker=10.8.189.47:8021 -kill job_201301150013_26899
2013-01-22 17:01:29,672 Stage-1 map = 0%, reduce = 0%
2013-01-22 17:01:35,742 Stage-1 map = 100%, reduce = 0%
2013-01-22 17:01:45,845 Stage-1 map = 100%, reduce = 8%
2013-01-22 17:01:53,920 Stage-1 map = 100%, reduce = 17%
2013-01-22 17:02:02,025 Stage-1 map = 100%, reduce = 25%
2013-01-22 17:02:10,120 Stage-1 map = 100%, reduce = 33%
2013-01-22 17:02:18,195 Stage-1 map = 100%, reduce = 42%
2013-01-22 17:02:25,260 Stage-1 map = 100%, reduce = 50%
2013-01-22 17:02:33,375 Stage-1 map = 100%, reduce = 58%
2013-01-22 17:02:41,450 Stage-1 map = 100%, reduce = 67%
2013-01-22 17:02:49,526 Stage-1 map = 100%, reduce = 75%
2013-01-22 17:02:57,601 Stage-1 map = 100%, reduce = 83%
2013-01-22 17:03:04,667 Stage-1 map = 100%, reduce = 92%
2013-01-22 17:03:12,742 Stage-1 map = 100%, reduce = 100%
Ended Job = job_201301150013_26899
Launching Job 2 out of 2
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapred.reduce.tasks=<number>
Starting Job = job_201301150013_26904, Tracking URL = http://ip-10-8-189-47.ec2.internal:50030/jobdetails.jsp?jobid=job_201301150013_26904
Kill Command = /usr/lib/hadoop/bin/hadoop job -Dmapred.job.tracker=10.8.189.47:8021 -kill job_201301150013_26904
2013-01-22 17:03:20,258 Stage-2 map = 0%, reduce = 0%
2013-01-22 17:03:23,285 Stage-2 map = 100%, reduce = 0%
2013-01-22 17:03:32,429 Stage-2 map = 100%, reduce = 100%
Ended Job = job_201301150013_26904
finished at 2013-01-22T17:03:35Z
OK
MapReduce time taken: 133.066 seconds
Time taken: 133.249 seconds
Status : success
Result :
+-------------------------------------------------------------------------------------+-------+
| agent | count |
+-------------------------------------------------------------------------------------+-------+
| Mozilla/5.0 (Windows NT 6.0; rv:10.0.1) Gecko/20100101 Firefox/10.0.1 | 630 |
| Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0) | 497 |
| Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:9.0.1) Gecko/20100101 Firefox/9.0.1 | 445 |
+-------------------------------------------------------------------------------------+-------+
3 rows in set
FluentdからTreasure Dataにデータを送る
基本的な使い方が分かりましたので、Fluentdからデータを送り込んでみましょう!Fluentdのセットアップは前回やりましたので割愛しますね。FluentdからTreasure Dataにデータを送るためにはAPIKEYが必要になりますので表示してみます。
$ td apikey
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
次にプラグインをインストールします。
$ sudo yum install make gcc -y
$ sudo /usr/lib64/fluent/ruby/bin/fluent-gem install fluent-plugin-td
プラグインに会わせてtd-agentの設定ファイルを記述したいと思います。
$ cd /etc/td-agent
$ sudo vi td-agent.conf
発生したApacheのログをS3とMongoとTreasure Dataに転送しています。
<source>
type forward
port 22222
</source>
<source>
type tail
path /var/log/httpd/access_log
format apache
tag td.testdb.www_access
</source>
<match *.**>
type copy
<store>
type tdlog
apikey XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
auto_create_table
buffer_type file
buffer_path /var/log/td-agent/buffer/td
use_ssl true
flush_interval 10s
</store>
<store>
type mongo
database httpd
collection accesslog
host localhost
port 27017
flush_interval 10s
</store>
<store>
type forest
subtype s3
<template>
s3_bucket akari-ec2-log
s3_endpoint s3-ap-northeast-1.amazonaws.com
path ${tag}/
buffer_path /var/log/td-agent/buffer/${tag}
time_slice_format %Y/%m/%d/ec2-%Y-%m-%d-%H
flush_interval 1m
</template>
</store>
</match>
tdコマンドでデータが登録されているか見てみましょう。
$ td table:list
+----------+------------+------+-------+--------+--------------------------------+--------+
| Database | Table | Type | Count | Size | Last import | Schema |
+----------+------------+------+-------+--------+--------------------------------+--------+
| fluent | info | log | 11 | 0.0 GB | Tue Jan 22 18:25:01 +0000 2013 | |
| testdb | www_access | log | 5042 | 0.0 GB | Tue Jan 22 18:25:04 +0000 2013 | |
+----------+------------+------+-------+--------+--------------------------------+--------+
2 rows in set
確かにカウントが増えていますね。td-agentからTreasureDataにデータを送る際に注意する点ですが、ログに付けているタグ名からデータベース名やテーブル名を判別しています。td.データベース名.テーブル名になるようにタグ付けしてくださいね。リクエストとしてはタグ付けでインスタンスIDやIPアドレスを入れたいこともあるので、データベース名やテーブル名はタグ以外で指定できるほうがうれしいかな。ログがどのインスタンスで発生したのか何かしら知りたい時ってないかなぁ。仮想サーバーは使い捨てと思えば不要かなぁ。
まとめ
自前でHadoop環境作ってデータ解析するのは正直面倒だなぁと思っている方は、FluentdとTreasureDataを組み合わせてお気楽ビックデータ解析をしてみましょう。今回の収穫は、Hiveを使ってビックデータを簡単に問合せ・集計できたことでした。特定カラムの部分一致をカウントしてランキングする等、実務でも十分活用できそうです!今日から君もビックデータの魔法使いになろう!
参考資料
Continuous Data Import from Fluentd