Control Tower環境でConfigルールによってVPCフローログの有効化を強制させる
はじめに
こんにちは。クラウド事業本部コンサルティング部に所属している和田響です。
この記事ではAWS Control Tower環境で全てのVPCに対して、VPCフローログの有効化を強制させる方法について記載します。
インプット
この記事は以下のAWSブログの手順に従って、実際にソリューションを展開して検証していきます。
全体構成は以下の通りです。
AWS ConfigルールでVPCのフローログが有効かどうかを評価し、SSM自動化ドキュメントによってVPCフローログが無効になっているVPCを自動で修正します。
やってみる
S3バケットの作成
まずはControl Tower環境内のVPCフローログを集約するためのS3バケットを作成します。
この記事ではControl Towerセットアップ時に作成されるLog Archiveアカウントに作成していきます。
Amazon S3のコンソールから バケットを作成 をクリックします。
バケット名には任意の文字列を入力し、
その他はデフォルトのままで バケットを作成 をクリックします。
続いてバケットポリシーを編集します。
作成したS3バケットのバケットポリシーの 編集 をクリックします。
以下のポリシーを貼り付けます。
<s3-bucket-name> をバケット名に変更します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "delivery.logs.amazonaws.com"
},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::<s3-bucket-name>/*",
"Condition": {
"StringEquals": {
"s3:x-amz-acl": "bucket-owner-full-control"
}
}
},
{
"Effect": "Allow",
"Principal": {
"Service": "delivery.logs.amazonaws.com"
},
"Action": "s3:GetBucketAcl",
"Resource": "arn:aws:s3:::<s3-bucket-name>"
}
]
}
変更の保存 をクリックします。
CloudFormation StackSetsのデプロイ
はじめに以下のYAMLファイル(aws-config-remediate-vpc-flow-logs.yam)をダウンロードしておきます。
AWSTemplateFormatVersion: '2010-09-09'
Description: 'This template will deploy an AWS Config rule to automatically remediate VPC Flow Logs enablement'
Parameters:
CustomConfigRuleName:
Description: Name that you want to give to the AWS Config Rule.
Type: String
Default: ConfigRuleForEnableVpcFlowLogs
TrafficType:
Type: String
AllowedValues:
- ACCEPT
- REJECT
- ALL
Description: The value for the VPC Flow Logs traffic type.
Default: ALL
MaxExecutionFrequency:
Type: String
AllowedValues:
- One_Hour
- Three_Hours
- Six_Hours
- Twelve_Hours
- TwentyFour_Hours
Description: The maximum frequency with which AWS Config runs evaluations for a rule.
Default: One_Hour
CentralizedS3LoggingBucket:
Description: Name of the S3 bucket in the logging account to send VPC Flow Logs.
Type: String
Resources:
ConfigRemediationRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Principal:
Service:
- 'ssm.amazonaws.com'
Action:
- 'sts:AssumeRole'
Path: '/'
Policies:
- PolicyName: aws-config-remediate-vpc-flow-logs-policy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Action:
- logs:CreateLogDelivery
- logs:DeleteLogDelivery
- logs:CreateLogGroup
- logs:PutResourcePolicy
- logs:DescribeResourcePolicies
- logs:DescribeLogGroups
Resource: "*"
- Effect: 'Allow'
Action:
- ec2:CreateFlowLogs
- ec2:DescribeFlowLogs
Resource: '*'
- Effect: 'Allow'
Action:
- iam:CreateServiceLinkedRole
Resource: "arn:aws:iam::*:role/aws-service-role/delivery.logs.amazonaws.com/*"
- Effect: 'Allow'
Action:
- s3:PutObject
- s3:GetBucketAcl
Resource:
- !Sub 'arn:aws:s3:::${CentralizedS3LoggingBucket}'
- !Sub 'arn:aws:s3:::${CentralizedS3LoggingBucket}/*'
ConfigRuleForEnableVpcFlowLogs:
Type: AWS::Config::ConfigRule
Properties:
ConfigRuleName: !Ref CustomConfigRuleName
Description: ConfigPermissionToInvokeAnAutomaticRemediation
InputParameters:
trafficType: !Ref TrafficType
MaximumExecutionFrequency: !Ref MaxExecutionFrequency
Scope:
ComplianceResourceTypes:
- AWS::EC2::VPC
Source:
Owner: AWS
SourceIdentifier: VPC_FLOW_LOGS_ENABLED
VpcFlowLogsRemediationConfiguration:
DependsOn: ConfigRuleForEnableVpcFlowLogs
Type: AWS::Config::RemediationConfiguration
Properties:
ConfigRuleName: !Ref CustomConfigRuleName
Automatic: true
MaximumAutomaticAttempts: 5 #minutes
RetryAttemptSeconds: 50 #seconds
ResourceType: AWS::EC2::VPC
Parameters:
VPCIds:
ResourceValue:
Value: 'RESOURCE_ID'
LogDestinationType:
StaticValue:
Values:
- s3
LogDestinationArn:
StaticValue:
Values:
- !Sub 'arn:aws:s3:::${CentralizedS3LoggingBucket}'
TrafficType:
StaticValue:
Values:
- !Ref TrafficType
AutomationAssumeRole:
StaticValue:
Values:
- !GetAtt ConfigRemediationRole.Arn
TargetId: AWS-EnableVPCFlowLogs
TargetType: SSM_DOCUMENT
TargetVersion: 1
Outputs:
ConfigRuleForEnableVpcFlowLogsArn:
Description: Arn of the AWS Config Rule to enable VPC Flow Logs
Value: !GetAtt ConfigRuleForEnableVpcFlowLogs.Arn
ConfigRemediationRoleArn:
Description: Arn of the IAM Role to perform auto-emediation
Value: !GetAtt ConfigRemediationRole.Arn
Control Tower 管理アカウントで、Control Tower がデプロイされているAWSリージョンを選択します。
(この記事では東京リージョンを選択)
CloudFormationのコンソールから、StackSetsを開きます。
StackSet を作成 をクリックします。
サービスマネージドアクセス許可 を選択し
テンプレートファイルのアップロード よりダウンロードしておいたYAMLファイル(aws-config-remediate-vpc-flow-logs.yam)をアップロードします。
以下の値を入力して 次へ をクリックします。
StackSet 名:任意の値
CentralizedS3LoggingBucket:アーカイブ用に作成したS3バケット名
CustomConfigRuleName:任意の値(デフォルトは ConfigRuleForEnableVpcFlowLogs )
MaxExecutionFrequency:AWS Configルールの評価頻度(デフォルトは One_Hour )
TrafficType: VPC フローログがキャプチャするトラフィックのタイプ(デフォルトは ALL )
チェックを入れて 次へ をクリックします。
デプロイターゲットは 組織単位 (OU) へのデプロイ を選択し、AWS OU IDにはデプロイしたいOUのIDを入力します。
リージョンの指定で、対象のリージョンを指定します。
選択した内容を確認し 送信 をクリックします。
CloudFormation スタックのデプロイ
以下のYAMLファイル (ct-lifecycle-event.yaml)をダウンロードしておきます。
AWSTemplateFormatVersion: 2010-09-09
Description: FlowLog - Infrastructure at the Control tower Management account for setting up Control tower life cycle event
Parameters:
StackSetArn:
Type: String
Description: ARN of the StackSet deployed from Control Tower Management account
Resources:
CTLifeCycleTrigger:
Type: AWS::Lambda::Function
Properties:
Description: CT Lifecycle Trigger Lambda function
Handler: lambda_handler
Runtime: python3.8
Role: !GetAtt 'FlowLogLifeCycleRole.Arn'
Timeout: 600
Environment:
Variables:
ct_stack_set_arn: !Ref StackSetArn
Code:
ZipFile: |
import json
import boto3
import logging
import os
from botocore.exceptions import ClientError
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
logging.getLogger('boto3').setLevel(logging.CRITICAL)
logging.getLogger('botocore').setLevel(logging.CRITICAL)
session = boto3.Session()
def list_stack_instance_by_account(target_session, stack_set_name, account_id):
'''
List all stack instances based on the StackSet name and Account Id
'''
try:
cfn_client = target_session.client('cloudformation')
cfn_paginator = cfn_client.get_paginator('list_stack_instances')
operation_parameters = {
'StackSetName': stack_set_name,
'StackInstanceAccount': account_id
}
stackset_result = cfn_paginator.paginate(**operation_parameters)
stackset_list = []
for page in stackset_result:
if 'Summaries' in page:
stackset_list.extend(page['Summaries'])
return stackset_list
except Exception as e:
LOGGER.error("List Stack Instance error: %s" % e)
return []
def list_stack_instance_region(target_session, stack_set_name):
'''
List all stack instances based on the StackSet name
'''
try:
cfn_client = target_session.client('cloudformation')
cfn_paginator = cfn_client.get_paginator('list_stack_instances')
operation_parameters = {
'StackSetName': stack_set_name
}
stackset_result = cfn_paginator.paginate(**operation_parameters)
stackset_list_region = []
for page in stackset_result:
for instance in page['Summaries']:
stackset_list_region.append(instance['Region'])
stackset_list_region=list(set(stackset_list_region))
return stackset_list_region
except Exception as e:
LOGGER.error("List Stack Instance error: %s" % e)
return []
def create_stack_instance(target_session, stackset_name, account, regions):
'''
Create stackset in particular account + region
'''
try:
cfn_client = target_session.client('cloudformation')
response = cfn_client.create_stack_instances(
StackSetName=stackset_name,
Accounts=account,
Regions=regions
)
LOGGER.debug(response)
LOGGER.info("Launched stackset instance {} for account {} in regions: {} with Operation id: {}".format(stackset_name, account, regions, response["OperationId"]))
return True
except Exception as e:
LOGGER.error("Could not create stackset instance : {}".format(e))
return False
def get_accounts_by_ou(target_session, ou_id):
'''
List all active accounts by the OU id
'''
try:
org_client = target_session.client('organizations')
org_paginator = org_client.get_paginator('list_accounts_for_parent')
operation_parameters = {
'ParentId': ou_id
}
accounts_response = org_paginator.paginate(**operation_parameters)
accounts_list = []
active_accounts_list = []
for page in accounts_response:
if 'Accounts' in page:
accounts_list.extend(page['Accounts'])
for account in accounts_list:
if account['Status'] == 'ACTIVE':
active_accounts_list.append(account['Id'])
return active_accounts_list
except ClientError as e:
LOGGER.error("Organization get accounts by OU error : {}".format(e))
return []
def lambda_handler(event, context):
LOGGER.info('Lambda Handler - Start')
LOGGER.info('REQUEST RECEIVED: {}'.format(json.dumps(event, default=str)))
# Check if lifecycle even matches
if 'detail' in event and event['detail']['eventName'] == 'CreateManagedAccount':
if event['detail']['serviceEventDetails']['createManagedAccountStatus']['state'] == 'SUCCEEDED':
account_id = event['detail']['serviceEventDetails']['createManagedAccountStatus']['account']['accountId']
#find if existing stackset instance for this account already exist
stackset_name = (str(os.environ["ct_stack_set_arn"]).split(":")[5]).split("/")[1]
stackset_instances = list_stack_instance_by_account(session, stackset_name, account_id)
stackset_instances_regions = list_stack_instance_region(session, stackset_name)
#stackset instance does not exist, create a new one
if len(stackset_instances) == 0:
create_stack_instance(session, stackset_name, [account_id], stackset_instances_regions)
#stackset instance already exist, check for missing region
elif len(stackset_instances) > 0:
stackset_region = []
for instance in stackset_instances:
stackset_region.append(instance['Region'])
next_region = list(set(stackset_instances_regions) - set(stackset_region))
if len(next_region) > 0:
create_stack_instance(session, stackset_name, [account_id], next_region)
else:
LOGGER.info("Stackset instance already exist : {}".format(stackset_instances))
else:
LOGGER.error("Invalid event state, expected: SUCCEEDED : {}".format(event))
else:
LOGGER.error("Invalid event received : {}".format(event))
LOGGER.info('Lambda Handler - End')
FlowLogLifeCycleRole:
Type: AWS::IAM::Role
Properties:
Description: FlowLog - Role used by lambda for life cycle / new account creation
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "lambda.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
Policies:
- PolicyName: StackSetPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- cloudformation:ListStackInstances
- cloudformation:CreateStackInstances
Resource:
- !Ref StackSetArn
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource:
- !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*'
CreateAccountLifeCycleRule:
Type: AWS::Events::Rule
Properties:
Description: FlowLog - CT Life Cycle for CreateManageAccount
EventPattern:
{
"source": [
"aws.controltower"
],
"detail-type": [
"AWS Service Event via CloudTrail"
],
"detail": {
"eventSource": [
"controltower.amazonaws.com"
],
"eventName": [
"CreateManagedAccount"
]
}
}
State: ENABLED
Targets:
- Arn: !GetAtt CTLifeCycleTrigger.Arn
Id: "OrganizationalUnitLifeCycle"
CreateAccountLifeCycleRulePermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !GetAtt CTLifeCycleTrigger.Arn
Action: "lambda:InvokeFunction"
Principal: "events.amazonaws.com"
SourceArn: !GetAtt CreateAccountLifeCycleRule.Arn
このスタックでは、新しいアカウント作成時にフローログ設定を自動化するためのライフサイクル用のLambda,EventBridge,IAMのリソースがデプロイされます。
CloudFormation コンソールから スタックの作成 をクリックします。
テンプレートファイルのアップロードから、先ほどダウンロードしておいたYAMLファイル (ct-lifecycle-event.yaml)をアップロードします。
以下の値を入力して 次へ をクリックします。
StackSet 名:任意の値
StackSetArn: CloudFormation StackSetsのデプロイ で作成したStackSet Arn
チェックを入れて 次へ をクリックします。
選択した内容を確認し 送信 をクリックします。
検証してみる
VPCの作成
ソリューションのデプロイができたので、VPCを作成して検証してみます。
作成直後はVPCフローログは有効になってません。
StackSetsのデプロイ時に以下の設定をしているので1時間待ってみます。
MaxExecutionFrequency:AWS Configルールの評価頻度(デフォルトは One_Hour )
1時間後に確認したら、VPCなんでフローログが有効になっていました。
AWS Config リソースのタイムラインを確認すると、 18:02:08 にConfigルールの評価が行われ、先ほど作成したVPCが非準拠リソースとして検出されていることがわかります。
AWS Systems ManagerのAutomationを確認すると、Configルールの評価の直後(2025 09:04:35 GMT)に実行履歴が残っています。
VPCフローログの削除
続いてVPCフローログを削除してます。
先ほどVPCフローログが有効になっていることを確認したVPCを選択し、VPCフローログの削除を行います。
削除できました。
1時間後に確認すると、再度有効化されています。
注意点
以降はこのソリューションを検証してみて感じた、注意点をまとめます。
評価頻度
このソリューションではVPCフローログの有効/無効をリアルタイムで監視しているわけではありません。
Configルールの評価頻度で設定したタイミングで有効/無効を判断し、無効であれば修復を行います。
厳格なセキュリティ要件がある場合は注意するポイントかと思います。
コスト
このソリューションではVPCフローログの取得、S3のストレージ、データ転送などにコストがかかりますが、一番注意すべきはConfigの料金だと思います。
Configはリソースの記録に加えて、Configルールの評価によってもコストが発生します。
Configルールの評価頻度が短いほどコストも上がっていくので、リソースの数はセキュリティ要件を考慮した設計が重要になります。
最後に
この記事ではVPCフローログの取得を強制させるためのソリューションを試してみました。
いつの日か宣言型ポリシー等の機能によって、ワンクリックで実装できる日が来ると嬉しいです。