ハンズオンに従ってCloudFormation Hooksのカスタムフックを作成してみた

こんにちは。枡川です。
先日CloudFormation Hooksが使えるようになりました。

こちらはCloudFormationスタックの作成、更新、削除の前にカスタムロジックを実施できる機能になります。
設定内容が基準に従っているかどうかをCloudFormation Hooksで確認し、定めたルールに準拠していない場合は警告を発生させたり、プロビジョニングを中止させたりすることが可能です。
Security Hubなどのサービスは発見的統制を実現しますが、こちらは予防的統制を実現する仕組みとなります。
cfn-guardをCI/CDパイプラインに組み込むことで似たようなことを実現できます。

Terraform用のtfsecも有名ですね。

CloudFormation HooksはCloudFormationとネイティブに統合された機能であり、パイプラインを構築しない場合でも統制を実施することが可能です。
実際に使用するにはまだ知見が溜まっていないかもしれませんが、AWSでのIaCセキュリティを考えた際に今後積極的に使っていきたい機能だと思います。
この機能を使用するにはCloudFormation Registryにフックが登録されている必要があります。
実際にCloudFormation Registryを確認すると、下記3種類からフックを選択可能です。

  • AWS
  • アクティブ化済みのサードパーティー
  • 非公開登録

現時点でアクティブ化済みのサードパーティー製フックとしてAWSが提供しているサンプルのフックが公開されており、それらを使用することが可能です。

これらはAWSマネージドという扱いでは無いが、AWSがサービス開始時にサンプルとして公開しているものになります。

  • AWSSamples::EFSEncrypt::Hook
    • EFSが暗号化されているか
  • AWSSamples::Ec2ImageIdCheckSsm::Hook
    • EC2インスタンスのAMIがSSMパラメータストアに保持しているイメージIDと一致するか
  • AWSSamples::Ec2SsmSmOnly::Hook
    • EC2がSSMで管理されているか
  • AWSSamples::Ec2VolumeEncryptionKms::Hook
    • EBSボリュームが指定したKMSで暗号化されているか
  • AWSSamples::EksClusterLogging::Hook
    • EKSの監査ログと認証システムログを有効化しているか
  • AWSSamples::EksClusterPublicApi::Hook
    • EKSがパブリックアクセス可能なエンドポイントを持っていないか
  • AWSSamples::FSxWindowsOnly::Hook
    • FSxWindowsのファイルシステムタイプがWindowsになっているか
  • AWSSamples::IAMPrincipalBoundary::Hook
    • IAMプリンシパルに対して特定のPermissionsBoundaryが設定されているか
  • AWSSamples::IamPoliciesRequireMfa::Hook
    • IAMポリシー、ロールに対してMFAを強制しているか
  • AWSSamples::IamPolicyDoesNotGiveAdmin::Hook
    • 全リソースに対して全アクションをワイルドカードで許可するポリシーになっていないか
  • AWSSamples::IamUsersHavePolicy::Hook
    • IAMユーザが最低1つポリシーを持っているか
  • AWSSamples::NoIP6::Hook
    • VPCを作成する際にIPv6を有効化していないか
  • AWSSamples::RdsEncrypt::Hook
    • RDSの暗号化が有効になっているか
  • AWSSamples::S3BlockPublicAccess::Hook
    • S3バケットがパブリックアクセスをブロックしているか
  • AWSSamples::S3BucketEncrypt::Hook
    • S3バケットに対してKMSによる暗号化が有効化されているか
  • AWSSamples::S3BucketLoggingCompliance::Hook
    • S3バケットに対してログ記録が有効になっているか
  • AWSSamples::S3BucketLoggingEnabled::Hook
    • S3バケットに対してログ記録が有効になっているか(ログ出力先に関して、バケットに加えてプレフィクスも指定、S3BucketLoggingComplianceとの差はJava実装かPython実装かという所が大きいように思える)
  • AWSSamples::SQSNonPublic::Hook
    • SQSがパブリックにアクセスできる状態でないか
  • AWSSamples::SQSPolicyEncryption::Hook
    • SQSがHTTPS経由の暗号化された接続のみを許可しているか
  • AWSSamples::SecurityGroupOpenIngress::Hook
    • セキュリティグループのインバウンドルールが0.0.0.0/0からのアクセスを許可していないか

AWSが提供しているものがサードパーティー枠であることに少し違和感を覚えましたが、あくまで使い方のサンプルを公式に示しているというものになるようです。
AWSに分類されるフックはまだ存在しませんが、今後AWSマネージドのフックが登録されるのでしょうか。
これ以外のチェックはできないのかというとそうでは無く、カスタムフックを生成して登録することで様々なチェックを実現することが可能です。
カスタムフックを作成するためには、JavaもしくはPythonでコーディングする必要があります。
手間は必要ですが、自由度はかなり高そうです。
カスタムフックの作成については、公式ドキュメントの下記ページに手順が記載されています。

ほぼ同じ内容の作業がハンズオンとしても提供されており、こちらの方が説明が丁寧なため、今回はこちらをやってみます。
PythonとJavaの2通りの手順が記載されていますが、今回はPythonで作成してみます。

やってみた

Pythonによるカスタムフックの作成は下記6ステップに別れています。

  • Step-1 Initiating Your Project
    • 必要なモジュールのインストール、プロジェクトの作成
  • Step-2 Modeling Your Hook
    • カスタムフックの作成
  • Step-3 Testing Your hook
    • 作成したカスタムフックのローカルでのテスト
  • Step-4 Registering Your Hook
    • 作成したカスタムフックのRegistryへの登録
  • Step-5 Test Your Hook
    • カスタムフックの動作確認
  • Step-6 Managing Your Hook
    • カスタムフックの登録後の管理について(バージョン管理と登録解除)

以下すべてでは無いですが、流れを記載します。
まず、必要なモジュールをインストールします。
カスタムフックをPythonで開発するためにcloudformation-clicloudformation-cli-python-pluginをインストールします。

pip3 install cloudformation-cli cloudformation-cli-python-plugin
pip3 install --upgrade cloudformation-cli cloudformation-cli-python-plugin

カスタムフックプロジェクトを作成するためにcfn initコマンドを実行します。
複数の質問に答えると、設定ファイルやソースコードが生成されます。

$ cfn init
Initializing new project
Do you want to develop a new resource(r) or a module(m) or a hook(h)?.
>> Please enter a value matching resource(r) or a module(m) or a hook(h)
Do you want to develop a new resource(r) or a module(m) or a hook(h)?.
>> h
What's the name of your hook type?
(Organization::Service::Hook)
>> MyCompany::Testing::Hook
Select a language for code generation:
[1] python36
[2] python37
(enter an integer): 
>> 2
Use docker for platform-independent packaging (Y/n)?
This is highly recommended unless you are experienced 
with cross-platform Python packaging.
>> Y
Initialized a new project in /home/ec2-user/environment/CloudFormationCLI/myhook

cfn init後、下記のような構造となります。

$ tree .
.
├── hook-role.yaml
├── mycompany-testing-hook.json
├── README.md
├── requirements.txt
├── rpdk.log
├── src
│   └── mycompany_testing_hook
│       ├── handlers.py
│       ├── __init__.py
│       └── models.py
└── template.yml

ハンズオンではプロジェクト名をMyCompany::Testing::Hookとしましたが、この場合はスキーマを定義するファイルとしてsrc/mycompany-testing-hook.jsonが生成されるので最初に修正します。
はじめは下記のようなファイルが生成されます。

{
    "typeName": "MyCompany::Testing::Hook",
    "description": "Example resource SSE (Server Side Encryption) verification hook",
    "sourceUrl": "https://github.com/aws-cloudformation/example-sse-hook",
    "documentationUrl": "https://github.com/aws-cloudformation/example-sse-hook/blob/master/README.md",
    "typeConfiguration": {
        "properties": {
            "EncryptionAlgorithm": {
                "description": "Encryption algorithm for SSE",
                "default": "AES256",
                "type": "string"
            }
        },
        "additionalProperties": false
    },
    "required": [],
    "handlers": {
        "preCreate": {
            "targetNames": [
                "My::Example::Resource"
            ],
            "permissions": []
        },
        "preUpdate": {
            "targetNames": [
                "My::Example::Resource"
            ],
            "permissions": []
        },
        "preDelete": {
            "targetNames": [
                "Other::Example::Resource"
            ],
            "permissions": []
        }
    },
    "additionalProperties": false
}

修正が必須なのはhandlersの項目です。
作成、更新、削除時のカスタムロジックの対象リソースを指定します。
descriptionsourceUrldocumentationUrlも必須では無いですが、修正しておくと良いです。
ここで指定した内容はマネジメントコンソールからも確認可能です。
今回はS3とSQSが対象となるため、jsonを下記のように修正します。

   {
    "typeName": "MyCompany::Testing::Hook",
    "description": "Verifies S3 bucket and SQS queues properties before create and update",
    "sourceUrl": "https://mycorp.com/my-repo.git",
    "documentationUrl": "https://mycorp.com/documentation",
    "typeConfiguration": {
        "properties": {
            "minBuckets": {
                "description": "Minimum number of compliant buckets",
                "type": "string"
                
            },
            "minQueues": {
                "description": "Minimum number of compliant queues",
                "type": "string"
            },
            "encryptionAlgorithm": {
                "description": "Encryption algorithm for SSE",
                "default": "AES256",
                "type": "string"
            }
        },
        "required": [],
        "additionalProperties": false
    },
    "handlers": {
        "preCreate": {
            "targetNames": [
                "AWS::S3::Bucket",
                "AWS::SQS::Queue"
            ],
            "permissions": []
        },
        "preUpdate": {
            "targetNames": [
                "AWS::S3::Bucket",
                "AWS::SQS::Queue"
            ],
            "permissions": []
        }
    },
    "additionalProperties": false
}

この状態でcfn generateを実行します。
下記のように表示され、mycompany-testing-hook-configuration.jsonと呼ばれるファイルが増えました。

Generated files for MyCompany::Testing::hook

これで基本的な設定は完了なので、次は実行するロジックを書いていきます。
src/models.pysrc/handlers.pyが存在しますが、手を入れるのはhandlers.pyの方です。
src/models.pyの方はcfn generateによって変更され、直接いじることは推奨されません。
handlers.pyにはpre_create_handlerpre_update_handlerpre_delete_handlerの3つの関数があり、それぞれの# TODO: put code hereとなっている部分にロジックを書いていきます。
pre_create_handlerに書いたら作成前に、pre_update_handlerに書いたら更新前に、pre_delete_handlerに書いたら削除前にフックを差し込めます。
preとついていることからわかるように作成、更新、削除の後にフックを差し込むことはできず、前段階にしかフックは差し込めません。
下記がcfn generate後のhandlers.pyです。

import logging
from typing import Any, MutableMapping, Optional

from cloudformation_cli_python_lib import (
    BaseHookHandlerRequest,
    HandlerErrorCode,
    Hook,
    HookInvocationPoint,
    OperationStatus,
    ProgressEvent,
    SessionProxy,
    exceptions,
)

from .models import HookHandlerRequest, TypeConfigurationModel

# Use this logger to forward log messages to CloudWatch Logs.
LOG = logging.getLogger(__name__)
TYPE_NAME = "MyCompany::Testing::Hook"

hook = Hook(TYPE_NAME, TypeConfigurationModel)
test_entrypoint = hook.test_entrypoint


@hook.handler(HookInvocationPoint.CREATE_PRE_PROVISION)
def pre_create_handler(
        session: Optional[SessionProxy],
        request: HookHandlerRequest,
        callback_context: MutableMapping[str, Any],
        type_configuration: TypeConfigurationModel
) -> ProgressEvent:
    target_model = request.hookContext.targetModel
    progress: ProgressEvent = ProgressEvent(
        status=OperationStatus.IN_PROGRESS
    )
    # TODO: put code here

    # Example:
    try:
        # Reading the Resource Hook's target properties
        resource_properties = target_model.get("resourceProperties")

        if isinstance(session, SessionProxy):
            client = session.client("s3")
        # Setting Status to success will signal to cfn that the hook operation is complete
        progress.status = OperationStatus.SUCCESS
    except TypeError as e:
        # exceptions module lets CloudFormation know the type of failure that occurred
        raise exceptions.InternalFailure(f"was not expecting type {e}")
        # this can also be done by returning a failed progress event
        # return ProgressEvent.failed(HandlerErrorCode.InternalFailure, f"was not expecting type {e}")

    return progress


@hook.handler(HookInvocationPoint.UPDATE_PRE_PROVISION)
def pre_update_handler(
        session: Optional[SessionProxy],
        request: BaseHookHandlerRequest,
        callback_context: MutableMapping[str, Any],
        type_configuration: TypeConfigurationModel
) -> ProgressEvent:
    target_model = request.hookContext.targetModel
    progress: ProgressEvent = ProgressEvent(
        status=OperationStatus.IN_PROGRESS
    )
    # TODO: put code here

    # Example:
    try:
        # A Hook that does not allow a resource's encryption algorithm to be modified

        # Reading the Resource Hook's target current properties and previous properties
        resource_properties = target_model.get("resourceProperties")
        previous_properties = target_model.get("previousResourceProperties")

        if resource_properties.get("encryptionAlgorithm") != previous_properties.get("encryptionAlgorithm"):
            progress.status = OperationStatus.FAILED
            progress.message = "Encryption algorithm can not be changed"
        else:
            progress.status = OperationStatus.SUCCESS
    except TypeError as e:
        progress = ProgressEvent.failed(HandlerErrorCode.InternalFailure, f"was not expecting type {e}")

    return progress


@hook.handler(HookInvocationPoint.DELETE_PRE_PROVISION)
def pre_delete_handler(
        session: Optional[SessionProxy],
        request: BaseHookHandlerRequest,
        callback_context: MutableMapping[str, Any],
        type_configuration: TypeConfigurationModel
) -> ProgressEvent:
    # TODO: put code here
    return ProgressEvent(
        status=OperationStatus.SUCCESS
    )

サンプルコード(Step-2 Modeling Your HookのExample handlers.py)からpre_create_handlerの部分だけ抜きだすと下記のようになります。

def _validate_s3_bucket_encryption(bucket: MutableMapping[str, Any], required_encryption_algorithm: str) -> ProgressEvent:
    status = None
    message = ""
    error_code = None

    if bucket:
        bucket_name = bucket.get("BucketName")

        bucket_encryption = bucket.get("BucketEncryption")
        if bucket_encryption:
            server_side_encryption_rules = bucket_encryption.get("ServerSideEncryptionConfiguration")
            if server_side_encryption_rules:
                for rule in server_side_encryption_rules:
                    bucket_key_enabled = rule.get("BucketKeyEnabled")
                    if bucket_key_enabled:
                        server_side_encryption_by_default = rule.get("ServerSideEncryptionByDefault")

                        encryption_algorithm = server_side_encryption_by_default.get("SSEAlgorithm")
                        kms_key_id = server_side_encryption_by_default.get("KMSMasterKeyID")  # "KMSMasterKeyID" is name of the property for an AWS::S3::Bucket

                        if encryption_algorithm == required_encryption_algorithm:
                            if encryption_algorithm == "aws:kms" and not kms_key_id:
                                status = OperationStatus.FAILED
                                message = f"KMS Key ID not set for bucket with name: f{bucket_name}"
                            else:
                                status = OperationStatus.SUCCESS
                                message = f"Successfully invoked PreCreateHookHandler for AWS::S3::Bucket with name: {bucket_name}"
                        else:
                            status = OperationStatus.FAILED
                            message = f"SSE Encryption Algorithm is incorrect for bucket with name: {bucket_name}"
                    else:
                        status = OperationStatus.FAILED
                        message = f"Bucket key not enabled for bucket with name: {bucket_name}"

                    if status == OperationStatus.FAILED:
                        break
            else:
                status = OperationStatus.FAILED
                message = f"No SSE Encryption configurations for bucket with name: {bucket_name}"
        else:
            status = OperationStatus.FAILED
            message = f"Bucket Encryption not enabled for bucket with name: {bucket_name}"
    else:
        status = OperationStatus.FAILED
        message = "Resource properties for S3 Bucket target model are empty"

    if status == OperationStatus.FAILED:
        error_code = HandlerErrorCode.NonCompliant

    return ProgressEvent(
        status=status,
        message=message,
        errorCode=error_code
    )


def _validate_sqs_queue_encryption(queue: MutableMapping[str, Any]) -> ProgressEvent:
    if not queue:
        return ProgressEvent(
            status=OperationStatus.FAILED,
            message="Resource properties for SQS Queue target model are empty",
            errorCode=HandlerErrorCode.NonCompliant
        )
    queue_name = queue.get("QueueName")

    kms_key_id = queue.get("KmsMasterKeyId")  # "KmsMasterKeyId" is name of the property for an AWS::SQS::Queue
    if not kms_key_id:
        return ProgressEvent(
            status=OperationStatus.FAILED,
            message=f"Server side encryption turned off for queue with name: {queue_name}",
            errorCode=HandlerErrorCode.NonCompliant
        )

    return ProgressEvent(
        status=OperationStatus.SUCCESS,
        message=f"Successfully invoked PreCreateHookHandler for targetAWS::SQS::Queue with name: {queue_name}"
    )

@hook.handler(HookInvocationPoint.CREATE_PRE_PROVISION)
def pre_create_handler(
        session: Optional[SessionProxy],
        request: HookHandlerRequest,
        callback_context: MutableMapping[str, Any],
        type_configuration: TypeConfigurationModel
) -> ProgressEvent:
    target_name = request.hookContext.targetName
    if "AWS::S3::Bucket" == target_name:
        return _validate_s3_bucket_encryption(request.hookContext.targetModel.get("resourceProperties"), type_configuration.encryptionAlgorithm)
    elif "AWS::SQS::Queue" == target_name:
        return _validate_sqs_queue_encryption(request.hookContext.targetModel.get("resourceProperties"))
    else:
        raise exceptions.InvalidRequest(f"Unknown target type: {target_name}")

ロジックを記載したら、Privateレジストリに登録します。

cfn submit --set-default

上記コマンドによってPrivateレジストリに登録されアクティブ化済みの拡張機能として確認可能になります。
構成の編集をクリックして、拡張機能の設定を行ないます。
各登録済みフックはTargetStacksFailureModePropertiesといった項目を設定して使用することができ、これらはフックの有効化後にマネジメントコンソールなどから設定することができます。
TargetStacksALL(すべてのスタック操作でフックを適用する)かNONE(すべてのスタック操作でフックが適用されなくなる)から設定します。
ON/OFFだと思うとしっくりきますね。
FaulureModeFail(中断する)かWARN(続行しつつ、警告を送信する)で選択できます。
Propertiesはそれぞれのフックで独自にもたせることができる設定項目となります。
今回は下記のように設定します。

{
  "CloudFormationConfiguration": {
    "HookConfiguration": {
      "TargetStacks": "ALL",
      "FailureMode": "FAIL",
      "Properties": {
        "encryptionAlgorithm": "aws:kms",
        "minBuckets":  "3",
        "minQueues":  "1"
      }
    }
  }
}

登録、設定まで終わったのでCloudFormation Hooksの動作確認を実施します。
まず、下記テンプレートからスタックを作成します。

---
AWSTemplateFormatVersion: "2010-09-09"
Resources:
  S3Bucket:
    Type: AWS::S3::Bucket
    Properties: {}

The following hook(s) failedと記載されている通り、フックが失敗してスタック作成自体も失敗していることがわかります。
この際、設定から表示する列にフックの呼び出しを追加することでリソース作成時にどのフックが呼び出されたかを確認することができます。
今回の場合ではきちんとMyCompany::Testing::Hookが呼び出されていますね! 次にS3に加えてSQSを増やしつつ、KMSで暗号化してスタックを作成してみます。

AWSTemplateFormatVersion: "2010-09-09"
Description: >
  This CloudFormation template provisions an encrypted S3 Bucket

Resources:
  EncryptedS3Bucket:
    Type: 'AWS::S3::Bucket'
    Properties:
      BucketName: !Sub 'encryptedbucket-${AWS::Region}-${AWS::AccountId}'
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: 'aws:kms'
              KMSMasterKeyID: !Ref EncryptionKey
            BucketKeyEnabled: true
            
  EncryptedQueue:
    Type: 'AWS::SQS::Queue'
    Properties:
      QueueName: !Sub 'encryptedqueue-${AWS::Region}-${AWS::AccountId}'
      KmsMasterKeyId: !Ref EncryptionKey

  EncryptionKey:
    Type: AWS::KMS::Key
    DeletionPolicy: Retain
    Properties:
      Description: KMS key used to encrypt the resource type artifacts
      EnableKeyRotation: true
      KeyPolicy:
        Version: "2012-10-17"
        Statement:
        - Sid: Enable full access for owning account
          Effect: Allow
          Principal:
            AWS: !Ref AWS::AccountId
          Action: kms:*
          Resource: "*"

Outputs:
  EncryptedBucketName:
    Value: !Ref EncryptedS3Bucket
  EncryptedQueueName:
    Value: !Ref EncryptedQueue

S3もSQSも暗号化されているので、問題無く作成されます。
下記のように更新してみます。
S3は暗号化を無効化しつつ、SQSについても遅延を設定します。

AWSTemplateFormatVersion: "2010-09-09"
Description: >
  This CloudFormation template provisions an encrypted S3 Bucket

Resources:
  EncryptedS3Bucket:
    Type: 'AWS::S3::Bucket'
    Properties:
      BucketName: !Sub 'encryptedbucket-${AWS::Region}-${AWS::AccountId}'
      # BucketEncryption:
      #   ServerSideEncryptionConfiguration:
      #     - ServerSideEncryptionByDefault:
      #         SSEAlgorithm: 'aws:kms'
      #         KMSMasterKeyID: !Ref EncryptionKey
      #       BucketKeyEnabled: true
            
  EncryptedQueue:
    Type: 'AWS::SQS::Queue'
    Properties:
      QueueName: !Sub 'encryptedqueue-${AWS::Region}-${AWS::AccountId}'
      KmsMasterKeyId: !Ref EncryptionKey
      DelaySeconds: 10 # 追加

  EncryptionKey:
    Type: AWS::KMS::Key
    DeletionPolicy: Retain
    Properties:
      Description: KMS key used to encrypt the resource type artifacts
      EnableKeyRotation: true
      KeyPolicy:
        Version: "2012-10-17"
        Statement:
        - Sid: Enable full access for owning account
          Effect: Allow
          Principal:
            AWS: !Ref AWS::AccountId
          Action: kms:*
          Resource: "*"

Outputs:
  EncryptedBucketName:
    Value: !Ref EncryptedS3Bucket
  EncryptedQueueName:
    Value: !Ref EncryptedQueue

S3バケット側でフックが失敗していることが確認できます。
意図通り動いていますね。

登録したフックを削除したい場合は下記コマンドで削除できます。

aws cloudformation deregister-type --arn <hook-arn>

まとめ

CloudFormation Hooksのカスタムフックを作成してみました。
それなりに手間は必要ですが、その分自由度が高くていろいろなことに使えるなと思いました。
興味がある方は是非ハンズオンもやってみて下さい!