こんにちは。AWS事業本部トクヤマシュンです。
Amazon Elasticache for Redisの保管時暗号化設定有無を、AWS Config カスタム Lambdaルールを使ってチェックする機会がありましたので、ご紹介します。
Amazon ElastiCache for Redis の保管時暗号化って何?
Amazon ElastiCache for Redisのディスク上に保管されるデータを暗号化する設定です。
これによりデータアクセスを制限し、セキュリティを強化することが可能です。
暗号化に利用する鍵はAWS管理のデフォルトキーとカスタマー管理のCMKから選択できます。
初回構築時に設定する必要があり、後から変更できませんのでご注意ください。
保管時暗号化の詳細については次の公式ドキュメントをご確認ください。
ElastiCache for Redis での保管時の暗号化
本エントリでは保管時暗号化の有効/無効をチェックする方法を紹介します。
AWS Configルールって何?
AWS Configルールを使うことで、事前に定義された内容がリソースに正しく設定されているかチェックできます。
ルールには次のような種類があります。
- マネージドルール
- AWSによって定義されたルール
- カスタマイズも可能
- ルールのチェック対象を制限する、チェックで利用するパラメーターを変更する、など
- カスタムルール
- カスタムポリシールール
AWS CloudFormation Guard
という言語を使って作成した独自ルールを利用してチェックを行います- チェック可能なリソースは AWS Configでサポートされているリソースタイプのみ
- カスタムLambdaルール
- AWS Lambdaにデプロイした独自関数を利用してチェックを行う
- チェック可能なリソースはAWS Configでサポートされていないリソースを含む任意のもの
- カスタムポリシールール
2023年1月31日現在、下記の状況です。
- Amazon Elasticache for Redisの保管時暗号化設定有無をチェックするマネージドルールはありません
- Amazon Elasticache for RedisはAWS Configのリソース記録に対応していません。
そのため、今回はカスタムLambdaルールの利用が必要です。
設定方法
マネジメントコンソールを使った設定方法を順に説明します。
AWS Configレコーダーの有効化
Amazon Elasticache for Redsiのチェックを行うためにはAWS Configレコーダーによる記録が有効化されていることが前提ですので、設定を確認します。
マネジメントコンソールからチェックを行いたい対象のリージョンに切り替え、AWS Config
→設定
にアクセスします。
次のどちらかの場合は変更が必要ですので、編集
ボタンをクリックして編集画面に移動します。
- レコーダー
記録はオフ
- 一般設定
このリージョンでサポートされているすべてのリソースを記録します
以外のリソース記載
なお、AWS Configをはじめて開く場合は次のような画面が開かれますので、この場合は「今すぐ始める」をクリックして編集画面に移動します。
編集画面では下記項目を設定します。
- レコーダー
記録を有効化
にチェック
- 一般設定
このリージョンでサポートされているすべてのリソースを記録します
をチェック
他の設定項目には今回触れませんが、環境に応じて設定してください。
AWS Config デベロッパーガイド 手動セットアップが参考になります。
設定できれば保存
ボタンをクリックします。
設定後、次の画像の赤枠のように表示されていればOKです。
以上でAWS Config レコーダーの設定は完了です。
AWS Lambdaの設定
続いて、AWS Lambdaの設定をマネジメントコンソールから行います。
AWS Lambda
→関数
に移動し、右上の関数の作成
ボタンをクリックします。
下記を設定します。
- 一から作成
- 関数名
- redis-encrypted-at-rest-check-function
- ランタイム
- Python 3.9
- アーキテクチャ
- x86_64
- 実行ロール
- AWSポリシーテンプレートから新しいロールを作成
- ロール名
- redis-encrypted-at-rest-check-role
- ポリシーテンプレート
- AWS Configルールのアクセス権限
選択できたら関数の作成
をクリックします。
関数が作成できたら、コードを書き換えます。
赤枠の部分を次のコードに書き換え、Deploy
ボタンをクリックしてください。
lambda_function.py(クリックで展開)
lambda_function.py
'''
Rule Name:
redis-encrypted-at-rest-check-rule
Description:
Check whether the Amazon ElastiCache Redis clusters have encryption at rest turned on. The rule is NON_COMPLIANT if the encryption at rest is not enabled.
Trigger:
Periodic
Reports on:
AWS::ElastiCache::CacheCluster
Rule Parameters:
None
Scenarios:
Scenario: 1
Given: No Amazon ElastiCache Redis cluster in the AWS Account
Then: Return "NOT_APPLICABLE"
Scenario: 2
Given: Encryption at rest not enabled for Amazon ElastiCache cluster
Then: Return NON_COMPLIANT with Annotation "Encryption at rest not enabled for Amazon ElastiCache cluster: {Cluster_ID}"
Scenario: 3
Given: Encryption at rest enabled for Amazon ElastiCache cluster
Then: Return COMPLAINT
'''
import json
import sys
import datetime
import boto3
import botocore
try:
import liblogging
except ImportError:
pass
##############
# Parameters #
##############
# Define the default resource to report to Config Rules
DEFAULT_RESOURCE_TYPE = 'AWS::ElastiCache::CacheCluster'
# Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account).
ASSUME_ROLE_MODE = False
# Other parameters (no change needed)
CONFIG_ROLE_TIMEOUT_SECONDS = 900
#############
# Main Code #
#############
def get_replication_groups(ec_client):
replication_groups = []
marker = None
replication_groups_result = {}
while True:
if not marker:
replication_groups_result = ec_client.describe_replication_groups(MaxRecords=100)
else:
replication_groups_result = ec_client.describe_replication_groups(Marker=marker, MaxRecords=100)
replication_groups.extend(replication_groups_result['ReplicationGroups'])
if 'Marker' in replication_groups_result:
marker = replication_groups_result['Marker']
else:
return replication_groups
def get_cache_clusters(ec_client):
cache_clusters = []
marker = None
cache_clusters_result = {}
while True:
if not marker:
cache_clusters_result = ec_client.describe_cache_clusters(MaxRecords=100, ShowCacheNodeInfo=False, ShowCacheClustersNotInReplicationGroups=True)
else:
cache_clusters_result = ec_client.describe_cache_clusters(Marker=marker, MaxRecords=100, ShowCacheNodeInfo=False, ShowCacheClustersNotInReplicationGroups=True)
for cluster in cache_clusters_result['CacheClusters']:
if cluster['Engine'] == 'redis':
cache_clusters.append(cluster)
if 'Marker' in cache_clusters_result:
marker = cache_clusters_result['Marker']
else:
return cache_clusters
def generate_evaluations(eval_list, key,event):
evaluations = []
for cluster in eval_list:
if cluster['AtRestEncryptionEnabled'] == False:
evaluations.append(build_evaluation(cluster[key], 'NON_COMPLIANT', event, annotation="Encryption at rest not enabled for Amazon ElastiCache cluster: {}".format(cluster[key])))
else:
evaluations.append(build_evaluation(cluster[key], 'COMPLIANT', event))
return evaluations
def evaluate_compliance(event, configuration_item):
ec_client = get_client('elasticache', event)
cache_clusters = get_cache_clusters(ec_client)
evaluations = []
replication_groups = get_replication_groups(ec_client)
if not cache_clusters and not replication_groups:
return build_evaluation(event['accountId'], "NOT_APPLICABLE", event)
evaluations.extend(generate_evaluations(cache_clusters, 'CacheClusterId', event))
evaluations.extend(generate_evaluations(replication_groups, 'ReplicationGroupId', event))
return evaluations
####################
# Helper Functions #
####################
# This gets the client after assuming the Config service role
# either in the same AWS account or cross-account.
def get_client(service, event):
"""Return the service boto client. It should be used instead of directly calling the client.
Keyword arguments:
service -- the service name used for calling the boto.client()
event -- the event variable given in the lambda handler
"""
if not ASSUME_ROLE_MODE:
return boto3.client(service)
credentials = get_assume_role_credentials(event["executionRoleArn"])
return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
# This generate an evaluation for config
def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None):
"""Form an evaluation as a dictionary. Usually suited to report on scheduled rules.
Keyword arguments:
resource_id -- the unique id of the resource to report
compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE
event -- the event variable given in the lambda handler
resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE)
annotation -- an annotation to be added to the evaluation (default None)
"""
eval_cc = {}
if annotation:
eval_cc['Annotation'] = annotation
eval_cc['ComplianceResourceType'] = resource_type
eval_cc['ComplianceResourceId'] = resource_id
eval_cc['ComplianceType'] = compliance_type
eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime'])
return eval_cc
def build_evaluation_from_config_item(configuration_item, compliance_type, annotation=None):
"""Form an evaluation as a dictionary. Usually suited to report on configuration change rules.
Keyword arguments:
configuration_item -- the configurationItem dictionary in the invokingEvent
compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE
annotation -- an annotation to be added to the evaluation (default None)
"""
eval_ci = {}
if annotation:
eval_ci['Annotation'] = annotation
eval_ci['ComplianceResourceType'] = configuration_item['resourceType']
eval_ci['ComplianceResourceId'] = configuration_item['resourceId']
eval_ci['ComplianceType'] = compliance_type
eval_ci['OrderingTimestamp'] = configuration_item['configurationItemCaptureTime']
return eval_ci
####################
# Boilerplate Code #
####################
# Helper function used to validate input
def check_defined(reference, reference_name):
if not reference:
raise Exception('Error: ', reference_name, 'is not defined')
return reference
# Check whether the message is OversizedConfigurationItemChangeNotification or not
def is_oversized_changed_notification(message_type):
check_defined(message_type, 'messageType')
return message_type == 'OversizedConfigurationItemChangeNotification'
# Check whether the message is a ScheduledNotification or not.
def is_scheduled_notification(message_type):
check_defined(message_type, 'messageType')
return message_type == 'ScheduledNotification'
# Get configurationItem using getResourceConfigHistory API
# in case of OversizedConfigurationItemChangeNotification
def get_configuration(resource_type, resource_id, configuration_capture_time):
result = AWS_CONFIG_CLIENT.get_resource_config_history(
resourceType=resource_type,
resourceId=resource_id,
laterTime=configuration_capture_time,
limit=1)
configuration_item = result['configurationItems'][0]
return convert_api_configuration(configuration_item)
# Convert from the API model to the original invocation model
def convert_api_configuration(configuration_item):
for k, v in configuration_item.items():
if isinstance(v, datetime.datetime):
configuration_item[k] = str(v)
configuration_item['awsAccountId'] = configuration_item['accountId']
configuration_item['ARN'] = configuration_item['arn']
configuration_item['configurationStateMd5Hash'] = configuration_item['configurationItemMD5Hash']
configuration_item['configurationItemVersion'] = configuration_item['version']
configuration_item['configuration'] = json.loads(configuration_item['configuration'])
if 'relationships' in configuration_item:
for i in range(len(configuration_item['relationships'])):
configuration_item['relationships'][i]['name'] = configuration_item['relationships'][i]['relationshipName']
return configuration_item
# Based on the type of message get the configuration item
# either from configurationItem in the invoking event
# or using the getResourceConfigHistiry API in getConfiguration function.
def get_configuration_item(invoking_event):
check_defined(invoking_event, 'invokingEvent')
if is_oversized_changed_notification(invoking_event['messageType']):
configuration_item_summary = check_defined(invoking_event['configuration_item_summary'], 'configurationItemSummary')
return get_configuration(configuration_item_summary['resourceType'], configuration_item_summary['resourceId'], configuration_item_summary['configurationItemCaptureTime'])
if is_scheduled_notification(invoking_event['messageType']):
return None
return check_defined(invoking_event['configurationItem'], 'configurationItem')
# Check whether the resource has been deleted. If it has, then the evaluation is unnecessary.
def is_applicable(configuration_item, event):
try:
check_defined(configuration_item, 'configurationItem')
check_defined(event, 'event')
except:
return True
status = configuration_item['configurationItemStatus']
event_left_scope = event['eventLeftScope']
if status == 'ResourceDeleted':
print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.")
return status in (['OK', 'ResourceDiscovered']) and not event_left_scope
# return (status == 'OK' or status == 'ResourceDiscovered') and not event_left_scope
def get_assume_role_credentials(role_arn):
sts_client = boto3.client('sts')
try:
assume_role_response = sts_client.assume_role(RoleArn=role_arn,
RoleSessionName="configLambdaExecution",
DurationSeconds=CONFIG_ROLE_TIMEOUT_SECONDS)
if 'liblogging' in sys.modules:
liblogging.logSession(role_arn, assume_role_response)
return assume_role_response['Credentials']
except botocore.exceptions.ClientError as ex:
# Scrub error message for any internal account info leaks
print(str(ex))
if 'AccessDenied' in ex.response['Error']['Code']:
ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role."
else:
ex.response['Error']['Message'] = "InternalError"
ex.response['Error']['Code'] = "InternalError"
raise ex
# This removes older evaluation (usually useful for periodic rule not reporting on AWS::::Account).
def clean_up_old_evaluations(latest_evaluations, event):
cleaned_evaluations = []
old_eval = AWS_CONFIG_CLIENT.get_compliance_details_by_config_rule(
ConfigRuleName=event['configRuleName'],
ComplianceTypes=['COMPLIANT', 'NON_COMPLIANT'],
Limit=100)
old_eval_list = []
while True:
for old_result in old_eval['EvaluationResults']:
old_eval_list.append(old_result)
if 'NextToken' in old_eval:
next_token = old_eval['NextToken']
old_eval = AWS_CONFIG_CLIENT.get_compliance_details_by_config_rule(
ConfigRuleName=event['configRuleName'],
ComplianceTypes=['COMPLIANT', 'NON_COMPLIANT'],
Limit=100,
NextToken=next_token)
else:
break
for old_eval in old_eval_list:
old_resource_id = old_eval['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceId']
newer_founded = False
for latest_eval in latest_evaluations:
if old_resource_id == latest_eval['ComplianceResourceId']:
newer_founded = True
if not newer_founded:
cleaned_evaluations.append(build_evaluation(old_resource_id, "NOT_APPLICABLE", event))
return cleaned_evaluations + latest_evaluations
def lambda_handler(event, context):
if 'liblogging' in sys.modules:
liblogging.logEvent(event)
global AWS_CONFIG_CLIENT
print(event)
check_defined(event, 'event')
invoking_event = json.loads(event['invokingEvent'])
try:
AWS_CONFIG_CLIENT = get_client('config', event)
if invoking_event['messageType'] in ['ConfigurationItemChangeNotification', 'ScheduledNotification', 'OversizedConfigurationItemChangeNotification']:
configuration_item = get_configuration_item(invoking_event)
if is_applicable(configuration_item, event):
compliance_result = evaluate_compliance(event, configuration_item)
else:
compliance_result = "NOT_APPLICABLE"
else:
return build_internal_error_response('Unexpected message type', str(invoking_event))
except botocore.exceptions.ClientError as ex:
if is_internal_error(ex):
return build_internal_error_response("Unexpected error while completing API request", str(ex))
return build_error_response("Customer error while making API request", str(ex), ex.response['Error']['Code'], ex.response['Error']['Message'])
except ValueError as ex:
return build_internal_error_response(str(ex), str(ex))
evaluations = []
latest_evaluations = []
if not compliance_result:
latest_evaluations.append(build_evaluation(event['accountId'], "NOT_APPLICABLE", event, resource_type='AWS::::Account'))
evaluations = clean_up_old_evaluations(latest_evaluations, event)
elif isinstance(compliance_result, str):
if configuration_item:
evaluations.append(build_evaluation_from_config_item(configuration_item, compliance_result))
else:
evaluations.append(build_evaluation(event['accountId'], compliance_result, event, resource_type=DEFAULT_RESOURCE_TYPE))
elif isinstance(compliance_result, list):
for evaluation in compliance_result:
missing_fields = False
for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'):
if field not in evaluation:
print("Missing " + field + " from custom evaluation.")
missing_fields = True
if not missing_fields:
latest_evaluations.append(evaluation)
evaluations = clean_up_old_evaluations(latest_evaluations, event)
elif isinstance(compliance_result, dict):
missing_fields = False
for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'):
if field not in compliance_result:
print("Missing " + field + " from custom evaluation.")
missing_fields = True
if not missing_fields:
evaluations.append(compliance_result)
else:
evaluations.append(build_evaluation_from_config_item(configuration_item, 'NOT_APPLICABLE'))
# Put together the request that reports the evaluation status
result_token = event['resultToken']
test_mode = False
if result_token == 'TESTMODE':
# Used solely for RDK test to skip actual put_evaluation API call
test_mode = True
# Invoke the Config API to report the result of the evaluation
evaluation_copy = []
evaluation_copy = evaluations[:]
while evaluation_copy:
AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluation_copy[:100], ResultToken=result_token, TestMode=test_mode)
del evaluation_copy[:100]
# Used solely for RDK test to be able to test Lambda function
return evaluations
def is_internal_error(exception):
return ((not isinstance(exception, botocore.exceptions.ClientError)) or exception.response['Error']['Code'].startswith('5')
or 'InternalError' in exception.response['Error']['Code'] or 'ServiceError' in exception.response['Error']['Code'])
def build_internal_error_response(internal_error_message, internal_error_details=None):
return build_error_response(internal_error_message, internal_error_details, 'InternalError', 'InternalError')
def build_error_response(internal_error_message, internal_error_details=None, customer_error_code=None, customer_error_message=None):
error_response = {
'internalErrorMessage': internal_error_message,
'internalErrorDetails': internal_error_details,
'customerErrorMessage': customer_error_message,
'customerErrorCode': customer_error_code
}
print(error_response)
return error_response
このコードはAWS Config Rules Githubリポジトリで提供されているELASTICACHE_REDIS_CLUSTER_AUTO_BACKUP_CHECK.pyの一部を編集する形で作成しました。
このように対象となるリソースのサンプルコードが提供されていれば、それを編集することで比較的容易に新たなカスタムLambdaルールの作成が可能です。
今回のコードでElasticache for Redisの暗号化をチェックする箇所は主に次の箇所です。
def generate_evaluations(eval_list, key,event):
evaluations = []
for cluster in eval_list:
if cluster['AtRestEncryptionEnabled'] == False:
evaluations.append(build_evaluation(cluster[key], 'NON_COMPLIANT', event, annotation="Encryption at rest not enabled for Amazon ElastiCache cluster: {}".format(cluster[key])))
else:
evaluations.append(build_evaluation(cluster[key], 'COMPLIANT', event))
return evaluations
クラスターのAtRestEncryptionEnabled属性をチェックし、FalseであればNON_COMPLIANT(非準拠)、TrueであればCOMPLIANT(準拠)として評価します。
以上でAWS Lambda設定は完了です。
AWS IAM Roleの設定
先ほどAWS Lambda作成時にIAM Roleを作成しましたが、Elasticacheの接続権限が不足しているので設定します。
AWS IAM
→ロール
と移動し、redis-encrypted-at-rest-check-function-role
を選択します。
許可を追加
→ポリシーをアタッチ
をクリックして許可追加画面に移動します。
AmazonElastiCacheReadOnlyAccess
を検索してチェックし、ポリシーをアタッチ
ボタンをクリックします。
以上でAWS IAM Roleの設定は完了です。
AWS Config ルールの設定
最後にAWS Configルールを設定します。
AWS Config
→ルール
と移動し、ルールを追加
ボタンをクリックします。
ルールタイプの選択でカスタムLambdaルールを作成
にチェックして、次へボタンをクリックします。
下記を設定します。
- 名前
- redis-encrypted-at-rest-check-rule
- 説明
- ルールの説明(必要であれば)
- AWS Lambda関数ARN
- redis-encrypted-at-rest-check-functionのARN
- Evaluation mode
- Turn on detective evaluationのみクリック
- トリガータイプ
- 定期的にチェック
- 変更範囲
- すべての変更
- 頻度
- 24時間
- もっと短い頻度で確認したい場合は適宜変更
- 24時間
設定できれば次へ
ボタンをクリックして確認画面に進みます。
内容を確認して問題なければ、ルールを追加
ボタンをクリックしてルール作成します。
以上ですべての設定は完了です。
動作確認
2つのAmazon Elasticache for Redisクラスターを作成します。
test:保管時の暗号化無効
test2:保管時の暗号化有効
AWS Config
→ルール
に移動し、redis-encrypted-at-rest-check-rule
を選択します。
まだ評価されていない場合は、アクション
→再評価
をクリックして、手動で評価を実行します。
対象範囲内のリソースとしてすべて
を選択します。
暗号化有効のtest2
は準拠
、無効のtest
は非準拠
として評価されていることがわかります。
以上で、想定通りのチェックができていることが確認できました。
最後に
Amazon Elasticache for Redisの保管時暗号化設定有無をAWS Config カスタム Lambdaルールを使ってチェックしました。
マネージドルールが存在しないようなチェックも、カスタム Lambdaルールを利用すれば実装することが可能です。
Lambda関数を一から作るのは大変かと思いますが、サンプルをベースに編集すれば比較的容易に作成可能ですので、お試しください。
以上、トクヤマシュンでした。