Elastic StackのX-Packを試す(Watcher編)

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

検証時のX-Packはrc1です。
GAリリース時は仕様が変わっている可能性もございますのでご注意ください。

こんにちは、藤本です。
Elastic Stackもrc1までリリースされました。これからはGAリリースに向けてバグフィックスが主なアップデートとなっていくことでしょう。

全6回で X-Pack をおさらいがてら機能を試しつつ、新機能を確認していきます。

今回はWatcherを試してみました。

X-Packを試すシリーズのその他のエントリは以下をご参照ください。

Watcher - Alerting & Notification

WatcherはElasticsearchのインデックスの監視・通知をすることができます。X-PackのMonitoringでも監視という表現を使いましたが、MonitoringはElasticsearchクラスタ、Kibanaインスタンスを監視する機能です。WatcherはElasticsearchにインデクシングされるデータ(ドキュメント)を監視する機能です。ドキュメントに対して柔軟な監視条件、多数の通知方法を提供します。

監視・通知のパラメータとして以下の4種類があります。

  • トリガー
    • 監視プロセスを開始する条件を定義します
  • インプット
    • 監視したいデータを取得するElasticsearchのクエリを定義します
  • 条件
    • クエリの結果がどうなると通知したいかという条件を定義します
  • アクション
    • 条件を満たした結果、何をしたいのかを定義します

ユースケース

ドキュメントに挙げられているユースケースとしては以下のようなものがあります。

ログ監視

Watcherはログメッセージに特定の文字列が出力されたら、アクションが発生するというよくあるログ監視を行うことができます。ログ監視であれば、Zabbix、Datadogなどの一般的な監視ツール、監視サービスでもサポートしています。Elasticsearch + Watcherを利用する特徴としては、Elasticsearchの全文検索エンジンという強力なクエリにあるかと思います。ドキュメントにあるような単純に「error」という文字列が含まれているメッセージを監視したいといったユースケースであれば、監視ツール、監視サービスを利用すればいいでしょう。インプットのクエリにはAggregationも利用可能ですので、クエリの集計結果が一定条件にマッチしたら、メール通知するといったことも可能となります。

Elasticsearchクラスタ、ノード監視

WatcherはElasticsearchクラスタやノードに異常が発生した時に通知することが可能です。Watcherは任意のElasticsearch Web APIの結果を監視することができます。ElasticsearchのWeb APIはインデックスのクエリだけでなく、Elasticsearchクラスタ、ノードのステータスチェックのWeb APIも提供しています。

例:クラスタのステータスチェック
$ curl -u elastic "localhost:9200/_cluster/health?pretty"
Enter host password for user 'elastic':
{
  "cluster_name" : "elasticsearch",
  "status" : "yellow",
  "timed_out" : false,
  "number_of_nodes" : 1,
  "number_of_data_nodes" : 1,
  "active_primary_shards" : 58,
  "active_shards" : 58,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 57,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 50.43478260869565
}

上記APIを利用することで「status」が「yellow」、「red」になることを監視し、異常発生時に通知することができます。クラスタのステータス以外にもノード単位のステータス、JVMの使用状況なども取得可能ですので、Elasticsearchクラスタを運用する上で必要な監視を行うことができるのではないでしょうか。それによりElasticsearch自身でElasticsearchの監視体制を整えることができます。Watcherはマスターノードで動作するため、冗長化している環境であれば、監視環境の可用性も高めることができます。

サイト更新情報の監視

Watcherを利用することでアップデート情報などを監視し、通知するようなクローリングのようなことを簡単に実現することができます。サンプルでは「Meetup.com」の更新情報から特定のキーワードを含むものがあったら、イベントのURL、イベント名、開催地などをメールにて通知しています。「Meetup.com」ではWeb APIで更新情報をJSON形式で取得し続けることができます。取得したJSONをLogstashを使って、Elasticsearchにインデキシングし続けます。インデキシングされたデータ(ドキュメント)をWatcherで監視することで実現しています。

Watcherの強みはLogstashやFluentdの豊富なPluginから取得・加工したデータ、MetricbeatやPacketbeatの今まで監視ツールが対応していなかったデータを利用することで様々なユースケースを展開することができることではないでしょうか。

Watcher設定内容

ここからはWatcherの設定内容を紹介します。それによって何ができるのか見えて来るかと思います。

トリガー

トリガーは監視プロセスを発火する条件を定義します。現在はスケジュールトリガーのみが用意されています。

スケジュールトリガー

スケジュールトリガーは指定した時間、指定したインターバルで発火することが可能です。

指定方法としては大きく以下の3種類があります。

時間指定

hourly、daily、weekly、monthly、yearlyといった単位で指定した時間をスケジュール指定することが可能です。

例えば、毎時0分に実行したい場合、以下のように指定します。

{
  "trigger" : {
    "schedule" : {
      "hourly" : { "minute" : 0 }
    }
  }
}
Cron指定

Linux、Unixでお馴染みのcron形式でスケジュール指定することが可能です。

例えば、毎時0分に実行したい場合、以下のように指定します。

{
  "trigger" : {
    "schedule" : {
      "cron" : "0 * * * * ?"
    }
  }
}
インターバル指定

時間の指定はなく、◯秒毎、◯分毎などいった間隔のみを指定することが可能です。

例えば、5分毎に実行したい場合、以下のように指定します。

{
  "trigger" : {
    "schedule" : {
      "interval" : "5m"
    }
  }
}

インプット

インプットはトリガーにより発火した時に監視・通知に利用したいデータの抽出方法を定義します。現在は固定値(simple)、検索クエリ(search)、HTTPリクエスト(http)が用意されています。またこれらインプットを組み合わせる(chain)こともできます。

Simple Input

あまり使い道が分かっていないのですが、固定値のデータを利用することが可能です。Chain Inputや後続プロセスのConditionActionのパラメータとして利用するのかな。。

Search Input

ElasticsearchのSearch APIを利用することでクエリの結果を利用することが可能です。パラメータにインデックス、タイプ(省略可)、Search APIのリクエストボディを指定します。

例えば、インデックスlogstash-*@messageフィールドにerrorを含むドキュメントを利用したい場合、以下のように指定します。

{
  "input" : {
    "search" : {
      "request" : {
        "indices" : [ "logstash-*" ],
        "body" : {
          "query" : {
            "match" : { "@message" : "error"}
          }
        }
      }
    }
  }
}

HTTP Input

任意のWebリクエストのレスポンスを利用することが可能です。ElasticsearchのWeb APIに関わらず、外部WebサービスへのWebリクエストのレスポンスも利用することが可能です。

例えば、ユースケースに記載したようなElasticsearchクラスタのステータスチェックの結果を利用したい場合、以下のように指定します。

{
  "input" : {
    "http" : {
      "request" : {
       "host" : "localhost",
       "port" : 9200,
       "path" : "/_cluster/health"
      }
    }
  }
}

Chain Input

上記3つのインプットを任意の組み合わせ・順番で順次実行し、最終的な結果を利用することが可能です。インプットの結果を次のインプットのパラメータに利用することもできます。

コンディション

コンディションはインプットの結果がどのようになった時にアクションを実行するのかという判定基準を定義します。

Always Condition

常にアクションを実行します。日時、週次レポートのようなElasticsearchのクエリから取得したデータを利用したい場合に使うことができます。

Never Condition

常にアクションを実行しません。ドキュメントには動作確認用途で利用すると記載されていますが、、、動作確認用途ならLogging Actionとか、Actionを定義しない方がいいような。

Compare Condition / Array Compare Condition

インプットの結果などと何かしらを比較し、判定基準が正の時にアクションを実行します。比較可能な値はインプットの結果、Metadata、実行時間などがあります。比較方法は等しい/等しくない、以上/以下、より大きい/より小さいがあります。

例えば、Search Inputの結果の件数が1件以上あることを判定基準としたい場合、以下のように記述します。

{
  "condition" : {
    "compare" : {
      "ctx.payload.hits.total" : { 
        "gte" : 1
      }
    }
  }
}

Script Condition

任意のスクリプトを記述し、結果がtrueの時にアクションを実行します。Compare Conditionよりももっと柔軟なコンディションを定義したい場合に有効でしょう。Elasticsearchのスクリプトで利用可能な言語を利用できます。X-PackではElasticsearch 5系から追加されたPainless scripting languageを利用することができます。

例えば、Search Inputの結果の件数が1件以上あることを判定基準としたい場合、以下のように記述します。

{
  "condition" : {
    "script" : {
      "lang" : "painless",
      "inline" : "return ctx.payload.hits.total > threshold",
      "params" : {
        "threshold" : 1
      }
    }
  }
}

アクション

アクションはコンディションの判定基準を満たした時に実施する内容を定義します。アクションは7種類あります。名前を見ればなんとなくわかると思うので、説明は省略します。

  • Email Action
  • Webhook Action
  • Index Action
  • Logging Action
  • HipChat Action
  • Slack Action
  • PagerDuty Action

試してみた

以下のような監視・通知の仕組みを作ってみました。

watcher

  • Metricbeatで10秒毎にサーバのリソース使用状況を取得(インデックス名はMetricbeat-YYYY.MM.DD)
  • 監視間隔は1分毎
  • 瞬間的な高負荷は無視したいので、1分間(6回)のサーバのCPU使用率の平均値が80%を超えるとアラート
  • サーバのCPU使用率だけだと何が原因か分からないので、どのプロセスがどのくらいCPU使用率なのかCPU使用率順にTop5を通知
  • 通知先はSlack

上記の設定値単位で確認しましょう。

トリガー

トリガーは1分毎なのでインターバル1分で設定します。

  "trigger" : { "schedule" : { "interval" : "1m" }},

インプット

インプットは少し分解しながら説明します。

今回はMetricbeatによってインデキシングされたデータを利用したいので、Search Inputを利用します。対象のインデックスはMetricbeatの標準設定で作成されるmetricbeat-*を指定します。

  "input" : {
    "search" : {
      "request" : {
        "indices" : [ "metricbeat-*" ],
        "body" : {
:
        }
      }
    }
  },

続いて、Query DSLです。過去1分間のデータを取得、集計したいので、Range Queryで現在時刻 - 1分以上のデータを抽出します。

        "body" : {
          "query": {
            "range": {
              "@timestamp": {
                "gte": "now-1m"
              }
            }
          },
          "aggs": {
:
          }
        }

次に集計したいデータは2種類です。

一つの集計は通知の有無を判断するコンディションに利用するサーバのCPU使用率です。今回はuser timeとcpu timeを合算したものをCPU使用率と扱います。プロセス単位のCPU使用率も集計されるため、Filter Aggragationを利用して、サーバのCPU使用率だけのデータに絞ります。

          "aggs": {
            "system": {
              "filter": { 
                "term": {
                  "metricset.name": "cpu"
                }
              },
              "aggs": {
                "cpuavg": {
                  "avg": {
                    "script": {
                      "inline": "doc['system.cpu.user.pct'].value + doc['system.cpu.user.pct'].value"
                    }
                  }
                }
              }
            }
          }

もう一つの集計は通知の文面に利用するプロセス単位のCPU使用率です。今回はCPU使用率が高い上位5プロセスを表示させたいため、Terms Aggregationを利用してCPU使用率の降順に5つを取得します。

          "aggs": {
            "proc": {
              "terms": {
                "field": "system.process.name",
                "order" : { "cpuusage" : "desc" },
                "size": 5
              },
              "aggs": {
                "cpuusage": {
                  "avg": {
                    "field": "system.process.cpu.total.pct"
                  }
                }
              }
            }
          }

コンディション

コンディションではサーバのCPU使用率が1分間平均で80%以上かの判定基準を定義します。一つ目の集計結果でCPU使用率を取得できているため、Compare Conditionで0.8(80%)と比較します。

  "condition" : {
    "compare" : {
      "ctx.payload.aggregations.system.cpuavg.value" : {
        "gte" : 0.8
      }
    }
  },

アクション

アクションではSlackにアラートメッセージを送信します。

  "actions" : {
    "notify-slack" : {
      "throttle_period" : "10m",
      "slack" : {
        "account" : "monitoring",
        "message" : {
          "from" : "watcher",
          "to" : [ "#x-pack" ],
          "text" : "cpu usage over 80%\nrank:\nNo1. {{ctx.payload.aggregations.proc.buckets.0.key}} : {{ctx.payload.aggregations.proc.buckets.0.cpuusage.value}}\nNo2. {{ctx.payload.aggregations.proc.buckets.1.key}} : {{ctx.payload.aggregations.proc.buckets.1.cpuusage.value}}\nNo3. {{ctx.payload.aggregations.proc.buckets.2.key}} : {{ctx.payload.aggregations.proc.buckets.2.cpuusage.value}}\nNo4. {{ctx.payload.aggregations.proc.buckets.3.key}} : {{ctx.payload.aggregations.proc.buckets.3.cpuusage.value}}\nNo5. {{ctx.payload.aggregations.proc.buckets.4.key}} : {{ctx.payload.aggregations.proc.buckets.4.cpuusage.value}}"
        }
      }
    }
  }

accountはElasticsearchの設定ファイルelasticsearch.ymlに設定したSlackアカウント情報が利用されます。設定内容は公式ドキュメントのページをご参照ください。

今回は送信者、宛先、本文はインデックスの設定で管理しているため、Incoming Webhook URLのみ設定しています。urlは個別の環境出払いだしたURLに置き換えてください。

# cat /etc/elasticsearch/elasticsearch.yml
:
xpack.notification.slack:
  account:
    monitoring:
      url: https://hooks.slack.com/services/xxxxxxxx/xxxxxxxxx/xxxxxxxxxxxxxxxxxxxxxx

elasticsearch.yml設定後は設定値を反映させるにはサービスの再起動が必要となります。

# systemctl restart elasticsearch

動作確認

それでは上記を設定して動作確認してみましょう。

# curl -XPUT -u elastic "http://localhost:9200/_xpack/watcher/watch/cpu-monitor" -d '{
  "trigger" : { ... },
  "input" : { ... },
  "condition" : { ... },
  "actions" : { ... }  
}'
Enter host password for user 'elastic':
{
  "_id": "cpu-monitor",
  "_version": 1,
  "created": true
}

Watcherの実行結果はインデキシングされますので、Search APIで履歴を取得可能です。最新の実行結果を見たい場合、実行時間の降順で取得します。結果が長いので省略しますが、負荷がかかっていない状況だと、CPU使用率が9%と、80%に達していないため、条件に一致していません。mettrueの場合は条件が一致、falseの場合は条件が不一致を表します。条件に一致していないため、実行されたアクション(actions)も空となっています。

curl -XGET -u elastic "http://localhost:9200/.watcher-history*/_search" -d'
{
  "sort": [
    {
      "result.execution_time": "desc"
    }
  ]
}'
Enter host password for user 'elastic':
{
:
            "condition": {
              "type": "compare",
              "status": "success",
              "met": false,
              "compare": {
                "resolved_values": {
                  "ctx.payload.aggregations.system.cpuavg.value": 0.09066666666666667
                }
              }
            },
            "actions": []
:
}

次に負荷をかけてみます。今回は簡単にyesコマンドで負荷をかけます。

少し待つと、Slackに通知されます。

Slack

履歴も見てみます。mettrueとなっていて、actionsにはSlack通知の情報が記載されています。

curl -XGET -u elastic "http://localhost:9200/.watcher-history*/_search" -d'
{
  "sort": [
    {
      "result.execution_time": "desc"
    }
  ]
}'
Enter host password for user 'elastic':
{
:
            "condition": {
              "type": "compare",
              "status": "success",
              "met": true,
              "compare": {
                "resolved_values": {
                  "ctx.payload.aggregations.system.cpuavg.value": 0.8798333333333334
                }
              }
            },
            "actions": [
              {
                "id": "notify-slack",
                "type": "slack",
                "status": "success",
                "slack": {
                  "account": "monitoring",
                  "sent_messages": [
                    {
                      "status": "success",
                      "to": "#x-pack",
                      "message": {
                        "from": "watcher",
                        "text": "cpu usage over 80%\nrank:\nNo1. yes : 0.7807999999999999\nNo2. java : 0.09733333333333334\nNo3. node : 0.008583333333333333\nNo4. metricbeat : 0.005750000000000001\nNo5. kworker/u2:2 : 0.0021999999999999997"
                      }
                    }
                  ]
                }
              }
            ]
:
}

ElasticsearchのSearch APIが分かっていれば、スクリプトを組まなくとも柔軟な監視内容、通知内容を設定することができました。

新機能

Watcher 2系からX-Pack 5系への大きなアップデートはなさそうです。

HTTP Inputの結果にResponseヘッダ、ステータスコードが追加

HTTP Inputの結果にHTTP Responseのヘッダ、ステータスコードが追加されました。それぞれctx.payload._headerctx.payload._status_codeで参照することができます。

# curl "localhost:9200/.watcher-history*/_search" -d '{ "sort": [ { "result.execution_time": { "order": "desc" } } ] }'
Enter host password for user 'elastic':
{
:
  "hits": {
:
    "hits": [ {
:
          "result": {
:
            "input": {
:
              "payload": {
:
                "_headers": { # ⇐ 追加されました
                  "content-length": [ "397" ],
                  "content-type": [ "application/json; charset=UTF-8" ]
                },
                "_status_code": 200, # ⇐ 追加されました
:
              },
            },
:
          },
:
        },
:
      }
    }
  }
}

Delete Watch APIがforceオプションをサポート

こんな感じなのかな(forceが必要な場面に出くわしてない)

# curl -XDELETE -u elastic _xpack/watcher/watch/test -d '{ "force" : true }'
Enter host password for user 'elastic':
{
  "_id": "test",
  "_version": 139,
  "found": true
}

実行中のWatchを変更、削除をサポート

実行中のWatchの変更、削除が可能となりました。実行時間が長くなってしまった設定を変更、削除することができます。

まとめ

いかがでしたでしょうか?

私は今回始めてWatcherを利用してみたのですが、機能的にシンプルで、ドキュメントも簡潔にまとまっているため、簡単に利用することができました。またLogstash、BeatsなどElastic Stackと組み合わさることで多くの監視・通知のユースケースが実装できるのかなと思いました。

ただ機能的には物足りなさを感じました。現在、トリガーがスケジュールしかないので、Percolator APIなどと組み合わさって、特定条件のクエリが来たらトリガーされるようなイベントドリブンなトリガーがあれば面白そうだなと思いました。また今回の設定だけでも80行近くのJSONとなりました。API/インデックスで設定・管理するのはDev Toolsを利用しても面倒に感じました。KibanaのWeb GUIから設定・管理できるようだと嬉しいです。