Amazon SESの送信イベントを送信情報に基づき分割してS3バケット上に保存する

2023.11.06

初めに

以前Amazon SESから送信したメールのログ(イベント)をS3バケットに格納する方法を紹介させていただきました。
現在ではVirtual Delivery Managerでログを確認できますが、より長期もしくは詳細に確認したい場合は引き続き変更セットを割り当てS3等にログを配布する必要があります。

同一の検証ID内ではイベント種別によってログの配布先を分離することができますが同一イベント同士で別々の配布先に渡すことができないためシステム毎に分けてファイル出力したい場合は何らか別の手段を用意する必要があります。

システムごとに利用アドレスが異なる場合検証IDをメールアドレス単位にすることで対応できますが、複数システムで共通のアドレスを使っている場合もありますし利用するアドレスによっては認証が手間であったりもします。

Kinesis Data Firehoseによる動的パーティショニング

Amazon Kinesis Data Firehoseによる動的パーティショニングと呼ばれる機能があり、こちらを用いることで配布先がS3の場合データの実際の値をもとにパーティションの値を決定することができます。

ただこの値を指定できるのはキープレフィックスとなるためバケット自体は別のものとすることができない点はご注意ください。

https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/dynamic-partitioning.html
パーティショニングキーの作成でサポートされているメソッドは次のとおりです。
インライン解析 - このメソッドは、JSON 形式のデータレコードからパーティショニングするためのキーの抽出で、Amazon Kinesis Data Firehose 組み込みサポートメカニズムである jq パーサーを使用します。
AWS Lambda 関数 - このメソッドは、指定された AWS Lambda 関数を使用して、パーティショニングに必要なデータフィールドを抽出して返します 。

動的パーティショニングではFirehose組み込みのjqパーサ、もしくはLambdaによるパラメータ抽出・変換の2種類のいずれかが利用可能です。

jqパーサーでも十分な場合が多いと思いますがより複雑な処理をしたい場合やJSONではない場合はLambda関数を利用して加工する事になりそうです。

今回はデータソースがJSONかつシンプルな抽出ですのでjqパーサーを利用します。

追加料金

動的パーティショニングの利用には通常の配信に加え以下の追加料金が発生します(執筆時点での東京リージョンのもの)。
最新の情報は料金ページの確認をお願いいたします。

項目 金額
配信された GB あたり 0.032USD
配信された 1,000 S3 オブジェクトあたり 0.008USD
JQ 処理、1 時間あたり (オプション) 0.112USD

バッファを900秒(=15分)にしておけば配信オブジェクトは1日辺りでも96*{{最大分割数}}個、JQ処理は今回のシンプルな例であれば70ms弱/件だったので小規模なシステムであればそこまでかからないでしょうか?

ただ発生イベント量による部分は大きいと思いますので事前に予想外の料金とならないかは確認しておきましょう。

差し替え時の注意

既に配信ストリームが存在する場合、動的パーティショニングは既存の配信ストリームに対して有効化を行うことができず作り直す必要があります。

幸いAmazon SES変更セットには複数のイベント先の割り当てが可能ですので一旦動的パーティショニングが有無両立させて不要であれば以前の配信ストリームを削除という形が無難でしょう。

設定

利用するキーの決定

以下は以前別件の記事で利用した送信のログです。

今回は送信システム毎に異なるSMTPユーザ(IAMユーザ)を利用しているものと仮定を置いてそれを示す値であるses:caller-identityの値を利用して分けてみます。

送信元IP毎に分類したいのであればses:source-ipを利用するなど各環境事情に合わせて必要なキーを選択しましょう。

{
  "eventType": "Delivery",
  "mail": {
    "timestamp": "2023-08-14T08:46:58.328Z",
    "source": "postfix@example.com",
    "sourceArn": "arn:aws:ses:ap-northeast-1:xxxxxxxxxxxx:identity/example.com",
    "sendingAccountId": "xxxxxxxxxxxx",
    "messageId": "01060189f33a1918-152cfea8-fd15-49d8-a778-e6445c97986c-000000",
    "destination": [
      "receive@example.com"
    ],
    "headersTruncated": false,
    "headers": [
      {
        "name": "Received",
        "value": "from ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal (ec2-43-207-xxx-xxx.ap-northeast-1.compute.amazonaws.com [43.207.xxx.xxx]) by email-smtp.amazonaws.com with SMTP (SimpleEmailService-d-RJ9XEHDO0) id tSZrKRumQhOCiKN7EFdt for receive@example.com; Mon, 14 Aug 2023 08:46:58 +0000 (UTC)"
      },
      {
        "name": "Received",
        "value": "from mail.example.com (localhost [127.0.0.1]) by ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal (Postfix) with SMTP id 8C3C8CBABF3 for <receive@example.com>; Mon, 14 Aug 2023 08:46:44 +0000 (UTC)"
      },
      {
        "name": "From",
        "value": "postfix@example.com"
      },
      {
        "name": "To",
        "value": "receive@example.com"
      },
      {
        "name": "Subject",
        "value": "from ses"
      },
      {
        "name": "Message-Id",
        "value": "<20230814084650.8C3C8CBABF3@ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal>"
      },
      {
        "name": "Date",
        "value": "Mon, 14 Aug 2023 08:46:44 +0000 (UTC)"
      }
    ],
    "commonHeaders": {
      "from": [
        "postfix@example.com"
      ],
      "date": "Mon, 14 Aug 2023 08:46:44 +0000 (UTC)",
      "to": [
        "receive@example.com"
      ],
      "messageId": "01060189f33a1918-152cfea8-fd15-49d8-a778-e6445c97986c-000000",
      "subject": "from ses"
    },
    "tags": {
      "ses:operation": [
        "SendSmtpEmail"
      ],
      "ses:configuration-set": [
        "examplecom-configuration"
      ],
      "ses:source-ip": [
        "43.207.xxx.xxx"
      ],
      "ses:from-domain": [
        "example.com"
      ],
      "ses:caller-identity": [
        "ses-smtp-user.20230808-xxxxx"
      ],
      "ses:outgoing-ip": [
        "23.251.234.12"
      ]
    }
  },
  "delivery": {
    "timestamp": "2023-08-14T08:47:31.069Z",
    "processingTimeMillis": 32741,
    "recipients": [
      "receive@example.com"
    ],
    "smtpResponse": "250 OK cj0lrkkn6rgabtc1jrhn7e05idpcfeqaplkvnh81",
    "reportingMTA": "e234-12.smtp-out.ap-northeast-1.amazonses.com"
  }
}

CloudFormationテンプレート

最初に記載した以前の記事のテンプレートを拡張する形で作成します。差分(追加)部分はハイライトで表示しています。

AWSTemplateFormatVersion: 2010-09-09
Metadata:
  AWS::CloudFormation::Interface:
    ParameterGroups:
      - Label: 
          default: General
        Parameters:
          - NoColonDomain
          - LogExpirationInDays
      - Label:
          default: Amazon SNS
        Parameters:
          - BaounceFeedbackAddress
          - ComplaintFeedbackAddress
Parameters:
  NoColonDomain:
    Description: The domain name without the "." ex) example.com -> examplecom
    Type: String
  LogExpirationInDays:
    Description: Log expiration (day)
    Type: Number
  BaounceFeedbackAddress:
    Description: E-mail address for bounce feedback. empty if not needed
    Type: String
  ComplaintFeedbackAddress:
    Description: E-mail address for complaint feedback. empty if not needed
    Type: String


Conditions:
  ExistBounceFeedbackAddress: !Not [!Equals ["", !Ref BaounceFeedbackAddress]]
  ExistComplaintFeedbackAddress: !Not [!Equals ["", !Ref ComplaintFeedbackAddress]]
Resources:
  #----------------------
  #--- SES Configuration
  #----------------------
  ConfigurationSet:
    Type: AWS::SES::ConfigurationSet
    Properties:
      Name: !Sub ${NoColonDomain}-configuration
      DeliveryOptions:
        TlsPolicy: REQUIRE
  MailLogDeliverEvent:
    Type: AWS::SES::ConfigurationSetEventDestination
    Properties:
      ConfigurationSetName: !Ref ConfigurationSet
      EventDestination:
        Name: !Sub ${NoColonDomain}-ses-log-deliver
        Enabled: True
        MatchingEventTypes:
          - delivery
          - reject
          - bounce
          - complaint
        KinesisFirehoseDestination: 
          DeliveryStreamARN: !GetAtt [SESLogDeliver, Arn]
          IAMRoleARN: !GetAtt [SESMailDeliverRole, Arn]
  SESMailDeliverRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub ${NoColonDomain}-ses-role
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
            Action: sts:AssumeRole
            Effect: Allow
            Principal:
              Service: ses.amazonaws.com
      Path: /
      Policies:
        - PolicyName: !Sub ${NoColonDomain}-ses-policy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              Effect: Allow
              Action:
                - firehose:PutRecordBatch
              Resource: !GetAtt [SESLogDeliver, Arn]
  #----------------------
  #--- S3
  #---------------------- 
  MailLogBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub ${NoColonDomain}-mail-log
      PublicAccessBlockConfiguration:
        BlockPublicAcls: True
        BlockPublicPolicy: True
        IgnorePublicAcls: True
        RestrictPublicBuckets: True    
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - BucketKeyEnabled: True
            ServerSideEncryptionByDefault: 
              SSEAlgorithm: AES256
      LifecycleConfiguration:
        Rules:
          - Id: !Sub Delete-After-${LogExpirationInDays}Days	
            ExpirationInDays: !Ref LogExpirationInDays
            Status: Enabled
  #----------------------
  #--- Firehose
  #----------------------
  SESLogDeliver:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties: 
      DeliveryStreamName: !Sub ${NoColonDomain}-ses-log-deliver
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration: 
        BucketARN: !GetAtt [MailLogBucket, Arn]
        BufferingHints: 
          IntervalInSeconds: 900
          SizeInMBs: 128
        CloudWatchLoggingOptions:
          Enabled: True
          #NOTE: LogGroup側からRefで撮りたいが${SESLogDeliver}がLogGroup側で欲しく循環参照がかかるのでベタ書きする
          LogGroupName: !Sub /aws/kinesisfirehose/${NoColonDomain}-ses-log-deliver
          LogStreamName: S3Delivery
        Prefix: "!{partitionKeyFromQuery:userName}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/"
        PartitioningConfiguration:
          Enabled: True
          RetryOptions:
            DurationInSeconds: 300
        ProcessingConfiguration:
          Enabled: True
          Processors:
          - Type: MetadataExtraction
            Parameters:
              - ParameterName: MetadataExtractionQuery
                ParameterValue: '{userName: .mail.tags."ses:caller-identity"[0]}'
              - ParameterName: JsonParsingEngine
                ParameterValue: JQ-1.6
        CompressionFormat: GZIP
        ErrorOutputPrefix: "!{firehose:error-output-type}/"            
        RoleARN: !GetAtt [FirehoseRole, Arn]
  FirehoseCWLogGroup:
    Type: AWS::Logs::LogGroup
    Properties: 
      LogGroupName: !Sub /aws/kinesisfirehose/${SESLogDeliver}
      RetentionInDays: !Ref LogExpirationInDays
  FirehoseCWLogStream:
    Type: AWS::Logs::LogStream
    Properties: 
      LogGroupName: !Ref FirehoseCWLogGroup
      LogStreamName: S3Delivery
  FirehoseRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub ${NoColonDomain}-firehose-role
      Path: /
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Action: sts:AssumeRole
            Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
      Policies:
        - PolicyName: !Sub ${NoColonDomain}-firehose-policy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 's3:PutObject'
                Resource: !Sub ${MailLogBucket.Arn}/*

  #----------------------
  #--- SNS
  #----------------------
  BounceFeedbackTopic:
    Condition: ExistBounceFeedbackAddress
    Type: AWS::SNS::Topic
    Properties: 
      TopicName: !Sub ${NoColonDomain}-bounce-feedback-topic
      DisplayName: !Sub ${NoColonDomain}-bounce-feedback-topic
  BounceFeedbackSubscription:
    Condition: ExistBounceFeedbackAddress
    Type: AWS::SNS::Subscription
    Properties:
      TopicArn: !Ref BounceFeedbackTopic
      Protocol: email
      Endpoint: !Ref BaounceFeedbackAddress
  ComplaintFeedbackTopic:
    Condition: ExistComplaintFeedbackAddress
    Type: AWS::SNS::Topic
    Properties: 
      TopicName: !Sub ${NoColonDomain}-complaint-feedback-topic
      DisplayName: !Sub ${NoColonDomain}-complaint-feedback-topic
  ComplaintFeedbackSubscription:
    Condition: ExistComplaintFeedbackAddress
    Type: AWS::SNS::Subscription
    Properties:
      TopicArn: !Ref ComplaintFeedbackTopic
      Protocol: email
      Endpoint: !Ref ComplaintFeedbackAddress

残念ながらCloudFormationの挙動として動的パーティショニングを有効化した際に置換が行われるわけではなくエラーで止まるようなので、すでに以前の記事のテンプレートを適用している場合は別の論理IDで作成する等で意図的に別のリソースとして作成する必要があります。

動的パーティショニング部分の設定はマネジメントコンソールでみるとこのような形となっております。

動作確認

2ユーザ分SMTP用のユーザを準備して送信します。

メール送信自体は通常通り送信するだけですので省略しますが同じバッファ間隔(900秒)に収まるように送信しています。

時間をおいて配信先バケットを確認してみると別々のファイルと指定配布されていることが確認できます。

$ aws s3 ls s3://xxxxx-mail-log/ses-smtp-user.20231106-104205/2023/11/06/
2023-11-06 12:51:54        894 examplecom-ses-dynamic-log-deliver-2-2023-11-06-03-29-19-917bfda5-190a-38f9-b39c-3bae3c65eaae.gz
$ aws s3 ls s3://xxxxx-mail-log/ses-smtp-user.20231106-122830/2023/11/06/
2023-11-06 12:51:54        892 examplecom-ses-dynamic-log-deliver-2-2023-11-06-03-30-22-769eb168-80b7-3ba7-87bc-71b47b35f6a3.gz

メトリクスを確認してもパーティションが2つに別れてることが確認できます。

終わりに

Amazon Kinesis Data Firehoseの動的パーティショニング機能を用いて値に応じてログを別のファイルに出力してみました。Lambda関数で処理を組み込んだりせずとも標準機能でできるのが魅力です。

権限的をどうしたいかや後続でどのような処理を行うかで事前に分けておくか、事後で一括で処理しつつその中で分けていくかは異なるかと思いますので実現したい事によって使い分けていきましょう。