LogstashのS3 Inputプラグインで取得するオブジェクトを日付でフィルターする
はじめに
こんにちは、藤本です。
先日、CloudFrontのアクセスログをKibanaで可視化するでS3にあるオブジェクトを定期的に取得するS3 Inputプラグインをご紹介しました。エントリの中で初回起動時に設定ファイルで指定したS3バケット、オブジェクトパスのプレフィックスの中から全てのオブジェクトを取得するため、オブジェクトサイズが大きく、オブジェクト数が多いほど、比例して処理時間、コストが必要となります。例えばELBを数年稼働している環境では膨大なログファイル数となり、初期インポートに時間がかかるし、S3バケットからの転送料金も嵩んでしまいますし、Elasticsearchに送るのであれば、ログファイルのボリュームに応じたElasticsearchのディスクサイズを用意しないといけません。
そこで今回は指定した日時以降のオブジェクトに絞って取得する小ネタをご紹介します。
結論
sincedbファイルに取得したい以降の日付をISO 8601の拡張形式(YYYY-MM-DDTHH:mm:ssZ
)で記述してください。
今回のエントリの前提の説明やソースコードを読んで分かったことを本エントリの最後に記載します。興味がある人はそちらも読んでみてください。
やってみた
環境
- OS : Amazon Linux 2016.03
- Logstash : 2.3.2
- Elasticsearch : Amazon Elasticsearch Service
環境準備
Amazon Elasticsearch Serviceのドメイン構築はlogstashでELBのログを2週間分だけAmazon Elasticsearch Serviceに取り込むを参考に、Logstashのインストール、amazon_es outputプラグインインストールは[新機能]Amazon Elasticsearch Serviceがリリースされました! | Developers.IOを参考に環境準備してください。
sincedbファイルがない場合
Logstashのs3 inputプラグインの設定ファイルでsincedbのファイルパスを指定します。
input { s3 { bucket => "<S3バケット名>" region => "<S3バケットリージョン>" sincedb_path => "/var/lib/logstash/sincedb" } }
sincedbファイルは最初ありません。
# ls /var/lib/logstash/sincedb ls: cannot access /var/lib/logstash/sincedb: No such file or directory
作成予定のインデックスはありません。
# curl search-********.ap-northeast-1.es.amazonaws.com/_cat/indices |grep elblog
Logstashを起動します。
/etc/init.d/logstash start logstash started.
しばらく待つと、S3からアクセスログファイルを取得し、ログメッセージ内に含まれる日付に応じたインデックスが作成されます。私の環境のELBが動作検証的にたまに起動するだけなのでログの日付がかなり分散していますが、古いものは2015年11月25日、新しいものは2016年4月27日にアクセスログを出力しています。
# curl search-********.ap-northeast-1.es.amazonaws.com/_cat/indices |grep elblog yellow open elblog-lb-2015.11.25 5 1 125 0 116.1kb 116.1kb yellow open elblog-lb-2015.11.28 5 1 79 0 142.1kb 142.1kb yellow open elblog-lb-2015.11.29 5 1 75 0 92.4kb 92.4kb yellow open elblog-lb-2015.12.01 5 1 5 0 51.2kb 51.2kb yellow open elblog-lb-2015.12.06 5 1 2725 0 1.7mb 1.7mb yellow open elblog-lb-2016.04.25 5 1 32 0 120.9kb 120.9kb yellow open elblog-lb-2016.04.26 5 1 4 0 30.5kb 30.5kb yellow open elblog-lb-2016.04.27 5 1 3 0 32.5kb 32.5kb yellow open elblog-lb1-2015.09.15 5 1 4 0 31.5kb 31.5kb yellow open elblog-lb2-2015.09.15 5 1 41 0 95kb 95kb
sincedbを確認します。最も新しいログファイルのタイムスタンプが書き込まれています。
# cat /var/lib/logstash/sincedb 2016-04-27 07:58:37 UTC
sincedbファイルに日付指定した場合
Logstashを停止し、一度、作成されたインデックスを全て削除します。
# /etc/init.d/logstash stop Killing logstash (pid 22659) with SIGTERM Waiting logstash (pid 22659) to die... Waiting logstash (pid 22659) to die... logstash stopped. # curl "search-********.ap-northeast-1.es.amazonaws.com/elblog-*" -XDELETE # curl search-********.ap-northeast-1.es.amazonaws.com/_cat/indices |grep elblog
sindbファイルに日付を指定します。今回は2016年1月1日を指定します。
# vi /var/lib/logstash/sincedb 2016-01-01 00:00:00 UTC
Logstashを起動します。
# /etc/init.d/logstash start logstash started.
しばらく待つと、インデックスが作成されています。今度は2016年1月1日以降のログファイルのみを取得していることが分かります。
# curl search-********.ap-northeast-1.es.amazonaws.com/_cat/indices |grep elblog yellow open elblog-lb-2016.04.25 5 1 32 0 129.8kb 129.8kb yellow open elblog-lb-2016.04.27 5 1 3 0 32.5kb 32.5kb yellow open elblog-lb-2016.04.26 5 1 4 0 37.1kb 37.1kb
まとめ
いかがでしたでしょうか?
これで必要な日付以降のオブジェクトに絞ってデータを取り込むことができます。
以下にLogstashをあまりご存じない方に向けて補足情報を記載します。
Logstash Inputプラグインのタイプ
LogstashのInputプラグインは大きく分類して、受信をイベントとして動作するプッシュモデル(Lambdaっぽく勝手に命名)、自らIntervalなどによって定期イベントとして動作するプルモデル(勝手にry)があります。プッシュモデルはbeatやtcp、syslogなどがあり、プルモデルはfileやs3、jdbcなどがあります。
プッシュモデルの場合、データを受信したら、受信したデータを処理すればよいので、データの差分、重複を管理する必要がありません。一方、プルモデルは取得先のデータに何が追加されたのか、何が変更されたのか管理しないと、データの重複、欠落が発生していしまいます。
S3 Inputプラグインの差分管理
上記問題を解決するためにプルモデルのinputプラグインでは前回からの差分を判断するために、各inputプラグインの特性に合わせて差分情報を管理しています。
S3 Inputプラグインの場合、差分管理用のファイルで日時を管理します。
管理用ファイルがない場合、設定ファイルで指定したbucket
、prefix
に前方一致する全てのオブジェクトを取得します。処理後にオブジェクトのタイムスタンプを管理用ファイルに書き込みます。
管理用ファイルがある場合、設定ファイルで指定したbucket
、prefix
に前方一致する条件に加えて、管理用ファイルのタイムスタンプより新しいオブジェクトを取得します。処理後、同様にオブジェクトのタイムスタンプを管理用ファイルに書き込みます。
この方式によりS3 Inputプラグインは継続した差分オブジェクトの取得が可能となります。
S3 Inputプラグインにおけるデータロスト、データ重複
ここから少し細かい話となります。
Logstashのプロセス、サーバダウン時にS3 Inputプラグインにおけるデータロスト、データ重複が気になったのでソースコードを読んで、S3 Inputプラグインにおける対策を読み解きました。
S3 Inputプラグインの処理フローはざっくり、対象となるオブジェクトの洗い出し、対象オブジェクトの処理順番をタイムスタンプにソート、対象オブジェクトを処理、対象オブジェクトのタイムスタンプを管理用ファイルに書き込み、という流れとなります。これにより、処理済みのオブジェクトを取得しない、未処理のオブジェクトを取得する対策を取っています。
例えば、タイムスタンプが古い順にオブジェクトA、B、Cがあるとします。Aの処理を完了し、Bの処理途中でプロセスダウンが発生した場合、管理用ファイルにはAのタイムスタンプが書き込まれています。次にLogstashを起動した場合、Aのタイムスタンプより新しいオブジェクトを対象としてオブジェクト洗い出しを行い、B、Cが対象となります。
データロスト、データ重複の可能性は0ではありませんが、最小限に押さえてくれているのは嬉しいですね。