マルチアカウント環境における全アカウント・全リージョンのSecurity Hubセキュリティ基準を一括集計して確認してみた
はじめに
マルチアカウント環境において、全アカウント・全リージョンのSecurity Hubセキュリティ基準の有効化状況を一括で集計し、確認する方法をご紹介します。
マルチアカウント環境では、アカウントごとに有効化するセキュリティ基準が異なる場合、各アカウントで意図したセキュリティ基準が正しく有効化されているかを確認したり、
Security Hubの中央設定を利用していない場合、全アカウントで特定のセキュリティ基準が有効化されているかを確認したいケースがあります。
このような場合、効率的に全アカウントのセキュリティ基準の有効化状況を確認するには、管理アカウントから一括で情報を収集し、集計する方法が有効です。本記事では、その具体的な手順を解説します。
実施手順の概要は以下のとおりです。
- 管理アカウントにAWS Lambdaを作成し、メンバーアカウントのIAMロール
SecurityHubManagementRole
を引き受ける権限を付与します。 - 各メンバーアカウントに
SecurityHubManagementRole
というIAMロールを作成し、Security Hubの確認権限を付与します。この作業は、管理アカウントからCloudFormation StackSetsを使用して、各メンバーアカウントに一括展開します。 - 管理アカウントからLambdaを実行し、組織内の全アカウントのアカウントIDを取得します。その後、各アカウントのIAMロールでAssumeRoleし、全リージョンにおけるSecurity Hubセキュリティ基準の有効化状況を確認・集計して結果を出力します。
- 管理アカウントは、LambdaのIAMロールの権限を利用してSecurity Hubセキュリティ基準の有効化状況を確認します。
管理アカウントにLambda作成
以下の設定でLambdaを作成します。
- ランタイム:Python 3.13
- タイムアウト:1分
- メモリ:2048MB
- IAMロールに付与するポリシー
- AWSLambdaBasicExecutionRole
- 以下のIAMポリシー
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "OrganizationsAccess",
"Effect": "Allow",
"Action": [
"organizations:ListAccountsForParent",
"organizations:ListChildren"
],
"Resource": "*"
},
{
"Sid": "STSAssumeRole",
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Resource": "arn:aws:iam::*:role/SecurityHubManagementRole"
},
{
"Sid": "SecurityHubAccess",
"Effect": "Allow",
"Action": [
"securityhub:DescribeHub",
"securityhub:GetEnabledStandards"
],
"Resource": "*"
},
{
"Sid": "EC2DescribeRegions",
"Effect": "Allow",
"Action": "ec2:DescribeRegions",
"Resource": "*"
}
]
}
コードは以下のとおりです。OU_ID
は、管理アカウント以外のアカウントを対象とするOU IDを指定します。これにより、OU配下の全アカウントを取得します。
import boto3
import json
from boto3.session import Session
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed # 並列処理用
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logging.getLogger('boto3').setLevel(logging.WARNING)
logging.getLogger('botocore').setLevel(logging.WARNING)
OU_ID = "ou-xxxx-xxxxxx"
ROLE_NAME = "SecurityHubManagementRole"
def get_all_account_ids(org_client, ou_id):
accounts = []
queue = [ou_id]
while queue:
current_ou = queue.pop(0)
try:
response = org_client.list_accounts_for_parent(ParentId=current_ou)
accounts.extend([account['Id'] for account in response['Accounts']])
child_ous = org_client.list_children(ParentId=current_ou, ChildType='ORGANIZATIONAL_UNIT')
queue.extend([child['Id'] for child in child_ous['Children']])
except Exception as e:
print(f"Error retrieving accounts for OU {current_ou}: {e}")
return accounts
def sts_assume_role(account_id, role_name, region):
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
session_name = "SecurityHubCheckSession"
client = boto3.client('sts')
try:
response = client.assume_role(RoleArn=role_arn, RoleSessionName=session_name)
session = Session(
aws_access_key_id=response['Credentials']['AccessKeyId'],
aws_secret_access_key=response['Credentials']['SecretAccessKey'],
aws_session_token=response['Credentials']['SessionToken'],
region_name=region
)
return session
except client.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'AccessDenied':
return None
else:
logger.error(f"Error assuming role for account {account_id}: {e}")
return None
# Security Hubが有効かどうかを確認
def is_security_hub_enabled(session, region):
try:
securityhub_client = session.client('securityhub', region_name=region)
securityhub_client.describe_hub()
return True
except securityhub_client.exceptions.InvalidAccessException:
return False
except Exception as e:
# SCPでリージョン制限が設定されている場合、エラーが発生します。この場合、ログ出力は行いません。
# logger.error(f"Error checking Security Hub status in region {region}: {e}")
return False
# 有効なSecurity Hubのセキュリティ基準を取得
def get_enabled_standards(session, region, account_id):
try:
securityhub_client = session.client('securityhub', region_name=region)
response = securityhub_client.get_enabled_standards()
enabled_standards = response.get('StandardsSubscriptions', [])
results = []
for standard in enabled_standards:
if standard['StandardsStatus'] == 'READY':
formatted_arn = standard['StandardsArn'].replace(f"arn:aws:securityhub:{region}::", "")
results.append({
'AccountId': account_id,
'Region': region,
'StandardsArn': formatted_arn
})
return results
except Exception as e:
logger.error(f"Error checking standards for account {account_id} in region {region}: {e}")
return []
# 各アカウントのSecurity Hubチェックを実行
def process_account(account_id, regions, role_name):
results = []
with ThreadPoolExecutor(max_workers=5) as executor: # リージョンごとに並列処理
futures = {executor.submit(process_region, account_id, region, role_name): region for region in regions}
for future in as_completed(futures):
results.extend(future.result())
return results
def process_region(account_id, region, role_name):
session = sts_assume_role(account_id, role_name, region)
if session and is_security_hub_enabled(session, region):
return get_enabled_standards(session, region, account_id)
return []
def lambda_handler(event, context):
sts_client = boto3.client('sts')
management_account_id = sts_client.get_caller_identity()['Account']
ec2_client = boto3.client('ec2')
regions = [region['RegionName'] for region in ec2_client.describe_regions()['Regions']]
org_client = boto3.client('organizations')
member_accounts = get_all_account_ids(org_client, OU_ID)
logger.info(f"Retrieved member accounts: {member_accounts}")
results = []
# 並列処理でメンバーアカウントのSecurity Hubをチェック
with ThreadPoolExecutor(max_workers=10) as executor: # 最大10スレッドで並列処理
futures = {executor.submit(process_account, account_id, regions, ROLE_NAME): account_id for account_id in member_accounts}
for future in as_completed(futures):
account_id = futures[future]
try:
account_results = future.result()
results.extend(account_results)
except Exception as e:
logger.error(f"Error processing account {account_id}: {e}")
# 管理アカウントのSecurity Hubをチェック(並列化)
def process_management_account_region(region):
try:
session = boto3.Session(region_name=region)
if is_security_hub_enabled(session, region):
return get_enabled_standards(session, region, management_account_id)
except Exception as e:
logger.error(f"Error processing management account {management_account_id} in region {region}: {e}")
return [] # エラー時は空リストを返す
with ThreadPoolExecutor(max_workers=5) as executor: # 最大5スレッドで並列処理
futures = {executor.submit(process_management_account_region, region): region for region in regions}
for future in as_completed(futures):
region = futures[future]
try:
standards = future.result()
results.extend(standards)
except Exception as e:
logger.error(f"Error processing management account in region {region}: {e}")
if results:
log_output = []
log_output.append("")
log_output.append("")
log_output.append("Account ID Region Enabled Standard")
log_output.append("-------------------- --------------- --------------------------------------------------")
for result in results:
log_output.append(f"{result['AccountId']:20} {result['Region']:15} {result['StandardsArn']}")
logger.info("\n".join(log_output))
else:
logger.info("No Security Hub standards found for any account or region.")
return {}
実際にLambdaの処理を以下に示します。
このLambda関数は、組織内の全アカウントを対象に、Security Hubのセキュリティ基準の有効化状況を確認し、結果を集計します。
- 組織内の全アカウントIDを取得します。
- 各アカウントの
SecurityHubManagementRole
をAssumeRoleし、対象アカウントにアクセスするための一時的なセッションを作成します。- 管理アカウントは、LambdaのIAMロールの権限を利用します。
- 各アカウントの全リージョンに対して、Security Hubが有効化されているかを確認します。
- Security Hubが有効な場合、有効化されているセキュリティ基準を取得します。
- 取得したデータを集計し、ログに出力します。これにより、どのアカウント・リージョンでどのセキュリティ基準が有効化されているかを確認できます。
メンバーアカウントにIAMロールを作成
Lambdaが各アカウントのIAMロールでAssumeRoleできるように、CloudFormation StackSetsを使用して各アカウントにIAMロールを作成します。
以下のCloudFormationテンプレートを使用してください。
AWSTemplateFormatVersion: '2010-09-09'
Description: Create IAM Role for Security Hub management
Parameters:
LambdaRoleArn:
Type: String
Description: The ARN of the Lambda function's IAM role
Resources:
SecurityHubManagementRole:
Type: AWS::IAM::Role
Properties:
RoleName: SecurityHubManagementRole
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
AWS: !Ref LambdaRoleArn
Action: sts:AssumeRole
Policies:
- PolicyName: SecurityHubPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- securityhub:DescribeHub
- securityhub:GetEnabledStandards
Resource: "*"
CloudFormation StackSetsの設定は以下のとおりです。他の設定はデフォルトのままです。
- アクセス許可モデル:サービスマネージドアクセス許可(SERVICE_MANAGED)
- テンプレート:SecurityHubManagementRole.yaml
- StackSet名:SecurityHubManagementRole
- デプロイターゲット:組織へのデプロイ
- 自動デプロイオプション
- 自動デプロイ:非アクティブ化済み
- リージョンの指定:東京リージョンのみ
パラメータは以下のとおりです。
- LambdaRoleArn:LambdaのIAMロールのARN
これで管理アカウントを除く、各アカウントにIAMロールが作成されました。
試してみた
それでは、Lambdaを実行してみましょう。
Lambdaに渡すイベントJSONは空で問題ありません。
Lambdaの実行結果は以下の通りです。
START RequestId: 192f24c0-2803-4bc4-854b-8ee25b199ace Version: $LATEST
[INFO] 2024-12-03T06:25:26.834Z 192f24c0-2803-4bc4-854b-8ee25b199ace Retrieved member accounts: ['xxxxxxxxxxxx', 'yyyyyyyyyyyy']
[INFO] 2024-12-03T06:25:41.692Z 192f24c0-2803-4bc4-854b-8ee25b199ace
Account ID Region Enabled Standard
-------------------- --------------- --------------------------------------------------
xxxxxxxxxxxx ap-northeast-1 standards/nist-800-53/v/5.0.0
xxxxxxxxxxxx ap-northeast-1 standards/pci-dss/v/3.2.1
yyyyyyyyyyyy ap-northeast-1 arn:aws:securityhub:::ruleset/cis-aws-foundations-benchmark/v/1.2.0
yyyyyyyyyyyy ap-northeast-1 standards/aws-foundational-security-best-practices/v/1.0.0
yyyyyyyyyyyy ap-northeast-1 standards/aws-resource-tagging-standard/v/1.0.0
yyyyyyyyyyyy us-east-1 standards/aws-foundational-security-best-practices/v/1.0.0
zzzzzzzzzzzz ap-northeast-1 standards/aws-foundational-security-best-practices/v/1.0.0
zzzzzzzzzzzz ap-northeast-1 standards/cis-aws-foundations-benchmark/v/3.0.0
zzzzzzzzzzzz ap-southeast-1 standards/aws-foundational-security-best-practices/v/1.0.0
zzzzzzzzzzzz ap-southeast-2 standards/aws-foundational-security-best-practices/v/1.0.0
zzzzzzzzzzzz us-east-1 standards/aws-foundational-security-best-practices/v/1.0.0
END RequestId: 192f24c0-2803-4bc4-854b-8ee25b199ace
REPORT RequestId: 192f24c0-2803-4bc4-854b-8ee25b199ace Duration: 20115.20 ms Billed Duration: 20116 ms Memory Size: 2048 MB Max Memory Used: 425 MB Init Duration: 331.20 ms
各アカウントごとにセキュリティ基準の出力を確認できました。
なお、実行時間は、管理アカウントと2つのメンバーアカウントを確認した場合、Lambdaのメモリを2048MBに設定して約20秒でした。
参考