Logstash に Dead Letter Queue の機能が追加されました

こんにちは、藤本です。

現地時間 7/6 に Elastic Stack 5.5.0 がリリースされました。

いくつか Elastic Stack 5.5.0 の新機能を試していますのでその他記事は下記をご参照ください。

今回は Logstash 5.5.0 で実装された Dead Letter Queue を試してみました。

Logstash と Dead Letter Queue

Logstash は入力されたデータを加工・変換して、色々な出力先にデータを送信することができます。例えば、ログファイルを定期的に読み取って、ログメッセージを意味のあるデータにパースして、アクセス元 IPアドレスから GeoIP データベースを参照してアクセス元情報を抽出して、Elasticsearch にインデキシングできます。他にも、IoT デバイスから HTTPリクエストでデバイスデータを受け取って、日時データの UNIX タイムスタンプフォーマットを ISO 8601 フォーマットに変換して、データベースにレコード登録できます。このように Logstash は様々な ETL をコードレスで実施することができます。これを実現するのが多くのプラグインとなります。プラグインは Input / Filter / Output に分かれており、利用用途に合わせてプラグインを組み合わせることで ETL を実現することができます。

ETL を実装する上で悩みなのが不測のデータです。例えば、想定したメッセージフォーマットと異なっていてパースできない、データベースにレコード登録しようとしたらテーブル定義と違っていて登録に失敗した、Elasticsearch にインデキシングしようとしたら Mapping タイプが異なっていてインデキシングに失敗した。これらの不測のデータはプラグインによってはデータを破棄してしまいます。それが本当に不要なデータであれば問題ないでしょうが、実はそのデータが何か異常を表すデータだったり、実はインプットデータが新しい形式となっていたりする場合、破棄されずに残したいですよね。そこで Dead Letter Queue です。

Logstash の Dead Letter Queue は処理に失敗したデータをファイルに書き出すことでキューイングします。ファイルを確認することで不測のデータを確認することができます。また Input プラグインとしてDead Letter Queue input pluginが実装されています。Dead Letter Queue input plugin によって、キューイングされたデータに任意の処理を行うことができます。例えば、Dead Letter Queue にキューイングされたデータをまとめて分析するために S3 output plugin を使って、S3 に集約したり、Dead Letter Queue にキューイングされたことをすぐに検知するために SNS によってメール通知したりすることができます。下記図は Dead Letter Queue の利用イメージで 公式ドキュメントから抜粋。

なお現在、Dead Letter Queue はelasticsearch-output-plugin にのみ実装されているようです。インデキシングするデータが Elasticsearch のインデックスとマッピングが合わない時に Dead Letter Queue へキューイングしてくれます。Dead Letter Queue にキューイングするのはプラグインの実装となります。今後はその他プラグインでも実装されていくようなのでプラグインのドキュメントページを見て確認しましょう。

試してみた

Dead Letter Queue にキューイングされる様子を確認してみましょう。

シチュエーションとしてはログファイルを Kibana で可視化するために、Logstash を利用してログファイルを読み取り、メッセージをパースして、Elasticsearch にインデキシングしています。想定するログのフォーマットは<ISO8601_TIMESTAMP> <INT>です。想定外のフォーマットのログメッセージがロギングされた時に破棄されずに Dead Letter Queue に保存されることを確認します。

動作環境

  • OS : CentOS 7.3
  • Elasticsearch : 5.5.0
  • Logstash : 5.5.0

Dead Letter Queue の有効化

Dead Letter Queue はデフォルト設定が無効であるため、設定ファイルにて明示的に有効化する必要があります。また Queue として保存するディレクトリパスを指定します。

# echo 'dead_letter_queue.enable: true
path.dead_letter_queue: "/var/lib/logstash/dlq"' > /etc/logstash/logstash.yml

キューファイルが作成されるディレクトリを作成しておきましょう。

# mkdir /var/lib/logstash/dlq
# chown logstash:logstash /var/lib/logstash/dlq

パイプライン設定

次に Logstash のパイプライン設定です。

パイプラインは File input plugin でファイルを読み取って、Grok filter plugin でパースして、Elasticsearch output plugin でインデキシングします。

# echo 'input {
  file {
    path => [ "/tmp/test.log" ]
  }
}

filter {
  grok {
    match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{WORD:status}" }
  }
}

output {
  elasticsearch {
    hosts => [ "localhost:9200" ]
  }
}' > /etc/logstash/conf.d/main.conf

Elasticsearch のインデックス作成

想定したログフォーマットでインデックステンプレートを作成します。

# curl -XPUT localhost:9200/_template/logstash -d '{
  "template": "logstash-*",
  "mappings": {
    "logs": {
      "properties": {
        "timestamp": {
          "type": "date"
        },
        "status": {
          "type": "long"
        }
      }
    }
  }
}'

Logstash 起動

Logstash を起動します。

# systemctl start logstash

動作確認

Dead Letter Queue にデータが入るか確認してみましょう。

まずは正常なログフォーマットでログ出力して、Elasticsearch にインデキシングされることを確認します。

# echo "$(date --iso-8601=seconds) 0" >> /tmp/test.log

Elasticsearch のインデックスを確認します。

# curl "localhost:9200/logstash-*/_search?pretty"
{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "logstash-2017.07.11",
        "_type" : "logs",
        "_id" : "AV0z_ybxfdueHSAdxBVZ",
        "_score" : 1.0,
        "_source" : {
          "path" : "/tmp/test.log",
          "@timestamp" : "2017-07-11T23:32:25.873Z",
          "@version" : "1",
          "host" : "localhost.localdomain",
          "message" : "2017-07-11T23:32:07+0000 0",
          "status" : "0",
          "timestamp" : "2017-07-11T23:32:07+0000"
        }
      }
    ]
  }
}

インデキシングされました。

次に、インデックスのマッピングとは合わないデータを登録してみましょう。

# echo "$(date --iso-8601=seconds) abc" >> /tmp/test.log

Elasticsearch のインデックスを確認します。

# curl "localhost:9200/logstash-*/_search?pretty"
{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "logstash-2017.07.11",
        "_type" : "logs",
        "_id" : "AV0z_ybxfdueHSAdxBVZ",
        "_score" : 1.0,
        "_source" : {
          "path" : "/tmp/test.log",
          "@timestamp" : "2017-07-11T23:32:25.873Z",
          "@version" : "1",
          "host" : "localhost.localdomain",
          "message" : "2017-07-11T23:32:07+0000 0",
          "status" : "0",
          "timestamp" : "2017-07-11T23:32:07+0000"
        }
      }
    ]
  }
}

データは増えていません。Dead Letter Queue にキューイングされているはずです。Dead Letter Queue を確認してみます。

# ls -l /var/lib/logstash/dlq/main/
total 40
-rw-r--r--. 1 logstash logstash 1737 Jul 12 00:17 10.log
-rw-r--r--. 1 logstash logstash    1 Jul 11 23:16 1.log
-rw-r--r--. 1 logstash logstash    1 Jul 11 23:16 2.log
-rw-r--r--. 1 logstash logstash    1 Jul 11 23:16 3.log
-rw-r--r--. 1 logstash logstash    1 Jul 11 23:17 4.log
-rw-r--r--. 1 logstash logstash    1 Jul 11 23:17 5.log
-rw-r--r--. 1 logstash logstash    1 Jul 11 23:18 6.log
-rw-r--r--. 1 logstash logstash    1 Jul 11 23:18 7.log
-rw-r--r--. 1 logstash logstash    1 Jul 11 23:18 8.log
-rw-r--r--. 1 logstash logstash    1 Jul 11 23:26 9.log

# cat /var/lib/logstash/dlq/main/10.log
1cW������b�2017-07-11T23:32:40.971Z�dMETA�dpathm/tmp/test.logdhostulocalhost.localdomain�dDATA�dpathm/tmp/test.logj@timestampx2017-07-11T23:32:40.893Zh@versiona1dhostulocalhost.localdomaingmessagex2elasticsearch*713795d1f5f7cdd0f106c00103c0254f104d57c0-3Could not index event to Elasticsearch. status: 400, action: ["index", {:_id=>nil, :_index=>"logstash-2017.07.11", :_type=>"logs", :_routing=>nil}, 2017-07-11T23:32:40.893Z localhost.localdomain 2017-07-11T23:32:40+0000 abc], response: {"index"=>{"_index"=>"logstash-2017.07.11", "_type"=>"logs", "_id"=>"AV0z_2FefdueHSAdxBVa", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse [status]", "caused_by"=>{"type"=>"number_format_exception", "reason"=>"For input string: \"abc\""}}}}

若干、バイナリが含まれていますが、キューイングされていました。インデキシングに失敗したデータだけではなく、Elasticsearch から 400 ステータスコードが返ってきたことや、status フィールドの型が一致しない旨のエラー内容なども含まれています。これは助かりますね。

続いて、Dead Letter Queue input plugin を利用して、Dead Letter Queue にキューイングされたデータを S3 に集約することを試そうと思ったのですが、下記エラーが表示されて利用することができませんでした。dead_letter_queueという名前だと認識されない?Dead Letter Queue input plugin は Logstash 5.5.0 ではプラグインとしてデフォルトインストールされてはいるのですが、、、

[2017-07-11T23:16:36,097][ERROR][logstash.agent           ] Cannot create pipeline {:reason=>"Couldn't find any input plugin named 'dead_letter_queue'. Are you sure this is correct? Trying to load the dead_letter_queue input plugin resulted in this error: Problems loading the requested plugin named dead_letter_queue of type input. Error: NameError NameError"}

αバージョンの 6.0.0 alpha 2 でも同じ事象が発生しているようなので同じ原因なのかな。6.0.0 alpha 3 で解消予定とのことなので、Logstash 5.5 も次のマイナーバージョンで解消されるのかな。

https://github.com/elastic/logstash/issues/7314

今回は Dead Letter Queue のファイルを確認するまでとします。

まとめ

いかがでしたでしょうか? ETL において Dead Letter Queue は欠かせない機能です。Logstash を利用している方で待ち望んでいる方も多かったのではないでしょうか。