【CloudFormation】SQS + Lambda で通知集約させてみた

2021.11.26

AWSでアラートなどを通知させる際に、似た内容が何件も来てしまうことがありませんか? 似た内容は、集約して通知してくれると嬉しいので、SQS + Lambdaで通知の集約を考えてみました。

構成

考えた構成は以下のようになります。

キュー追加をトリガーとして、Lambdaを実行させます。Lambdaでキューを受け取り、キューの数を通知させます。 アラーム後、重複してLambda起動が行われないように、Lambdaの同時実行数は1とします。

CloudFormationで構築

上記アーキテクチャをCloudFormationで作成してみました。

tempalte.yml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Aggregation of notifications

Parameters:
  Email:
    Type: String
    Description: "Email destination address"
    AllowedPattern: "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$"
    MinLength: 5
  MailSubject:
    Type: String
    Description: "Email Subject"
    Default: "Notifications"
  MailContent:
    Type: String
    Description: "Email Content"
    Default: "{} cases sent from SQS!!"

Resources:
  # Mail送信用の SNS Topic
  MailSnsTopic:
    Type: AWS::SNS::Topic
    Properties:
      TopicName: cfn-notify
      Subscription:
        - Endpoint: !Ref Email
          Protocol: email

  LambdaRole:
    Type: 'AWS::IAM::Role'
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Principal:
              Service:
                - "lambda.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      ManagedPolicyArns:
        - 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
        - 'arn:aws:iam::aws:policy/AmazonSNSFullAccess'
      MaxSessionDuration: 3600
      Path: "/"
  
  LambdaFunction:
    Type: AWS::Serverless::Function
    Properties:
      Runtime: python3.7
      Handler: index.lambda_handler
      Timeout: 900
      ReservedConcurrentExecutions: 1
      Role: !GetAtt LambdaRole.Arn
      Environment:
        Variables:
          SQS_NAME: !GetAtt NotifySQS.QueueName
          TOPIC_ARN: !Ref MailSnsTopic
          SUBJECT: !Ref MailSubject
          CONTENT: !Ref MailContent
      InlineCode: |
        import boto3
        import logging
        import json
        import time
        import os

        logger = logging.getLogger()
        logger.setLevel(logging.INFO)

        sqs_name = os.environ.get('SQS_NAME') 
        sqs = boto3.resource('sqs')
        queue = sqs.get_queue_by_name(QueueName=sqs_name)
        client = boto3.client('sns')
        topicarn = os.environ.get("TOPIC_ARN") 

        subject = os.environ.get('SUBJECT')
        content =  os.environ.get('CONTENT')

        def pub_sns(total):
            global topicarn, subject, content 
            client.publish(
                TopicArn=topicarn,
                Subject=subject,
                Message=content.format(total),
            )

        def wait_sqs(num):
            while True:
                msg_list = queue.receive_messages(MaxNumberOfMessages=10)
                if len(msg_list) == 0 :
                    break
                tmp = 0
                for msg in msg_list:
                    msg.delete()
                    tmp+=1
                logger.info(str(tmp) +"cases GET!")
                num += tmp
                time.sleep(60)
            return num
             
        def lambda_handler(event, context):
            total = wait_sqs(0)
            if total != 0 :
                pub_sns(total)
            return "Aggregate {} SQS cases".format(total)
  LambdaEvent:
    Type: 'AWS::Lambda::Permission'
    Properties:
      Action: 'lambda:InvokeFunction'
      FunctionName: !Ref LambdaFunction
      Principal: 'sns.amazonaws.com'
      SourceAccount: !Ref AWS::AccountId
      SourceArn: !Ref LambdaSNSTopic
  
  LambdaSNSTopic:
    Type: AWS::SNS::Topic
    Properties:
      Subscription:
        - Endpoint:
            Fn::GetAtt:
              - "LambdaFunction"
              - "Arn"
          Protocol: "lambda"
      TopicName: "LambdaSNSTopic"

  # SQS 作成
  NotifySQS:
    Type: AWS::SQS::Queue
  
  # SQS policy
  SQSPolicy: 
    Type: AWS::SQS::QueuePolicy
    Properties: 
      Queues: 
        - !Ref NotifySQS
      PolicyDocument: 
        Statement: 
          - 
            Action: 
              - "SQS:*"
            Effect: "Allow"
            Resource: 
              Fn::GetAtt:
                - "NotifySQS"
                - "Arn"
            Principal:  
              AWS: 
                - "*"  
 

  # SQS Alarm作成
  QueueAlarm: 
    Type: AWS::CloudWatch::Alarm
    Properties: 
      AlarmDescription: "Alarm if queue depth is one over"
      Namespace: "AWS/SQS"
      MetricName: "ApproximateNumberOfMessagesVisible"
      Dimensions: 
        - 
          Name: "QueueName"
          Value: 
            Fn::GetAtt: 
              - "NotifySQS"
              - "QueueName"
      Statistic: "Minimum"
      Period: "60"
      EvaluationPeriods: "1"
      Threshold: "1"
      ComparisonOperator: "GreaterThanOrEqualToThreshold"
      AlarmActions: 
        - 
          Ref: LambdaSNSTopic
      InsufficientDataActions: 
        - 
          Ref: LambdaSNSTopic

通知集約の検証

では、実際にスタックを作成して、通知が集約されるかを検証してみます。

スタックの作成

先ほどのCloudFormationテンプレートのスタックをコンソールから作成します。 作成時に、通知先のメールアドレスと通知内容(件名と本文)が求められます。本文の{}箇所には集約数が埋め込まれます。

スタック作成後、SQSやLmabdaなどが作成されます。 この時、作成したSQSを通知先のターゲットに指定することで、SQSに通知を一旦溜め込み通知を集約させることができます。

また、通知先のメールアドレスにConfirm subscriptionのメールが送信されているので、承認しておきます。

大量通知の準備

検証のため、Security Hubの"HIGH"と"MEDIUM"の検出結果を大量に通知させるようにします。 以前紹介したCloudFormationを利用すると一撃でEventBridgeが作成されます。

作成されたEventBridgeのターゲットを先ほど作成したSQSに変更します。

大量の通知をさせる

InspectorのルールパッケージCIS Operating System Security Configuration Benchmarks-1.0でデフォルトのEC2に対して評価することで大量に"HIGH"と"MEDIUM"を検出できます。

では、実際に評価を開始し、検出終了まで待つと、大量に"HIGH"と"MEDIUM"の検出が行われます。
また、通知先のメールアドレスを確認すると、"HIGH"と"MEDIUM"の数が通知されていることが確認できます。

最後に

通知集約を行うために、キューイングをトリガーとして、Lambda直列実行も行なってみましたが、上手くできませんでした。
色々検討した結果、今の構成に落ち着き、実装できました。