S3に保存したAWS WAFログをAthenaで分析してみた

WAFログをAthenaで分析してみたブログです。サンプルクエリもあるよ。
2022.03.09

こんにちは、AWS事業本部コンサルティング部の芦沢です。

今回の記事では、S3に出力したAWS WAFログをAthenaで分析してみたいと思います。

類似のブログは存在しますが、Athenaのマネジメントコンソールのアップデートや、個人的に拘りたい箇所もあったので新しくブログを書いてみました。

事前準備

ログの分析をするためには、分析元になるログが必要です。

本ブログでは、S3に出力したWAFログが準備されている前提で手順を進めていきます。

ログの分析をやりたくても実行環境がないよという方は、以前のブログで、AWS WAFをCloudFormationで構築する記事を書いていますので、以下の記事を参考に環境構築してみましょう。

環境構成

本記事で取り扱うAWS環境は以下の構成です。

背景が紫色の範囲を今回対象としています。

  • Amazon Athena
  • S3バケット
    • WAFアクセスログ出力用
    • Athenaクエリ保存用

やってみた

それでは早速WAFログを分析してみたいと思います。

まずはAWSマネジメントコンソールのAmazon Athenaへアクセスしてください。

ここから始めます。

実行準備

まずは、Athenaの実行に必要なリソースが揃っているか確認しましょう。

  • データソース
  • データベース
  • WAFログ保存済みのSバケット
  • Athenaクエリログ保存用Sバケット

クエリ実行結果保存設定(S3)

Athenaクエリの実行結果を保存するS3バケットの設定をします。

※既に設定済みの方は飛ばしていただいて問題ありません。


Athenaコンソール画面のクエリエディタ-設定から、クエリの結果と暗号化の設定の管理をクリックします。

S3を参照をクリックします。

WAFログを保存済みのS3バケットを選択し、選択をクリックします。

クエリ結果の場所に、選択したS3バケット名が表示されることを確認し、保存をクリックします。

クエリの結果の場所に設定したS3バケット名が表示されていることを確認できたらOKです。


データベース作成

Athenaデータベースを作成します。

※既に設定済みの方は飛ばしていただいて問題ありません。

Athenaコンソール画面のクエリエディタ-エディタを表示します。

クエリ欄に以下コマンドを入力します。

## <データベース名>には、データベース名として設定したい名称を入力
CREATE DATABASE <データベース名>

実行をクリックし、クエリは成功しました。と表示されたらOKです。

テーブル作成

Athenaコンソール画面のクエリエディタ-エディタから

以下のクエリを入力します。

今回クエリに使用している

  • テーブル名:waflogs
  • S3バケット名:aws-waf-logs-s3
CREATE EXTERNAL TABLE `waflogs`(
  `timestamp` bigint,
  `formatversion` int,
  `webaclid` string,
  `terminatingruleid` string,
  `terminatingruletype` string,
  `action` string,
  `terminatingRuleMatchDetails` array < struct <
    conditionType: string,
    location: string,
    matchedData: array < string >
    > >,
  `httpsourcename` string,
  `httpsourceid` string,
  `ruleGroupList` array < struct <
                    ruleGroupId: string,
                    terminatingRule: struct < ruleId: string, action: string >,
                    nonTerminatingMatchingRules: array < struct < action: string, ruleId: string > >,
                    excludedRules: array < struct < exclusionType: string, ruleId: string > >
   > >,
  `ratebasedrulelist` array<
                      struct<
                        ratebasedruleid:string,
                        limitkey:string,
                        maxrateallowed:int
                            >
                           >,
  `nonterminatingmatchingrules` array<
                                struct<
                                  ruleid:string,
                                  action:string
                                      >
                                     >,
  `httprequest` struct<
                      clientip:string,
                      country:string,
                      headers:array<
                              struct<
                                name:string,
                                value:string
                                    >
                                   >,
                      uri:string,
                      args:string,
                      httpversion:string,
                      httpmethod:string,
                      requestid:string
                      > 
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://aws-waf-logs-s3/AWSLogs/'

実行をクリックし、完了 クエリは成功しました。と表示されたらOKです。

テーブル欄に、作成したテーブルが表示されていることを確認しましょう。

ログ分析クエリ実行

作成したテーブルを指定して、ログ分析クエリを実行します。

後述のサンプルクエリ一覧を参考に、任意のクエリを入力します。

ここではCountされたアクセスのみを表示させるクエリを入力してみます。

SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST,
       httpsourcename,
       httpsourceid,
       countedrule.ruleid,
       rules.terminatingrule.ruleid AS rule_info,
       httprequest.clientip,
       httprequest.country,
       httprequest.uri,
       httprequest.args,
       httprequest.httpmethod
FROM
       "waflogs",
       UNNEST(nonterminatingmatchingrules) t(countedrule),
       UNNEST(rulegrouplist) t(rules)
WHERE
       countedrule.action = 'COUNT' AND rules.terminatingrule.action = 'BLOCK';

実行をクリックし、完了と表示され、結果配下にログの分析結果が表示されたらOKです。

結果欄から、クエリ実行による分析結果を確認できます。

(参考)サンプルクエリ一覧

以下ブログを参考に、本手順で作成したテーブルを使ったいろいろなクエリパターンを作成してみました。

  • 無加工でログを全件表示
    SELECT *
    FROM
           "waflogs";
  • Blockされたアクセスのみを表示
    SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST,
           httpsourcename,
           httpsourceid,
           countedrule.ruleid,
           rules.terminatingrule.ruleid AS rule_info,
           httprequest.clientip,
           httprequest.country,
           httprequest.uri,
           httprequest.args,
           httprequest.httpmethod
    FROM
           "waflogs",
           UNNEST(nonterminatingmatchingrules) t(countedrule),
           UNNEST(rulegrouplist) t(rules)
    WHERE
           action = 'BLOCK';
  • COUNTされたアクセスのみ表示
    SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST,
           httpsourcename,
           httpsourceid,
           countedrule.ruleid,
           rules.terminatingrule.ruleid AS rule_info,
           httprequest.clientip,
           httprequest.country,
           httprequest.uri,
           httprequest.args,
           httprequest.httpmethod
    FROM
           "waflogs",
           UNNEST(nonterminatingmatchingrules) t(countedrule),
           UNNEST(rulegrouplist) t(rules)
    WHERE
           countedrule.action = 'COUNT' AND rules.terminatingrule.action = 'BLOCK';
  • 国別のアクセス数の集計
    SELECT httprequest.country,
           COUNT(httprequest.country) AS count
    FROM
           "waflogs"
    GROUP BY
           httprequest.country;
  • クライアントIPごとのアクセス数の集計
    SELECT httprequest.clientip,
           COUNT(request.clientip) AS count
    FROM
           "waflogs"
    GROUP BY
           request.clientip;

最後に

今回はAWS WAFのログをAthenaで分析してみました。

サンプルクエリも載せているので、クエリ作成時の参考にしていただければと思います。

以上、AWS事業本部コンサルティング部の芦沢がお送りしました。