様々なファイルをデータソースにElasticsearchへデータ投入する

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

藤本です。

Elasticsearchにデータ投入する方法を調べる機会がありましたので、今回はいくつかのファイルをデータソースにElasticsearchへデータ投入する方法をご紹介します。

概要

Elasticsearchはリアルタイムデータ分析、ログ解析、全文検索など様々なユースケースで活用することができます。例えば、Excelでデータ蓄積して、グラフ化・集計を行っているのであれば、Elasticsearchにデータ投入して、Kibanaで可視化することができます。ログをSyslogで集約して、grepやawkを駆使してパフォーマンス解析しているのであれば、logstashやfluentdなどでメッセージ解析し、Elasticsearchに集約、Kibanaで可視化することができます。ブログサイトの記事をDBに投入していてアプリケーションによって検索処理ロジックを実装しているのであれば、Elasticsearchの字句解析・トークナイズにより、よりクレバーで高速な検索をElasticsearchに委ねることができ、実装を減らすことが可能です。これらはElasticsearchのよくあるユースケースです。分かってはいるが、正直試さないと本当に想定する利用方法が可能なのか、実装が少なくて済むのか、パフォーマンスは早いのか、判断することができません。

そこで今回はElasticsearch導入の取っ掛かりとなるべく、既にあるデータをElasticsearchに持っていく方法をデータソース別にご紹介し、まずはElasticsearchやKibanaを触っていただければ幸いです。
ちなみに今回は全てLogstashをベースにご紹介していますが、Fluentdでも実装可能なはず(未確認)です。

環境情報

  • Elasticsearchサーバ
    • IPアドレス:10.255.0.100
    • Elasticsearchバージョン:2.1.1  
  • Logstash実行環境(OSXローカル)
    • Logstashバージョン:2.1.1

Logstashインストール

インストール要件

LogstashはJRubyで実装されています。そのため動作環境にJava実行環境が必要となります。

インストール

公式ページよりインストール対象となるOSに応じたインストーラをダウンロードしてください。 今回はOSX上から実行するためZIPファイルをダウンロードします。

ZIPファイルの場合、解凍するだけで利用可能です。

# wget https://download.elastic.co/logstash/logstash/logstash-2.1.1.zip
--2016-01-27 16:33:32--  https://download.elastic.co/logstash/logstash/logstash-2.1.1.zip
Resolving download.elastic.co... 184.72.228.136, 23.21.123.221, 174.129.234.135
Connecting to download.elastic.co|184.72.228.136|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 78856636 (75M) [application/zip]
Saving to: 'logstash-2.1.1.zip'

logstash-2.1.1.zip          100%[=============================================>]  75.20M  3.35MB/s   in 20s

2016-01-27 16:33:53 (3.73 MB/s) - 'logstash-2.1.1.zip' saved [78856636/78856636]

# unzip logstash-2.1.1.zip
・・・・・・

# logstash-2.1.1/bin/logstash --version
logstash 2.1.1

ログファイル(定型のメッセージフォーマット)からElasticsearch

ログ分析のサンプルでよくあるApacheのアクセスログをElasticsearchに取り込みます。アクセスログはhttpd.confのLogFormatの設定によって利用者により任意のフォーマットで出力することができます。Logstashの設定ファイルにメッセージフォーマットを記述することでフィールドに分割してElasticsearchへデータ投入することができます。

利用するLogstash Plugins

logstash-input-file

任意のファイルの内容を永続的に読み取ります。オプションによりファイル読み取り開始位置や読み取る間隔を調整することができます。

logstash-filter-grok

入力を定義したフォーマットによってパースします。パターンのマッチには%{SYNTAX:SEMANTIC}の形式で抜き出したいメッセージ型とフィールド名を定義します。 代表的なパターンはLogstashで用意されています。Apacheのアクセスログパターンも用意されています。

cat ./logstash-2.1.1/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.2/patterns/grok-patterns
(略)
COMMONAPACHELOG %{IPORHOST:clientip} %{HTTPDUSER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}
(略)

logstash-filter-date

日付フォーマットを変換します。Elasticsearch(2.1.1)の場合、デフォルトのDate型として認識するformatがstrict_date_optional_time、もしくはepoch_millis(Unixtime(milliseconds))となります。一方、アクセスログのdd/MMM/yyyy:hh:mm:ss Zとなります。Elasticsearch側のDate formatを変更してもよいですが、今回はLogstash側で日付フォーマットを変換します。

補足

その他にもgeoip、useragentと組み合わせることでアクセス元地域やクライアント情報を解析することができます。Elastic 大谷さんのエントリが参考になります。

テストデータ

適当なアクセスログを用意します。

# cat /tmp/access_log
::1 - - [30/Jan/2016:01:33:04 +0000] "GET / HTTP/1.1" 403 4961 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.19.1 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2"
::1 - - [30/Jan/2016:01:33:32 +0000] "GET / HTTP/1.1" 200 37 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.19.1 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2"

Logstash設定ファイル例

  1. アクセスログファイルを頭から読み取ります。
  2. 読み取った一行一行をメッセージパターンに当てはめて、フィールドに分割し、数値はint型に型変換します。
  3. timestampフィールドの文字列を指定した日付フォーマットでパースし、ISOフォーマットに変換します。
  4. Elasticsearchホストにデータ投入します。
# cat accesslog.conf
input {
  file {
    path => "/tmp/access_log"
    start_position => "beginning"
  }
}
filter {
  grok {
    match => {
      "message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}'
    }
  }
  date {
    match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ]
  }
}
output {
  elasticsearch {
    hosts => ["10.255.0.100"]
    index => "accesslog-%{+YYYYMMdd}"
  }
}

grok設定補足

手元のCentOS6のhttpd.confのアクセスログフォーマットのデフォルト設定は以下のようになっています。

[root@localhost ~]# grep '^CustomLog' /etc/httpd/conf/httpd.conf
CustomLog logs/access_log combined
[root@localhost ~]# grep '^LogFormat' /etc/httpd/conf/httpd.conf
LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\"" combined
(略)

grok-patternの%{COMBINEDAPACHELOG}を利用してもよいですが、数値もString型で渡されます。grokフィルター内で型変換したい場合、%{SYNTAX:SEMANTIC:TYPE}の型で指定します。

Elasticsearchへの取り込み

設定ファイルを指定し、Logstashを起動します。

# logstash-2.1.1/bin/logstash -f accesslog.conf
Settings: Default filter workers: 2
Logstash startup completed

Elasticsearchにindexが作成されたことを確認します。

# curl 10.255.0.100:9200/_cat/indices
yellow open .kibana            1 1 1 0  3.1kb  3.1kb
yellow open accesslog-20160130 5 1 2 0 14.6kb 14.6kb

Logstash設定ファイルのElasticsearchプラグインで指定したindex名通り、accesslog-本日日付のindexが作成されました。
access_logのデータが登録されたか確認してみます。

{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "accesslog-20160130",
      "_type" : "logs",
      "_id" : "AVKQtB5ERGPrrn2Y5uaI",
      "_score" : 1.0,
      "_source":{"message":"::1 - - [30/Jan/2016:01:33:32 +0000] \"GET / HTTP/1.1\" 200 37 \"-\" \"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.19.1 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2\"","@version":"1","@timestamp":"2016-01-30T01:33:32.000Z","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/access_log","clientip":"::1","ident":"-","auth":"-","timestamp":"30/Jan/2016:01:33:32 +0000","verb":"GET","request":"/","httpversion":"1.1","response":200,"bytes":37,"referrer":"\"-\"","agent":"\"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.19.1 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2\""}
    }, {
      "_index" : "accesslog-20160130",
      "_type" : "logs",
      "_id" : "AVKQtB5ERGPrrn2Y5uaH",
      "_score" : 1.0,
      "_source":{"message":"::1 - - [30/Jan/2016:01:33:04 +0000] \"GET / HTTP/1.1\" 403 4961 \"-\" \"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.19.1 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2\"","@version":"1","@timestamp":"2016-01-30T01:33:04.000Z","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/access_log","clientip":"::1","ident":"-","auth":"-","timestamp":"30/Jan/2016:01:33:04 +0000","verb":"GET","request":"/","httpversion":"1.1","response":403,"bytes":4961,"referrer":"\"-\"","agent":"\"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.19.1 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2\""}
    } ]
  }
}

データが入っていますね。_sourceにmessageキーとは別にgrokフィルタで定義した通り、フィールドに分割され、フィールドキーとマッピングされて値が登録されていることが分かります。
indexを確認してみます。

# curl "10.255.0.100:9200/accesslog-`date +%Y%m%d`?pretty"
{
  "accesslog-20160130" : {
    "aliases" : { },
    "mappings" : {
      "logs" : {
        "properties" : {
          "@timestamp" : {
            "type" : "date",
            "format" : "strict_date_optional_time||epoch_millis"
          },
          "@version" : {
            "type" : "string"
          },
          "agent" : {
            "type" : "string"
          },
          "auth" : {
            "type" : "string"
          },
          "bytes" : {
            "type" : "long"
          },
          "clientip" : {
            "type" : "string"
          },
          "host" : {
            "type" : "string"
          },
          "httpversion" : {
            "type" : "string"
          },
          "ident" : {
            "type" : "string"
          },
          "message" : {
            "type" : "string"
          },
          "path" : {
            "type" : "string"
          },
          "referrer" : {
            "type" : "string"
          },
          "request" : {
            "type" : "string"
          },
          "response" : {
            "type" : "long"
          },
          "timestamp" : {
            "type" : "string"
          },
          "verb" : {
            "type" : "string"
          }
        }
      }
    },
    "settings" : {
      "index" : {
        "creation_date" : "1454126669277",
        "number_of_shards" : "5",
        "number_of_replicas" : "1",
        "uuid" : "76VkNSzlRR6QjV6T6xzytQ",
        "version" : {
          "created" : "2010199"
        }
      }
    },
    "warmers" : { }
  }
}

grokフィルターで指定した通り、bytesフィールド、responseフィールドがlong型にマッピングされていることを確認できました。

CSVファイルからElasticsearch

CSVファイルの,区切りのカラムをフィールドに分割してElasticsearchへデータ投入します。

利用するLogstash Plugins

logstash-filter-csv

カンマ区切りの入力をフィールド単位に分割します。プラグイン名はcsvですが、separatorを指定することが可能であり、CSVに限らず、タブ区切りのTSV、スペース区切りのSSVなどにも対応できます。

logstash-filter-mutate(必要に応じて)

フィールドの名前変更、削除、置換、型変換などを行います。今回は数値の型変換に利用します。

テストデータ

交通費っぽいデータをテストデータとします。

# cat /tmp/keihi.csv
20160117,001,Train,Akihabara,Tokyo,200
20160118,001,Taxi,Tokyo,Akihabara,730
20160118,002,Train,Oita,Akihabara,20000

Logstash設定ファイル例

  1. CSVファイルを頭から読み取ります。
  2. ,で句切られた文字列をフィールドに分割し、指定した文字列をキーに割り当てます。(,はデフォルト設定なので省略可)
  3. dateフィールドの文字列を指定した日付フォーマットでパースし、ISOフォーマットに変換します。
  4. priceフィールドの文字列を数値型に変換します。
  5. Elasticsearchへデータ投入します。
# cat csv.conf
input {
  file {
    path => "/tmp/keihi.csv"
    start_position => "beginning"
  }
}
filter {
  csv {
    columns => ["date","user_id","transfer","source","destination","price"]
    separator => ","
  }
  date {
    match => [ "date", "YYYYMMdd" ]
  }
  mutate {
    convert => { "price" => "integer"}
  }
}
output {
  elasticsearch {
    hosts => ["10.255.0.100"]
    index => "keihi"
  }
}

Elasticsearchへの取り込み

設定ファイルに指定し、Logstashを起動します。

# logstash-2.1.1/bin/logstash -f csv.conf
Settings: Default filter workers: 2
Logstash startup completed

投入されたデータを確認します。

# curl "10.255.0.100:9200/keihi/_search?pretty"
{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 3,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "keihi",
      "_type" : "logs",
      "_id" : "AVKSHv56yJGvu4jlGvdu",
      "_score" : 1.0,
      "_source":{"message":"20160117,001,Train,Akihabara,Tokyo,200","@version":"1","@timestamp":"2016-01-16T15:00:00.000Z","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/keihi.csv","date":"20160117","user_id":"001","transfer":"Train","source":"Akihabara","destination":"Tokyo","price":200}
    }, {
      "_index" : "keihi",
      "_type" : "logs",
      "_id" : "AVKSHv56yJGvu4jlGvdv",
      "_score" : 1.0,
      "_source":{"message":"20160118,001,Taxi,Tokyo,Akihabara,730","@version":"1","@timestamp":"2016-01-17T15:00:00.000Z","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/keihi.csv","date":"20160118","user_id":"001","transfer":"Taxi","source":"Tokyo","destination":"Akihabara","price":730}
    }, {
      "_index" : "keihi",
      "_type" : "logs",
      "_id" : "AVKSHv56yJGvu4jlGvdw",
      "_score" : 1.0,
      "_source":{"message":"20160118,002,Train,Oita,Akihabara,20000","@version":"1","@timestamp":"2016-01-17T15:00:00.000Z","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/keihi.csv","date":"20160118","user_id":"002","transfer":"Train","source":"Oita","destination":"Akihabara","price":20000}
    } ]
  }
}

設定ファイルで指定したindex名でデータが投入されています。columnで指定した文字列を各フィールドのキーにCSVファイルのデータがマッピングされていることが分かります。
indexを見てみます。

curl "10.255.0.100:9200/keihi?pretty"
{
  "keihi" : {
    "aliases" : { },
    "mappings" : {
      "logs" : {
        "properties" : {
          "@timestamp" : {
            "type" : "date",
            "format" : "strict_date_optional_time||epoch_millis"
          },
          "@version" : {
            "type" : "string"
          },
          "date" : {
            "type" : "string"
          },
          "destination" : {
            "type" : "string"
          },
          "host" : {
            "type" : "string"
          },
          "message" : {
            "type" : "string"
          },
          "path" : {
            "type" : "string"
          },
          "price" : {
            "type" : "long"
          },
          "source" : {
            "type" : "string"
          },
          "transfer" : {
            "type" : "string"
          },
          "user_id" : {
            "type" : "string"
          }
        }
      }
    },
    "settings" : {
      "index" : {
        "creation_date" : "1454150450695",
        "number_of_shards" : "5",
        "number_of_replicas" : "1",
        "uuid" : "74Hn_VbjSv6-RrwSY0Vcqw",
        "version" : {
          "created" : "2010199"
        }
      }
    },
    "warmers" : { }
  }
}

mutateフィルタにより、priceがlong型に型変換されていることが分かります。

データが少ないのでありがたみが感じられませんが、Kibanaを使えば、簡単にデータ集計・可視化することができます。

screenshot_2016-01-30_19_51_48_png

ExcelファイルからElasticsearch

普段の業務でCSVファイルはあまり扱わないでしょう。Excelファイルも同様の処理ができます。

利用するLogstash Plugins

logstash-input-xlsx

LogstashはPluginにより各処理を実行します。多くのPluginが標準PluginとしてLogstashのパッケージに取り込まれています。しかし、それが全てのPluginではありません。Logstashに限らず、Elastic社のプロダクトはPluginを取り込むことで機能拡張することが可能です。Logstash、Elasticsearch、Kibana、Beatsは全てPluginに対応しています。
標準PluginではExcelファイルを読み取ることができません。Excelファイルを読み取るLogstashプラグインがコミュニティプラグインとして提供されています。こちらを使うことでExcelファイルをElasticsearchに取り込むことが可能です。コミュニティプラグインなのでElastic社のサブスクリプションのサポート外になるのかな。そこはご注意ください。

それではPluginをインストールします。

# logstash-2.1.1/bin/plugin install logstash-input-xlsx
Validating logstash-input-xlsx
Installing logstash-input-xlsx
Installation successful

これだけです。
Pluginのページを見ると依存するPluginがあるのでこちらもインストールしておきます。

# logstash-2.1.1/bin/plugin install logstash-codec-excel
Validating logstash-codec-excel
Installing logstash-codec-excel
Installation successful

テストデータ

テストデータは先ほどと同じデータを利用します。

Windows_8_1_x64

Logstash設定ファイル例

  1. XLSXファイルを頭から読み取ります。
  2. logstash-input-xlsxプラグインを利用すると、カラムが;区切りのメッセージになります。先ほど利用したcsvフィルタのseparatorに;を指定して、フィールドに分割します。
  3. dateフィールドの文字列を指定した日付フォーマットでパースし、ISOフォーマットに変換します。
  4. priceフィールドの文字列を数値型に変換します。
  5. Elasticsearchへデータ投入します。
# cat excel.conf
input {
  xlsx {
    path => [ "/Users/fujimoto.shinji/Techs/logstash-datasource/keihi.xlsx" ]
    start_position => "beginning"
  }
}
filter {
  csv {
    columns => ["date","user_id","transfer","source","destination","price"]
    separator => ";"
  }
  date {
    match => [ "date", "YYYYMMdd" ]
  }
  mutate {
    convert => { "price" => "integer"}
  }
}
output {
  elasticsearch {
    hosts => ["10.255.0.100"]
    index => "excel"
  }
}

Elasticsearchへの取り込み

設定ファイルを指定し、Logstashを起動します。

# logstash-2.1.1/bin/logstash -f excel.conf
Settings: Default filter workers: 2
Logstash startup completed

データを確認します。

# curl "10.255.0.100:9200/excel/_search?pretty"
{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 3,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "excel",
      "_type" : "logs",
      "_id" : "AVKSXjawyJGvu4jlGveM",
      "_score" : 1.0,
      "_source":{"message":"20160118;002;Train;Oita;Akihabara;20000;","@version":"1","@timestamp":"2016-01-17T15:00:00.000Z","tags":["eof"],"wsname":"Sheet1","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/keihi.xlsx","date":"20160118","user_id":"002","transfer":"Train","source":"Oita","destination":"Akihabara","price":20000,"column7":null}
    }, {
      "_index" : "excel",
      "_type" : "logs",
      "_id" : "AVKSXjawyJGvu4jlGveK",
      "_score" : 1.0,
      "_source":{"message":"20160117;001;Train;Akihabara;Tokyo;200;","@version":"1","@timestamp":"2016-01-16T15:00:00.000Z","wsname":"Sheet1","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/keihi.xlsx","date":"20160117","user_id":"001","transfer":"Train","source":"Akihabara","destination":"Tokyo","price":200,"column7":null}
    }, {
      "_index" : "excel",
      "_type" : "logs",
      "_id" : "AVKSXjawyJGvu4jlGveL",
      "_score" : 1.0,
      "_source":{"message":"20160118;001;Taxi;Tokyo;Akihabara;730;","@version":"1","@timestamp":"2016-01-17T15:00:00.000Z","wsname":"Sheet1","host":"HL00120.local","path":"/Users/fujimoto.shinji/Techs/logstash-datasource/keihi.xlsx","date":"20160118","user_id":"001","transfer":"Taxi","source":"Tokyo","destination":"Akihabara","price":730,"column7":null}
    } ]
  }
}

CSVと同じようなデータが登録されています。
CSVファイルになかったデータとして、追加でwsnameにシート名が入っています。またメッセージがセミコロンで終わっているため、column7ができているようです。csvフィルタのオプションでremove_fieldを利用すればcolumn7を削除することも可能です。

まとめ

いかがでしたでしょうか?
Logstashを利用すれば様々なファイルをElasticsearchに取り込むことが可能です。Logstashは数多くのInput PluginsFilter Pluginsが標準で含まれています。自身が利用したいデータソースが対応しているかどうか一度探してみてください。