CloudTrailとFirehoseを活用したS3オブジェクトログのサーバレスなETL処理を試してみた

CloudTrailとFirehoseを活用したS3オブジェクトログのサーバレスなETL処理を試してみた

CloudTrailのデータイベントとFirehoseの動的パーティションを活用し、S3オブジェクトの変更ログをサーバレスかつコスト効率良く収集するETLパイプラインを試してみました
2025.07.24

Amazon S3では、サーバーアクセスログとして、Amazon S3バケットに対するリクエストを記録することが可能です。

  • Amazon S3 サーバーアクセスログサンプル
79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be amzn-s3-demo-bucket1 [06/Feb/2019:00:00:38 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be 3E57427F3EXAMPLE REST.GET.VERSIONING - "GET /amzn-s3-demo-bucket1?versioning HTTP/1.1" 200 - 113 - 7 - "-" "S3Console/0.4" - s9lzHYrFp76ZVxRcpX9+5cjAnEH2ROuNkd2BHfIa6UkFVdtjf5mKR3/eTPFvsiP/XV/VLi31234= SigV4 ECDHE-RSA-AES128-GCM-SHA256 AuthHeader amzn-s3-demo-bucket1.s3.us-west-1.amazonaws.com TLSV1.2 arn:aws:s3:us-west-1:123456789012:accesspoint/example-AP Yes

https://docs.aws.amazon.com/ja_jp/AmazonS3/latest/userguide/LogFormat.html

従来、これらのアクセスログはAmazon Athenaで解析できましたが、ログのスキャン量が増えることによるコストやパフォーマンスの課題がありました。

https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-s3-access-logs-to-identify-requests.html#querying-s3-access-logs-for-requests

今回、S3オブジェクトの変更操作を対象とした CloudTrailのオブジェクトログを設定し、EventBridge経由でFirehoseに転送するアーキテクチャ、マネージドサービスのみ利用したサーバレスなETLパイプラインを試す機会がありましたので、紹介します。

アーキテクチャ図
by whisk

CloudTrail

役割: S3バケットのオブジェクトログ設定を行います。

  • ログ取得対象とするS3バケットのオブジェクトログ設定を行います。
  • ログ対象を限定、S3バケットと 書込みイベントに限定する事で、CloudTrail の証跡費用を節約しています。
  S3ChangeNotifierCloudTrail:
    Type: AWS::CloudTrail::Trail
    Properties:
      S3BucketName: !Ref S3ChangeNotifierLogBucket
      IsLogging: true
      EnableLogFileValidation: false
      IncludeGlobalServiceEvents: false
      IsMultiRegionTrail: false   
      EventSelectors:
        - ReadWriteType: WriteOnly   
          IncludeManagementEvents: false
          DataResources:
            - { Type: AWS::S3::Object, Values: [!Sub 'arn:${AWS::Partition}:s3:::${TargetS3BucketName1}/'] }

EventBridge

役割: CloudTrailが記録したログから必要な情報だけを抽出し、次のFirehoseに渡します

  • イベントブリッジルールを利用して、S3の変更ログ(オブジェクトログ)を抽出します。
  • InputTransformer: CloudTrailのログ全体(detail要素)から、必要な項目(バケット名、イベント名など)だけを抽出します。これにより、後続の処理データ量を削減します。
  • InputTemplate: 改行を含まない1行のJSONに整形し、Firehoseに転送します。
  CloudTrailToFirehoseRule:
    Type: AWS::Events::Rule
    Properties:
      Name: !Sub '${AWS::StackName}-CloudTrailToFirehose'
      Description: "Send S3 CloudTrail events directly to Firehose for log storage"
      EventPattern:
        source: ["aws.s3"]
        "detail-type": ["AWS API Call via CloudTrail"]
        detail:
          eventSource: ["s3.amazonaws.com"]
          eventName:
            # Monitor various S3 operations
            - prefix: "PutObject"
            - prefix: "DeleteObject"
            - prefix: "CopyObject"
            - prefix: "CompleteMultipartUpload"
            - prefix: "AbortMultipartUpload"
            - prefix: "RestoreObject"
          requestParameters:
            bucketName: [!Ref TargetS3BucketName1]  # Filter by target bucket
      State: ENABLED
      Targets:
        - Arn: !GetAtt S3ChangeNotifierFirehose.Arn
          Id: "SendToFirehose"
          RoleArn: !GetAtt EventBridgeFirehoseRole.Arn
          # Transform CloudTrail event to compact format (~70% size reduction)
          InputTransformer:
            InputPathsMap:
              bucketName: "$.detail.requestParameters.bucketName"
              bucketNameFallback: "$.detail.resources[0].ARN"
              objectKey: "$.detail.requestParameters.key"
              eventName: "$.detail.eventName"
              eventTime: "$.detail.eventTime"
              eventID: "$.detail.eventID"
              userArn: "$.detail.userIdentity.arn"
              principalId: "$.detail.userIdentity.principalId"
              sourceIP: "$.detail.sourceIPAddress"
              awsRegion: "$.detail.awsRegion"
              requestID: "$.detail.requestID"
              userName: "$.detail.userIdentity.userName"
              resources: "$.detail.resources"
            # Output only essential fields to reduce storage costs
            InputTemplate: '{"bucketName": "<bucketName>", "bucketNameFallback": "<bucketNameFallback>", "objectKey": "<objectKey>", "eventName": "<eventName>", "eventTime": "<eventTime>", "eventID": "<eventID>", "userArn": "<userArn>", "principalId": "<principalId>", "sourceIP": "<sourceIP>", "awsRegion": "<awsRegion>", "requestID": "<requestID>", "userName": "<userName>", "resources": <resources>}'

Firehose

役割: EventBridgeから受け取ったデータを、分析しやすい形式とフォルダ構成でS3に保存します

  • MetadataExtractionの動的パーティション機能を利用して、EventBridegeから送られたJSON形式のレコードに含まれるS3バケット、操作種別、ユーザー情報を抽出、パーティション項目として利用します。
  • AppendDelimiterToRecordを利用して、改行区切りのJSONL形式、汎用性の高い状態でS3に保存を実現します。
  S3ChangeNotifierFirehose:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: !Sub '${AWS::StackName}-firehose-stream'
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt S3ChangeNotifierFirehoseBucket.Arn
        BufferingHints:
          SizeInMBs: 64
          IntervalInSeconds: 60
        CompressionFormat: GZIP
        RoleARN: !GetAtt FirehoseDeliveryRole.Arn
        # 動的パーティション設定: bucketName/eventName/principalId + 日付
        DynamicPartitioningConfiguration:
          Enabled: true
          RetryOptions:
            DurationInSeconds: 300
        Prefix: 'bucket=!{partitionKeyFromQuery:bucket_name}/event=!{partitionKeyFromQuery:event_name}/principal=!{partitionKeyFromQuery:principal_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/'
        ErrorOutputPrefix: 'errors/!{firehose:error-output-type}/dt=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}/'
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: '{bucket_name: (.bucketName // .bucketNameFallback // "unknown"), event_name: (.eventName // "unknown"), principal_id: (.principalId // .userArn // "unknown")}'
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6
            - Type: AppendDelimiterToRecord
              Parameters:
                - ParameterName: Delimiter
                  ParameterValue: '\n'

まとめ

今回はS3のオブジェクトログを例にしましたが、このアーキテクチャは更新頻度が比較的低いログの長期保管と分析に特に有効と考えられます。例えば、監査ログやトレンド分析用のデータを、低コストで効率的に蓄積できる可能性があります。

さらに、ここで構築したパイプラインは、将来的にAmazon Redshiftや S3 Tables(Apache Iceberg) といった、より高度なデータ分析基盤への入り口としても活用できます。ぜひ、機会があればお試しください。

https://dev.classmethod.jp/articles/20241004-amazon-kinesis-firehose-for-iceberg/

参考

テンプレート全文

AWSTemplateFormatVersion: '2010-09-09'
Description: >
  S3 CloudTrail events to Firehose for log storage and processing.
  Monitors S3 object operations via CloudTrail, transforms events through EventBridge,
  and stores processed data in S3 via Kinesis Data Firehose with dynamic partitioning.
  Architecture: CloudTrail -> EventBridge (InputTransformer) -> Firehose (Dynamic Partitioning) -> S3

Parameters:
  TargetS3BucketName1:
    Type: String
    Description: Name of the EXISTING S3 bucket to monitor for write operations (PUT, DELETE, COPY, etc.) via CloudTrail logging.
    Default: s3-web-classmethod-m-ap-northeast-1-147179611693

  LogRetentionDays:
    Type: Number
    Default: 550
    Description: Number of days to retain logs and processed data
    MinValue: 1
    MaxValue: 3653
    ConstraintDescription: Must be between 1 and 3653 days.

Resources:
  # --- S3 Bucket for CloudTrail Logs ---
  S3ChangeNotifierLogBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub '${AWS::StackName}-ct-${AWS::AccountId}-${AWS::Region}'
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      VersioningConfiguration:
        Status: Enabled
      LifecycleConfiguration:
        Rules:
          - Id: DeleteOldLogs
            Status: Enabled
            ExpirationInDays: !Ref LogRetentionDays  # Configurable retention period
          - Id: DeleteOldVersions
            Status: Enabled
            NoncurrentVersionExpirationInDays: 10    # Delete old versions after 10 days
          - Id: CleanupIncompleteMultipartUploads
            Status: Enabled
            AbortIncompleteMultipartUpload:
              DaysAfterInitiation: 1                 # Clean up failed multipart uploads after 1 day
      Tags:
        - Key: Purpose
          Value: S3ChangeNotification
        - Key: StackName
          Value: !Ref AWS::StackName
        - Key: Component
          Value: CloudTrailLogs

  # S3 Bucket Policy for CloudTrail Access (ACL-disabled bucket optimized)
  S3ChangeNotifierLogBucketPolicy:
    Type: AWS::S3::BucketPolicy
    Properties:
      Bucket: !Ref S3ChangeNotifierLogBucket
      PolicyDocument:
        Statement:
          # Allow CloudTrail to write log files (no ACL dependency)
          - Sid: AWSCloudTrailWrite
            Effect: Allow
            Principal:
              Service: cloudtrail.amazonaws.com
            Action: 
              - s3:PutObject
              - s3:GetBucketAcl  # Required for CloudTrail service validation
            Resource: 
              - !GetAtt S3ChangeNotifierLogBucket.Arn
              - !Sub "${S3ChangeNotifierLogBucket.Arn}/*"
            Condition:
              StringEquals:
                "AWS:SourceArn": !Sub "arn:${AWS::Partition}:cloudtrail:${AWS::Region}:${AWS::AccountId}:trail/${AWS::StackName}-S3ObjectLogTrailSeq"

  # CloudTrail for S3 Object-level Events
  S3ChangeNotifierCloudTrail:
    Type: AWS::CloudTrail::Trail
    DependsOn: [ S3ChangeNotifierLogBucketPolicy ]
    Properties:
      TrailName: !Sub '${AWS::StackName}-S3ObjectLogTrailSeq'
      S3BucketName: !Ref S3ChangeNotifierLogBucket
      IsLogging: true
      EnableLogFileValidation: false     # Disabled to reduce storage costs (~50% reduction)
      IncludeGlobalServiceEvents: false  # Only regional events
      IsMultiRegionTrail: false          # Single region trail
      EventSelectors:
        - ReadWriteType: WriteOnly       # Only write operations
          IncludeManagementEvents: false # Only data events
          DataResources:
            - { Type: AWS::S3::Object, Values: [!Sub 'arn:${AWS::Partition}:s3:::${TargetS3BucketName1}/'] }
      Tags:
        - Key: Purpose
          Value: S3ChangeNotification
        - Key: StackName
          Value: !Ref AWS::StackName
        - Key: Component
          Value: S3ObjectLogging

  # --- Firehose Related Resources ---
  # S3 Bucket for Firehose Output
  S3ChangeNotifierFirehoseBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub '${AWS::StackName}-fh-${AWS::AccountId}-${AWS::Region}'
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      VersioningConfiguration:
        Status: Enabled
      LifecycleConfiguration:
        Rules:
          - Id: DeleteOldFirehoseData
            Status: Enabled
            ExpirationInDays: !Ref LogRetentionDays  # Configurable retention period
          - Id: DeleteOldVersions
            Status: Enabled
            NoncurrentVersionExpirationInDays: 10    # Delete old versions after 10 days
          - Id: CleanupIncompleteMultipartUploads
            Status: Enabled
            AbortIncompleteMultipartUpload:
              DaysAfterInitiation: 1                 # Clean up failed multipart uploads after 1 day
      Tags:
        - Key: Purpose
          Value: S3ChangeNotification
        - Key: StackName
          Value: !Ref AWS::StackName
        - Key: Component
          Value: FirehoseStorage

  # IAM Role for Firehose Delivery
  FirehoseDeliveryRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub '${AWS::StackName}-FirehoseDeliveryRole-${AWS::Region}'
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: FirehoseDeliveryPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              # S3 permissions for Firehose delivery
              - Effect: Allow
                Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource:
                  - !GetAtt S3ChangeNotifierFirehoseBucket.Arn
                  - !Sub "${S3ChangeNotifierFirehoseBucket.Arn}/*"
              # CloudWatch Logs permissions for Firehose error logging
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/${AWS::StackName}-firehose-stream:*"
      Tags:
        - Key: Purpose
          Value: S3ChangeNotification
        - Key: StackName
          Value: !Ref AWS::StackName
        - Key: Component
          Value: FirehoseDeliveryRole

  # CloudWatch Log Group for Firehose
  FirehoseLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/kinesisfirehose/${AWS::StackName}-firehose-stream'
      RetentionInDays: 14
      Tags:
        - Key: Purpose
          Value: S3ChangeNotification
        - Key: StackName
          Value: !Ref AWS::StackName
        - Key: Component
          Value: FirehoseLogGroup

  # Kinesis Data Firehose Delivery Stream
  S3ChangeNotifierFirehose:
    Type: AWS::KinesisFirehose::DeliveryStream
    DependsOn: FirehoseLogGroup
    Properties:
      DeliveryStreamName: !Sub '${AWS::StackName}-firehose-stream'
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt S3ChangeNotifierFirehoseBucket.Arn
        BufferingHints:
          SizeInMBs: 64
          IntervalInSeconds: 60
        CompressionFormat: GZIP
        RoleARN: !GetAtt FirehoseDeliveryRole.Arn
        # 動的パーティション設定: bucketName/eventName/principalId + 日付
        DynamicPartitioningConfiguration:
          Enabled: true
          RetryOptions:
            DurationInSeconds: 300
        Prefix: 'bucket=!{partitionKeyFromQuery:bucket_name}/event=!{partitionKeyFromQuery:event_name}/principal=!{partitionKeyFromQuery:principal_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/'
        ErrorOutputPrefix: 'errors/!{firehose:error-output-type}/dt=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}/'
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: '{bucket_name: (.bucketName // .bucketNameFallback // "unknown"), event_name: (.eventName // "unknown"), principal_id: (.principalId // .userArn // "unknown")}'
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6
            - Type: AppendDelimiterToRecord
              Parameters:
                - ParameterName: Delimiter
                  ParameterValue: '\n'
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Ref FirehoseLogGroup
          LogStreamName: 'S3Delivery'
      Tags:
        - Key: Purpose
          Value: S3ChangeNotification
        - Key: StackName
          Value: !Ref AWS::StackName
        - Key: Component
          Value: FirehoseStream

  # --- EventBridge Related Resources ---
  # IAM Role for EventBridge to Firehose
  EventBridgeFirehoseRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: !Sub '${AWS::StackName}-EventBridgeFirehoseRole-${AWS::Region}'
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: events.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: EventBridgeFirehosePolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              # Allow EventBridge to send records to Firehose
              - Effect: Allow
                Action:
                  - firehose:PutRecord
                  - firehose:PutRecordBatch
                Resource: !GetAtt S3ChangeNotifierFirehose.Arn
      Tags:
        - Key: Purpose
          Value: S3ChangeNotification
        - Key: StackName
          Value: !Ref AWS::StackName
        - Key: Component
          Value: EventBridgeFirehoseRole

  # EventBridge Rule to Route CloudTrail Events to Firehose
  CloudTrailToFirehoseRule:
    Type: AWS::Events::Rule
    Properties:
      Name: !Sub '${AWS::StackName}-CloudTrailToFirehose'
      Description: "Send S3 CloudTrail events directly to Firehose for log storage"
      EventPattern:
        source: ["aws.s3"]
        "detail-type": ["AWS API Call via CloudTrail"]
        detail:
          eventSource: ["s3.amazonaws.com"]
          eventName:
            # Monitor various S3 operations
            - prefix: "PutObject"
            - prefix: "DeleteObject"
            - prefix: "CopyObject"
            - prefix: "CompleteMultipartUpload"
            - prefix: "AbortMultipartUpload"
            - prefix: "RestoreObject"
          requestParameters:
            bucketName: [!Ref TargetS3BucketName1]  # Filter by target bucket
      State: ENABLED
      Targets:
        - Arn: !GetAtt S3ChangeNotifierFirehose.Arn
          Id: "SendToFirehose"
          RoleArn: !GetAtt EventBridgeFirehoseRole.Arn
          # Transform CloudTrail event to compact format (~70% size reduction)
          InputTransformer:
            InputPathsMap:
              bucketName: "$.detail.requestParameters.bucketName"
              bucketNameFallback: "$.detail.resources[0].ARN"
              objectKey: "$.detail.requestParameters.key"
              eventName: "$.detail.eventName"
              eventTime: "$.detail.eventTime"
              eventID: "$.detail.eventID"
              userArn: "$.detail.userIdentity.arn"
              principalId: "$.detail.userIdentity.principalId"
              sourceIP: "$.detail.sourceIPAddress"
              awsRegion: "$.detail.awsRegion"
              requestID: "$.detail.requestID"
              userName: "$.detail.userIdentity.userName"
              resources: "$.detail.resources"
            # Output only essential fields to reduce storage costs
            InputTemplate: '{"bucketName": "<bucketName>", "bucketNameFallback": "<bucketNameFallback>", "objectKey": "<objectKey>", "eventName": "<eventName>", "eventTime": "<eventTime>", "eventID": "<eventID>", "userArn": "<userArn>", "principalId": "<principalId>", "sourceIP": "<sourceIP>", "awsRegion": "<awsRegion>", "requestID": "<requestID>", "userName": "<userName>", "resources": <resources>}'

Outputs:
  TargetS3BucketName:
    Description: Name of the S3 bucket being monitored
    Value: !Ref TargetS3BucketName1
    Export:
      Name: !Sub "${AWS::StackName}-TargetS3BucketName"

  CloudTrailLogBucketName:
    Description: Name of the S3 bucket storing CloudTrail logs
    Value: !Ref S3ChangeNotifierLogBucket
    Export:
      Name: !Sub "${AWS::StackName}-CloudTrailLogBucketName"

  FirehoseBucketName:
    Description: Name of the S3 bucket where Firehose delivers processed logs
    Value: !Ref S3ChangeNotifierFirehoseBucket
    Export:
      Name: !Sub "${AWS::StackName}-FirehoseBucketName"

  FirehoseDeliveryStreamName:
    Description: Name of the Kinesis Data Firehose delivery stream
    Value: !Ref S3ChangeNotifierFirehose
    Export:
      Name: !Sub "${AWS::StackName}-FirehoseDeliveryStreamName"

  CloudTrailName:
    Description: Name of the CloudTrail
    Value: !Ref S3ChangeNotifierCloudTrail
    Export:
      Name: !Sub "${AWS::StackName}-CloudTrailName"

  EventBridgeRuleName:
    Description: Name of the EventBridge rule routing CloudTrail events to Firehose
    Value: !Ref CloudTrailToFirehoseRule
    Export:
      Name: !Sub "${AWS::StackName}-EventBridgeRuleName"

デプロイ例

aws cloudformation create-stack \
  --stack-name s3-cloudtrail-to-firehose \
  --template-body file://s3-cloudtrail-to-firehose.yaml \
  --parameters ParameterKey=TargetS3BucketName1,ParameterValue=your-bucket-name \
               ParameterKey=LogRetentionDays,ParameterValue=365 \
  --capabilities CAPABILITY_NAMED_IAM

Chatbot連携

Lambdaレスではありませんが、Firehose経由で 保存されたS3オブジェクトの更新をトリガーで起動するLambda関数を利用することで、Q Developer (Chatbot) を用いた通知なども可能になります。

import boto3
import json
import gzip
import io
import os
import logging
import urllib.parse
from collections import defaultdict, namedtuple

DEFAULT_VALUE = 'N/A'
MAX_DESCRIPTION_LENGTH = 3800
TRUNCATION_SUFFIX = "\n... (truncated)"
GroupKey = namedtuple("GroupKey", ["bucket_name", "actual_issuer_arn", "event_name"])

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3_client = boto3.client('s3')
sns_client = boto3.client('sns')

CHATBOT_SNS_TOPIC_ARN = os.environ['CHATBOT_SNS_TOPIC_ARN']

def extract_bucket_from_path(object_key):
    """S3キーパスからバケット名を抽出"""
    try:
        for part in object_key.split('/'):
            if part.startswith('bucket='):
                return part.replace('bucket=', '')
        return None
    except Exception as e:
        logger.error(f"Failed to extract bucket from path {object_key}: {e}")
        return None

def format_csv_message(group_key, csv_data, earliest_time):
    title = f":csv: S3 Activity - {group_key.bucket_name} ({group_key.event_name} by {group_key.actual_issuer_arn}, {earliest_time})"
    return {
        "version": "1.0", "source": "custom",
        "content": {
            "textType": "client-markdown", "title": title,
            "description": f"```\n{csv_data}\n```"
        }
    }

def send_to_sns(message, group_key):
    try:
        sns_client.publish(
            TopicArn=CHATBOT_SNS_TOPIC_ARN,
            Subject=f"S3 Activity: {group_key.bucket_name} - {group_key.event_name}",
            Message=json.dumps(message)
        )
        return True
    except Exception as e:
        logger.error(f"Failed to send SNS message: {e}")
        return False

def parse_and_group_log(log_bucket, object_key, target_app_bucket):
    grouped_events = defaultdict(list)
    processed_event_count = 0

    try:
        response = s3_client.get_object(Bucket=log_bucket, Key=object_key)
        with gzip.GzipFile(fileobj=io.BytesIO(response['Body'].read())) as f:
            content = f.read().decode('utf-8')

        # 標準的なJSONL処理
        lines = [line.strip() for line in content.strip().split('\n') if line.strip()]

        for line in lines:
            try:
                event_data = json.loads(line)

                bucket_name = event_data.get('bucketName') or event_data.get('bucketNameFallback', DEFAULT_VALUE)
                if bucket_name.startswith('arn:aws:s3:::'):
                    bucket_name = bucket_name.replace('arn:aws:s3:::', '')

                if bucket_name != target_app_bucket:
                    continue

                event_name = event_data.get('eventName', DEFAULT_VALUE)
                issuer_arn = event_data.get('userArn', event_data.get('principalId', DEFAULT_VALUE))
                group_key = GroupKey(bucket_name, issuer_arn, event_name)

                # オブジェクトキーの処理
                object_key_value = event_data.get('objectKey', '')
                object_arns = []

                if object_key_value:
                    # 単一オブジェクトの場合
                    object_arns.append(f"arn:aws:s3:::{bucket_name}/{object_key_value}")
                else:
                    # objectKeyが空の場合、resourcesから複数オブジェクトを抽出
                    resources = event_data.get('resources', [])
                    for resource in resources:
                        if resource.get('type') == 'AWS::S3::Object':
                            if resource.get('ARN'):
                                # 個別のARNがある場合(具体的なオブジェクト)
                                object_arns.append(resource['ARN'])
                            elif resource.get('ARNPrefix'):
                                # ARNPrefixの場合(複数オブジェクトの一括表示)
                                arn_prefix = resource['ARNPrefix']
                                if arn_prefix.startswith('arn:aws:s3:::'):
                                    parts = arn_prefix.split('/', 1)
                                    if len(parts) > 1:
                                        display_path = parts[1] + " (Multiple Objects)"
                                    else:
                                        display_path = "(Multiple Objects)"
                                    object_arns.append(f"arn:aws:s3:::{bucket_name}/{display_path}")

                    # resourcesに何もない場合のフォールバック
                    if not object_arns:
                        object_arns.append(f"arn:aws:s3:::{bucket_name}/Unknown Key")

                # 各オブジェクトARNに対してイベント詳細を作成
                for object_arn in object_arns:
                    event_detail = {
                        'object_arn': object_arn,
                        'event_time': event_data.get('eventTime', DEFAULT_VALUE)
                    }
                    grouped_events[group_key].append(event_detail)
                    processed_event_count += 1

            except json.JSONDecodeError:
                continue
    except Exception as e:
        logger.error(f"Failed to parse log file {object_key}: {e}")
    return grouped_events, processed_event_count

def notify_grouped_events(grouped_events):
    if not grouped_events: 
        return 0
    sent_count = 0
    for group_key, events in grouped_events.items():
        # object_arnからobjectKeyを一括抽出
        object_keys = [
            e.get('object_arn', DEFAULT_VALUE).split('/', 1)[1] 
            if e.get('object_arn', '').startswith('arn:aws:s3:::') and '/' in e.get('object_arn', '')
            else e.get('object_arn', DEFAULT_VALUE)
            for e in events
        ]

        csv_block = "\n".join(object_keys)
        if len(csv_block) > MAX_DESCRIPTION_LENGTH:
            csv_block = csv_block[:MAX_DESCRIPTION_LENGTH - len(TRUNCATION_SUFFIX)] + TRUNCATION_SUFFIX

        # 最も早い時刻を計算
        event_times = [e.get('event_time') for e in events if e.get('event_time') != DEFAULT_VALUE]
        earliest_time = min(event_times) if event_times else DEFAULT_VALUE

        chatbot_message = format_csv_message(group_key, csv_block, earliest_time)
        if send_to_sns(chatbot_message, group_key):
            sent_count += 1
    return sent_count

def lambda_handler(event, context):
    try:
        logger.info("Lambda handler started")
        successful_notifications = 0

        # EventBridge Ruleで既にフィルタリング済みのため、直接処理開始
        detail = event.get('detail', {})
        log_bucket_name = detail.get('bucket', {}).get('name')
        object_key = detail.get('object', {}).get('key')

        logger.info(f"Processing S3 event: bucket={log_bucket_name}, key={object_key}")

        # 動的パーティションのパスから監視対象バケット名を取得
        target_app_bucket = extract_bucket_from_path(object_key)
        if not target_app_bucket:
            logger.error(f"Could not extract target bucket from object key: {object_key}")
            return {'statusCode': 400, 'body': 'Invalid object key format'}

        logger.info(f"Extracted target bucket from path: {target_app_bucket}")

        decoded_key = urllib.parse.unquote_plus(object_key)
        grouped_events, event_count = parse_and_group_log(log_bucket_name, decoded_key, target_app_bucket)
        if grouped_events:
            successful_notifications = notify_grouped_events(grouped_events)
            logger.info(f"Sent {successful_notifications} notifications for {event_count} events")

        logger.info(f"Handler completed successfully: {successful_notifications} notifications")
        return {'statusCode': 200, 'body': f'Sent {successful_notifications} notifications.'}
    except Exception as e:
        logger.error(f"Lambda handler error: {str(e)}", exc_info=True)
        return {'statusCode': 500, 'body': f'Error: {str(e)}'}

この記事をシェアする

facebookのロゴhatenaのロゴtwitterのロゴ

© Classmethod, Inc. All rights reserved.