AWS Config カスタム LambdaルールでAmazon Elasticache for Redisの保管時暗号化設定をチェックしてみた

マネージドルールが用意されていない項目もチェックできます。そう、カスタムLambdaルールならね。

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは。AWS事業本部トクヤマシュンです。

Amazon Elasticache for Redisの保管時暗号化設定有無を、AWS Config カスタム Lambdaルールを使ってチェックする機会がありましたので、ご紹介します。

Amazon ElastiCache for Redis の保管時暗号化って何?

Amazon ElastiCache for Redisのディスク上に保管されるデータを暗号化する設定です。
これによりデータアクセスを制限し、セキュリティを強化することが可能です。
暗号化に利用する鍵はAWS管理のデフォルトキーとカスタマー管理のCMKから選択できます。
初回構築時に設定する必要があり、後から変更できませんのでご注意ください。

保管時暗号化の詳細については次の公式ドキュメントをご確認ください。

ElastiCache for Redis での保管時の暗号化

本エントリでは保管時暗号化の有効/無効をチェックする方法を紹介します。

AWS Configルールって何?

AWS Configルールを使うことで、事前に定義された内容がリソースに正しく設定されているかチェックできます。
ルールには次のような種類があります。

2023年1月31日現在、下記の状況です。

  • Amazon Elasticache for Redisの保管時暗号化設定有無をチェックするマネージドルールはありません
  • Amazon Elasticache for RedisはAWS Configのリソース記録に対応していません。

そのため、今回はカスタムLambdaルールの利用が必要です。

設定方法

マネジメントコンソールを使った設定方法を順に説明します。

AWS Configレコーダーの有効化

Amazon Elasticache for Redsiのチェックを行うためにはAWS Configレコーダーによる記録が有効化されていることが前提ですので、設定を確認します。

マネジメントコンソールからチェックを行いたい対象のリージョンに切り替え、AWS Config設定にアクセスします。
次のどちらかの場合は変更が必要ですので、編集ボタンをクリックして編集画面に移動します。

  • レコーダー
    • 記録はオフ
  • 一般設定
    • このリージョンでサポートされているすべてのリソースを記録します以外のリソース記載

なお、AWS Configをはじめて開く場合は次のような画面が開かれますので、この場合は「今すぐ始める」をクリックして編集画面に移動します。

編集画面では下記項目を設定します。

  • レコーダー
    • 記録を有効化にチェック
  • 一般設定
    • このリージョンでサポートされているすべてのリソースを記録しますをチェック

他の設定項目には今回触れませんが、環境に応じて設定してください。
AWS Config デベロッパーガイド 手動セットアップが参考になります。

設定できれば保存ボタンをクリックします。

設定後、次の画像の赤枠のように表示されていればOKです。

以上でAWS Config レコーダーの設定は完了です。

AWS Lambdaの設定

続いて、AWS Lambdaの設定をマネジメントコンソールから行います。

AWS Lambda関数に移動し、右上の関数の作成ボタンをクリックします。

下記を設定します。

  • 一から作成
  • 関数名
    • redis-encrypted-at-rest-check-function
  • ランタイム
    • Python 3.9
  • アーキテクチャ
    • x86_64
  • 実行ロール
    • AWSポリシーテンプレートから新しいロールを作成
  • ロール名
    • redis-encrypted-at-rest-check-role
  • ポリシーテンプレート
    • AWS Configルールのアクセス権限

選択できたら関数の作成をクリックします。

関数が作成できたら、コードを書き換えます。

赤枠の部分を次のコードに書き換え、Deployボタンをクリックしてください。

lambda_function.py(クリックで展開)

lambda_function.py

'''
Rule Name:
  redis-encrypted-at-rest-check-rule

Description:
  Check whether the Amazon ElastiCache Redis clusters have encryption at rest turned on. The rule is NON_COMPLIANT if the encryption at rest is not enabled.
Trigger:
  Periodic
Reports on:
  AWS::ElastiCache::CacheCluster
Rule Parameters:
  None
Scenarios:
  Scenario: 1
     Given: No Amazon ElastiCache Redis cluster in the AWS Account
      Then: Return "NOT_APPLICABLE"
  Scenario: 2
     Given: Encryption at rest not enabled for Amazon ElastiCache cluster
      Then: Return NON_COMPLIANT with Annotation "Encryption at rest not enabled for Amazon ElastiCache cluster: {Cluster_ID}"
  Scenario: 3
     Given: Encryption at rest enabled for Amazon ElastiCache cluster
      Then: Return COMPLAINT
'''
import json
import sys
import datetime
import boto3
import botocore

try:
    import liblogging
except ImportError:
    pass

##############
# Parameters #
##############

# Define the default resource to report to Config Rules
DEFAULT_RESOURCE_TYPE = 'AWS::ElastiCache::CacheCluster'

# Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account).
ASSUME_ROLE_MODE = False

# Other parameters (no change needed)
CONFIG_ROLE_TIMEOUT_SECONDS = 900

#############
# Main Code #
#############

def get_replication_groups(ec_client):
    replication_groups = []
    marker = None
    replication_groups_result = {}
    while True:
        if not marker:
            replication_groups_result = ec_client.describe_replication_groups(MaxRecords=100)
        else:
            replication_groups_result = ec_client.describe_replication_groups(Marker=marker, MaxRecords=100)
        replication_groups.extend(replication_groups_result['ReplicationGroups'])
        if 'Marker' in replication_groups_result:
            marker = replication_groups_result['Marker']
        else:
            return replication_groups
def get_cache_clusters(ec_client):
    cache_clusters = []
    marker = None
    cache_clusters_result = {}
    while True:
        if not marker:
            cache_clusters_result = ec_client.describe_cache_clusters(MaxRecords=100, ShowCacheNodeInfo=False, ShowCacheClustersNotInReplicationGroups=True)
        else:
            cache_clusters_result = ec_client.describe_cache_clusters(Marker=marker, MaxRecords=100, ShowCacheNodeInfo=False, ShowCacheClustersNotInReplicationGroups=True)
        for cluster in cache_clusters_result['CacheClusters']:
            if cluster['Engine'] == 'redis':
                cache_clusters.append(cluster)
        if 'Marker' in cache_clusters_result:
            marker = cache_clusters_result['Marker']
        else:
            return cache_clusters

def generate_evaluations(eval_list, key,event):
    evaluations = []
    for cluster in eval_list:
        if cluster['AtRestEncryptionEnabled'] == False:
            evaluations.append(build_evaluation(cluster[key], 'NON_COMPLIANT', event, annotation="Encryption at rest not enabled for Amazon ElastiCache cluster: {}".format(cluster[key])))
        else:
            evaluations.append(build_evaluation(cluster[key], 'COMPLIANT', event))
    return evaluations

def evaluate_compliance(event, configuration_item):
    ec_client = get_client('elasticache', event)
    cache_clusters = get_cache_clusters(ec_client)
    evaluations = []
    replication_groups = get_replication_groups(ec_client)
    if not cache_clusters and not replication_groups:
        return build_evaluation(event['accountId'], "NOT_APPLICABLE", event)
    evaluations.extend(generate_evaluations(cache_clusters, 'CacheClusterId', event))
    evaluations.extend(generate_evaluations(replication_groups, 'ReplicationGroupId', event))
    return evaluations

####################
# Helper Functions #
####################

# This gets the client after assuming the Config service role
# either in the same AWS account or cross-account.
def get_client(service, event):
    """Return the service boto client. It should be used instead of directly calling the client.
    Keyword arguments:
    service -- the service name used for calling the boto.client()
    event -- the event variable given in the lambda handler
    """
    if not ASSUME_ROLE_MODE:
        return boto3.client(service)
    credentials = get_assume_role_credentials(event["executionRoleArn"])
    return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'],
                        aws_secret_access_key=credentials['SecretAccessKey'],
                        aws_session_token=credentials['SessionToken']
                       )

# This generate an evaluation for config
def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None):
    """Form an evaluation as a dictionary. Usually suited to report on scheduled rules.
    Keyword arguments:
    resource_id -- the unique id of the resource to report
    compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE
    event -- the event variable given in the lambda handler
    resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE)
    annotation -- an annotation to be added to the evaluation (default None)
    """
    eval_cc = {}
    if annotation:
        eval_cc['Annotation'] = annotation
    eval_cc['ComplianceResourceType'] = resource_type
    eval_cc['ComplianceResourceId'] = resource_id
    eval_cc['ComplianceType'] = compliance_type
    eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime'])
    return eval_cc

def build_evaluation_from_config_item(configuration_item, compliance_type, annotation=None):
    """Form an evaluation as a dictionary. Usually suited to report on configuration change rules.
    Keyword arguments:
    configuration_item -- the configurationItem dictionary in the invokingEvent
    compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE
    annotation -- an annotation to be added to the evaluation (default None)
    """
    eval_ci = {}
    if annotation:
        eval_ci['Annotation'] = annotation
    eval_ci['ComplianceResourceType'] = configuration_item['resourceType']
    eval_ci['ComplianceResourceId'] = configuration_item['resourceId']
    eval_ci['ComplianceType'] = compliance_type
    eval_ci['OrderingTimestamp'] = configuration_item['configurationItemCaptureTime']
    return eval_ci

####################
# Boilerplate Code #
####################

# Helper function used to validate input
def check_defined(reference, reference_name):
    if not reference:
        raise Exception('Error: ', reference_name, 'is not defined')
    return reference

# Check whether the message is OversizedConfigurationItemChangeNotification or not
def is_oversized_changed_notification(message_type):
    check_defined(message_type, 'messageType')
    return message_type == 'OversizedConfigurationItemChangeNotification'

# Check whether the message is a ScheduledNotification or not.
def is_scheduled_notification(message_type):
    check_defined(message_type, 'messageType')
    return message_type == 'ScheduledNotification'

# Get configurationItem using getResourceConfigHistory API
# in case of OversizedConfigurationItemChangeNotification
def get_configuration(resource_type, resource_id, configuration_capture_time):
    result = AWS_CONFIG_CLIENT.get_resource_config_history(
        resourceType=resource_type,
        resourceId=resource_id,
        laterTime=configuration_capture_time,
        limit=1)
    configuration_item = result['configurationItems'][0]
    return convert_api_configuration(configuration_item)

# Convert from the API model to the original invocation model
def convert_api_configuration(configuration_item):
    for k, v in configuration_item.items():
        if isinstance(v, datetime.datetime):
            configuration_item[k] = str(v)
    configuration_item['awsAccountId'] = configuration_item['accountId']
    configuration_item['ARN'] = configuration_item['arn']
    configuration_item['configurationStateMd5Hash'] = configuration_item['configurationItemMD5Hash']
    configuration_item['configurationItemVersion'] = configuration_item['version']
    configuration_item['configuration'] = json.loads(configuration_item['configuration'])
    if 'relationships' in configuration_item:
        for i in range(len(configuration_item['relationships'])):
            configuration_item['relationships'][i]['name'] = configuration_item['relationships'][i]['relationshipName']
    return configuration_item

# Based on the type of message get the configuration item
# either from configurationItem in the invoking event
# or using the getResourceConfigHistiry API in getConfiguration function.
def get_configuration_item(invoking_event):
    check_defined(invoking_event, 'invokingEvent')
    if is_oversized_changed_notification(invoking_event['messageType']):
        configuration_item_summary = check_defined(invoking_event['configuration_item_summary'], 'configurationItemSummary')
        return get_configuration(configuration_item_summary['resourceType'], configuration_item_summary['resourceId'], configuration_item_summary['configurationItemCaptureTime'])
    if is_scheduled_notification(invoking_event['messageType']):
        return None
    return check_defined(invoking_event['configurationItem'], 'configurationItem')

# Check whether the resource has been deleted. If it has, then the evaluation is unnecessary.
def is_applicable(configuration_item, event):
    try:
        check_defined(configuration_item, 'configurationItem')
        check_defined(event, 'event')
    except:
        return True
    status = configuration_item['configurationItemStatus']
    event_left_scope = event['eventLeftScope']
    if status == 'ResourceDeleted':
        print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.")
    return status in (['OK', 'ResourceDiscovered']) and not event_left_scope
    # return (status == 'OK' or status == 'ResourceDiscovered') and not event_left_scope

def get_assume_role_credentials(role_arn):
    sts_client = boto3.client('sts')
    try:
        assume_role_response = sts_client.assume_role(RoleArn=role_arn,
                                                      RoleSessionName="configLambdaExecution",
                                                      DurationSeconds=CONFIG_ROLE_TIMEOUT_SECONDS)
        if 'liblogging' in sys.modules:
            liblogging.logSession(role_arn, assume_role_response)
        return assume_role_response['Credentials']
    except botocore.exceptions.ClientError as ex:
        # Scrub error message for any internal account info leaks
        print(str(ex))
        if 'AccessDenied' in ex.response['Error']['Code']:
            ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role."
        else:
            ex.response['Error']['Message'] = "InternalError"
            ex.response['Error']['Code'] = "InternalError"
        raise ex

# This removes older evaluation (usually useful for periodic rule not reporting on AWS::::Account).
def clean_up_old_evaluations(latest_evaluations, event):

    cleaned_evaluations = []

    old_eval = AWS_CONFIG_CLIENT.get_compliance_details_by_config_rule(
        ConfigRuleName=event['configRuleName'],
        ComplianceTypes=['COMPLIANT', 'NON_COMPLIANT'],
        Limit=100)

    old_eval_list = []

    while True:
        for old_result in old_eval['EvaluationResults']:
            old_eval_list.append(old_result)
        if 'NextToken' in old_eval:
            next_token = old_eval['NextToken']
            old_eval = AWS_CONFIG_CLIENT.get_compliance_details_by_config_rule(
                ConfigRuleName=event['configRuleName'],
                ComplianceTypes=['COMPLIANT', 'NON_COMPLIANT'],
                Limit=100,
                NextToken=next_token)
        else:
            break

    for old_eval in old_eval_list:
        old_resource_id = old_eval['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceId']
        newer_founded = False
        for latest_eval in latest_evaluations:
            if old_resource_id == latest_eval['ComplianceResourceId']:
                newer_founded = True
        if not newer_founded:
            cleaned_evaluations.append(build_evaluation(old_resource_id, "NOT_APPLICABLE", event))

    return cleaned_evaluations + latest_evaluations

def lambda_handler(event, context):
    if 'liblogging' in sys.modules:
        liblogging.logEvent(event)

    global AWS_CONFIG_CLIENT

    print(event)
    check_defined(event, 'event')
    invoking_event = json.loads(event['invokingEvent'])

    try:
        AWS_CONFIG_CLIENT = get_client('config', event)
        if invoking_event['messageType'] in ['ConfigurationItemChangeNotification', 'ScheduledNotification', 'OversizedConfigurationItemChangeNotification']:
            configuration_item = get_configuration_item(invoking_event)
            if is_applicable(configuration_item, event):
                compliance_result = evaluate_compliance(event, configuration_item)
            else:
                compliance_result = "NOT_APPLICABLE"
        else:
            return build_internal_error_response('Unexpected message type', str(invoking_event))
    except botocore.exceptions.ClientError as ex:
        if is_internal_error(ex):
            return build_internal_error_response("Unexpected error while completing API request", str(ex))
        return build_error_response("Customer error while making API request", str(ex), ex.response['Error']['Code'], ex.response['Error']['Message'])
    except ValueError as ex:
        return build_internal_error_response(str(ex), str(ex))

    evaluations = []
    latest_evaluations = []

    if not compliance_result:
        latest_evaluations.append(build_evaluation(event['accountId'], "NOT_APPLICABLE", event, resource_type='AWS::::Account'))
        evaluations = clean_up_old_evaluations(latest_evaluations, event)
    elif isinstance(compliance_result, str):
        if configuration_item:
            evaluations.append(build_evaluation_from_config_item(configuration_item, compliance_result))
        else:
            evaluations.append(build_evaluation(event['accountId'], compliance_result, event, resource_type=DEFAULT_RESOURCE_TYPE))
    elif isinstance(compliance_result, list):
        for evaluation in compliance_result:
            missing_fields = False
            for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'):
                if field not in evaluation:
                    print("Missing " + field + " from custom evaluation.")
                    missing_fields = True

            if not missing_fields:
                latest_evaluations.append(evaluation)
        evaluations = clean_up_old_evaluations(latest_evaluations, event)
    elif isinstance(compliance_result, dict):
        missing_fields = False
        for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'):
            if field not in compliance_result:
                print("Missing " + field + " from custom evaluation.")
                missing_fields = True
        if not missing_fields:
            evaluations.append(compliance_result)
    else:
        evaluations.append(build_evaluation_from_config_item(configuration_item, 'NOT_APPLICABLE'))

    # Put together the request that reports the evaluation status
    result_token = event['resultToken']
    test_mode = False
    if result_token == 'TESTMODE':
        # Used solely for RDK test to skip actual put_evaluation API call
        test_mode = True

    # Invoke the Config API to report the result of the evaluation
    evaluation_copy = []
    evaluation_copy = evaluations[:]
    while evaluation_copy:
        AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluation_copy[:100], ResultToken=result_token, TestMode=test_mode)
        del evaluation_copy[:100]

    # Used solely for RDK test to be able to test Lambda function
    return evaluations

def is_internal_error(exception):
    return ((not isinstance(exception, botocore.exceptions.ClientError)) or exception.response['Error']['Code'].startswith('5')
            or 'InternalError' in exception.response['Error']['Code'] or 'ServiceError' in exception.response['Error']['Code'])

def build_internal_error_response(internal_error_message, internal_error_details=None):
    return build_error_response(internal_error_message, internal_error_details, 'InternalError', 'InternalError')

def build_error_response(internal_error_message, internal_error_details=None, customer_error_code=None, customer_error_message=None):
    error_response = {
        'internalErrorMessage': internal_error_message,
        'internalErrorDetails': internal_error_details,
        'customerErrorMessage': customer_error_message,
        'customerErrorCode': customer_error_code
    }
    print(error_response)
    return error_response

このコードはAWS Config Rules Githubリポジトリで提供されているELASTICACHE_REDIS_CLUSTER_AUTO_BACKUP_CHECK.pyの一部を編集する形で作成しました。
このように対象となるリソースのサンプルコードが提供されていれば、それを編集することで比較的容易に新たなカスタムLambdaルールの作成が可能です。

今回のコードでElasticache for Redisの暗号化をチェックする箇所は主に次の箇所です。

def generate_evaluations(eval_list, key,event):
    evaluations = []
    for cluster in eval_list:
        if cluster['AtRestEncryptionEnabled'] == False:
            evaluations.append(build_evaluation(cluster[key], 'NON_COMPLIANT', event, annotation="Encryption at rest not enabled for Amazon ElastiCache cluster: {}".format(cluster[key])))
        else:
            evaluations.append(build_evaluation(cluster[key], 'COMPLIANT', event))
    return evaluations

クラスターのAtRestEncryptionEnabled属性をチェックし、FalseであればNON_COMPLIANT(非準拠)、TrueであればCOMPLIANT(準拠)として評価します。

以上でAWS Lambda設定は完了です。

AWS IAM Roleの設定

先ほどAWS Lambda作成時にIAM Roleを作成しましたが、Elasticacheの接続権限が不足しているので設定します。

AWS IAMロールと移動し、redis-encrypted-at-rest-check-function-roleを選択します。
許可を追加ポリシーをアタッチをクリックして許可追加画面に移動します。

AmazonElastiCacheReadOnlyAccessを検索してチェックし、ポリシーをアタッチボタンをクリックします。

以上でAWS IAM Roleの設定は完了です。

AWS Config ルールの設定

最後にAWS Configルールを設定します。 AWS Configルールと移動し、ルールを追加ボタンをクリックします。

ルールタイプの選択でカスタムLambdaルールを作成にチェックして、次へボタンをクリックします。

下記を設定します。

  • 名前
    • redis-encrypted-at-rest-check-rule
  • 説明
    • ルールの説明(必要であれば)
  • AWS Lambda関数ARN
    • redis-encrypted-at-rest-check-functionのARN
  • Evaluation mode
    • Turn on detective evaluationのみクリック
  • トリガータイプ
    • 定期的にチェック
  • 変更範囲
    • すべての変更
  • 頻度
    • 24時間
      • もっと短い頻度で確認したい場合は適宜変更

設定できれば次へボタンをクリックして確認画面に進みます。

内容を確認して問題なければ、ルールを追加ボタンをクリックしてルール作成します。

以上ですべての設定は完了です。

動作確認

2つのAmazon Elasticache for Redisクラスターを作成します。

test:保管時の暗号化無効

test2:保管時の暗号化有効

AWS Configルールに移動し、redis-encrypted-at-rest-check-ruleを選択します。
まだ評価されていない場合は、アクション再評価をクリックして、手動で評価を実行します。

対象範囲内のリソースとしてすべてを選択します。
暗号化有効のtest2準拠、無効のtest非準拠として評価されていることがわかります。

以上で、想定通りのチェックができていることが確認できました。

最後に

Amazon Elasticache for Redisの保管時暗号化設定有無をAWS Config カスタム Lambdaルールを使ってチェックしました。
マネージドルールが存在しないようなチェックも、カスタム Lambdaルールを利用すれば実装することが可能です。
Lambda関数を一から作るのは大変かと思いますが、サンプルをベースに編集すれば比較的容易に作成可能ですので、お試しください。 以上、トクヤマシュンでした。