CDKでAthenaを使用したWAFログ分析環境を実装する方法
こんにちは、リテールアプリ共創部の戸田駿太です。
この記事では、AWS CDK を使用して、Athena を使用した WAF ログ分析環境を構築する方法を解説します。
はじめに
前回の記事では、CDK で WAF のアクセスログを S3 に保存する方法について解説しました。
今回は、その続きとして、Amazon Athena を使用して S3 に保存された WAF ログを効率的に分析する環境を CDK で構築する方法を紹介します。Athena を活用することで、標準的な SQL クエリを使って大量の WAF ログを簡単に分析できるようになります。
実装するアーキテクチャ
今回実装するアーキテクチャは以下の通りです:
- S3 に保存された WAF ログに対するスキーマ定義
- Glue データカタログを使用してログのスキーマを定義
- Athena でログを分析するためのワークグループを設定
- クエリ結果を保存するための S3 バケットを設定
ソースコード
S3 バケットの構造
Athena のクエリ結果は指定した S3 バケットに保存されます.
AthenaResultBucket/
└── {クエリ結果ファイル}
CDK による Athena 分析環境の実装
スタックの作成
まず、Athena 分析環境のためのリソースを含む CDK スタックを作成します。このコードでは、既存の WAF ログバケットを参照し、Athena のクエリ結果を保存するための新しい S3 バケットを作成しています。
export class AthenaWafAnalysisCdkStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// 既存のWAFログバケットを参照
const wafLogBucket = s3.Bucket.fromBucketName(this, "WafLogBucket", "既存のWAFログバケット名をここに入力");
// Athenaのクエリ結果を保存するS3バケットを作成
const athenaResultBucket = new s3.Bucket(this, "AthenaResultBucket", {
removalPolicy: cdk.RemovalPolicy.DESTROY,
autoDeleteObjects: true,
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
encryption: s3.BucketEncryption.S3_MANAGED,
});
// 以下、Athena関連リソースの実装...
}
}
Glue データカタログの設定
WAF ログを Athena で分析するためのデータカタログを設定します。このコードでは、S3 に保存された WAF ログのスキーマを定義して、Athena でクエリできるようにします。
// AWS Glueデータベースを作成
const glueDatabase = new glue.CfnDatabase(this, "WafLogDatabase", {
catalogId: this.account,
databaseInput: {
name: "waf_logs_db",
description: "Database for WAF logs",
},
});
// AWS Glueテーブルを作成
const glueTable = new glue.CfnTable(this, "WafLogTable", {
catalogId: this.account,
databaseName: glueDatabase.ref,
tableInput: {
name: "waf_logs",
tableType: "EXTERNAL_TABLE",
parameters: {
classification: "json",
typeOfData: "file",
},
storageDescriptor: {
location: `s3://${wafLogBucket.bucketName}/waf-logs/`,
inputFormat: "org.apache.hadoop.mapred.TextInputFormat",
outputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
serdeInfo: {
serializationLibrary: "org.openx.data.jsonserde.JsonSerDe",
parameters: {
paths:
"action,httpRequest,nonTerminatingMatchingRules,rateBasedRuleList,ruleGroupList,terminatingRuleId,terminatingRuleType",
},
},
columns: [
{ name: "timestamp", type: "timestamp" },
{ name: "formatversion", type: "int" },
{ name: "webaclid", type: "string" },
{ name: "terminatingruleid", type: "string" },
{ name: "terminatingruletype", type: "string" },
{ name: "action", type: "string" },
{ name: "httpsourcename", type: "string" },
{ name: "httpsourceid", type: "string" },
{ name: "rulegrouplist", type: "array<string>" },
{ name: "ratebasedrulelist", type: "array<string>" },
{ name: "nonterminatingmatchingrules", type: "array<string>" },
{
name: "httprequest",
type: "struct<clientip:string,country:string,headers:array<struct<name:string,value:string>>,uri:string,args:string,httpversion:string,httpmethod:string,requestid:string>",
},
],
},
},
});
この Glue テーブル定義では、WAF ログの JSON フォーマットに合わせて各フィールドの型を指定しています。
AI を活用した Glue テーブル構造の作成
WAF ログのような複雑な構造を持つ JSON データに対する Glue テーブルを作成する際、スキーマ定義が複雑になることがあります。特に以下のような入れ子構造を持つフィールド定義は手動で作成するのがめんどうです。
type: "struct<clientip:string,country:string,headers:array<struct<name:string,value:string>>,uri:string,args:string,httpversion:string,httpmethod:string,requestid:string>";
このような複雑なスキーマ定義を簡単に作成する際には生成 AI を活用すると便利です。実際の WAF ログのサンプルを 生成 AI に入力し、「この JSON ログデータに対応する Glue テーブルのスキーマ定義を作成してください」と指示するだけで、適切な型定義を含むスキーマを自動生成できます。
手順は以下の通りです:
- 実際の WAF ログの JSON サンプルを AI に入力
- 「このログデータに適した Glue テーブルの columns 定義を生成してください」と依頼
- AI が自動的に適切な型やネストされた構造を解析し、CDK で使用可能なスキーマ定義を生成
これにより、複雑なネスト構造や配列定義を正確に把握し、手作業での型定義ミスを防ぐことができます。特に WAF ログのような複雑な構造を持つデータでは、この方法によって開発効率が大幅に向上します。
Athena ワークグループの設定
Athena クエリを実行するためのワークグループとロールを設定します。このセクションでは、クエリの実行環境と必要な権限を設定しています。
// Athena Workgroupを作成
const athenaWorkgroup = new athena.CfnWorkGroup(this, "WafAnalysisWorkGroup", {
name: "waf-analysis-workgroup",
recursiveDeleteOption: true,
state: "ENABLED",
workGroupConfiguration: {
resultConfiguration: {
outputLocation: `s3://${athenaResultBucket.bucketName}/`,
},
publishCloudWatchMetricsEnabled: true,
enforceWorkGroupConfiguration: true,
},
});
// Athenaクエリを実行するためのIAMロールを作成
const athenaQueryRole = new iam.Role(this, "AthenaQueryRole", {
assumedBy: new iam.ServicePrincipal("athena.amazonaws.com"),
});
// 権限を追加
athenaQueryRole.addManagedPolicy(iam.ManagedPolicy.fromAwsManagedPolicyName("AmazonAthenaFullAccess"));
wafLogBucket.grantRead(athenaQueryRole);
athenaResultBucket.grantReadWrite(athenaQueryRole);
ワークグループを使用することで、特定の分析チーム向けに独立した環境を提供でき、クエリ結果の保存場所や設定を一元管理できます。IAM ロールには Athena の機能を使用するための権限と、S3 バケットに対する適切なアクセス権限が付与されています。
リソース情報の出力
最後に、作成したリソースの情報を出力として定義します。これにより、コンソールで各リソースに簡単にアクセスできるようになります。
// リソース情報を出力
new cdk.CfnOutput(this, "AthenaWorkgroupName", {
value: athenaWorkgroup.name,
description: "The name of the Athena workgroup for querying WAF logs",
});
new cdk.CfnOutput(this, "GlueDatabaseName", {
value: glueDatabase.ref,
description: "The name of the Glue database for WAF logs",
});
new cdk.CfnOutput(this, "GlueTableName", {
value: glueTable.ref,
description: "The name of the Glue table for WAF logs",
});
new cdk.CfnOutput(this, "AthenaResultBucketName", {
value: athenaResultBucket.bucketName,
description: "The name of the S3 bucket storing Athena query results",
});
これらの出力は、スタックのデプロイ後に Athena コンソールでクエリを実行する際に参照すると便利です。
WAF ログの分析
Athena でのクエリ例
Athena を使うことで、WAF ログに対して SQL クエリを実行して様々な分析を行うことができます。以下に、よく使われるクエリ例を紹介します。
ブロックされたリクエストの集計
以下のクエリは、日ごとのブロックされたリクエスト数を集計します。
SELECT
DATE(from_unixtime(timestamp/1000)) as day,
count(*) as request_count
FROM
waf_logs_db.waf_logs
WHERE
action = 'BLOCK'
GROUP BY
DATE(from_unixtime(timestamp/1000))
ORDER BY
day DESC;
このクエリはタイムスタンプを日付に変換し、ブロックされたリクエストのみを集計して日ごとの傾向を把握できます。
攻撃元の国別集計
以下のクエリは、ブロックされたリクエストの攻撃元を国別に集計します。
SELECT
httprequest.country,
count(*) as blocked_requests
FROM
waf_logs_db.waf_logs
WHERE
action = 'BLOCK'
GROUP BY
httprequest.country
ORDER BY
blocked_requests DESC
LIMIT 10;
このクエリを実行することで、どの国からの攻撃が多いかを特定し、地域ベースのブロック戦略の検討材料にすることができます。
終了ルール別のブロック数
以下のクエリは、どの WAF ルールが最も多くのリクエストをブロックしたかを集計します。
SELECT
terminatingruleid,
count(*) as rule_blocks
FROM
waf_logs_db.waf_logs
WHERE
action = 'BLOCK'
GROUP BY
terminatingruleid
ORDER BY
rule_blocks DESC;
このクエリの結果から、最も効果的な WAF ルールを特定したり、あまり効果のないルールを見直したりする判断材料が得られます。
IP アドレス別のリクエスト数
以下のクエリは、IP アドレス別のリクエスト数を集計します。
SELECT
httprequest.clientip,
count(*) as request_count
FROM
waf_logs_db.waf_logs
GROUP BY
httprequest.clientip
ORDER BY
request_count DESC
LIMIT 20;
このクエリにより、特定の IP アドレスからの異常なアクセス数を検出し、潜在的な攻撃者を特定することができます。
時間帯別のブロックされたリクエスト数
以下のクエリは、時間帯別にブロックされたリクエスト数を集計します。
SELECT
HOUR(from_unixtime(timestamp/1000)) as hour_of_day,
count(*) as blocked_count
FROM
waf_logs_db.waf_logs
WHERE
action = 'BLOCK'
GROUP BY
HOUR(from_unixtime(timestamp/1000))
ORDER BY
hour_of_day;
このクエリの結果から、攻撃が多い時間帯を特定し、監視を強化するタイミングを決定することができます。
URI パターン別のアクセス分析
以下のクエリは、アクセスされた URI ごとのリクエスト数を集計します。
SELECT
httprequest.uri,
count(*) as access_count
FROM
waf_logs_db.waf_logs
GROUP BY
httprequest.uri
ORDER BY
access_count DESC
LIMIT 20;
パーティショニングを設定したテーブルに対しては、新しいログデータが追加されるたびにパーティションを更新する必要があります。これには AWS Glue クローラを使用することができます。
ユースケース
このアーキテクチャを活用することで、以下のようなユースケースに対応できます:
- セキュリティモニタリング: WAF によってブロックされた攻撃を分析し、攻撃パターンを把握
- ルール最適化: ブロックされたリクエストの分析から、WAF ルールの有効性を評価し改善
- 地理的アクセス分析: 国別、地域別のアクセスパターンとブロックされた攻撃の相関関係を分析
- API 利用状況の把握: API Gateway へのアクセスパターンを分析し、リソース最適化に活用
- 異常検知: 通常のアクセスパターンと異なる動きを検出して、未知の攻撃を特定
まとめ
AWS CDK を使用して Athena による WAF ログ分析環境を構築することで、以下のメリットが得られます。
- 効率的な分析: SQL を使用して大量の WAF ログを簡単に分析できる
- インフラのコード化: 環境構築をコードで管理し、再現性を確保
- コスト最適化: 必要なときだけクエリを実行し、ストレージコストを最小限に抑える
- セキュリティ強化: WAF のパターンを分析して、ルールを最適化
実際の運用では、定期的な分析レポートの自動化やアラートの設定なども設定できます。
AWS Lambda と Amazon EventBridge を組み合わせることで、特定の攻撃パターンを検出した場合に自動的に対応するシステムを構築することも可能です。
ぜひ参考にしてみてください。