LogstashのS3 Inputプラグインで取得するオブジェクトを日付でフィルターする

LogstashのS3 Inputプラグインで取得するオブジェクトを日付でフィルターする

Clock Icon2016.05.29

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

こんにちは、藤本です。

先日、CloudFrontのアクセスログをKibanaで可視化するでS3にあるオブジェクトを定期的に取得するS3 Inputプラグインをご紹介しました。エントリの中で初回起動時に設定ファイルで指定したS3バケット、オブジェクトパスのプレフィックスの中から全てのオブジェクトを取得するため、オブジェクトサイズが大きく、オブジェクト数が多いほど、比例して処理時間、コストが必要となります。例えばELBを数年稼働している環境では膨大なログファイル数となり、初期インポートに時間がかかるし、S3バケットからの転送料金も嵩んでしまいますし、Elasticsearchに送るのであれば、ログファイルのボリュームに応じたElasticsearchのディスクサイズを用意しないといけません。

そこで今回は指定した日時以降のオブジェクトに絞って取得する小ネタをご紹介します。

結論

sincedbファイルに取得したい以降の日付をISO 8601の拡張形式(YYYY-MM-DDTHH:mm:ssZ)で記述してください。

今回のエントリの前提の説明やソースコードを読んで分かったことを本エントリの最後に記載します。興味がある人はそちらも読んでみてください。

やってみた

環境

  • OS : Amazon Linux 2016.03
  • Logstash : 2.3.2
  • Elasticsearch : Amazon Elasticsearch Service

環境準備

Amazon Elasticsearch Serviceのドメイン構築はlogstashでELBのログを2週間分だけAmazon Elasticsearch Serviceに取り込むを参考に、Logstashのインストール、amazon_es outputプラグインインストールは[新機能]Amazon Elasticsearch Serviceがリリースされました! | Developers.IOを参考に環境準備してください。

sincedbファイルがない場合

Logstashのs3 inputプラグインの設定ファイルでsincedbのファイルパスを指定します。

input {
  s3 {
    bucket => "<S3バケット名>"
    region => "<S3バケットリージョン>"
    sincedb_path => "/var/lib/logstash/sincedb"
  }
}

sincedbファイルは最初ありません。

# ls /var/lib/logstash/sincedb
ls: cannot access /var/lib/logstash/sincedb: No such file or directory

作成予定のインデックスはありません。

# curl search-********.ap-northeast-1.es.amazonaws.com/_cat/indices |grep elblog

Logstashを起動します。

/etc/init.d/logstash start
logstash started.

しばらく待つと、S3からアクセスログファイルを取得し、ログメッセージ内に含まれる日付に応じたインデックスが作成されます。私の環境のELBが動作検証的にたまに起動するだけなのでログの日付がかなり分散していますが、古いものは2015年11月25日、新しいものは2016年4月27日にアクセスログを出力しています。

# curl search-********.ap-northeast-1.es.amazonaws.com/_cat/indices |grep elblog
yellow open elblog-lb-2015.11.25       5 1    125 0 116.1kb 116.1kb
yellow open elblog-lb-2015.11.28       5 1     79 0 142.1kb 142.1kb
yellow open elblog-lb-2015.11.29       5 1     75 0  92.4kb  92.4kb
yellow open elblog-lb-2015.12.01       5 1      5 0  51.2kb  51.2kb
yellow open elblog-lb-2015.12.06       5 1   2725 0   1.7mb   1.7mb
yellow open elblog-lb-2016.04.25       5 1     32 0 120.9kb 120.9kb
yellow open elblog-lb-2016.04.26       5 1      4 0  30.5kb  30.5kb
yellow open elblog-lb-2016.04.27       5 1      3 0  32.5kb  32.5kb
yellow open elblog-lb1-2015.09.15      5 1      4 0  31.5kb  31.5kb
yellow open elblog-lb2-2015.09.15      5 1     41 0    95kb    95kb

sincedbを確認します。最も新しいログファイルのタイムスタンプが書き込まれています。

# cat /var/lib/logstash/sincedb
2016-04-27 07:58:37 UTC

sincedbファイルに日付指定した場合

Logstashを停止し、一度、作成されたインデックスを全て削除します。

# /etc/init.d/logstash stop
Killing logstash (pid 22659) with SIGTERM
Waiting logstash (pid 22659) to die...
Waiting logstash (pid 22659) to die...
logstash stopped.

# curl "search-********.ap-northeast-1.es.amazonaws.com/elblog-*" -XDELETE

# curl search-********.ap-northeast-1.es.amazonaws.com/_cat/indices |grep elblog

sindbファイルに日付を指定します。今回は2016年1月1日を指定します。

# vi /var/lib/logstash/sincedb
2016-01-01 00:00:00 UTC

Logstashを起動します。

# /etc/init.d/logstash start
logstash started.

しばらく待つと、インデックスが作成されています。今度は2016年1月1日以降のログファイルのみを取得していることが分かります。

# curl search-********.ap-northeast-1.es.amazonaws.com/_cat/indices |grep elblog
yellow open elblog-lb-2016.04.25       5 1     32 0 129.8kb 129.8kb
yellow open elblog-lb-2016.04.27       5 1      3 0  32.5kb  32.5kb
yellow open elblog-lb-2016.04.26       5 1      4 0  37.1kb  37.1kb

まとめ

いかがでしたでしょうか?
これで必要な日付以降のオブジェクトに絞ってデータを取り込むことができます。

以下にLogstashをあまりご存じない方に向けて補足情報を記載します。

Logstash Inputプラグインのタイプ

LogstashのInputプラグインは大きく分類して、受信をイベントとして動作するプッシュモデル(Lambdaっぽく勝手に命名)、自らIntervalなどによって定期イベントとして動作するプルモデル(勝手にry)があります。プッシュモデルはbeattcpsyslogなどがあり、プルモデルはfiles3jdbcなどがあります。

プッシュモデルの場合、データを受信したら、受信したデータを処理すればよいので、データの差分、重複を管理する必要がありません。一方、プルモデルは取得先のデータに何が追加されたのか、何が変更されたのか管理しないと、データの重複、欠落が発生していしまいます。

S3 Inputプラグインの差分管理

上記問題を解決するためにプルモデルのinputプラグインでは前回からの差分を判断するために、各inputプラグインの特性に合わせて差分情報を管理しています。

S3 Inputプラグインの場合、差分管理用のファイルで日時を管理します。
管理用ファイルがない場合、設定ファイルで指定したbucketprefixに前方一致する全てのオブジェクトを取得します。処理後にオブジェクトのタイムスタンプを管理用ファイルに書き込みます。
管理用ファイルがある場合、設定ファイルで指定したbucketprefixに前方一致する条件に加えて、管理用ファイルのタイムスタンプより新しいオブジェクトを取得します。処理後、同様にオブジェクトのタイムスタンプを管理用ファイルに書き込みます。

この方式によりS3 Inputプラグインは継続した差分オブジェクトの取得が可能となります。

S3 Inputプラグインにおけるデータロスト、データ重複

ここから少し細かい話となります。

Logstashのプロセス、サーバダウン時にS3 Inputプラグインにおけるデータロスト、データ重複が気になったのでソースコードを読んで、S3 Inputプラグインにおける対策を読み解きました。

S3 Inputプラグインの処理フローはざっくり、対象となるオブジェクトの洗い出し、対象オブジェクトの処理順番をタイムスタンプにソート、対象オブジェクトを処理、対象オブジェクトのタイムスタンプを管理用ファイルに書き込み、という流れとなります。これにより、処理済みのオブジェクトを取得しない、未処理のオブジェクトを取得する対策を取っています。

例えば、タイムスタンプが古い順にオブジェクトA、B、Cがあるとします。Aの処理を完了し、Bの処理途中でプロセスダウンが発生した場合、管理用ファイルにはAのタイムスタンプが書き込まれています。次にLogstashを起動した場合、Aのタイムスタンプより新しいオブジェクトを対象としてオブジェクト洗い出しを行い、B、Cが対象となります。

データロスト、データ重複の可能性は0ではありませんが、最小限に押さえてくれているのは嬉しいですね。

参考資料

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.