AWS WAFの検知結果をSlackに通知する仕組みを構築してみた
はじめに
AWS WAFのマネージドルール(Managed Rule)、たとえばコアルールセット(Core Rule Set)はルール数が多く、開発時にすべてを検証するのはなかなか大変です。「とりあえずカウント(Count)に設定しておこう」と考え、結果的に放置し、検知に気付かず本番リリース後に障害を発生させてしまった、ということがありました。
ログを出力するだけでは不十分で、通知する仕組みも一緒に用意すべきです。毎回構築するのも面倒なので、Lambda関数とCloudFormationのテンプレートを作成してみました。これを使うことで、WAFの検知結果をSlackに通知する仕組みを簡単に構築できます。
実行結果
以下はSlack通知の例です。
前提条件
- AWS WAFのログをCloudWatch Logsに出力していること
- Lambda関数をアップロードするためのS3バケットを用意していること
前提条件ではないですが、WAFはCloudFrontにアタッチしていることを想定しています。そのため、今回作成するLambda関数などのリソースもバージニア北部リージョン(us-east-1)を明示的に指定しています。必要に応じて、リージョンを変更してください。
Lambda関数
アラームからAWS Chatbotで直接通知する方法もありますが、今回はLambda関数を使って通知内容をカスタマイズしてみます。
ここでは、Python 3.13で実装したLambda関数を紹介します。CloudWatch Logs Insightsを使ってWAFログを集計し、BlockまたはCountされたリクエスト数をSlackに通知します。
import os
import json
import boto3
import requests
import logging
from datetime import datetime, timedelta, timezone
from urllib.parse import quote
# Set up logging
logging.basicConfig(level=logging.INFO)
# デフォルト値
# CloudWatch Logs Insightsで利用するクエリ(BLOCKまたはCOUNTされたログを抽出)
DEFAULT_QUERY = 'fields @timestamp, @message | filter action="BLOCK" or action="COUNT"'
# Slack通知のメッセージテンプレート
DEFAULT_MESSAGE = (
"直近{hours}時間でWAFにより {count} 件のリクエストがBlockまたはCountされました。\n"
"ルールごとの件数:\n{rule_text}\n\n"
"詳細はCloudWatch Logs Insightsでご確認ください: "
"<{insights_url}|CloudWatch Logs Insightsで確認>"
)
DEFAULT_HOURS = 1
# デフォルトのAWSリージョン(環境変数がなければus-east-1)
DEFAULT_REGION = os.environ.get("AWS_REGION", "us-east-1")
def get_env_or_event(key, event, default=None):
"""
event引数、環境変数、デフォルト値の順で設定値を取得する
"""
if key in event:
return event[key]
if key in os.environ:
return os.environ[key]
return default
def fetch_waf_logs_by_rule(log_group, query, hours):
"""
CloudWatch Logs InsightsでWAFログを取得し、ルールごとの検知件数を集計する
- BLOCKはterminatingRuleId、COUNTはnonTerminatingMatchingRules[].ruleIdでカウント
"""
logs = boto3.client("logs")
now = datetime.now(timezone.utc)
end_time = int(now.timestamp() * 1000)
start_time = int((now - timedelta(hours=hours)).timestamp() * 1000)
# Insightsクエリを実行
start_query_response = logs.start_query(
logGroupName=log_group,
startTime=start_time,
endTime=end_time,
queryString=query,
limit=1000,
)
query_id = start_query_response["queryId"]
response = None
# クエリ完了まで最大10回ポーリング
for _ in range(10):
result = logs.get_query_results(queryId=query_id)
if result["status"] == "Complete":
response = result
break
rule_counts = {}
if not response:
return rule_counts
for row in response["results"]:
msg = None
for field in row:
if field["field"] == "@message":
msg = field["value"]
break
if not msg:
continue
try:
log = json.loads(msg)
except json.JSONDecodeError as e:
logging.warning(f"JSON decode error: {e}. msg={msg}")
continue
# BLOCKの場合
if log.get("action") == "BLOCK" and log.get("terminatingRuleId"):
rule = log["terminatingRuleId"]
rule_counts[rule] = rule_counts.get(rule, 0) + 1
# COUNTの場合
for rule in log.get("nonTerminatingMatchingRules", []):
rule_id = rule.get("ruleId")
if rule_id:
rule_counts[rule_id] = rule_counts.get(rule_id, 0) + 1
return rule_counts
def generate_insights_url(region, log_group, start, end, query):
"""
CloudWatch Logs Insightsのクエリ画面へのダイレクトリンクを生成
- region: AWSリージョン
- log_group: ロググループ名
- start, end: ISO8601形式のUTC時刻文字列
- query: クエリ内容
"""
base = f"https://console.aws.amazon.com/cloudwatch/home?region={region}#logsV2:logs-insights"
editor_string = quote(query)
log_group_enc = quote(log_group)
url = (
f"{base}$3FqueryDetail$3D~(end~'{end}~start~'{start}~timeType~'ABSOLUTE~unit~'seconds"
f"~editorString~'{editor_string}~isLiveTail~false~source~(~'{log_group_enc}))"
)
return url
def notify_slack(webhook_url, message):
"""
指定したSlack Webhook URLにメッセージを送信する
"""
try:
response = requests.post(
webhook_url,
data=json.dumps({"text": message}),
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
logging.info("Slack通知に成功しました。")
except requests.RequestException as e:
logging.error(f"Slack通知失敗: {e}")
def lambda_handler(event, context):
"""
Lambdaエントリポイント
- WAFログをCloudWatch Logs Insightsで集計し、ルールごとの件数とクエリ画面リンクをSlack通知
- event/環境変数から各種パラメータを取得
"""
slack_webhook_url = get_env_or_event("SLACK_WEBHOOK_URL", event)
if not slack_webhook_url:
logging.error("SLACK_WEBHOOK_URLが指定されていません。")
return {"status": "error", "reason": "Missing SLACK_WEBHOOK_URL."}
log_group = get_env_or_event("LOG_GROUP", event)
if not log_group:
logging.error("LOG_GROUPが指定されていません。")
return {"status": "error", "reason": "Missing LOG_GROUP."}
query = get_env_or_event("QUERY", event, DEFAULT_QUERY)
message_template = get_env_or_event("MESSAGE", event, DEFAULT_MESSAGE)
hours = get_env_or_event("HOURS", event, DEFAULT_HOURS)
try:
hours = int(hours)
except Exception:
logging.warning(f"HOURSの値が不正です: {hours}。デフォルト値({DEFAULT_HOURS})を使用します。")
hours = DEFAULT_HOURS
region = get_env_or_event("AWS_REGION", event, DEFAULT_REGION)
# クエリ期間(直近hours時間)
now = datetime.now(timezone.utc)
end_time = now.strftime("%Y-%m-%dT%H:%M:%S.000Z")
start_time = (now - timedelta(hours=hours)).strftime("%Y-%m-%dT%H:%M:%S.000Z")
# ルールごとの件数集計
rule_counts = fetch_waf_logs_by_rule(log_group, query, hours)
total = sum(rule_counts.values())
if rule_counts:
rule_text = "\n".join([f"{k}: {v}件" for k, v in rule_counts.items()])
else:
rule_text = "なし"
# Insightsクエリ画面へのリンク生成
insights_url = generate_insights_url(region, log_group, start_time, end_time, query)
# Slack通知文生成
text = message_template.format(
hours=hours, count=total, rule_text=rule_text, insights_url=insights_url
)
notify_slack(slack_webhook_url, text)
return {
"status": "ok",
"count": total,
"rule_counts": rule_counts,
"insights_url": insights_url,
}
CloudFormationテンプレート
CloudFormationテンプレートでLambda関数、CloudWatch Logs Metric Filter、CloudWatch Alarmを作成します。
AWSTemplateFormatVersion: '2010-09-09'
Description: Notification system for WAF detection results to Slack
Parameters:
SlackWebhookUrl:
Type: String
Description: Slack Webhook URL
LogGroup:
Type: String
Description: CloudWatch Log Group name for WAF logs
LambdaS3Bucket:
Type: String
Description: S3 bucket name where Lambda code is stored
LambdaS3Key:
Type: String
Description: S3 key for Lambda code (zip)
LambdaLayerArn:
Type: String
Description: ARN of the Lambda layer to use
Resources:
# IAM role for Lambda execution
WafNotifyFunctionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: LambdaWafNotifyLogsInsights
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- logs:StartQuery
- logs:GetQueryResults
Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:${LogGroup}:*
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
# Lambda function
WafNotifyFunction:
Type: AWS::Lambda::Function
Properties:
Handler: app.lambda_handler
Role: !GetAtt WafNotifyFunctionRole.Arn
Runtime: python3.13
Timeout: 60
Code:
S3Bucket: !Ref LambdaS3Bucket
S3Key: !Ref LambdaS3Key
Environment:
Variables:
SLACK_WEBHOOK_URL: !Ref SlackWebhookUrl
LOG_GROUP: !Ref LogGroup
Layers:
- !Ref LambdaLayerArn
# Permission for CloudWatch Alarm to invoke Lambda function
WafNotifyFunctionInvokePermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !GetAtt WafNotifyFunction.Arn
Action: lambda:InvokeFunction
Principal: lambda.alarms.cloudwatch.amazonaws.com
# CloudWatch Logs Metric Filter (detect BLOCK/COUNT)
WafBlockedMetricFilter:
Type: AWS::Logs::MetricFilter
Properties:
LogGroupName: !Ref LogGroup
FilterPattern: '{ $.action = "BLOCK" || $.action = "COUNT" }'
MetricTransformations:
- MetricValue: "1"
MetricNamespace: "WAF"
MetricName: "BlockedOrCountedRequests"
# CloudWatch Alarm (detects BLOCK/COUNT per hour, invokes Lambda directly)
WafBlockedAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: "WAF-BlockedOrCounted-Alarm"
AlarmDescription: "Detect requests BLOCK or COUNT by WAF"
Namespace: "WAF"
MetricName: "BlockedOrCountedRequests"
Statistic: Sum
Period: 3600 # 1 hour
EvaluationPeriods: 1
Threshold: 1
ComparisonOperator: GreaterThanOrEqualToThreshold
TreatMissingData: notBreaching
AlarmActions:
- !GetAtt WafNotifyFunction.Arn
デプロイ
デプロイ用のスクリプトを用意しました。以下のスクリプトを実行することで、Lambdaレイヤーの作成、Lambda関数のデプロイ、CloudFormationスタックの作成を一括で行います。変数は適宜変更してください。
また、実行にはLambdaやS3やCloudFormationの操作権限が必要です。適切なIAMを用意してください。
#!/bin/bash
set -e
# 変数定義
AWS_PROFILE="<YOUR_AWS_PROFILE>"
REGION="us-east-1"
S3_BUCKET="<YOUR_LAMBDA_S3_BUCKET>"
S3_KEY="lambda/lambda_function.zip"
SLACK_WEBHOOK_URL="<YOUR_SLACK_WEBHOOK_URL>"
LOG_GROUP_NAME="<YOUR_LOG_GROUP_NAME>"
echo "Lambdaレイヤーのデプロイを開始します..."
rm -rf python lambda_layer.zip
mkdir -p python
pip install -t python requests
echo "Lambdaレイヤー用のzipファイルを作成します..."
zip -r9 lambda_layer.zip python
echo "LambdaレイヤーをAWSにデプロイします..."
LAYER_VERSION_ARN=$(aws lambda publish-layer-version \
--layer-name waf-notify-layer \
--description "for waf-notify" \
--zip-file fileb://lambda_layer.zip \
--compatible-runtimes python3.13 \
--region $REGION \
--profile $AWS_PROFILE \
--query 'LayerVersionArn' --output text)
echo "レイヤーのデプロイ完了: $LAYER_VERSION_ARN"
echo "Lambda関数コードをzip化し、S3にアップロードします..."
zip -j lambda_function.zip app.py
aws s3 cp lambda_function.zip s3://$S3_BUCKET/$S3_KEY --region $REGION --profile $AWS_PROFILE
echo "Lambdaコードのアップロード完了: s3://$S3_BUCKET/$S3_KEY"
echo "CloudFormationスタックのデプロイを開始します..."
aws cloudformation deploy \
--template-file template.yml \
--stack-name waf-notify-stack \
--capabilities CAPABILITY_NAMED_IAM \
--parameter-overrides \
SlackWebhookUrl="$SLACK_WEBHOOK_URL" \
LogGroup="$LOG_GROUP_NAME" \
LambdaS3Bucket="$S3_BUCKET" \
LambdaS3Key="$S3_KEY" \
LambdaLayerArn="$LAYER_VERSION_ARN" \
--region $REGION \
--profile $AWS_PROFILE
echo "デプロイ完了"
まとめ
今回のようなWAFに限らず、ログを出力して終わり、ではありません。しっかりと検知し、障害を未然に防ぐことが大切です。
今回ご紹介したCloudFormationテンプレートを使えば、簡単にWAF監視とSlack通知の仕組みを導入できます。ぜひ活用してみてください。