[AWS Service Catalog]共有した製品のエラーを管理者に通知する仕組みを考えてみた

2022.08.25

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Service Catalogのポートフォリオ共有機能を使って製品をマルチアカウントに展開した後、メンバーアカウント側の製品エラーとなった場合に通知させたいと思ったので仕組みを考えてみました。

共有したポートフォリオの製品情報は管理者から見れない

Service Catalogをマルチアカウントで運用する時、共有機能を使うことでメンバーアカウントにポートフォリオを共有できます。

管理者が作成したポートフォリオを、ユーザーが自由に利用できるという非常に便利な機能です。

しかしこの機能を利用すると、共有した製品がどのように利用されているか管理者から確認する方法がありません。そのため、共有された製品の起動などを管理者で検知したい場合は、自分で仕組みを実装する必要があります。

Service Catalogを運用している時、ポートフォリオを共有して提供しているのに、エラーとなっていて使ってもらえない…なんてことがあるかもしれません。

そんな時気付けるよう製品の起動や更新時に失敗時に管理者へ通知したいと考えました。

考えた実装

どうにか製品利用時のエラーを管理者に通知させたいと思い、考えた構成がこちらです。

共有するポートフォリオに通知の制約を設定し、宛先は管理アカウントの受信用トピック(Receive Topic)に向けています。共有先でも制約は共有されるため、製品を起動すると管理アカウントの受信用トピックにイベントが発行されます。

CloudFormationの通知はイベントが全て発行されるのですが、受け取りたいものはスタック作成・更新の失敗です。そのためイベントをLambda(Filter Lambda)でフィルターしてからメール送信という流れです。

本当であればSNSのサブスクリプションフィルターでフィルターをかけたいのですが、CloudFormationの通知はテキスト形式で送られてくるためLambdaを採用しています。このLambdaにはCFnスタックの全イベントが送られてきますが、デフォルトでは同時実行数の上限が1000回と耐えられる想定のためSQS等は挟んでいません。

• AWS リージョンあたりのデフォルトの同時実行数の制限は、いかなる時点においても 1,000 回の呼び出しです。 Lambda 関数の同時実行数の制限の引き上げをリクエストする

やってみる

それでは実際に通知させてみます。

前提

  • Organizations環境
  • 共有元ポートフォリオがあること

CloudFormationの実行

まず最初は通知に必要な以下リソースをCloudFormationで作成します。

  • 受信用SNSトピック(Receive Topic)
  • 通知用SNSトピック(Send Topic)
  • フィルター用Lambda(Filter Lambda)
    • Python3.8で実装

Lambdaのコードはサンプル程度にお考えください。

管理あか東京リージョンで以下のテンプレートを使ってリソースを作成します。入力パラメータはそれぞれの環境に合わせて入力してください。

  • MailAddress:通知先のメールアドレス
  • OrgId:組織ID
AWSTemplateFormatVersion: "2010-09-09"
Description: CFn Alert aggregater

Parameters:
  MailAddress:
    Type: String
    Description: Enter the email address to be notified
  OrgId:
    Type: String
    Description: Enter the organization ID you want to receive CFn events

Conditions:
  MainTargetRegion: !Equals [!Ref AWS::Region, ap-northeast-1]

Resources:
  # CloudFormationの通知を受けるSNSトピック
  CFnEventReceiveSNSTopic:
    Type: AWS::SNS::Topic
    Properties:
      Subscription:
        - Endpoint: !Sub "arn:aws:lambda:ap-northeast-1:${AWS::AccountId}:function:AggregatFilterFunction"
          Protocol: "lambda"
      TopicName: "CFnEventReceiveSNSTopic"
  CFnEventReceiveSNSTopicPolicy:
    Type: AWS::SNS::TopicPolicy
    Properties:
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: "Allow"
            Principal:
              AWS: "*"
            Action: "SNS:Publish"
            Resource: !Ref CFnEventReceiveSNSTopic
            Condition:
              StringEquals:
                aws:PrincipalOrgID: !Ref OrgId
      Topics:
        - !Ref CFnEventReceiveSNSTopic
  # メール送信用SNSトピック
  SendAlertSNSTopic:
    Type: AWS::SNS::Topic
    Condition: MainTargetRegion
    Properties:
      Subscription:
        - Endpoint: !Ref MailAddress
          Protocol: "email"
      TopicName: "SendAlertSNSTopic"
  # 通知を集約、フィルターするLambda
  AggregatFilterLambdaFunction:
    Condition: MainTargetRegion
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: "AggregatFilterFunction"
      Role: !GetAtt "LambdaExecutionRole.Arn"
      Runtime: "python3.8"
      Handler: index.lambda_handler
      Environment:
        Variables:
          SendAlertSNSTopicArn: !Ref SendAlertSNSTopic
      Timeout: "180"
      Code:
        ZipFile: |
          import json
          import boto3
          import os
          import logging

          logger = logging.getLogger()
          logger.setLevel("INFO")
          sns_topic_arn=os.environ["SendAlertSNSTopicArn"]

          def lambda_handler(event, context):
              logger.info(event)
              massage=event["Records"][0]["Sns"]["Message"]
              check_status_list=[
                  "ROLLBACK_COMPLETE",
                  "ROLLBACK_FAILED",
                  "UPDATE_FAILED",
                  "UPDATE_ROLLBACK_FAILED"
                  ]
              
              is_status=False

              for check_status in check_status_list:
                  if check_status in massage:
                      is_status = True
              if is_status:
                  sns_publish(massage)

          def sns_publish(massage):
              params = {
              'TopicArn': sns_topic_arn,
              'Subject': 'Service Catalog Product(CloudFormation Stack) Failed Alert',
              'Message': massage
              }
              sns = boto3.client('sns')
              sns.publish(**params)
  AggregatFilterLambdaFunctionPermission:
    Condition: MainTargetRegion
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !GetAtt AggregatFilterLambdaFunction.Arn
      Action: lambda:InvokeFunction
      Principal: sns.amazonaws.com
      SourceAccount: !Ref "AWS::AccountId"

  # Lambda用IAMロール
  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Condition: MainTargetRegion
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      Policies:
        - PolicyName: output-log-policy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
        - PolicyName: sns-publish-policy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - sns:Publish
                Resource: "*"
        - PolicyName: assume-role-policy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - sts:AssumeRole
                Resource: "*"

組織IDはSNSトピックのリソースベースポリシーに許可を与えるために利用します。通知するイベントはスタックの最後に出力されるであろう以下の4つに絞っています。

  • ROLLBACK_COMPLETE
  • ROLLBACK_FAILED
  • UPDATE_FAILED
  • UPDATE_ROLLBACK_FAILED

CloudFormationを展開できたら、イベントを受け取るリソースの実装は完了です。

通知制約の設定

ポートフォリオに対して通知の制約を設定します。任意のポートフォリオで制約のタブから「制約の作成」を開いてください。

制約を設定する製品を選んで、通知を選択します。今回はサンプルでVPC Flow Logsを設定する製品を選んでいます。

通知先として、先ほどCloudFormationで作成した受信用SNSトピックを選択します。

これで通知制約の設定は完了です。

ポートフォリオの共有

次にポートフォリオをメンバーアカウントへ共有します。共有方法については以下ブログに記載していますので、「Organization組織内への共有」を参照してください。

共有ができたら、メンバーアカウント側のポートフォリオでアクセス権限を付与してください。

製品のエラーを通知する

それでは準備が整ったので製品のエラーを通知させてみます。共有されたポートフォリオにアクセス権限を付与すると、製品の起動ができます。

この状態で、起動時にエラーが発生すると、Service Catalogのコンソール上では失敗のステータスに変わります。

同時にSNSから以下のようなメールが通知されました。

とりあえずエラーを通知できればいいよというケースであれば、StackIDからアカウントとリージョンが判断できるためメンバーアカウントへログインすれば確認はできます。ただし、ここで通知される内容はCloudFormationのスタックイベントであるため、Service Catalogの情報を通知に含めることができません。

もしプロビジョニングされた製品名やバージョンを取得したい場合、Lambdaの中で工夫して取得する必要があります。

通知にService Catalogの情報を含めたい場合

と、いうわけで若干無理やりですが、プロビジョニングされた製品名やバージョンを取得して通知してみました。上記の通知内容で良い方は飛ばしてください。

試してみたい方はAggregatFilterFunction内のコードを置き換えてご利用ください。あくまでサンプルですので、ご利用は自己責任でお願いします。

長くなってしまったので折りたたんでいます。

AggregatFilterFunction.py(クリックすると展開されます)
import boto3
import os
import logging
from boto3.session import Session

logger = logging.getLogger()
logger.setLevel("INFO")
sns_topic_arn=os.environ["SendAlertSNSTopicArn"]

def lambda_handler(event, context):
    logger.info('[START] lambda_handler')
    logger.info(event)
    massage=event["Records"][0]["Sns"]["Message"]
    check_status_list=[
        "ROLLBACK_COMPLETE",
        "ROLLBACK_FAILED",
        "UPDATE_FAILED",
        "UPDATE_ROLLBACK_FAILED"
        ]

    is_status=False
    # ステータスが特定のものだけ通知
    for check_status in check_status_list:
        if check_status in massage:
            is_status = True
    if is_status:
        mes_l=(massage.split('\n'))
        #eventからスタックの情報取得
        stack_info=get_stack_info(mes_l)
        # Get Client
        member_session = get_member_session(stack_info["account_id"])
        #プロビジョニングされた製品の情報取得
        pro_pd_info=get_provisioned_product_info(member_session,stack_info["region"],stack_info["product_id"])
        #プロビジョニングされた製品のアーティファクト(バージョン)取得
        pro_artifact_info=get_provisioning_artifact_info(member_session,stack_info["region"],pro_pd_info)
        send_mes=create_mes(stack_info,pro_pd_info,pro_artifact_info)
        sns_publish(send_mes)
    logger.info('[END] lambda_handler')

def sns_publish(massage):
    params = {
    'TopicArn': sns_topic_arn,
    'Subject': 'Service Catalog Product(CloudFormation Stack) Failed Alert',
    'Message': massage
    }
    sns = boto3.client('sns')
    sns.publish(**params)
    
def get_member_session(target_account_id):
    logger.info('[START] get_member_session')
    sts_connection = boto3.client('sts')
    try:
        # Assume Role
        role_arn = "arn:aws:iam::%s:role/AWSControlTowerExecution" % target_account_id
        role_session_name = "CROSS_ACCOUNT_ACCESS_FROM_CT_MANAGEMENT_ACCOUNT"
        logger.info("- RoleArn=%s" % role_arn)
        logger.info("- RoleSessionName=%s" % role_session_name)
        target = sts_connection.assume_role(
            RoleArn=role_arn,
            RoleSessionName=role_session_name,
        )
    except Exception as e:
        logger.error(e)
        exit()
    else:
        member_session = Session(
            aws_access_key_id=target['Credentials']['AccessKeyId'],
            aws_secret_access_key=target['Credentials']['SecretAccessKey'],
            aws_session_token=target['Credentials']['SessionToken']
        )
        logger.info('[END] get_member_session')
        return member_session

def get_stack_info(mes_l):
    stack_info={}
    #リストからStackIdのみを取得
    ext_stack_id = [s for s in mes_l if "StackId" in s][0]
    #不要な文字列を削除
    stack_id=ext_stack_id.replace('StackId=', '').replace("'","")
    
    #StackIdから情報を取得
    stack_id_l=(stack_id.split(':'))
    stack_info["region"]=stack_id_l[3]
    stack_info["account_id"]=stack_id_l[4].replace(' ', '')
    stack_info["product_id"]="pp-"+stack_id_l[5].split("/")[1].split("-")[3]
    
    #リストからResourceStatusのみを取得
    ext_res_status = [s for s in mes_l if "ResourceStatus" in s][0]
    stack_info["resource_status"]=ext_res_status.replace('ResourceStatus=', '').replace("'","")
    logger.info(f"stack_info:${stack_info}")
    return stack_info

def get_provisioned_product_info(member_session,region,provisioned_product_id):
    sc = member_session.client('servicecatalog',region_name=region)
    #プロビジョニングされた製品の情報取得
    pro_pd_info=sc.describe_provisioned_product(Id=provisioned_product_id)
    logger.info(f"pro_pd_info:${pro_pd_info}")
    return pro_pd_info

def get_provisioning_artifact_info(member_session,region,pro_pd_info):
    sc = member_session.client('servicecatalog',region_name=region)
    artifact_id=pro_pd_info.get("ProvisionedProductDetail").get("ProvisioningArtifactId")
    product_id=pro_pd_info.get("ProvisionedProductDetail").get("ProductId")
    
    #プロビジョニングされた製品のアーティファクト(バージョン)取得
    pro_artifact_info=sc.describe_provisioning_artifact(
        ProductId=product_id,
        ProvisioningArtifactId=artifact_id
        )
    logger.info(f"pro_artifact_info:${pro_artifact_info}")
    return pro_artifact_info

def create_mes(stack_info,pro_pd_info,pro_artifact_info):
    #get stack_info
    account_id=stack_info.get("account_id")
    region=stack_info.get("region")
    product_id=stack_info.get("product_id")
    resource_status=stack_info.get("resource_status")
    # get pro_pd_info
    pro_pd_name=pro_pd_info.get("ProvisionedProductDetail").get("Name")
    # get pro_artifact_info
    version=pro_artifact_info.get("ProvisioningArtifactDetail").get("Name")
    description=pro_artifact_info.get("ProvisioningArtifactDetail").get("Description")
    
    ret = f"Service Catalogの製品でエラーが発生しました。\n\n"
    ret += f"・AWSアカウント: {account_id}\n"
    ret += f"・リージョン: {region}\n"
    ret += f"・スタックのステータス: {resource_status}\n"
    ret += f"・プロビジョニングされた製品名: {pro_pd_name}\n"
    ret += f"・プロビジョニングされた製品ID: {product_id}\n"
    ret += f"・製品のバージョン: {version}\n"
    ret += f"・製品バージョンの詳細: {description}\n"
    logger.info(f"create_mes:${ret}")
    return ret

コードを説明すると長くなってしまうので、簡単な処理イメージを記載しておきます。

  1. StackIdからアカウント・リージョン・プロビジョニングされた製品IDを取得
  2. 取得したアカウント・リージョンをもとにメンバーアカウントへAssumeRole
  3. プロビジョニングされた製品IDからプロビジョニングされた製品の詳細を取得
  4. プロビジョニングされた製品の詳細から、アーティファクトIDを取得
  5. アーティファクトIDからアーティファクト詳細情報を取得
  6. 取得した情報をもとに通知メッセージを作成・送信

管理アカウントでは共有先でプロビジョニングされた製品の情報は取得できないため、通知元アカウントのAWSControlTowerExecutionのロールにAssumeRoleしています。(Control Tower環境ではデフォルトで作成されるため)

そのためControl Tower環境であることが前提となっていますが、メンバーアカウントにService CatalogのRead権限を持ったロールがあれば以下を利用したいロール名に置き換えてもOKです。

# Assume Role
role_arn = "arn:aws:iam::%s:role/AWSControlTowerExecution" % target_account_id
role_session_name = "CROSS_ACCOUNT_ACCESS_FROM_CT_MANAGEMENT_ACCOUNT"
logger.info("- RoleArn=%s" % role_arn)
logger.info("- RoleSessionName=%s" % role_session_name)
target = sts_connection.assume_role(
    RoleArn=role_arn,
    RoleSessionName=role_session_name,
)

置き換えた場合の通知結果はこちらになります。プロビジョニングされた製品やバージョンの情報も含めて通知できました。

その他必要な情報等あれば、適宜コードを修正してください。

マルチリージョンで利用したい場合

シングルリージョンで利用する方は飛ばしてもらって大丈夫です。

Service Catalogをマルチリージョンで利用するケースを考えて、CloudFormationをStackSetsで展開できるように作成しています。

StackSetsで展開する場合、以下のように受信用SNSトピックが各リージョンに作成され、フィルター用Lambdaと通知用SNSトピックは東京リージョンに作成されます。

StackSetsでマルチリージョン展開するときの手順ですが、以下の設定を守ってください。

  • リージョン指定で東京リージョンを一番上にする
  • デプロイオプションは「順次」にする

これはLambdaと通知用SNSトピックを東京リージョンのみ作成するようにしているため、順序性を持たせる必要があるためです。もしLambdaと通知用SNSトピックを東京リージョン以外に作成したい場合は、テンプレートのConditions記載のリージョンを変更してください。

Conditions:
  MainTargetRegion: !Equals [!Ref AWS::Region, ap-northeast-1]

おわりに

通知の制約を活かしてプロビジョニングされた製品のエラーを管理者に通知してみました。この仕組みであれば、共有するポートフォリオの通知制約を設定するだけで済みそうです。

ただし、スタックイベントにはService Catalogの情報がないため、必要であればサンプルに載せたように作り込みが必要です。また、規模が大きくなりLambdaの同時実行数が懸念になりそうであれば、受信用トピックとLambdaの間にSQSを入れることも検討ください。

今回は製品の作成や更新のエラーを取得するための実装でしたが、Lambdaの処理を変更することでよく使われる製品のカウントにも使えるかなと思いました。

どうにかService Catalogの製品利用を通知したい方のお役に立てれば幸いです。

参考