Ingest Node を使って、より簡単に ELB のアクセスログを AWS Lambda で Elasticsearch に取り込む

Elasticsearch Service

こんにちは、藤本です。

先日、Amazon Elasticsearch Service で Elasticsearch 5.1 が利用可能となり、Elasticsearch 5 系の新機能である Ingest Node を利用できるようになりました。

そこで以前エントリした ELB のアクセスログを Amazon Elasticsearch Service へ Lambda で取り込む実装を Ingest Node を利用して、Lambda の実装を減らしたいと思います。

Elasticsearch Ingest Node

Elasticsearch はスキーマレスで JSON 形式でデータを取得、登録します。ログ解析のユースケースでも有名な Elasticsearch(Kibana) ですが、ログメッセージをそのまま送ると、いい感じに可視化できるわけではありません。可視化する目的に合わせてログメッセージを JSON 形式に変換する必要があります。Logstashや、Fluentd といった ETL の機能を持つツールを利用することで簡単に実装することができます。ただし、Lambda など独自実装を利用する場合、正規表現などで変換処理を実装する必要がありました。

それが Elasticsearch 5 系で追加された Ingest Node により、データを受け取った Elasticsearch 側で変換処理ができるようになりました。イメージとしては Logstash のいくつかのフィルター機能が Elasticsearch に実装された感じです。

Ingest Node に関して下記エントリをご参照ください。

やってみた

ELB のアクセスログを Elasticsearch に取り込む手順は以下となります。

  1. Amazon Elasticsearch Service の起動
  2. Ingest Pipeline の設定
  3. AWS Lambda ファンクションの作成
  4. 動作確認

1. Amazon Elasticsearch Service の起動

Amazon Elasticsearch Service の起動は下記エントリをご参照ください。

2. Ingest Pipeline の設定

今回利用する変換機能(Processor)は GrokRemoveUserAgent の 3つです。

Grok

Grok は文字列のパターン(正規表現)によって、各要素をキーバリューの JSON 形式に変換します。それにより、タイムスタンプ、アクセス元、URLのパスなど意味のある単位で取り出すことができます。今回は ELB のログメッセージパターンに合わせて、設定します。ALB のログのパターンは下記をご参照ください。

Remove

フィールドを削除することができます。今回は元のメッセージ、ユーザーエージェントを削除しています。無駄なデータはサイズ肥大化、パフォーマンス劣化を引き起こします。

UserAgent

UserAgent ヘッダの値からアクセス元の OS名、OSバージョン、ブラウザなどを解析します。

これら変換機能を組み合わせた設定が以下となります。

$ curl -XPUT 'https://search-xxxxxxxxxxx.ap-northeast-1.es.amazonaws.com/_ingest/pipeline/elblog' -d '{
  "processors": [{
    "grok": {
      "field": "message",
      "patterns":[ "%{NOTSPACE:type} %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:elb} %{IP:clientip}:%{INT:clientport:int} (?:(%{IP:targetip}:?:%{INT:targetport:int})|-) %{NUMBER:request_processing_time:float} %{NUMBER:target_processing_time:float} %{NUMBER:response_processing_time:float} %{INT:elb_status_code:int} %{INT:target_status_code:int} %{INT:received_bytes:int} %{INT:sent_bytes:int} \"(?:%{WORD:verb} %{URIPROTO:proto}://?(?:%{URIHOST:urihost})?(?:%{URIPATH:path}(?:%{URIPARAM:params})?)?(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" \"%{DATA:agent}\"" ],
      "ignore_missing": true
    }
  },{
    "remove":{
      "field": "message"
    }
  }, {
    "user_agent": {
      "field": "agent",
      "target_field": "user_agent",
      "ignore_failure": true
    }
  }, {
    "remove": {
      "field": "agent",
      "ignore_failure": true
    }
  }]
}'

3. AWS Lambda ファンクションの作成

AWS Lambda ファンクションは S3 へのプットイベント(5分毎のアクセスログの生成)をトリガーにログファイルをダウンロードして、Elasticsearch へ送信するだけです。

3.1. IAM Role の作成

Lambda ファンクション用 IAM Role を作成します。S3 、および Amazon ES への POST 権限を付与します。

3.2. Lambda ファンクションの作成

Lambda ファンクションを作成します。トリガーには ALB の出力先の S3 バケットへの PUT を指定します。Runtime は前回同様、Python 2.7 を指定します。ソースコードはブログエントリの最後に添付します。

4. 動作確認

ALB にアクセスし、ログを出力します。ALB は 5分毎でログ出力していますので少し待ちます。

Kibana へアクセスし、データを確認します。送信元IPアドレスや URLパス、受信データ量などがフィールドに変換されています。

Kibana

まとめ

いかがでしたでしょうか? Ingest Node により簡単に実装を減らすことができました。Ingest Pipeline の設定を変更するだけで同じ Lambda ファンクションで CloudFront のアクセスログも可視化することができます。

ソースコード