AWS WAFの検知結果をSlackに通知する仕組みを構築してみた

AWS WAFの検知結果をSlackに通知する仕組みを構築してみた

Clock Icon2025.05.28

はじめに

AWS WAFのマネージドルール(Managed Rule)、たとえばコアルールセット(Core Rule Set)はルール数が多く、開発時にすべてを検証するのはなかなか大変です。「とりあえずカウント(Count)に設定しておこう」と考え、結果的に放置し、検知に気付かず本番リリース後に障害を発生させてしまった、ということがありました。

ログを出力するだけでは不十分で、通知する仕組みも一緒に用意すべきです。毎回構築するのも面倒なので、Lambda関数とCloudFormationのテンプレートを作成してみました。これを使うことで、WAFの検知結果をSlackに通知する仕組みを簡単に構築できます。

実行結果

以下はSlack通知の例です。

Slack通知

前提条件

  • AWS WAFのログをCloudWatch Logsに出力していること
  • Lambda関数をアップロードするためのS3バケットを用意していること

前提条件ではないですが、WAFはCloudFrontにアタッチしていることを想定しています。そのため、今回作成するLambda関数などのリソースもバージニア北部リージョン(us-east-1)を明示的に指定しています。必要に応じて、リージョンを変更してください。

Lambda関数

アラームからAWS Chatbotで直接通知する方法もありますが、今回はLambda関数を使って通知内容をカスタマイズしてみます。

ここでは、Python 3.13で実装したLambda関数を紹介します。CloudWatch Logs Insightsを使ってWAFログを集計し、BlockまたはCountされたリクエスト数をSlackに通知します。

import os
import json
import boto3
import requests
import logging
from datetime import datetime, timedelta, timezone
from urllib.parse import quote

# Set up logging
logging.basicConfig(level=logging.INFO)

# デフォルト値
# CloudWatch Logs Insightsで利用するクエリ(BLOCKまたはCOUNTされたログを抽出)
DEFAULT_QUERY = 'fields @timestamp, @message | filter action="BLOCK" or action="COUNT"'
# Slack通知のメッセージテンプレート
DEFAULT_MESSAGE = (
    "直近{hours}時間でWAFにより {count} 件のリクエストがBlockまたはCountされました。\n"
    "ルールごとの件数:\n{rule_text}\n\n"
    "詳細はCloudWatch Logs Insightsでご確認ください: "
    "<{insights_url}|CloudWatch Logs Insightsで確認>"
)
DEFAULT_HOURS = 1
# デフォルトのAWSリージョン(環境変数がなければus-east-1)
DEFAULT_REGION = os.environ.get("AWS_REGION", "us-east-1")


def get_env_or_event(key, event, default=None):
    """
    event引数、環境変数、デフォルト値の順で設定値を取得する
    """
    if key in event:
        return event[key]
    if key in os.environ:
        return os.environ[key]
    return default


def fetch_waf_logs_by_rule(log_group, query, hours):
    """
    CloudWatch Logs InsightsでWAFログを取得し、ルールごとの検知件数を集計する
    - BLOCKはterminatingRuleId、COUNTはnonTerminatingMatchingRules[].ruleIdでカウント
    """
    logs = boto3.client("logs")
    now = datetime.now(timezone.utc)
    end_time = int(now.timestamp() * 1000)
    start_time = int((now - timedelta(hours=hours)).timestamp() * 1000)
    # Insightsクエリを実行
    start_query_response = logs.start_query(
        logGroupName=log_group,
        startTime=start_time,
        endTime=end_time,
        queryString=query,
        limit=1000,
    )
    query_id = start_query_response["queryId"]
    response = None
    # クエリ完了まで最大10回ポーリング
    for _ in range(10):
        result = logs.get_query_results(queryId=query_id)
        if result["status"] == "Complete":
            response = result
            break
    rule_counts = {}
    if not response:
        return rule_counts

    for row in response["results"]:
        msg = None
        for field in row:
            if field["field"] == "@message":
                msg = field["value"]
                break
        if not msg:
            continue
        try:
            log = json.loads(msg)
        except json.JSONDecodeError as e:
            logging.warning(f"JSON decode error: {e}. msg={msg}")
            continue

        # BLOCKの場合
        if log.get("action") == "BLOCK" and log.get("terminatingRuleId"):
            rule = log["terminatingRuleId"]
            rule_counts[rule] = rule_counts.get(rule, 0) + 1

        # COUNTの場合
        for rule in log.get("nonTerminatingMatchingRules", []):
            rule_id = rule.get("ruleId")
            if rule_id:
                rule_counts[rule_id] = rule_counts.get(rule_id, 0) + 1

    return rule_counts


def generate_insights_url(region, log_group, start, end, query):
    """
    CloudWatch Logs Insightsのクエリ画面へのダイレクトリンクを生成
    - region: AWSリージョン
    - log_group: ロググループ名
    - start, end: ISO8601形式のUTC時刻文字列
    - query: クエリ内容
    """
    base = f"https://console.aws.amazon.com/cloudwatch/home?region={region}#logsV2:logs-insights"
    editor_string = quote(query)
    log_group_enc = quote(log_group)
    url = (
        f"{base}$3FqueryDetail$3D~(end~'{end}~start~'{start}~timeType~'ABSOLUTE~unit~'seconds"
        f"~editorString~'{editor_string}~isLiveTail~false~source~(~'{log_group_enc}))"
    )
    return url


def notify_slack(webhook_url, message):
    """
    指定したSlack Webhook URLにメッセージを送信する
    """
    try:
        response = requests.post(
            webhook_url,
            data=json.dumps({"text": message}),
            headers={"Content-Type": "application/json"},
        )
        response.raise_for_status()
        logging.info("Slack通知に成功しました。")
    except requests.RequestException as e:
        logging.error(f"Slack通知失敗: {e}")


def lambda_handler(event, context):
    """
    Lambdaエントリポイント
    - WAFログをCloudWatch Logs Insightsで集計し、ルールごとの件数とクエリ画面リンクをSlack通知
    - event/環境変数から各種パラメータを取得
    """
    slack_webhook_url = get_env_or_event("SLACK_WEBHOOK_URL", event)
    if not slack_webhook_url:
        logging.error("SLACK_WEBHOOK_URLが指定されていません。")
        return {"status": "error", "reason": "Missing SLACK_WEBHOOK_URL."}

    log_group = get_env_or_event("LOG_GROUP", event)
    if not log_group:
        logging.error("LOG_GROUPが指定されていません。")
        return {"status": "error", "reason": "Missing LOG_GROUP."}

    query = get_env_or_event("QUERY", event, DEFAULT_QUERY)
    message_template = get_env_or_event("MESSAGE", event, DEFAULT_MESSAGE)
    hours = get_env_or_event("HOURS", event, DEFAULT_HOURS)
    try:
        hours = int(hours)
    except Exception:
        logging.warning(f"HOURSの値が不正です: {hours}。デフォルト値({DEFAULT_HOURS})を使用します。")
        hours = DEFAULT_HOURS
    region = get_env_or_event("AWS_REGION", event, DEFAULT_REGION)

    # クエリ期間(直近hours時間)
    now = datetime.now(timezone.utc)
    end_time = now.strftime("%Y-%m-%dT%H:%M:%S.000Z")
    start_time = (now - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%S.000Z")

    # ルールごとの件数集計
    rule_counts = fetch_waf_logs_by_rule(log_group, query, hours)
    total = sum(rule_counts.values())
    if rule_counts:
        rule_text = "\n".join([f"{k}: {v}件" for k, v in rule_counts.items()])
    else:
        rule_text = "なし"

    # Insightsクエリ画面へのリンク生成
    insights_url = generate_insights_url(region, log_group, start_time, end_time, query)

    # Slack通知文生成
    text = message_template.format(
        hours=hours, count=total, rule_text=rule_text, insights_url=insights_url
    )
    notify_slack(slack_webhook_url, text)
    return {
        "status": "ok",
        "count": total,
        "rule_counts": rule_counts,
        "insights_url": insights_url,
    }

CloudFormationテンプレート

CloudFormationテンプレートでLambda関数、CloudWatch Logs Metric Filter、CloudWatch Alarmを作成します。

AWSTemplateFormatVersion: '2010-09-09'
Description: Notification system for WAF detection results to Slack

Parameters:
  SlackWebhookUrl:
    Type: String
    Description: Slack Webhook URL
  LogGroup:
    Type: String
    Description: CloudWatch Log Group name for WAF logs
  LambdaS3Bucket:
    Type: String
    Description: S3 bucket name where Lambda code is stored
  LambdaS3Key:
    Type: String
    Description: S3 key for Lambda code (zip)
  LambdaLayerArn:
    Type: String
    Description: ARN of the Lambda layer to use

Resources:
  # IAM role for Lambda execution
  WafNotifyFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: LambdaWafNotifyLogsInsights
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - logs:StartQuery
                  - logs:GetQueryResults
                Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:${LogGroup}:*
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

  # Lambda function
  WafNotifyFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: app.lambda_handler
      Role: !GetAtt WafNotifyFunctionRole.Arn
      Runtime: python3.13
      Timeout: 60
      Code:
        S3Bucket: !Ref LambdaS3Bucket
        S3Key: !Ref LambdaS3Key
      Environment:
        Variables:
          SLACK_WEBHOOK_URL: !Ref SlackWebhookUrl
          LOG_GROUP: !Ref LogGroup
      Layers:
        - !Ref LambdaLayerArn

  # Permission for CloudWatch Alarm to invoke Lambda function
  WafNotifyFunctionInvokePermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !GetAtt WafNotifyFunction.Arn
      Action: lambda:InvokeFunction
      Principal: lambda.alarms.cloudwatch.amazonaws.com

  # CloudWatch Logs Metric Filter (detect BLOCK/COUNT)
  WafBlockedMetricFilter:
    Type: AWS::Logs::MetricFilter
    Properties:
      LogGroupName: !Ref LogGroup
      FilterPattern: '{ $.action = "BLOCK" || $.action = "COUNT" }'
      MetricTransformations:
        - MetricValue: "1"
          MetricNamespace: "WAF"
          MetricName: "BlockedOrCountedRequests"

  # CloudWatch Alarm (detects BLOCK/COUNT per hour, invokes Lambda directly)
  WafBlockedAlarm:
    Type: AWS::CloudWatch::Alarm
    Properties:
      AlarmName: "WAF-BlockedOrCounted-Alarm"
      AlarmDescription: "Detect requests BLOCK or COUNT by WAF"
      Namespace: "WAF"
      MetricName: "BlockedOrCountedRequests"
      Statistic: Sum
      Period: 3600 # 1 hour
      EvaluationPeriods: 1
      Threshold: 1
      ComparisonOperator: GreaterThanOrEqualToThreshold
      TreatMissingData: notBreaching
      AlarmActions:
        - !GetAtt WafNotifyFunction.Arn

デプロイ

デプロイ用のスクリプトを用意しました。以下のスクリプトを実行することで、Lambdaレイヤーの作成、Lambda関数のデプロイ、CloudFormationスタックの作成を一括で行います。変数は適宜変更してください。

また、実行にはLambdaやS3やCloudFormationの操作権限が必要です。適切なIAMを用意してください。

#!/bin/bash
set -e

# 変数定義
AWS_PROFILE="<YOUR_AWS_PROFILE>"
REGION="us-east-1"
S3_BUCKET="<YOUR_LAMBDA_S3_BUCKET>"
S3_KEY="lambda/lambda_function.zip"
SLACK_WEBHOOK_URL="<YOUR_SLACK_WEBHOOK_URL>"
LOG_GROUP_NAME="<YOUR_LOG_GROUP_NAME>"

echo "Lambdaレイヤーのデプロイを開始します..."

rm -rf python lambda_layer.zip
mkdir -p python
pip install -t python requests

echo "Lambdaレイヤー用のzipファイルを作成します..."

zip -r9 lambda_layer.zip python

echo "LambdaレイヤーをAWSにデプロイします..."

LAYER_VERSION_ARN=$(aws lambda publish-layer-version \
  --layer-name waf-notify-layer \
  --description "for waf-notify" \
  --zip-file fileb://lambda_layer.zip \
  --compatible-runtimes python3.13 \
  --region $REGION \
  --profile $AWS_PROFILE \
  --query 'LayerVersionArn' --output text)

echo "レイヤーのデプロイ完了: $LAYER_VERSION_ARN"

echo "Lambda関数コードをzip化し、S3にアップロードします..."

zip -j lambda_function.zip app.py
aws s3 cp lambda_function.zip s3://$S3_BUCKET/$S3_KEY --region $REGION --profile $AWS_PROFILE

echo "Lambdaコードのアップロード完了: s3://$S3_BUCKET/$S3_KEY"

echo "CloudFormationスタックのデプロイを開始します..."

aws cloudformation deploy \
  --template-file template.yml \
  --stack-name waf-notify-stack \
  --capabilities CAPABILITY_NAMED_IAM \
  --parameter-overrides \
      SlackWebhookUrl="$SLACK_WEBHOOK_URL" \
      LogGroup="$LOG_GROUP_NAME" \
      LambdaS3Bucket="$S3_BUCKET" \
      LambdaS3Key="$S3_KEY" \
      LambdaLayerArn="$LAYER_VERSION_ARN" \
  --region $REGION \
  --profile $AWS_PROFILE

echo "デプロイ完了"

まとめ

今回のようなWAFに限らず、ログを出力して終わり、ではありません。しっかりと検知し、障害を未然に防ぐことが大切です。

今回ご紹介したCloudFormationテンプレートを使えば、簡単にWAF監視とSlack通知の仕組みを導入できます。ぜひ活用してみてください。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.