AWS WAFのログをAmazon Data Firehose経由でS3 Tablesに書き込んでみた
お疲れさまです。とーちです。
前回は、VPCフローログをAmazon Data Firehose(以下、Firehose)経由でAmazon S3 Tablesに書き込む方法について解説しました。今回は、AWS WAFのログをS3 Tablesに出力する方法について説明したいと思います。
S3Tablesに出力することのメリットなどは前回のブログを参考にしてください
今回は以下のような流れで実装していきます。
- まずはどんなレコードが出力されるか確認する
- ALBおよびWAFの構築
- テスト用AWS Lambdaの構築
- テスト用のFirehoseストリームの構築
- WAFロギングの設定
- レコードの確認
- S3 Tablesへの出力
- WAFログ格納用のテーブルの作成
- レコード変換用AWS Lambdaの作成
- Firehoseストリームの構築
- WAFロギングの設定
- テスト
WAFからのログレコードのフォーマットは、公式ドキュメントからも確認できますが、ステップバイステップで進めていくほうが設定する中で、どこまでができていて、どこができていないのかを判別しやすいので今回はこの流れで進めます。
それではさっそくやっていきましょう。
前提条件
前回と同様に、S3 TablesのテーブルバケットやFirehoseでS3 Tablesに書き込むためのIAMロールの作成、AWS Lake Formationを使った権限付与などは既に完了しているものとします。もしこれらがまだ実施されていない場合は以下の記事を参考に実施してください。
- [アップデート]Amazon Data Firehose から S3 Tables へ直接データ配信できるようになりました | DevelopersIO
- Amazon Data FirehoseでApache Icebergテーブルに配信する際の「一意のキー設定」と「JSONQuery式(JQ式)」の違いについて | DevelopersIO
まずはどんなレコードが出力されるか確認する
まずはWAFからFirehoseにログ出力するとどんなレコードが出力されるのかを確認してみましょう。
今回は検証用にALBとWAFだけのシンプルな構成を構築します。
ALBおよびWAFについてはAWS マネジメントコンソールからほぼデフォルトのままの設定で作成したので詳細は割愛します。WAFはALBの画面からWAF統合にチェックをつけると楽に構築できます。
続いて、Firehoseからデータ変換用の関数として起動するAWS Lambdaを作成します。この段階では具体的な変換ロジックは作成しません。あくまでもFirehoseがどういうレコードをWAFから受け取るのかを確認するのが目的です。
AWS Lambdaの設定は前回と同様にIAMロールはデフォルトで作成されるものを使用し、タイムアウト設定を1分に変更したくらいであとはデフォルトのままです。ランタイムはPython 3.13で作成しました。
肝心のコードは以下になります。
import json
import base64
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
# イベント全体をログに出力
logger.info(f"Received event: {json.dumps(event)}")
# 各レコードを処理
if 'records' in event:
for record in event['records']:
# Base64でエンコードされたデータをデコード
try:
data = base64.b64decode(record['data']).decode('utf-8')
logger.info(f"Record ID: {record['recordId']}, Data: {data}")
except Exception as e:
logger.error(f"Error decoding: {str(e)}")
# Firehoseの場合は元のデータをそのまま返す
if 'records' in event:
return {
'records': [{
'recordId': record['recordId'],
'result': 'Ok',
'data': record['data']
} for record in event['records']]
}
return {
'statusCode': 200,
'body': json.dumps('Processed Firehose records')
}
受け取ったevent
を出力し、records
の中のdata
の値をデコードするだけのシンプルなコードです。
次にこのAWS Lambdaをデータ変換用関数として指定するFirehoseストリームを作成します。
設定は以下の通りです。前回とほぼ同じですが、一点だけ、Firehoseストリームのストリーム名には必ずaws-waf-logs-
というプレフィックスをつける必要がある、という点にだけは注意してください。なお、データの宛先として一意のキー設定でテーブルを指定していますが、この時点では適当なテーブルでOKです(一意のキー設定の場合、テーブル存在チェックが入るので何かしらのテーブルを作っておく必要はあります)。
次にWAFロギングの設定をします。AWS マネジメントコンソールからWAFの画面に行き、以下のようにロギングの設定を行ってください。
次の画面で作成したFirehoseストリームを選びます。ストリーム名にaws-waf-logs-
がついていないと選択肢として出てこないのでご注意ください。
設定を完了するとWAFのログがFirehoseに流れ始めます。ALBのデフォルトDNS名に何回かリクエストを送るとWAFのログが出力されます。
AWS LambdaのログをAmazon CloudWatch Logsから見ると以下のようなレコードをWAFから受け取っていることがわかりました。
{
"invocationId": "abcdef12-3456-7890-abcd-1234567890ab",
"deliveryStreamArn": "arn:aws:firehose:ap-northeast-1:123456789012:deliverystream/aws-waf-logs-example-stream",
"region": "ap-northeast-1",
"records": [
{
"recordId": "shardId-000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
"approximateArrivalTimestamp": 1742806129515,
"data": "eyJ0aW1lc3RhbXAiOjE3NDI4MDYxMDk2MjEsImZvcm1hdFZlcnNpb24iOjEsIndlYmFjbElkIjoiYXJuOmF3czp3YWZ2MjphcC1ub3J0aGVhc3QtMToxMjM0NTY3ODkwMTI6cmVnaW9uYWwvd2ViYWNsL0NyZWF0ZWRCeUFMQi13YWZsb2ctZXhhbXBsZS9hYmNkZWYxMi0zNDU2LTc4OTAtYWJjZC0xMjM0NTY3ODkwYWIiLCJ0ZXJtaW5hdGluZ1J1bGVJZCI6IkRlZmF1bHRfQWN0aW9uIiwidGVybWluYXRpbmdSdWxlVHlwZSI6IlJFR1VMQVIiLCJhY3Rpb24iOiJBTExPVyIsInRlcm1pbmF0aW5nUnVsZU1hdGNoRGV0YWlscyI6W10sImh0dHBTb3VyY2VOYW1lIjoiQUxCIiwiaHR0cFNvdXJjZUlkIjoiMTIzNDU2Nzg5MDEyLWFwcC93YWZsb2ctZXhhbXBsZS9hYmNkZWYxMjM0NTYiLCJydWxlR3JvdXBMaXN0IjpbeyJydWxlR3JvdXBJZCI6IkFXUyNBV1NNYW5hZ2VkUnVsZXNBbWF6b25JcFJlcHV0YXRpb25MaXN0IiwidGVybWluYXRpbmdSdWxlIjpudWxsLCJub25UZXJtaW5hdGluZ01hdGNoaW5nUnVsZXMiOltdLCJleGNsdWRlZFJ1bGVzIjpudWxsLCJjdXN0b21lckNvbmZpZyI6bnVsbH0seyJydWxlR3JvdXBJZCI6IkFXUyNBV1NNYW5hZ2VkUnVsZXNDb21tb25SdWxlU2V0IiwidGVybWluYXRpbmdSdWxlIjpudWxsLCJub25UZXJtaW5hdGluZ01hdGNoaW5nUnVsZXMiOltdLCJleGNsdWRlZFJ1bGVzIjpudWxsLCJjdXN0b21lckNvbmZpZyI6bnVsbH0seyJydWxlR3JvdXBJZCI6IkFXUyNBV1NNYW5hZ2VkUnVsZXNLbm93bkJhZElucHV0c1J1bGVTZXQiLCJ0ZXJtaW5hdGluZ1J1bGUiOm51bGwsIm5vblRlcm1pbmF0aW5nTWF0Y2hpbmdSdWxlcyI6W10sImV4Y2x1ZGVkUnVsZXMiOm51bGwsImN1c3RvbWVyQ29uZmlnIjpudWxsfV0sInJhdGVCYXNlZFJ1bGVMaXN0IjpbXSwibm9uVGVybWluYXRpbmdNYXRjaGluZ1J1bGVzIjpbXSwicmVxdWVzdEhlYWRlcnNJbnNlcnRlZCI6bnVsbCwicmVzcG9uc2VDb2RlU2VudCI6bnVsbCwiaHR0cFJlcXVlc3QiOnsiY2xpZW50SXAiOiIxMjMuNDU2Ljc4OS4xMCIsImNvdW50cnkiOiJKUCIsImhlYWRlcnMiOlt7Im5hbWUiOiJIb3N0IiwidmFsdWUiOiJleGFtcGxlLWFwcC5hcC1ub3J0aGVhc3QtMS5lbGIuYW1hem9uYXdzLmNvbSJ9LHsibmFtZSI6IkNvbm5lY3Rpb24iLCJ2YWx1ZSI6ImtlZXAtYWxpdmUifSx7Im5hbWUiOiJQcmFnbWEiLCJ2YWx1ZSI6Im5vLWNhY2hlIn0seyJuYW1lIjoiQ2FjaGUtQ29udHJvbCIsInZhbHVlIjoibm8tY2FjaGUifSx7Im5hbWUiOiJVc2VyLUFnZW50IiwidmFsdWUiOiJNb3ppbGxhLzUuMCAoTWFjaW50b3NoOyBJbnRlbCBNYWMgT1MgWCAxMF8xNV83KSBBcHBsZVdlYktpdC81MzcuMzYgKEtIVE1MLCBsaWtlIEdlY2tvKSBDaHJvbWUvMTM0LjAuMC4wIFNhZmFyaS81MzcuMzYifSx7Im5hbWUiOiJBY2NlcHQiLCJ2YWx1ZSI6ImltYWdlL2F2aWYsaW1hZ2Uvd2VicCxpbWFnZS9hcG5nLGltYWdlL3N2Zyt4bWwsaW1hZ2UvKiwqLyo7cT0wLjgifSx7Im5hbWUiOiJSZWZlcmVyIiwidmFsdWUiOiJodHRwOi8vZXhhbXBsZS1hcHAuYXAtbm9ydGhlYXN0LTEuZWxiLmFtYXpvbmF3cy5jb20vIn0seyJuYW1lIjoiQWNjZXB0LUVuY29kaW5nIiwidmFsdWUiOiJnemlwLCBkZWZsYXRlIn0seyJuYW1lIjoiQWNjZXB0LUxhbmd1YWdlIiwidmFsdWUiOiJqYSxlbi1VUztxPTAuOSxlbjtxPTAuOCJ9XSwidXJpIjoiL2Zhdmljb24uaWNvIiwiYXJncyI6IiIsImh0dHBWZXJzaW9uIjoiSFRUUC8xLjEiLCJodHRwTWV0aG9kIjoiR0VUIiwicmVxdWVzdElkIjoiMS1hYmNkZWYxMi0xMjM0NTY3ODkwYWJjZGVmMTIzNDU2NzgiLCJmcmFnbWVudCI6IiIsInNjaGVtZSI6Imh0dHAiLCJob3N0IjoiZXhhbXBsZS1hcHAuYXAtbm9ydGhlYXN0LTEuZWxiLmFtYXpvbmF3cy5jb20ifX0K"
}
]
}
data部分をbase64デコードすると以下のような内容になっています。
{
"timestamp": 1742806109621,
"formatVersion": 1,
"webaclId": "arn:aws:wafv2:ap-northeast-1:123456789012:regional/webacl/CreatedByALB-waflog-example/abcdef12-3456-7890-abcd-1234567890ab",
"terminatingRuleId": "Default_Action",
"terminatingRuleType": "REGULAR",
"action": "ALLOW",
"terminatingRuleMatchDetails": [],
"httpSourceName": "ALB",
"httpSourceId": "123456789012-app/waflog-example/abcdef123456",
"ruleGroupList": [
{
"ruleGroupId": "AWS#AWSManagedRulesAmazonIpReputationList",
"terminatingRule": null,
"nonTerminatingMatchingRules": [],
"excludedRules": null,
"customerConfig": null
},
{
"ruleGroupId": "AWS#AWSManagedRulesCommonRuleSet",
"terminatingRule": null,
"nonTerminatingMatchingRules": [],
"excludedRules": null,
"customerConfig": null
},
{
"ruleGroupId": "AWS#AWSManagedRulesKnownBadInputsRuleSet",
"terminatingRule": null,
"nonTerminatingMatchingRules": [],
"excludedRules": null,
"customerConfig": null
}
],
"rateBasedRuleList": [],
"nonTerminatingMatchingRules": [],
"requestHeadersInserted": null,
"responseCodeSent": null,
"httpRequest": {
"clientIp": "123.456.789.10",
"country": "JP",
"headers": [
{
"name": "Host",
"value": "example-app.ap-northeast-1.elb.amazonaws.com"
},
{
"name": "Connection",
"value": "keep-alive"
},
{
"name": "Pragma",
"value": "no-cache"
},
{
"name": "Cache-Control",
"value": "no-cache"
},
{
"name": "User-Agent",
"value": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36"
},
{
"name": "Accept",
"value": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8"
},
{
"name": "Referer",
"value": "http://example-app.ap-northeast-1.elb.amazonaws.com/"
},
{
"name": "Accept-Encoding",
"value": "gzip, deflate"
},
{
"name": "Accept-Language",
"value": "ja,en-US;q=0.9,en;q=0.8"
}
],
"uri": "/favicon.ico",
"args": "",
"httpVersion": "HTTP/1.1",
"httpMethod": "GET",
"requestId": "1-abcdef12-1234567890abcdef12345678",
"fragment": "",
"scheme": "http",
"host": "example-app.ap-northeast-1.elb.amazonaws.com"
}
}
なお、デコードは自身のPCのターミナル上で以下のようなコマンドを実行すると便利です(macOSで実施)。
echo "<base64エンコードされた文字列>" | base64 -d | jq
S3 Tablesへの出力
どんなレコードが出力されるか確認できたところでS3 Tablesに出力するためのリソースを作成していきます。後述しますが、念のため、テーブルやFirehoseストリームは上記で作成したものを使い回さずに作り直したほうが良いです。
まずはレコードを格納するためのS3 Tablesのテーブルを作りましょう。クエリはこちらです。
CREATE TABLE `my_s3_namespace`.`waflogs` (
timestamp bigint,
formatversion int,
webaclid string,
terminatingruleid string,
terminatingruletype string,
action string,
httpsourcename string,
httpsourceid string,
httprequest struct<
clientip: string,
country: string,
uri: string,
args: string,
httpversion: string,
httpmethod: string,
requestid: string,
host: string,
headers: array<struct<name: string, value: string>>,
scheme: string,
fragment: string
>,
rulegrouplist array<struct<
rulegroupid: string,
terminatingrule: string,
nonterminatingmatchingrules: array<string>,
excludedrules: string,
customerconfig: string
>>,
terminatingrulematchdetails array<string>,
ratebasedrulelist array<string>,
nonterminatingmatchingrules array<string>,
requestheadersinserted string,
responsecodesent string,
log_date date
)
PARTITIONED BY (month(`log_date`))
TBLPROPERTIES ('table_type' = 'iceberg')
注意点としてはWAFのログがネストされた形式になっているということです。
Firehoseはネストされたログを以下のように扱います(参考:Consideration and limitations - Amazon Data Firehose)。
- 列名と値については、Firehoseは複数レベルにネストされたJSONの最初のレベルのノードのみを取得します
- Firehoseが(S3 TablesなどのApache Icebergテーブルにデータを)正常に配信するには、ソースデータの列名とデータ型がターゲットテーブルのものと一致する必要があります
- Firehoseは、(ネストされた列の場合)位置フィールドに一致するstructまたはmapデータ型の列がIcebergテーブルにあることを想定しています
なかなか分かりづらい記載ですが、要はこういうことを言っていると思われます。
- Firehoseが受け取ったソースデータ(今回の例でいうとdata部分をbase64デコードした内容)の列名とデータ型はターゲットテーブルのものと一致する必要がある
- 列名またはデータ型が一致しない場合、Firehoseはエラーとなり、その内容をFirehoseストリーム設定の中で指定した
S3バックアップバケット
に配信するが、ネストされた構造については最上位の型(structかmap型になる)のみをチェックして中身まではチェックしない
この点は覚えておいたほうがいいでしょう。
もう一つの注意点として、Amazon Athenaでテーブルを作成した際は列名等の大文字小文字が無視され、すべて小文字で作成されるということです(参考:Name databases, tables, and columns - Amazon Athena)。
Firehoseが受け取ったソースデータ(今回の例でいうとdata部分をbase64デコードした内容)の列名とS3 Tablesのテーブル上の列名は大文字小文字も含め一致させる必要があります。
今回ソースデータの内容としてformatVersion
やwebaclId
など列名にあたるキーの部分に大文字を使われている箇所がいくつかあります。このままテーブルに格納しようとするとこれらは列名の不一致によりデータが格納されません。
これを解消するために、データ変換用AWS Lambdaの処理の中で、すべての列を小文字にするように修正する必要があります。
上記も踏まえて作成したAWS Lambdaのコードが以下となります。
import base64
import json
import logging
from datetime import datetime, timezone
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
output = []
logger.info(f"Processing {len(event['records'])} records")
for record in event['records']:
try:
# Base64でエンコードされたデータをデコード
payload = base64.b64decode(record['data']).decode('utf-8').strip()
# JSONとして解析
json_data = json.loads(payload)
# キーを小文字に変換する関数(再帰的に処理)
def convert_keys_to_lowercase(obj):
if isinstance(obj, dict):
return {k.lower(): convert_keys_to_lowercase(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_keys_to_lowercase(item) for item in obj]
else:
return obj
# すべてのキーを小文字に変換
result = convert_keys_to_lowercase(json_data)
# タイムスタンプから日付フィールドを生成
if 'timestamp' in result:
timestamp_ms = result['timestamp']
log_date = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
result['log_date'] = log_date.strftime('%Y-%m-%d')
# デバッグ出力
logger.info(f"Top level keys after transformation: {list(result.keys())}")
# 結果をJSON文字列に変換してBase64エンコード
data_json = json.dumps(result) + '\n'
encoded_data = base64.b64encode(data_json.encode('utf-8')).decode('utf-8')
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': encoded_data
}
except Exception as e:
# エラーが発生した場合はログに記録し、元のデータを返す
logger.error(f"Error processing record: {str(e)}")
output_record = {
'recordId': record['recordId'],
'result': 'ProcessingFailed',
'data': record['data']
}
output.append(output_record)
logger.info(f"Successfully processed {len(output)} records")
return {'records': output}
convert_keys_to_lowercase
という関数を定義して、列名をすべて小文字に変えたうえで、base64エンコードをしています。
AWS Lambda関数が作成できたら、Firehoseストリーム作成とWAFロギングの設定をします。こちらは先程(まずはどんなレコードが出力されるか確認する)と作成方法は同じなので、割愛します。
正常に作成できていると以下のようにWAFのログがS3 Tablesに出力できていることが確認できます。
ネストされた列についてもクエリしてみましょう。以下のようなSELECT文を実行します。
SELECT
timestamp,
httprequest.uri,
h.name,
h.value
FROM my_s3_namespace.waflogs
CROSS JOIN UNNEST(httprequest.headers) AS t(h)
WHERE h.name = 'User-Agent'
AND log_date BETWEEN CURRENT_DATE - INTERVAL '7' DAY AND CURRENT_DATE
LIMIT 100;
すると以下のようにネスト部分を表示することができます。
CROSS JOIN UNNEST(httprequest.headers) AS t(h)
の部分が特に重要で以下のようなことをやっています。
UNNEST
演算子は、配列を行に展開します。httprequest.headers
は下記の通り構造体(struct型)の配列となっています。
httprequest struct<
clientip: string,
country: string,
uri: string,
args: string,
httpversion: string,
httpmethod: string,
requestid: string,
host: string,
headers: array<struct<name: string, value: string>>,
scheme: string,
fragment: string
>
CROSS JOIN UNNEST(httprequest.headers)
は、以下のような処理を配列に対して実行しています。httprequest.headers
配列を取り出す- 配列内の各要素(各ヘッダー)を別々の行に展開する
- 元のデータの他のフィールド(timestamp, uriなど)と組み合わせる
AS t(h)
の部分は、展開された配列要素に別名を付けています。t
はテーブルエイリアス、h
は展開された構造体(ヘッダー情報)の別名です。
またWHERE h.name = 'User-Agent'
で展開されたヘッダー情報の中から、名前が"User-Agent"のものだけを抽出しています。
余談
この検証をしている最中にも遭遇したのですが、Firehoseは一度作成したストリームを変更したり、Firehoseストリームは作り直さずにテーブルの作り直しだけをしたりするとパーミッションエラーが起こるという挙動をすることがあるようです。(具体的な発生条件まで追えていません。)
具体的には以下のようなエラーがS3バックアップバケットに出力されました。(機微と思われる部分をマスクしてます)
IcebergFailedItem(attemptsMade=0, arrivalTimestamp=1742810713000, lastErrorCode=S3.AccessDenied, lastErrorMessage=Access was denied. Ensure that the trust policy for the provided IAM role allows Firehose to assume the role, and the access policy allows access to the S3 bucket., attemptEndingTimestamp=1742810820689, dataFileFormat=PARQUET, deleteFileFormat=, dataFilesPaths=s3://[BUCKET-ID-MASKED]--table-s3/data/sxMagg/log_date_month=2025-03/00008-0-[FILE-ID-MASKED]-00002.parquet,s3://[BUCKET-ID-MASKED]--table-s3/data/c9koKg/log_date_month=2025-03/00003-1-[FILE-ID-MASKED]-00002.parquet, deleteFilesPaths=)
このときは、以下の手順を行うことで解消しました。どこまで必要かまでは検証できていませんが、もし同じようなエラーが出たときは参考になればと思います
- AWS WAFのLogging設定を一度Disableにする
- Firehoseストリームを削除する
- S3Tablesのテーブルを
drop table my_s3_namespace.<テーブル名>;
で削除する - S3Tablesのテーブルを同一の内容で、別の名前で作り直す。
- Firehoseストリームを作成する
- AWS WAFのLogging設定で再作成したFirehoseストリームを指定
まとめ
今回はWAFログをS3 Tablesに格納する方法について解説しました。ポイントをまとめると以下のようになります。
- WAFのログをFirehoseに出力するには、ストリーム名に
aws-waf-logs-
というプレフィックスが必要 - WAFのログはネストされた構造を持つため、S3 Tablesのテーブル定義でもそれに合わせた構造体の定義が必要
- Amazon Athenaでテーブルを作成すると列名は小文字になるため、WAFログのJSONキーも小文字に変換する必要がある
- ネストされた構造のクエリには
UNNEST
演算子が便利
S3 Tablesを使うことで、WAFログのような大量のセキュリティデータを効率的に保存・分析することができます。特に長期間のログ保存と高速なクエリが必要なセキュリティ分析のユースケースでは、S3 Tablesが活きてくるのではないかなと思います。
以上、とーちでした。