CloudTrailとFirehoseを活用したS3オブジェクトログのサーバレスなETL処理を試してみた
Amazon S3では、サーバーアクセスログとして、Amazon S3バケットに対するリクエストを記録することが可能です。
- Amazon S3 サーバーアクセスログサンプル
79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be amzn-s3-demo-bucket1 [06/Feb/2019:00:00:38 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be 3E57427F3EXAMPLE REST.GET.VERSIONING - "GET /amzn-s3-demo-bucket1?versioning HTTP/1.1" 200 - 113 - 7 - "-" "S3Console/0.4" - s9lzHYrFp76ZVxRcpX9+5cjAnEH2ROuNkd2BHfIa6UkFVdtjf5mKR3/eTPFvsiP/XV/VLi31234= SigV4 ECDHE-RSA-AES128-GCM-SHA256 AuthHeader amzn-s3-demo-bucket1.s3.us-west-1.amazonaws.com TLSV1.2 arn:aws:s3:us-west-1:123456789012:accesspoint/example-AP Yes
従来、これらのアクセスログはAmazon Athenaで解析できましたが、ログのスキャン量が増えることによるコストやパフォーマンスの課題がありました。
今回、S3オブジェクトの変更操作を対象とした CloudTrailのオブジェクトログを設定し、EventBridge経由でFirehoseに転送するアーキテクチャ、マネージドサービスのみ利用したサーバレスなETLパイプラインを試す機会がありましたので、紹介します。
CloudTrail
役割: S3バケットのオブジェクトログ設定を行います。
- ログ取得対象とするS3バケットのオブジェクトログ設定を行います。
- ログ対象を限定、S3バケットと 書込みイベントに限定する事で、CloudTrail の証跡費用を節約しています。
S3ChangeNotifierCloudTrail:
Type: AWS::CloudTrail::Trail
Properties:
S3BucketName: !Ref S3ChangeNotifierLogBucket
IsLogging: true
EnableLogFileValidation: false
IncludeGlobalServiceEvents: false
IsMultiRegionTrail: false
EventSelectors:
- ReadWriteType: WriteOnly
IncludeManagementEvents: false
DataResources:
- { Type: AWS::S3::Object, Values: [!Sub 'arn:${AWS::Partition}:s3:::${TargetS3BucketName1}/'] }
EventBridge
役割: CloudTrailが記録したログから必要な情報だけを抽出し、次のFirehoseに渡します
- イベントブリッジルールを利用して、S3の変更ログ(オブジェクトログ)を抽出します。
- InputTransformer: CloudTrailのログ全体(detail要素)から、必要な項目(バケット名、イベント名など)だけを抽出します。これにより、後続の処理データ量を削減します。
- InputTemplate: 改行を含まない1行のJSONに整形し、Firehoseに転送します。
CloudTrailToFirehoseRule:
Type: AWS::Events::Rule
Properties:
Name: !Sub '${AWS::StackName}-CloudTrailToFirehose'
Description: "Send S3 CloudTrail events directly to Firehose for log storage"
EventPattern:
source: ["aws.s3"]
"detail-type": ["AWS API Call via CloudTrail"]
detail:
eventSource: ["s3.amazonaws.com"]
eventName:
# Monitor various S3 operations
- prefix: "PutObject"
- prefix: "DeleteObject"
- prefix: "CopyObject"
- prefix: "CompleteMultipartUpload"
- prefix: "AbortMultipartUpload"
- prefix: "RestoreObject"
requestParameters:
bucketName: [!Ref TargetS3BucketName1] # Filter by target bucket
State: ENABLED
Targets:
- Arn: !GetAtt S3ChangeNotifierFirehose.Arn
Id: "SendToFirehose"
RoleArn: !GetAtt EventBridgeFirehoseRole.Arn
# Transform CloudTrail event to compact format (~70% size reduction)
InputTransformer:
InputPathsMap:
bucketName: "$.detail.requestParameters.bucketName"
bucketNameFallback: "$.detail.resources[0].ARN"
objectKey: "$.detail.requestParameters.key"
eventName: "$.detail.eventName"
eventTime: "$.detail.eventTime"
eventID: "$.detail.eventID"
userArn: "$.detail.userIdentity.arn"
principalId: "$.detail.userIdentity.principalId"
sourceIP: "$.detail.sourceIPAddress"
awsRegion: "$.detail.awsRegion"
requestID: "$.detail.requestID"
userName: "$.detail.userIdentity.userName"
resources: "$.detail.resources"
# Output only essential fields to reduce storage costs
InputTemplate: '{"bucketName": "<bucketName>", "bucketNameFallback": "<bucketNameFallback>", "objectKey": "<objectKey>", "eventName": "<eventName>", "eventTime": "<eventTime>", "eventID": "<eventID>", "userArn": "<userArn>", "principalId": "<principalId>", "sourceIP": "<sourceIP>", "awsRegion": "<awsRegion>", "requestID": "<requestID>", "userName": "<userName>", "resources": <resources>}'
Firehose
役割: EventBridgeから受け取ったデータを、分析しやすい形式とフォルダ構成でS3に保存します
- MetadataExtractionの動的パーティション機能を利用して、EventBridegeから送られたJSON形式のレコードに含まれるS3バケット、操作種別、ユーザー情報を抽出、パーティション項目として利用します。
- AppendDelimiterToRecordを利用して、改行区切りのJSONL形式、汎用性の高い状態でS3に保存を実現します。
S3ChangeNotifierFirehose:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: !Sub '${AWS::StackName}-firehose-stream'
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt S3ChangeNotifierFirehoseBucket.Arn
BufferingHints:
SizeInMBs: 64
IntervalInSeconds: 60
CompressionFormat: GZIP
RoleARN: !GetAtt FirehoseDeliveryRole.Arn
# 動的パーティション設定: bucketName/eventName/principalId + 日付
DynamicPartitioningConfiguration:
Enabled: true
RetryOptions:
DurationInSeconds: 300
Prefix: 'bucket=!{partitionKeyFromQuery:bucket_name}/event=!{partitionKeyFromQuery:event_name}/principal=!{partitionKeyFromQuery:principal_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/'
ErrorOutputPrefix: 'errors/!{firehose:error-output-type}/dt=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}/'
ProcessingConfiguration:
Enabled: true
Processors:
- Type: MetadataExtraction
Parameters:
- ParameterName: MetadataExtractionQuery
ParameterValue: '{bucket_name: (.bucketName // .bucketNameFallback // "unknown"), event_name: (.eventName // "unknown"), principal_id: (.principalId // .userArn // "unknown")}'
- ParameterName: JsonParsingEngine
ParameterValue: JQ-1.6
- Type: AppendDelimiterToRecord
Parameters:
- ParameterName: Delimiter
ParameterValue: '\n'
まとめ
今回はS3のオブジェクトログを例にしましたが、このアーキテクチャは更新頻度が比較的低いログの長期保管と分析に特に有効と考えられます。例えば、監査ログやトレンド分析用のデータを、低コストで効率的に蓄積できる可能性があります。
さらに、ここで構築したパイプラインは、将来的にAmazon Redshiftや S3 Tables(Apache Iceberg) といった、より高度なデータ分析基盤への入り口としても活用できます。ぜひ、機会があればお試しください。
参考
テンプレート全文
AWSTemplateFormatVersion: '2010-09-09'
Description: >
S3 CloudTrail events to Firehose for log storage and processing.
Monitors S3 object operations via CloudTrail, transforms events through EventBridge,
and stores processed data in S3 via Kinesis Data Firehose with dynamic partitioning.
Architecture: CloudTrail -> EventBridge (InputTransformer) -> Firehose (Dynamic Partitioning) -> S3
Parameters:
TargetS3BucketName1:
Type: String
Description: Name of the EXISTING S3 bucket to monitor for write operations (PUT, DELETE, COPY, etc.) via CloudTrail logging.
Default: s3-web-classmethod-m-ap-northeast-1-147179611693
LogRetentionDays:
Type: Number
Default: 550
Description: Number of days to retain logs and processed data
MinValue: 1
MaxValue: 3653
ConstraintDescription: Must be between 1 and 3653 days.
Resources:
# --- S3 Bucket for CloudTrail Logs ---
S3ChangeNotifierLogBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub '${AWS::StackName}-ct-${AWS::AccountId}-${AWS::Region}'
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
VersioningConfiguration:
Status: Enabled
LifecycleConfiguration:
Rules:
- Id: DeleteOldLogs
Status: Enabled
ExpirationInDays: !Ref LogRetentionDays # Configurable retention period
- Id: DeleteOldVersions
Status: Enabled
NoncurrentVersionExpirationInDays: 10 # Delete old versions after 10 days
- Id: CleanupIncompleteMultipartUploads
Status: Enabled
AbortIncompleteMultipartUpload:
DaysAfterInitiation: 1 # Clean up failed multipart uploads after 1 day
Tags:
- Key: Purpose
Value: S3ChangeNotification
- Key: StackName
Value: !Ref AWS::StackName
- Key: Component
Value: CloudTrailLogs
# S3 Bucket Policy for CloudTrail Access (ACL-disabled bucket optimized)
S3ChangeNotifierLogBucketPolicy:
Type: AWS::S3::BucketPolicy
Properties:
Bucket: !Ref S3ChangeNotifierLogBucket
PolicyDocument:
Statement:
# Allow CloudTrail to write log files (no ACL dependency)
- Sid: AWSCloudTrailWrite
Effect: Allow
Principal:
Service: cloudtrail.amazonaws.com
Action:
- s3:PutObject
- s3:GetBucketAcl # Required for CloudTrail service validation
Resource:
- !GetAtt S3ChangeNotifierLogBucket.Arn
- !Sub "${S3ChangeNotifierLogBucket.Arn}/*"
Condition:
StringEquals:
"AWS:SourceArn": !Sub "arn:${AWS::Partition}:cloudtrail:${AWS::Region}:${AWS::AccountId}:trail/${AWS::StackName}-S3ObjectLogTrailSeq"
# CloudTrail for S3 Object-level Events
S3ChangeNotifierCloudTrail:
Type: AWS::CloudTrail::Trail
DependsOn: [ S3ChangeNotifierLogBucketPolicy ]
Properties:
TrailName: !Sub '${AWS::StackName}-S3ObjectLogTrailSeq'
S3BucketName: !Ref S3ChangeNotifierLogBucket
IsLogging: true
EnableLogFileValidation: false # Disabled to reduce storage costs (~50% reduction)
IncludeGlobalServiceEvents: false # Only regional events
IsMultiRegionTrail: false # Single region trail
EventSelectors:
- ReadWriteType: WriteOnly # Only write operations
IncludeManagementEvents: false # Only data events
DataResources:
- { Type: AWS::S3::Object, Values: [!Sub 'arn:${AWS::Partition}:s3:::${TargetS3BucketName1}/'] }
Tags:
- Key: Purpose
Value: S3ChangeNotification
- Key: StackName
Value: !Ref AWS::StackName
- Key: Component
Value: S3ObjectLogging
# --- Firehose Related Resources ---
# S3 Bucket for Firehose Output
S3ChangeNotifierFirehoseBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub '${AWS::StackName}-fh-${AWS::AccountId}-${AWS::Region}'
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
VersioningConfiguration:
Status: Enabled
LifecycleConfiguration:
Rules:
- Id: DeleteOldFirehoseData
Status: Enabled
ExpirationInDays: !Ref LogRetentionDays # Configurable retention period
- Id: DeleteOldVersions
Status: Enabled
NoncurrentVersionExpirationInDays: 10 # Delete old versions after 10 days
- Id: CleanupIncompleteMultipartUploads
Status: Enabled
AbortIncompleteMultipartUpload:
DaysAfterInitiation: 1 # Clean up failed multipart uploads after 1 day
Tags:
- Key: Purpose
Value: S3ChangeNotification
- Key: StackName
Value: !Ref AWS::StackName
- Key: Component
Value: FirehoseStorage
# IAM Role for Firehose Delivery
FirehoseDeliveryRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub '${AWS::StackName}-FirehoseDeliveryRole-${AWS::Region}'
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: FirehoseDeliveryPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
# S3 permissions for Firehose delivery
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !GetAtt S3ChangeNotifierFirehoseBucket.Arn
- !Sub "${S3ChangeNotifierFirehoseBucket.Arn}/*"
# CloudWatch Logs permissions for Firehose error logging
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/${AWS::StackName}-firehose-stream:*"
Tags:
- Key: Purpose
Value: S3ChangeNotification
- Key: StackName
Value: !Ref AWS::StackName
- Key: Component
Value: FirehoseDeliveryRole
# CloudWatch Log Group for Firehose
FirehoseLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/kinesisfirehose/${AWS::StackName}-firehose-stream'
RetentionInDays: 14
Tags:
- Key: Purpose
Value: S3ChangeNotification
- Key: StackName
Value: !Ref AWS::StackName
- Key: Component
Value: FirehoseLogGroup
# Kinesis Data Firehose Delivery Stream
S3ChangeNotifierFirehose:
Type: AWS::KinesisFirehose::DeliveryStream
DependsOn: FirehoseLogGroup
Properties:
DeliveryStreamName: !Sub '${AWS::StackName}-firehose-stream'
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt S3ChangeNotifierFirehoseBucket.Arn
BufferingHints:
SizeInMBs: 64
IntervalInSeconds: 60
CompressionFormat: GZIP
RoleARN: !GetAtt FirehoseDeliveryRole.Arn
# 動的パーティション設定: bucketName/eventName/principalId + 日付
DynamicPartitioningConfiguration:
Enabled: true
RetryOptions:
DurationInSeconds: 300
Prefix: 'bucket=!{partitionKeyFromQuery:bucket_name}/event=!{partitionKeyFromQuery:event_name}/principal=!{partitionKeyFromQuery:principal_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/'
ErrorOutputPrefix: 'errors/!{firehose:error-output-type}/dt=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}/'
ProcessingConfiguration:
Enabled: true
Processors:
- Type: MetadataExtraction
Parameters:
- ParameterName: MetadataExtractionQuery
ParameterValue: '{bucket_name: (.bucketName // .bucketNameFallback // "unknown"), event_name: (.eventName // "unknown"), principal_id: (.principalId // .userArn // "unknown")}'
- ParameterName: JsonParsingEngine
ParameterValue: JQ-1.6
- Type: AppendDelimiterToRecord
Parameters:
- ParameterName: Delimiter
ParameterValue: '\n'
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: !Ref FirehoseLogGroup
LogStreamName: 'S3Delivery'
Tags:
- Key: Purpose
Value: S3ChangeNotification
- Key: StackName
Value: !Ref AWS::StackName
- Key: Component
Value: FirehoseStream
# --- EventBridge Related Resources ---
# IAM Role for EventBridge to Firehose
EventBridgeFirehoseRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub '${AWS::StackName}-EventBridgeFirehoseRole-${AWS::Region}'
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: events.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: EventBridgeFirehosePolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
# Allow EventBridge to send records to Firehose
- Effect: Allow
Action:
- firehose:PutRecord
- firehose:PutRecordBatch
Resource: !GetAtt S3ChangeNotifierFirehose.Arn
Tags:
- Key: Purpose
Value: S3ChangeNotification
- Key: StackName
Value: !Ref AWS::StackName
- Key: Component
Value: EventBridgeFirehoseRole
# EventBridge Rule to Route CloudTrail Events to Firehose
CloudTrailToFirehoseRule:
Type: AWS::Events::Rule
Properties:
Name: !Sub '${AWS::StackName}-CloudTrailToFirehose'
Description: "Send S3 CloudTrail events directly to Firehose for log storage"
EventPattern:
source: ["aws.s3"]
"detail-type": ["AWS API Call via CloudTrail"]
detail:
eventSource: ["s3.amazonaws.com"]
eventName:
# Monitor various S3 operations
- prefix: "PutObject"
- prefix: "DeleteObject"
- prefix: "CopyObject"
- prefix: "CompleteMultipartUpload"
- prefix: "AbortMultipartUpload"
- prefix: "RestoreObject"
requestParameters:
bucketName: [!Ref TargetS3BucketName1] # Filter by target bucket
State: ENABLED
Targets:
- Arn: !GetAtt S3ChangeNotifierFirehose.Arn
Id: "SendToFirehose"
RoleArn: !GetAtt EventBridgeFirehoseRole.Arn
# Transform CloudTrail event to compact format (~70% size reduction)
InputTransformer:
InputPathsMap:
bucketName: "$.detail.requestParameters.bucketName"
bucketNameFallback: "$.detail.resources[0].ARN"
objectKey: "$.detail.requestParameters.key"
eventName: "$.detail.eventName"
eventTime: "$.detail.eventTime"
eventID: "$.detail.eventID"
userArn: "$.detail.userIdentity.arn"
principalId: "$.detail.userIdentity.principalId"
sourceIP: "$.detail.sourceIPAddress"
awsRegion: "$.detail.awsRegion"
requestID: "$.detail.requestID"
userName: "$.detail.userIdentity.userName"
resources: "$.detail.resources"
# Output only essential fields to reduce storage costs
InputTemplate: '{"bucketName": "<bucketName>", "bucketNameFallback": "<bucketNameFallback>", "objectKey": "<objectKey>", "eventName": "<eventName>", "eventTime": "<eventTime>", "eventID": "<eventID>", "userArn": "<userArn>", "principalId": "<principalId>", "sourceIP": "<sourceIP>", "awsRegion": "<awsRegion>", "requestID": "<requestID>", "userName": "<userName>", "resources": <resources>}'
Outputs:
TargetS3BucketName:
Description: Name of the S3 bucket being monitored
Value: !Ref TargetS3BucketName1
Export:
Name: !Sub "${AWS::StackName}-TargetS3BucketName"
CloudTrailLogBucketName:
Description: Name of the S3 bucket storing CloudTrail logs
Value: !Ref S3ChangeNotifierLogBucket
Export:
Name: !Sub "${AWS::StackName}-CloudTrailLogBucketName"
FirehoseBucketName:
Description: Name of the S3 bucket where Firehose delivers processed logs
Value: !Ref S3ChangeNotifierFirehoseBucket
Export:
Name: !Sub "${AWS::StackName}-FirehoseBucketName"
FirehoseDeliveryStreamName:
Description: Name of the Kinesis Data Firehose delivery stream
Value: !Ref S3ChangeNotifierFirehose
Export:
Name: !Sub "${AWS::StackName}-FirehoseDeliveryStreamName"
CloudTrailName:
Description: Name of the CloudTrail
Value: !Ref S3ChangeNotifierCloudTrail
Export:
Name: !Sub "${AWS::StackName}-CloudTrailName"
EventBridgeRuleName:
Description: Name of the EventBridge rule routing CloudTrail events to Firehose
Value: !Ref CloudTrailToFirehoseRule
Export:
Name: !Sub "${AWS::StackName}-EventBridgeRuleName"
デプロイ例
aws cloudformation create-stack \
--stack-name s3-cloudtrail-to-firehose \
--template-body file://s3-cloudtrail-to-firehose.yaml \
--parameters ParameterKey=TargetS3BucketName1,ParameterValue=your-bucket-name \
ParameterKey=LogRetentionDays,ParameterValue=365 \
--capabilities CAPABILITY_NAMED_IAM
Chatbot連携
Lambdaレスではありませんが、Firehose経由で 保存されたS3オブジェクトの更新をトリガーで起動するLambda関数を利用することで、Q Developer (Chatbot) を用いた通知なども可能になります。
import boto3
import json
import gzip
import io
import os
import logging
import urllib.parse
from collections import defaultdict, namedtuple
DEFAULT_VALUE = 'N/A'
MAX_DESCRIPTION_LENGTH = 3800
TRUNCATION_SUFFIX = "\n... (truncated)"
GroupKey = namedtuple("GroupKey", ["bucket_name", "actual_issuer_arn", "event_name"])
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client('s3')
sns_client = boto3.client('sns')
CHATBOT_SNS_TOPIC_ARN = os.environ['CHATBOT_SNS_TOPIC_ARN']
def extract_bucket_from_path(object_key):
"""S3キーパスからバケット名を抽出"""
try:
for part in object_key.split('/'):
if part.startswith('bucket='):
return part.replace('bucket=', '')
return None
except Exception as e:
logger.error(f"Failed to extract bucket from path {object_key}: {e}")
return None
def format_csv_message(group_key, csv_data, earliest_time):
title = f":csv: S3 Activity - {group_key.bucket_name} ({group_key.event_name} by {group_key.actual_issuer_arn}, {earliest_time})"
return {
"version": "1.0", "source": "custom",
"content": {
"textType": "client-markdown", "title": title,
"description": f"```\n{csv_data}\n```"
}
}
def send_to_sns(message, group_key):
try:
sns_client.publish(
TopicArn=CHATBOT_SNS_TOPIC_ARN,
Subject=f"S3 Activity: {group_key.bucket_name} - {group_key.event_name}",
Message=json.dumps(message)
)
return True
except Exception as e:
logger.error(f"Failed to send SNS message: {e}")
return False
def parse_and_group_log(log_bucket, object_key, target_app_bucket):
grouped_events = defaultdict(list)
processed_event_count = 0
try:
response = s3_client.get_object(Bucket=log_bucket, Key=object_key)
with gzip.GzipFile(fileobj=io.BytesIO(response['Body'].read())) as f:
content = f.read().decode('utf-8')
# 標準的なJSONL処理
lines = [line.strip() for line in content.strip().split('\n') if line.strip()]
for line in lines:
try:
event_data = json.loads(line)
bucket_name = event_data.get('bucketName') or event_data.get('bucketNameFallback', DEFAULT_VALUE)
if bucket_name.startswith('arn:aws:s3:::'):
bucket_name = bucket_name.replace('arn:aws:s3:::', '')
if bucket_name != target_app_bucket:
continue
event_name = event_data.get('eventName', DEFAULT_VALUE)
issuer_arn = event_data.get('userArn', event_data.get('principalId', DEFAULT_VALUE))
group_key = GroupKey(bucket_name, issuer_arn, event_name)
# オブジェクトキーの処理
object_key_value = event_data.get('objectKey', '')
object_arns = []
if object_key_value:
# 単一オブジェクトの場合
object_arns.append(f"arn:aws:s3:::{bucket_name}/{object_key_value}")
else:
# objectKeyが空の場合、resourcesから複数オブジェクトを抽出
resources = event_data.get('resources', [])
for resource in resources:
if resource.get('type') == 'AWS::S3::Object':
if resource.get('ARN'):
# 個別のARNがある場合(具体的なオブジェクト)
object_arns.append(resource['ARN'])
elif resource.get('ARNPrefix'):
# ARNPrefixの場合(複数オブジェクトの一括表示)
arn_prefix = resource['ARNPrefix']
if arn_prefix.startswith('arn:aws:s3:::'):
parts = arn_prefix.split('/', 1)
if len(parts) > 1:
display_path = parts[1] + " (Multiple Objects)"
else:
display_path = "(Multiple Objects)"
object_arns.append(f"arn:aws:s3:::{bucket_name}/{display_path}")
# resourcesに何もない場合のフォールバック
if not object_arns:
object_arns.append(f"arn:aws:s3:::{bucket_name}/Unknown Key")
# 各オブジェクトARNに対してイベント詳細を作成
for object_arn in object_arns:
event_detail = {
'object_arn': object_arn,
'event_time': event_data.get('eventTime', DEFAULT_VALUE)
}
grouped_events[group_key].append(event_detail)
processed_event_count += 1
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"Failed to parse log file {object_key}: {e}")
return grouped_events, processed_event_count
def notify_grouped_events(grouped_events):
if not grouped_events:
return 0
sent_count = 0
for group_key, events in grouped_events.items():
# object_arnからobjectKeyを一括抽出
object_keys = [
e.get('object_arn', DEFAULT_VALUE).split('/', 1)[1]
if e.get('object_arn', '').startswith('arn:aws:s3:::') and '/' in e.get('object_arn', '')
else e.get('object_arn', DEFAULT_VALUE)
for e in events
]
csv_block = "\n".join(object_keys)
if len(csv_block) > MAX_DESCRIPTION_LENGTH:
csv_block = csv_block[:MAX_DESCRIPTION_LENGTH - len(TRUNCATION_SUFFIX)] + TRUNCATION_SUFFIX
# 最も早い時刻を計算
event_times = [e.get('event_time') for e in events if e.get('event_time') != DEFAULT_VALUE]
earliest_time = min(event_times) if event_times else DEFAULT_VALUE
chatbot_message = format_csv_message(group_key, csv_block, earliest_time)
if send_to_sns(chatbot_message, group_key):
sent_count += 1
return sent_count
def lambda_handler(event, context):
try:
logger.info("Lambda handler started")
successful_notifications = 0
# EventBridge Ruleで既にフィルタリング済みのため、直接処理開始
detail = event.get('detail', {})
log_bucket_name = detail.get('bucket', {}).get('name')
object_key = detail.get('object', {}).get('key')
logger.info(f"Processing S3 event: bucket={log_bucket_name}, key={object_key}")
# 動的パーティションのパスから監視対象バケット名を取得
target_app_bucket = extract_bucket_from_path(object_key)
if not target_app_bucket:
logger.error(f"Could not extract target bucket from object key: {object_key}")
return {'statusCode': 400, 'body': 'Invalid object key format'}
logger.info(f"Extracted target bucket from path: {target_app_bucket}")
decoded_key = urllib.parse.unquote_plus(object_key)
grouped_events, event_count = parse_and_group_log(log_bucket_name, decoded_key, target_app_bucket)
if grouped_events:
successful_notifications = notify_grouped_events(grouped_events)
logger.info(f"Sent {successful_notifications} notifications for {event_count} events")
logger.info(f"Handler completed successfully: {successful_notifications} notifications")
return {'statusCode': 200, 'body': f'Sent {successful_notifications} notifications.'}
except Exception as e:
logger.error(f"Lambda handler error: {str(e)}", exc_info=True)
return {'statusCode': 500, 'body': f'Error: {str(e)}'}