LambdaでAWS WAFのsampleログを取得してS3に保存する

2018.01.25

こんにちは、臼田です。

皆さん、Lambdaしていますか?

今日はAWS WAFにて検知・ブロックしたログをLambdaを利用してS3に保存してみます。

動機

なぜこのようなことを行うのかという動機ですが、AWS WAFのログを保存し閲覧・分析したいためです。

AWS WAFでは通常のログ保存機能はありません。現状では3時間以内の一部のログをサンプルとして取得するしか出来ません。これはある程度の規模でより厳格にWAFを運用するという用途ではやや不向きな状態です。

WAFに求められる機能は、アプリケーションに対する防御も然ることながら、加えてログから攻撃者の動向を知ることで、どのアプリケーション、コンテンツが狙われているか、どこから攻撃されているか、どのようなツールが利用されているか等様々な情報を取得し、次の対策に繋げるという役割もあります。

目的

今回のブログでは以下の目的で進めます。

  • WAFのsampleログをLambdaで取得しS3に保存する
  • 保存するデータはAmazon Athenaでクエリできるようにする
  • Lambdaの構造はCloudFormationからwebACLを指定して適用する事を想定
  • Lambdaの実装について詳しく説明する

なお、Athenaでのクエリ及びCloudFormationについては別の回で説明します。

WAFのsampleログとは

Gets detailed information about a specified number of requests--a sample--that AWS WAF randomly selects from among the first 5,000 requests that your AWS resource received during a time range that you choose. You can specify a sample size of up to 500 requests, and you can specify any time range in the previous three hours.

参考: GetSampledRequests - AWS WAF

GetSampleRequestsというAPIで取得可能です。

過去3時間中の任意の期間を指定すると、最初の5000行のログからサンプリングした最大500行のログが取得可能です。

これは完全なログではなく、サンプルであるという点に注意が必要です。あくまで参考程度に利用するように心がけて下さい。

スクリプト

Python3.6にて実装しています。
import os
import json
from datetime import datetime, timedelta
import boto3

WAF_TYPE = os.environ['WAF_TYPE']
REGION = os.environ['REGION']
WebAclId = os.environ['WebAclId']
OUTPUT_BUCKET = os.environ['OUTPUT_BUCKET']
OUTPUT_FILE_NAME_BASE = 'waf-logs.json'
tmpfile = '/tmp/logs.json'
INTERVAL = int(os.environ['INTERVAL'])


s3 = boto3.resource('s3')
# select client type
if WAF_TYPE == 'ALB':
	session = boto3.session.Session(region_name=REGION)
	waf = session.client('waf-regional')
else:
	waf = boto3.client('waf')

# replace datetime to string
def time2str(x):
	x['Timestamp'] = x['Timestamp'].isoformat()
	return x

def lambda_handler(event, context):
	# get rules
	acl = waf.get_web_acl(WebACLId=WebAclId)
	WebACLName = acl['WebACL']['Name']
	marge_logs = []
	for rule in acl['WebACL']['Rules']:
		RuleId = rule['RuleId']
		# get sample requests
		r = waf.get_sampled_requests(
			MaxItems=500,
			WebAclId=WebAclId,
			RuleId=RuleId,
			TimeWindow={
				'EndTime': datetime.utcnow(),
				'StartTime': datetime.utcnow() - timedelta(minutes=INTERVAL)
			}
		)
		# check sample count
		if 'SampledRequests' not in r or len(r['SampledRequests']) == 0:
			continue
		# replace datetime to string
		logs = list(map(time2str, r['SampledRequests']))
		for l in logs:
			# add rule id to log
			l['RuleId'] = RuleId
			marge_logs.append(json.dumps(l))

	# save logs to S3
	if marge_logs != []:
		now = datetime.utcnow().strftime('%Y-%m-%d/%H-%M-%S-%f')
		keyname = '_'.join([now, WebACLName, OUTPUT_FILE_NAME_BASE])
		try:
			with open(tmpfile, 'w') as outfile:
				outfile.write('\n'.join(marge_logs))
			s3.Object(OUTPUT_BUCKET, keyname).upload_file(
				tmpfile,
				ExtraArgs={'ContentType': "application/json"}
			)
		except Exception as e:
			print(e, "Error to write output file")
	return marge_logs

順に説明していきます。

流れ

  • 指定されたwebACLから設定されているRulesを取得
  • Ruleの数だけループ
    • sampleをintervalの間隔で取得
    • 取得したログを加工して溜める
  • 全てのログをS3に保存

環境変数

  • WAF_TYPE: WAF(webACL)がCloudFront/ALBのどちらに適用されているかを識別します。適用先に応じて利用するboto3クライアントが変わります。
  • REGION: WAFがALBに適用されている場合にクライアント生成時にリージョンを指定します
  • WebAclId: ログを取得するwebACLのID。CloudFormationから作成することを想定しているため直接IDを保持します。
  • OUTPUT_BUCKET: 出力先のS3バケット
  • INTERVAL: ログの取得間隔(分)。CloudFormationにて同時に作成するcronと同値にする想定。500行のログが出力される頻度に応じて変更します。

スクリプトのポイント

  • sampleのtimestampのstring化
    • jsonで保存するにあたり、r['SampledRequests'][各リクエスト]['Timestamp']をdatetime型からstring型に変換しています。
    • map(time2str, r['SampledRequests'])にて実行
  • ログにRuleIdを挿入
    • Athenaで各ログ毎に処理を行いたいため、ログ毎にどのRuleで処理されたか分かるようにRuleIdを挿入しています。
    • l['RuleId'] = RuleId
    • sampleログは、RuleIdを指定して取得しているため、元データ個々にRuleIdはありません
  • ログ1行毎に改行して保存
    • Athenaで処理するにあたり、ログファイルは各ログのリストだと1行ごと認識できません。
    • その為、ログ毎に改行して下記のような形式で取り込みます。
    • {"Request": {"ClientIP": "192.0.2.1", "Country": "-", "URI": "/index.html", "Method": "POST", "HTTPVersion": "HTTP/1.1", "Headers": [{"Name": "Host", "Value": "alb.ap-northeast-1.elb.amazonaws.com"}, {"Name": "Content-Length", "Value": "28"}, {"Name": "User-Agent", "Value": "curl/7.54.0"}, {"Name": "Accept", "Value": "*/*"}, {"Name": "Content-Type", "Value": "application/x-www-form-urlencoded"}]}, "Weight": 1, "Timestamp": "2018-01-25T04:40:58.325000+00:00", "Action": "COUNT", "RuleId": "be06c7bc-fc2c-4cd2-b53b-097548df2e6f"}
      {"Request": {"ClientIP": "192.0.2.1", "Country": "-", "URI": "/index.html", "Method": "POST", "HTTPVersion": "HTTP/1.1", "Headers": [{"Name": "Host", "Value": "alb.ap-northeast-1.elb.amazonaws.com"}, {"Name": "Content-Length", "Value": "28"}, {"Name": "User-Agent", "Value": "curl/7.54.0"}, {"Name": "Accept", "Value": "*/*"}, {"Name": "Content-Type", "Value": "application/x-www-form-urlencoded"}]}, "Weight": 1, "Timestamp": "2018-01-25T04:41:03.268000+00:00", "Action": "COUNT", "RuleId": "be06c7bc-fc2c-4cd2-b53b-097548df2e6f"}
      {"Request": {"ClientIP": "192.0.2.1", "Country": "-", "URI": "/index.html", "Method": "POST", "HTTPVersion": "HTTP/1.1", "Headers": [{"Name": "Host", "Value": "alb.ap-northeast-1.elb.amazonaws.com"}, {"Name": "Content-Length", "Value": "28"}, {"Name": "User-Agent", "Value": "curl/7.54.0"}, {"Name": "Accept", "Value": "*/*"}, {"Name": "Content-Type", "Value": "application/x-www-form-urlencoded"}]}, "Weight": 1, "Timestamp": "2018-01-25T04:40:53.267000+00:00", "Action": "COUNT", "RuleId": "be06c7bc-fc2c-4cd2-b53b-097548df2e6f"}
      {"Request": {"ClientIP": "192.0.2.1", "Country": "-", "URI": "/index.html", "Method": "POST", "HTTPVersion": "HTTP/1.1", "Headers": [{"Name": "Host", "Value": "alb.ap-northeast-1.elb.amazonaws.com"}, {"Name": "Content-Length", "Value": "28"}, {"Name": "User-Agent", "Value": "curl/7.54.0"}, {"Name": "Accept", "Value": "*/*"}, {"Name": "Content-Type", "Value": "application/x-www-form-urlencoded"}]}, "Weight": 1, "Timestamp": "2018-01-25T04:40:45.601000+00:00", "Action": "COUNT", "RuleId": "be06c7bc-fc2c-4cd2-b53b-097548df2e6f"}
    • 事前に1行ごとstringにjson.dumps()しておき、最後にoutfile.write('\n'.join(marge_logs))します

IAM Role

必要なIAM Roleは次の3種類です。

  • WAFのRead
  • S3への書き込み
  • LambdaとしてCloudWatch Logsへの書き込み

最初にLambdaを作成する際に一緒に新規IAM Roleを作成すると、CloudWatch Logsへの書き込みは設定されるので、その後WAFのReadとS3への書き込みを追加するとスムーズです。

WAFについてはAWS管理ポリシーであるAWSWAFReadOnlyAccessを利用することが可能です。

S3については新たに管理ポリシーを作成しましょう。現在IAM Roleに直接インラインポリシーを記載することは非推奨です。

最近実装されたIAMポリシービジュアルエディタを利用することにより非常に簡単に管理ポリシーを作成することが可能です。

[新機能]IAMのVisual Editorを使って見た[便利!]

サービスはS3を選択し、アクションはPutObject、リソースに保存したいS3を設定すれば出来上がりです。

作成した管理ポリシーをIAM Roleに紐付けて完了です。

まとめ

今回はAWS WAFのsampleログを取得してみました。この仕組があれば、AWS WAFを利用する際に、良い感じにログを取得して分析できるようになると思います。

ただ、最初にも書きましたが、sampleログであり全てのログが取得できているわけではないことに留意して利用しましょう。

また別でCloudFormationでの適用やAthenaでの分析について書いていきたいと思います。