初めに
以前Amazon SESから送信したメールのログ(イベント)をS3バケットに格納する方法を紹介させていただきました。
現在ではVirtual Delivery Managerでログを確認できますが、より長期もしくは詳細に確認したい場合は引き続き変更セットを割り当てS3等にログを配布する必要があります。
同一の検証ID内ではイベント種別によってログの配布先を分離することができますが同一イベント同士で別々の配布先に渡すことができないためシステム毎に分けてファイル出力したい場合は何らか別の手段を用意する必要があります。
システムごとに利用アドレスが異なる場合検証IDをメールアドレス単位にすることで対応できますが、複数システムで共通のアドレスを使っている場合もありますし利用するアドレスによっては認証が手間であったりもします。
Kinesis Data Firehoseによる動的パーティショニング
Amazon Kinesis Data Firehoseによる動的パーティショニングと呼ばれる機能があり、こちらを用いることで配布先がS3の場合データの実際の値をもとにパーティションの値を決定することができます。
ただこの値を指定できるのはキープレフィックスとなるためバケット自体は別のものとすることができない点はご注意ください。
https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/dynamic-partitioning.html
パーティショニングキーの作成でサポートされているメソッドは次のとおりです。
インライン解析 - このメソッドは、JSON 形式のデータレコードからパーティショニングするためのキーの抽出で、Amazon Kinesis Data Firehose 組み込みサポートメカニズムである jq パーサーを使用します。
AWS Lambda 関数 - このメソッドは、指定された AWS Lambda 関数を使用して、パーティショニングに必要なデータフィールドを抽出して返します 。
動的パーティショニングではFirehose組み込みのjqパーサ、もしくはLambdaによるパラメータ抽出・変換の2種類のいずれかが利用可能です。
jqパーサーでも十分な場合が多いと思いますがより複雑な処理をしたい場合やJSONではない場合はLambda関数を利用して加工する事になりそうです。
今回はデータソースがJSONかつシンプルな抽出ですのでjqパーサーを利用します。
追加料金
動的パーティショニングの利用には通常の配信に加え以下の追加料金が発生します(執筆時点での東京リージョンのもの)。
最新の情報は料金ページの確認をお願いいたします。
項目 | 金額 |
---|---|
配信された GB あたり | 0.032USD |
配信された 1,000 S3 オブジェクトあたり | 0.008USD |
JQ 処理、1 時間あたり (オプション) | 0.112USD |
バッファを900秒(=15分)にしておけば配信オブジェクトは1日辺りでも96*{{最大分割数}}
個、JQ処理は今回のシンプルな例であれば70ms弱/件だったので小規模なシステムであればそこまでかからないでしょうか?
ただ発生イベント量による部分は大きいと思いますので事前に予想外の料金とならないかは確認しておきましょう。
差し替え時の注意
既に配信ストリームが存在する場合、動的パーティショニングは既存の配信ストリームに対して有効化を行うことができず作り直す必要があります。
幸いAmazon SES変更セットには複数のイベント先の割り当てが可能ですので一旦動的パーティショニングが有無両立させて不要であれば以前の配信ストリームを削除という形が無難でしょう。
設定
利用するキーの決定
以下は以前別件の記事で利用した送信のログです。
今回は送信システム毎に異なるSMTPユーザ(IAMユーザ)を利用しているものと仮定を置いてそれを示す値であるses:caller-identity
の値を利用して分けてみます。
送信元IP毎に分類したいのであればses:source-ip
を利用するなど各環境事情に合わせて必要なキーを選択しましょう。
{
"eventType": "Delivery",
"mail": {
"timestamp": "2023-08-14T08:46:58.328Z",
"source": "postfix@example.com",
"sourceArn": "arn:aws:ses:ap-northeast-1:xxxxxxxxxxxx:identity/example.com",
"sendingAccountId": "xxxxxxxxxxxx",
"messageId": "01060189f33a1918-152cfea8-fd15-49d8-a778-e6445c97986c-000000",
"destination": [
"receive@example.com"
],
"headersTruncated": false,
"headers": [
{
"name": "Received",
"value": "from ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal (ec2-43-207-xxx-xxx.ap-northeast-1.compute.amazonaws.com [43.207.xxx.xxx]) by email-smtp.amazonaws.com with SMTP (SimpleEmailService-d-RJ9XEHDO0) id tSZrKRumQhOCiKN7EFdt for receive@example.com; Mon, 14 Aug 2023 08:46:58 +0000 (UTC)"
},
{
"name": "Received",
"value": "from mail.example.com (localhost [127.0.0.1]) by ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal (Postfix) with SMTP id 8C3C8CBABF3 for <receive@example.com>; Mon, 14 Aug 2023 08:46:44 +0000 (UTC)"
},
{
"name": "From",
"value": "postfix@example.com"
},
{
"name": "To",
"value": "receive@example.com"
},
{
"name": "Subject",
"value": "from ses"
},
{
"name": "Message-Id",
"value": "<20230814084650.8C3C8CBABF3@ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal>"
},
{
"name": "Date",
"value": "Mon, 14 Aug 2023 08:46:44 +0000 (UTC)"
}
],
"commonHeaders": {
"from": [
"postfix@example.com"
],
"date": "Mon, 14 Aug 2023 08:46:44 +0000 (UTC)",
"to": [
"receive@example.com"
],
"messageId": "01060189f33a1918-152cfea8-fd15-49d8-a778-e6445c97986c-000000",
"subject": "from ses"
},
"tags": {
"ses:operation": [
"SendSmtpEmail"
],
"ses:configuration-set": [
"examplecom-configuration"
],
"ses:source-ip": [
"43.207.xxx.xxx"
],
"ses:from-domain": [
"example.com"
],
"ses:caller-identity": [
"ses-smtp-user.20230808-xxxxx"
],
"ses:outgoing-ip": [
"23.251.234.12"
]
}
},
"delivery": {
"timestamp": "2023-08-14T08:47:31.069Z",
"processingTimeMillis": 32741,
"recipients": [
"receive@example.com"
],
"smtpResponse": "250 OK cj0lrkkn6rgabtc1jrhn7e05idpcfeqaplkvnh81",
"reportingMTA": "e234-12.smtp-out.ap-northeast-1.amazonses.com"
}
}
CloudFormationテンプレート
最初に記載した以前の記事のテンプレートを拡張する形で作成します。差分(追加)部分はハイライトで表示しています。
AWSTemplateFormatVersion: 2010-09-09
Metadata:
AWS::CloudFormation::Interface:
ParameterGroups:
- Label:
default: General
Parameters:
- NoColonDomain
- LogExpirationInDays
- Label:
default: Amazon SNS
Parameters:
- BaounceFeedbackAddress
- ComplaintFeedbackAddress
Parameters:
NoColonDomain:
Description: The domain name without the "." ex) example.com -> examplecom
Type: String
LogExpirationInDays:
Description: Log expiration (day)
Type: Number
BaounceFeedbackAddress:
Description: E-mail address for bounce feedback. empty if not needed
Type: String
ComplaintFeedbackAddress:
Description: E-mail address for complaint feedback. empty if not needed
Type: String
Conditions:
ExistBounceFeedbackAddress: !Not [!Equals ["", !Ref BaounceFeedbackAddress]]
ExistComplaintFeedbackAddress: !Not [!Equals ["", !Ref ComplaintFeedbackAddress]]
Resources:
#----------------------
#--- SES Configuration
#----------------------
ConfigurationSet:
Type: AWS::SES::ConfigurationSet
Properties:
Name: !Sub ${NoColonDomain}-configuration
DeliveryOptions:
TlsPolicy: REQUIRE
MailLogDeliverEvent:
Type: AWS::SES::ConfigurationSetEventDestination
Properties:
ConfigurationSetName: !Ref ConfigurationSet
EventDestination:
Name: !Sub ${NoColonDomain}-ses-log-deliver
Enabled: True
MatchingEventTypes:
- delivery
- reject
- bounce
- complaint
KinesisFirehoseDestination:
DeliveryStreamARN: !GetAtt [SESLogDeliver, Arn]
IAMRoleARN: !GetAtt [SESMailDeliverRole, Arn]
SESMailDeliverRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub ${NoColonDomain}-ses-role
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
Action: sts:AssumeRole
Effect: Allow
Principal:
Service: ses.amazonaws.com
Path: /
Policies:
- PolicyName: !Sub ${NoColonDomain}-ses-policy
PolicyDocument:
Version: 2012-10-17
Statement:
Effect: Allow
Action:
- firehose:PutRecordBatch
Resource: !GetAtt [SESLogDeliver, Arn]
#----------------------
#--- S3
#----------------------
MailLogBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub ${NoColonDomain}-mail-log
PublicAccessBlockConfiguration:
BlockPublicAcls: True
BlockPublicPolicy: True
IgnorePublicAcls: True
RestrictPublicBuckets: True
BucketEncryption:
ServerSideEncryptionConfiguration:
- BucketKeyEnabled: True
ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
LifecycleConfiguration:
Rules:
- Id: !Sub Delete-After-${LogExpirationInDays}Days
ExpirationInDays: !Ref LogExpirationInDays
Status: Enabled
#----------------------
#--- Firehose
#----------------------
SESLogDeliver:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: !Sub ${NoColonDomain}-ses-log-deliver
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt [MailLogBucket, Arn]
BufferingHints:
IntervalInSeconds: 900
SizeInMBs: 128
CloudWatchLoggingOptions:
Enabled: True
#NOTE: LogGroup側からRefで撮りたいが${SESLogDeliver}がLogGroup側で欲しく循環参照がかかるのでベタ書きする
LogGroupName: !Sub /aws/kinesisfirehose/${NoColonDomain}-ses-log-deliver
LogStreamName: S3Delivery
Prefix: "!{partitionKeyFromQuery:userName}/!{timestamp:yyyy}/!{timestamp:MM}/!{timestamp:dd}/"
PartitioningConfiguration:
Enabled: True
RetryOptions:
DurationInSeconds: 300
ProcessingConfiguration:
Enabled: True
Processors:
- Type: MetadataExtraction
Parameters:
- ParameterName: MetadataExtractionQuery
ParameterValue: '{userName: .mail.tags."ses:caller-identity"[0]}'
- ParameterName: JsonParsingEngine
ParameterValue: JQ-1.6
CompressionFormat: GZIP
ErrorOutputPrefix: "!{firehose:error-output-type}/"
RoleARN: !GetAtt [FirehoseRole, Arn]
FirehoseCWLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub /aws/kinesisfirehose/${SESLogDeliver}
RetentionInDays: !Ref LogExpirationInDays
FirehoseCWLogStream:
Type: AWS::Logs::LogStream
Properties:
LogGroupName: !Ref FirehoseCWLogGroup
LogStreamName: S3Delivery
FirehoseRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub ${NoColonDomain}-firehose-role
Path: /
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Action: sts:AssumeRole
Effect: Allow
Principal:
Service: firehose.amazonaws.com
Policies:
- PolicyName: !Sub ${NoColonDomain}-firehose-policy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 's3:PutObject'
Resource: !Sub ${MailLogBucket.Arn}/*
#----------------------
#--- SNS
#----------------------
BounceFeedbackTopic:
Condition: ExistBounceFeedbackAddress
Type: AWS::SNS::Topic
Properties:
TopicName: !Sub ${NoColonDomain}-bounce-feedback-topic
DisplayName: !Sub ${NoColonDomain}-bounce-feedback-topic
BounceFeedbackSubscription:
Condition: ExistBounceFeedbackAddress
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref BounceFeedbackTopic
Protocol: email
Endpoint: !Ref BaounceFeedbackAddress
ComplaintFeedbackTopic:
Condition: ExistComplaintFeedbackAddress
Type: AWS::SNS::Topic
Properties:
TopicName: !Sub ${NoColonDomain}-complaint-feedback-topic
DisplayName: !Sub ${NoColonDomain}-complaint-feedback-topic
ComplaintFeedbackSubscription:
Condition: ExistComplaintFeedbackAddress
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref ComplaintFeedbackTopic
Protocol: email
Endpoint: !Ref ComplaintFeedbackAddress
残念ながらCloudFormationの挙動として動的パーティショニングを有効化した際に置換が行われるわけではなくエラーで止まるようなので、すでに以前の記事のテンプレートを適用している場合は別の論理IDで作成する等で意図的に別のリソースとして作成する必要があります。
動的パーティショニング部分の設定はマネジメントコンソールでみるとこのような形となっております。
動作確認
2ユーザ分SMTP用のユーザを準備して送信します。
メール送信自体は通常通り送信するだけですので省略しますが同じバッファ間隔(900秒)に収まるように送信しています。
時間をおいて配信先バケットを確認してみると別々のファイルと指定配布されていることが確認できます。
$ aws s3 ls s3://xxxxx-mail-log/ses-smtp-user.20231106-104205/2023/11/06/
2023-11-06 12:51:54 894 examplecom-ses-dynamic-log-deliver-2-2023-11-06-03-29-19-917bfda5-190a-38f9-b39c-3bae3c65eaae.gz
$ aws s3 ls s3://xxxxx-mail-log/ses-smtp-user.20231106-122830/2023/11/06/
2023-11-06 12:51:54 892 examplecom-ses-dynamic-log-deliver-2-2023-11-06-03-30-22-769eb168-80b7-3ba7-87bc-71b47b35f6a3.gz
メトリクスを確認してもパーティションが2つに別れてることが確認できます。
終わりに
Amazon Kinesis Data Firehoseの動的パーティショニング機能を用いて値に応じてログを別のファイルに出力してみました。Lambda関数で処理を組み込んだりせずとも標準機能でできるのが魅力です。
権限的をどうしたいかや後続でどのような処理を行うかで事前に分けておくか、事後で一括で処理しつつその中で分けていくかは異なるかと思いますので実現したい事によって使い分けていきましょう。