様々なファイルをデータソースにElasticsearchへデータ投入する
はじめに
藤本です。
Elasticsearchにデータ投入する方法を調べる機会がありましたので、今回はいくつかのファイルをデータソースにElasticsearchへデータ投入する方法をご紹介します。
概要
Elasticsearchはリアルタイムデータ分析、ログ解析、全文検索など様々なユースケースで活用することができます。例えば、Excelでデータ蓄積して、グラフ化・集計を行っているのであれば、Elasticsearchにデータ投入して、Kibanaで可視化することができます。ログをSyslogで集約して、grepやawkを駆使してパフォーマンス解析しているのであれば、logstashやfluentdなどでメッセージ解析し、Elasticsearchに集約、Kibanaで可視化することができます。ブログサイトの記事をDBに投入していてアプリケーションによって検索処理ロジックを実装しているのであれば、Elasticsearchの字句解析・トークナイズにより、よりクレバーで高速な検索をElasticsearchに委ねることができ、実装を減らすことが可能です。これらはElasticsearchのよくあるユースケースです。分かってはいるが、正直試さないと本当に想定する利用方法が可能なのか、実装が少なくて済むのか、パフォーマンスは早いのか、判断することができません。
そこで今回はElasticsearch導入の取っ掛かりとなるべく、既にあるデータをElasticsearchに持っていく方法をデータソース別にご紹介し、まずはElasticsearchやKibanaを触っていただければ幸いです。 ちなみに今回は全てLogstashをベースにご紹介していますが、Fluentdでも実装可能なはず(未確認)です。
環境情報
- Elasticsearchサーバ
- IPアドレス:10.255.0.100
- Elasticsearchバージョン:2.1.1
-
Logstash実行環境(OSXローカル)
- Logstashバージョン:2.1.1
Logstashインストール
インストール要件
LogstashはJRubyで実装されています。そのため動作環境にJava実行環境が必要となります。
インストール
公式ページよりインストール対象となるOSに応じたインストーラをダウンロードしてください。 今回はOSX上から実行するためZIPファイルをダウンロードします。
ZIPファイルの場合、解凍するだけで利用可能です。
# wget https://download.elastic.co/logstash/logstash/logstash-2.1.1.zip --2016-01-27 16:33:32-- https://download.elastic.co/logstash/logstash/logstash-2.1.1.zip Resolving download.elastic.co... 184.72.228.136, 23.21.123.221, 174.129.234.135 Connecting to download.elastic.co|184.72.228.136|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 78856636 (75M) [application/zip] Saving to: 'logstash-2.1.1.zip' logstash-2.1.1.zip 100%[=============================================>] 75.20M 3.35MB/s in 20s 2016-01-27 16:33:53 (3.73 MB/s) - 'logstash-2.1.1.zip' saved [78856636/78856636] # unzip logstash-2.1.1.zip ・・・・・・ # logstash-2.1.1/bin/logstash --version logstash 2.1.1
ログファイル(定型のメッセージフォーマット)からElasticsearch
ログ分析のサンプルでよくあるApacheのアクセスログをElasticsearchに取り込みます。アクセスログはhttpd.confのLogFormatの設定によって利用者により任意のフォーマットで出力することができます。Logstashの設定ファイルにメッセージフォーマットを記述することでフィールドに分割してElasticsearchへデータ投入することができます。
利用するLogstash Plugins
logstash-input-file
任意のファイルの内容を永続的に読み取ります。オプションによりファイル読み取り開始位置や読み取る間隔を調整することができます。
logstash-filter-grok
入力を定義したフォーマットによってパースします。パターンのマッチには%{SYNTAX:SEMANTIC}
の形式で抜き出したいメッセージ型とフィールド名を定義します。
代表的なパターンはLogstashで用意されています。Apacheのアクセスログパターンも用意されています。
cat ./logstash-2.1.1/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.2/patterns/grok-patterns (略) COMMONAPACHELOG %{IPORHOST:clientip} %{HTTPDUSER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent} (略)
logstash-filter-date
日付フォーマットを変換します。Elasticsearch(2.1.1)の場合、デフォルトのDate型として認識するformatがstrict_date_optional_time、もしくはepoch_millis(Unixtime(milliseconds))となります。一方、アクセスログのdd/MMM/yyyy:hh:mm:ss Zとなります。Elasticsearch側のDate formatを変更してもよいですが、今回はLogstash側で日付フォーマットを変換します。
補足
その他にもgeoip、useragentと組み合わせることでアクセス元地域やクライアント情報を解析することができます。Elastic 大谷さんのエントリが参考になります。
テストデータ
適当なアクセスログを用意します。
# cat /tmp/access_log ::1 - - [30/Jan/2016:01:33:04 +0000] "GET / HTTP/1.1" 403 4961 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.19.1 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2" ::1 - - [30/Jan/2016:01:33:32 +0000] "GET / HTTP/1.1" 200 37 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.19.1 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2"
Logstash設定ファイル例
- アクセスログファイルを頭から読み取ります。
- 読み取った一行一行をメッセージパターンに当てはめて、フィールドに分割し、数値はint型に型変換します。
- timestampフィールドの文字列を指定した日付フォーマットでパースし、ISOフォーマットに変換します。
- Elasticsearchホストにデータ投入します。
# cat accesslog.conf input { file { path => "/tmp/access_log" start_position => "beginning" } } filter { grok { match => { "message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}' } } date { match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ] } } output { elasticsearch { hosts => ["10.255.0.100"] index => "accesslog-%{+YYYYMMdd}" } }
grok設定補足
手元のCentOS6のhttpd.confのアクセスログフォーマットのデフォルト設定は以下のようになっています。
[root@localhost ~]# grep '^CustomLog' /etc/httpd/conf/httpd.conf CustomLog logs/access_log combined [root@localhost ~]# grep '^LogFormat' /etc/httpd/conf/httpd.conf LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\"" combined (略)
grok-patternの%{COMBINEDAPACHELOG}
を利用してもよいですが、数値もString型で渡されます。grokフィルター内で型変換したい場合、%{SYNTAX:SEMANTIC:TYPE}
の型で指定します。
Elasticsearchへの取り込み
設定ファイルを指定し、Logstashを起動します。
# logstash-2.1.1/bin/logstash -f accesslog.conf Settings: Default filter workers: 2 Logstash startup completed
Elasticsearchにindexが作成されたことを確認します。
# curl 10.255.0.100:9200/_cat/indices yellow open .kibana 1 1 1 0 3.1kb 3.1kb yellow open accesslog-20160130 5 1 2 0 14.6kb 14.6kb
Logstash設定ファイルのElasticsearchプラグインで指定したindex名通り、accesslog-本日日付のindexが作成されました。 access_logのデータが登録されたか確認してみます。
{ "took" : 5, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "accesslog-20160130", "_type" : "logs", "_id" : "AVKQtB5ERGPrrn2Y5uaI", "_score" : 1.0, "_source":{"message":"::1 - - [30/Jan/2016:01:33:32 +0000] \"GET / HTTP/1.1\" 200 37 \"-\" \"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.19.1 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2\"","@version":"1","@timestamp":"2016-01-30T01:33:32.000Z","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/access_log","clientip":"::1","ident":"-","auth":"-","timestamp":"30/Jan/2016:01:33:32 +0000","verb":"GET","request":"/","httpversion":"1.1","response":200,"bytes":37,"referrer":"\"-\"","agent":"\"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.19.1 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2\""} }, { "_index" : "accesslog-20160130", "_type" : "logs", "_id" : "AVKQtB5ERGPrrn2Y5uaH", "_score" : 1.0, "_source":{"message":"::1 - - [30/Jan/2016:01:33:04 +0000] \"GET / HTTP/1.1\" 403 4961 \"-\" \"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.19.1 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2\"","@version":"1","@timestamp":"2016-01-30T01:33:04.000Z","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/access_log","clientip":"::1","ident":"-","auth":"-","timestamp":"30/Jan/2016:01:33:04 +0000","verb":"GET","request":"/","httpversion":"1.1","response":403,"bytes":4961,"referrer":"\"-\"","agent":"\"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.19.1 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2\""} } ] } }
データが入っていますね。_sourceにmessageキーとは別にgrokフィルタで定義した通り、フィールドに分割され、フィールドキーとマッピングされて値が登録されていることが分かります。 indexを確認してみます。
# curl "10.255.0.100:9200/accesslog-`date +%Y%m%d`?pretty" { "accesslog-20160130" : { "aliases" : { }, "mappings" : { "logs" : { "properties" : { "@timestamp" : { "type" : "date", "format" : "strict_date_optional_time||epoch_millis" }, "@version" : { "type" : "string" }, "agent" : { "type" : "string" }, "auth" : { "type" : "string" }, "bytes" : { "type" : "long" }, "clientip" : { "type" : "string" }, "host" : { "type" : "string" }, "httpversion" : { "type" : "string" }, "ident" : { "type" : "string" }, "message" : { "type" : "string" }, "path" : { "type" : "string" }, "referrer" : { "type" : "string" }, "request" : { "type" : "string" }, "response" : { "type" : "long" }, "timestamp" : { "type" : "string" }, "verb" : { "type" : "string" } } } }, "settings" : { "index" : { "creation_date" : "1454126669277", "number_of_shards" : "5", "number_of_replicas" : "1", "uuid" : "76VkNSzlRR6QjV6T6xzytQ", "version" : { "created" : "2010199" } } }, "warmers" : { } } }
grokフィルターで指定した通り、bytesフィールド、responseフィールドがlong型にマッピングされていることを確認できました。
CSVファイルからElasticsearch
CSVファイルの,区切りのカラムをフィールドに分割してElasticsearchへデータ投入します。
利用するLogstash Plugins
logstash-filter-csv
カンマ区切りの入力をフィールド単位に分割します。プラグイン名はcsvですが、separatorを指定することが可能であり、CSVに限らず、タブ区切りのTSV、スペース区切りのSSVなどにも対応できます。
logstash-filter-mutate(必要に応じて)
フィールドの名前変更、削除、置換、型変換などを行います。今回は数値の型変換に利用します。
テストデータ
交通費っぽいデータをテストデータとします。
# cat /tmp/keihi.csv 20160117,001,Train,Akihabara,Tokyo,200 20160118,001,Taxi,Tokyo,Akihabara,730 20160118,002,Train,Oita,Akihabara,20000
Logstash設定ファイル例
- CSVファイルを頭から読み取ります。
- ,で句切られた文字列をフィールドに分割し、指定した文字列をキーに割り当てます。(,はデフォルト設定なので省略可)
- dateフィールドの文字列を指定した日付フォーマットでパースし、ISOフォーマットに変換します。
- priceフィールドの文字列を数値型に変換します。
- Elasticsearchへデータ投入します。
# cat csv.conf input { file { path => "/tmp/keihi.csv" start_position => "beginning" } } filter { csv { columns => ["date","user_id","transfer","source","destination","price"] separator => "," } date { match => [ "date", "YYYYMMdd" ] } mutate { convert => { "price" => "integer"} } } output { elasticsearch { hosts => ["10.255.0.100"] index => "keihi" } }
Elasticsearchへの取り込み
設定ファイルに指定し、Logstashを起動します。
# logstash-2.1.1/bin/logstash -f csv.conf Settings: Default filter workers: 2 Logstash startup completed
投入されたデータを確認します。
# curl "10.255.0.100:9200/keihi/_search?pretty" { "took" : 2, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }, "hits" : { "total" : 3, "max_score" : 1.0, "hits" : [ { "_index" : "keihi", "_type" : "logs", "_id" : "AVKSHv56yJGvu4jlGvdu", "_score" : 1.0, "_source":{"message":"20160117,001,Train,Akihabara,Tokyo,200","@version":"1","@timestamp":"2016-01-16T15:00:00.000Z","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/keihi.csv","date":"20160117","user_id":"001","transfer":"Train","source":"Akihabara","destination":"Tokyo","price":200} }, { "_index" : "keihi", "_type" : "logs", "_id" : "AVKSHv56yJGvu4jlGvdv", "_score" : 1.0, "_source":{"message":"20160118,001,Taxi,Tokyo,Akihabara,730","@version":"1","@timestamp":"2016-01-17T15:00:00.000Z","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/keihi.csv","date":"20160118","user_id":"001","transfer":"Taxi","source":"Tokyo","destination":"Akihabara","price":730} }, { "_index" : "keihi", "_type" : "logs", "_id" : "AVKSHv56yJGvu4jlGvdw", "_score" : 1.0, "_source":{"message":"20160118,002,Train,Oita,Akihabara,20000","@version":"1","@timestamp":"2016-01-17T15:00:00.000Z","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/keihi.csv","date":"20160118","user_id":"002","transfer":"Train","source":"Oita","destination":"Akihabara","price":20000} } ] } }
設定ファイルで指定したindex名でデータが投入されています。columnで指定した文字列を各フィールドのキーにCSVファイルのデータがマッピングされていることが分かります。 indexを見てみます。
curl "10.255.0.100:9200/keihi?pretty" { "keihi" : { "aliases" : { }, "mappings" : { "logs" : { "properties" : { "@timestamp" : { "type" : "date", "format" : "strict_date_optional_time||epoch_millis" }, "@version" : { "type" : "string" }, "date" : { "type" : "string" }, "destination" : { "type" : "string" }, "host" : { "type" : "string" }, "message" : { "type" : "string" }, "path" : { "type" : "string" }, "price" : { "type" : "long" }, "source" : { "type" : "string" }, "transfer" : { "type" : "string" }, "user_id" : { "type" : "string" } } } }, "settings" : { "index" : { "creation_date" : "1454150450695", "number_of_shards" : "5", "number_of_replicas" : "1", "uuid" : "74Hn_VbjSv6-RrwSY0Vcqw", "version" : { "created" : "2010199" } } }, "warmers" : { } } }
mutateフィルタにより、priceがlong型に型変換されていることが分かります。
データが少ないのでありがたみが感じられませんが、Kibanaを使えば、簡単にデータ集計・可視化することができます。
ExcelファイルからElasticsearch
普段の業務でCSVファイルはあまり扱わないでしょう。Excelファイルも同様の処理ができます。
利用するLogstash Plugins
logstash-input-xlsx
LogstashはPluginにより各処理を実行します。多くのPluginが標準PluginとしてLogstashのパッケージに取り込まれています。しかし、それが全てのPluginではありません。Logstashに限らず、Elastic社のプロダクトはPluginを取り込むことで機能拡張することが可能です。Logstash、Elasticsearch、Kibana、Beatsは全てPluginに対応しています。 標準PluginではExcelファイルを読み取ることができません。Excelファイルを読み取るLogstashプラグインがコミュニティプラグインとして提供されています。こちらを使うことでExcelファイルをElasticsearchに取り込むことが可能です。コミュニティプラグインなのでElastic社のサブスクリプションのサポート外になるのかな。そこはご注意ください。
それではPluginをインストールします。
# logstash-2.1.1/bin/plugin install logstash-input-xlsx Validating logstash-input-xlsx Installing logstash-input-xlsx Installation successful
これだけです。 Pluginのページを見ると依存するPluginがあるのでこちらもインストールしておきます。
# logstash-2.1.1/bin/plugin install logstash-codec-excel Validating logstash-codec-excel Installing logstash-codec-excel Installation successful
テストデータ
テストデータは先ほどと同じデータを利用します。
Logstash設定ファイル例
- XLSXファイルを頭から読み取ります。
- logstash-input-xlsxプラグインを利用すると、カラムが;区切りのメッセージになります。先ほど利用したcsvフィルタのseparatorに;を指定して、フィールドに分割します。
- dateフィールドの文字列を指定した日付フォーマットでパースし、ISOフォーマットに変換します。
- priceフィールドの文字列を数値型に変換します。
- Elasticsearchへデータ投入します。
# cat excel.conf input { xlsx { path => [ "/Users/fujimoto.shinji/Techs/logstash-datasource/keihi.xlsx" ] start_position => "beginning" } } filter { csv { columns => ["date","user_id","transfer","source","destination","price"] separator => ";" } date { match => [ "date", "YYYYMMdd" ] } mutate { convert => { "price" => "integer"} } } output { elasticsearch { hosts => ["10.255.0.100"] index => "excel" } }
Elasticsearchへの取り込み
設定ファイルを指定し、Logstashを起動します。
# logstash-2.1.1/bin/logstash -f excel.conf Settings: Default filter workers: 2 Logstash startup completed
データを確認します。
# curl "10.255.0.100:9200/excel/_search?pretty" { "took" : 1, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }, "hits" : { "total" : 3, "max_score" : 1.0, "hits" : [ { "_index" : "excel", "_type" : "logs", "_id" : "AVKSXjawyJGvu4jlGveM", "_score" : 1.0, "_source":{"message":"20160118;002;Train;Oita;Akihabara;20000;","@version":"1","@timestamp":"2016-01-17T15:00:00.000Z","tags":["eof"],"wsname":"Sheet1","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/keihi.xlsx","date":"20160118","user_id":"002","transfer":"Train","source":"Oita","destination":"Akihabara","price":20000,"column7":null} }, { "_index" : "excel", "_type" : "logs", "_id" : "AVKSXjawyJGvu4jlGveK", "_score" : 1.0, "_source":{"message":"20160117;001;Train;Akihabara;Tokyo;200;","@version":"1","@timestamp":"2016-01-16T15:00:00.000Z","wsname":"Sheet1","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/keihi.xlsx","date":"20160117","user_id":"001","transfer":"Train","source":"Akihabara","destination":"Tokyo","price":200,"column7":null} }, { "_index" : "excel", "_type" : "logs", "_id" : "AVKSXjawyJGvu4jlGveL", "_score" : 1.0, "_source":{"message":"20160118;001;Taxi;Tokyo;Akihabara;730;","@version":"1","@timestamp":"2016-01-17T15:00:00.000Z","wsname":"Sheet1","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/keihi.xlsx","date":"20160118","user_id":"001","transfer":"Taxi","source":"Tokyo","destination":"Akihabara","price":730,"column7":null} } ] } }
CSVと同じようなデータが登録されています。 CSVファイルになかったデータとして、追加でwsnameにシート名が入っています。またメッセージがセミコロンで終わっているため、column7ができているようです。csvフィルタのオプションでremove_fieldを利用すればcolumn7を削除することも可能です。
まとめ
いかがでしたでしょうか? Logstashを利用すれば様々なファイルをElasticsearchに取り込むことが可能です。Logstashは数多くのInput Plugins、Filter Pluginsが標準で含まれています。自身が利用したいデータソースが対応しているかどうか一度探してみてください。