AWS WAFのBlockとCountログを抽出するパイプラインをFirehoseとLambdaで組んでみた

WAFログ解析の効率化を図るため、Firehose と Lambda を利用したサーバレス な AWS WAF ログ処理パイプラインを試してみました。
2020.08.26

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

AWSチームのすずきです。

AWS WAF のアクセスログは、Kinesis Data Firehose 経由でS3などに出力する事が可能です。

今回、Firehose と Lambda を利用して、AWS WAF で BLOCK 、COUNT と判定されたログを サーバレスに抽出するパイプラインを試す機会がありましたので、紹介させていただきます。

構成図

AWS WAFログサンプル

BLOCK、COUNT に該当するログをS3から回収してサンプルとして利用しました。

BLOCK

遮断設定をした WAFルールに該当したログです。

  • 「action」が BLOCK と記録されます。
  • 「terminatingRuleId」と「terminatingRuleType」に 遮断条件に該当したルールが記録されます。
  • ユーザ定義のルールにより遮断された場合「terminatingRuleMatchDetails」に詳細が記録されます。
{
    "timestamp": 1598361187218,
    "formatVersion": 1,
    "webaclId": "arn:aws:wafv2:ap-northeast-1:000000000000:regional/webacl/wafv2-webacl/0000-0000-0000-0000",
    "terminatingRuleId": "AWS-AWSManagedRulesAmazonIpReputationList",
    "terminatingRuleType": "MANAGED_RULE_GROUP",
    "action": "BLOCK",
    "terminatingRuleMatchDetails": [],
    "httpSourceName": "ALB",
    "httpRequest": {
        "clientIp": "0.0.0.0",
        "country": "RO",
        "headers": [
            {
                "name": "Host",
                "value": "dev.classmethod.jp"
            },
            {
                "name": "User-Agent",
                "value": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.79 Safari/537.36"
            }
        ],
        "uri": "/articles/introduction-of-devcleaner/",
        "args": "",
        "httpVersion": "HTTP/1.0",
        "httpMethod": "GET",
        "requestId": null
    }
}

COUNT

カウントのみ、遮断は行わない設定の WAFルールに該当したログです。

  • 「action」は ALLOW と記録されます。
  • 「nonTerminatingMatchingRules」にカウントと判定されたルールが記録されます。
  • 複数のルールがカウントとして記録される場合もあります。
{
    "timestamp": 1598361187218,
    "formatVersion": 1,
    "webaclId": "arn:aws:wafv2:ap-northeast-1:000000000000:regional/webacl/wafv2-webacl/0000-0000-0000-0000",
    "terminatingRuleId": "Default_Action",
    "terminatingRuleType": "REGULAR",
    "action": "ALLOW",
    "terminatingRuleMatchDetails": [],
    "nonTerminatingMatchingRules": [
        {
            "action": "COUNT",
            "ruleId": "AWS-AWSManagedRulesCommonRuleSet"
        },
        {
            "action": "COUNT",
            "ruleId": "AWS-AWSManagedRulesAdminProtectionRuleSet"
        }
    ],
    "httpRequest": {
        "clientIp": "0.0.0.0",
        "country": "RO",
        "headers": [
            {
                "name": "Host",
                "value": "dev.classmethod.jp"
            },
            {
                "name": "User-Agent",
                "value": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.79 Safari/537.36"
            }
        ],
        "uri": "/articles/introduction-of-devcleaner/",
        "args": "",
        "httpVersion": "HTTP/1.0",
        "httpMethod": "GET",
        "requestId": null
    }
}

Lambda関数

ブループリント

Firehose の データ加工用 として提供されている サンプルコードを利用しました。

  • kinesis-firehose-process-record-python
import base64
def lambda_handler(event, context):
    output = []
    for record in event['records']:
        print(record['recordId'])
        payload = base64.b64decode(record['data'])
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload)
        }
        output.append(output_record)
    return {'records': output}

修正箇所

  • boto3、json などを利用するためインポートします。
  • オリジナルデータ を未加工で書き戻すようにしました。
import base64
import json
import boto3
import os

def lambda_handler(event, context):
  output = []
  output_block = []
  output_count = []

  for record in event['records']:
    output_record = {
      'recordId': record['recordId'],
      'result': 'Ok',
      'data': record['data']
    }
    output.append(output_record)
  return {'records': output}

判定

  • デコード、JSONパース処理を追加しました。
  • 「action」が ALLOWに一致しないログレコード を BLOCK と判定しました。
  • 「nonTerminatingMatchingRules」要素が存在するログレコードを COUNT と判定しました。
    # Extract block/count Log
    a = base64.b64decode(record['data'])
    try:
      b = json.loads(a)
    except ValueError as e:
      pass
    else:
      #block
      if b['action'] != 'ALLOW':
        print('BLOCK: ' + record['recordId'])
        output_block.append(b)
      #count
      elif len(b['nonTerminatingMatchingRules']) > 0:
        output_count.append(b)

出力

  • BLOCK、COUNT 判定した ログレコードは、別の Firehose ストリームに登録(put_record_batch)しました。
  if len(output_block) > 0:
    s = os.environ['firehose_block']
    put_record_firehose(s, output_block)

  if len(output_count) > 0:
    s = os.environ['firehose_count']
    put_record_firehose(s, output_count)

def put_record_firehose(s, output2):
  firehose = boto3.client('firehose')
  u = []
  for t in output2:
    u.append({'Data': json.dumps(t) + "\n"})
    if len(u) > 600 or len(str(u)) > 600000:
      r = firehose.put_record_batch(DeliveryStreamName = s, Records = u)
      u = []
  if len(u) > 0:
    r = firehose.put_record_batch(DeliveryStreamName = s, Records = u)

Firehose

作成した Lambda 関数、AWS WAF出力先となる Firehose 配信ストリーム に設定しました。

  • バッファは1MB、実行頻度は60秒。Lambdaのペイロードの上限(6MB)に抵触しない指定としました。

CloudFormation

AWS WAFログ処理を行うFirehose、Lambda関数は以下のテンプレートで展開可能です。

S3出力

datadog 出力

動作確認

Lambda

Firehose と連携後、Lambda関数が 動作します。

Firehose

登録された全ログの容量に対し、約3%が「COUNT」0.004%が「BLOCK」として保存されました。

S3

1時間あたりのファイル数、容量とも削減された事が確認できました。

  • 全ログ

  • BLOCKのみ

まとめ

Firehose と 簡易な実装の Lambda を利用する事で、Athena などを利用したログ解析の効率化が期待できます。

WAFルールの副作用調査などで、異常ログの解析機会が多い場合に有効ですのでお試しください。

  • 利用例(Datadog Log Explorer)