S3に保存されたRedshiftの監査ログを作成イベント通知(Lambda)で自動でタグ付けしてみた

2021.08.06

AWS事業本部 梶原@福岡オフィスです。
S3に出力されるRedshiftの監査ログをオブジェクト作成時イベント通知でLambdaを起動し自動でタグ付けを行ったので、ソース等を共有します。

ユースケースとしては単純にタグ付けしたいだけの場合ももちろんありますが、タグ付けすることによりライフサイクル設定時の条件とすることが可能となり、プリフィックスで指定できない場合の条件として、タグの値を使用することができます。
データ量が多くなりがちなユーザーアクティビティログなどについては、ライフサイクルを短くすることで、保存期間の調整などが可能となります
また、複数のRedshiftから同一のバケットにログ出力している場合なども、クラスタ毎に条件を分けることも可能となります。

Redshiftの監査ログに限らずS3に保存されたタイミングでオブジェクトにタグ付けを行いたいと思う機会は結構あるので、結構応用がきくかと思います

注意

Redshiftのログ作成のタイミングは定期的また大量に呼び出されないですが、オブジェクト作成が大量に頻繁にあるようなバケットの場合はLambdaの実行回数などの上限、また、Lambdaの実行、S3APIの呼び出しは従量課金となりますので、意図しない動きなどにご注意しつつご検証の上ご使用ください

説明

既存のバケットにS3イベント通知を作成する方法はこちらのブログで説明しているの割愛します。

CloudFormation一撃で作成できるようにしていますので、ブログの最後のテンプレートでRedshiftのログ出力先バケットを指定してスタックを作成してください。

パラメータ

Parameters:
  NotificationBucket:
    Type: String
    Description: S3 bucket log output destination for Redshift.
  Prefix:
    Type: String
    Default: redshift/
    Description: Prefix log output destination for Redshift. Prefix of the object key name for filtering rules.
  Suffix:
    Type: String
    Default: .gz
    Description: Suffix of the object key name for filtering rules

NotificationBucket: オブジェクト作成時にLambda関数を呼び出す既存のバケットになります
Prefix: redshiftのログの出力先のディレクトリを指定してください (redshift/ としています)
Suffix: GZIP形式で保存されるので.gzを指定します

Redshiftの監査ログのタグ付け関数

  RedshiftLogsTaggingFunction:
    Type: 'AWS::Lambda::Function'
    Properties:
      Code:
        ZipFile: |
          import json
          import urllib.parse
          import posixpath
          import boto3

          print('Loading function')

          s3 = boto3.client('s3')

          def lambda_handler(event, context):
              # print("Received event: " + json.dumps(event, indent=2))

              # Get the object from the event and show its content type
              bucket = event['Records'][0]['s3']['bucket']['name']
              key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
              try:
                  # key redshift log
                  # AWSLogs/AccountID/ServiceName/Region/Year/Month/Day/AccountID_ServiceName_Region_ClusterName_LogType_Timestamp.gz
                  fileName = posixpath.basename(key)
                  accountID, serviceName, region, clusterName, logType, timestamp = fileName.split('_')
                  timestamp, ext =  timestamp.split('.')
                  print(accountID, serviceName, region, clusterName, logType, timestamp, ext)

                  response = s3.get_object_tagging(
                      Bucket = bucket,
                      Key = key
                  )
                  tagSet = response['TagSet']
                  print(tagSet)
                  
                  tags = {tag['Key']: tag['Value'] for tag in tagSet}
                  # print(tags)
                          
                  tags['ClusterName'] = clusterName
                  tags['LogType'] = logType
                  tags['Timestamp'] = timestamp

                  tagSetNew = [{'Key': k, 'Value': v} for k, v in tags.items()]
                  print(tagSetNew)
                  
                  response = s3.put_object_tagging(
                      Bucket = bucket,
                      Key = key,
                      Tagging = {
                          'TagSet': tagSetNew
                      },
                  )
                  # print(response)
                  
                  return(response)
                  
              except Exception as e:
                  print(e)
                  print('Error tagging object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
                  raise e
      Handler: index.lambda_handler
      Role: !GetAtt RedshiftLogsTaggingFunctionRole.Arn
      Runtime: python3.8
      Timeout: 60

Redshiftの監査ログファイル名は

特定の形式(AWSLogs/AccountID/ServiceName/Region/Year/Month/Day/AccountID_ServiceName_Region_ClusterName_LogType_Timestamp.gz)

参考 https://docs.aws.amazon.com/ja_jp/redshift/latest/mgmt/db-auditing.html

で保存されますので、作成イベントで呼ばれた際にファイル名を分解しタグつけを行っています。

  1. ファイル名を分解して、各タグの値とする
  2. オブジェクトの既存のタグを取得する
  3. 既存タグに、タグ、タグ値を追加する(ClusterName, LogType, Timestamp)
  4. オブジェクトにタグ付けを行う
    tags['ClusterName'] = clusterName
    tags['LogType'] = logType
    tags['Timestamp'] = timestamp

の部分をカスタマイズすれば、独自のタグ付けが行えますので、適時変更してください

Redshidtログのタグ付け関数用のRole

  RedshiftLogsTaggingFunctionRole:
    Type: 'AWS::IAM::Role'
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 's3:GetObjectTagging'
                  - 's3:PutObjectTagging'
                Resource: !Sub 'arn:aws:s3:::${NotificationBucket}/*'
              - Effect: Allow
                Action:
                  - 'logs:CreateLogGroup'
                  - 'logs:CreateLogStream'
                  - 'logs:PutLogEvents'
                Resource: 'arn:aws:logs:*:*:*'

S3バケットオブジェクトのタグ取得's3:GetObjectTagging'またタグ付け's3:PutObjectTagging'を許可しています

動作検証

スタックが正常に作成されたら。指定したバケットにRedshiftの監査ログ形式でオブジェクト(ファイル)をアップロードしてみてください。

動作検証としてredshift/の配下にRedshiftのログファイルを保存してみます。

保存タイミングでLambda関数が呼び出され自動でタグ付けされます。

まとめ

オブジェクト作成のタイミングでタグ付けしたいときに役に立つと嬉しいです。

Template

AWSTemplateFormatVersion: 2010-09-09
Description: >-
  Tagging the Redshift log
Parameters:
  NotificationBucket:
    Type: String
    Description: S3 bucket log output destination for Redshift.
  Prefix:
    Type: String
    Default: redshift/
    Description: Prefix log output destination for Redshift. Prefix of the object key name for filtering rules.
  Suffix:
    Type: String
    Default: .gz
    Description: Suffix of the object key name for filtering rules

Resources:
  RedshiftLogsTaggingFunction:
    Type: 'AWS::Lambda::Function'
    Properties:
      Code:
        ZipFile: |
          import json
          import urllib.parse
          import posixpath
          import boto3

          print('Loading function')

          s3 = boto3.client('s3')

          def lambda_handler(event, context):
              # print("Received event: " + json.dumps(event, indent=2))

              # Get the object from the event and show its content type
              bucket = event['Records'][0]['s3']['bucket']['name']
              key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
              try:
                  # key redshift log
                  # AWSLogs/AccountID/ServiceName/Region/Year/Month/Day/AccountID_ServiceName_Region_ClusterName_LogType_Timestamp.gz
                  fileName = posixpath.basename(key)
                  accountID, serviceName, region, clusterName, logType, timestamp = fileName.split('_')
                  timestamp, ext =  timestamp.split('.')
                  print(accountID, serviceName, region, clusterName, logType, timestamp, ext)

                  response = s3.get_object_tagging(
                      Bucket = bucket,
                      Key = key
                  )
                  tagSet = response['TagSet']
                  print(tagSet)
                  
                  tags = {tag['Key']: tag['Value'] for tag in tagSet}
                  # print(tags)
                          
                  tags['ClusterName'] = clusterName
                  tags['LogType'] = logType
                  tags['Timestamp'] = timestamp

                  tagSetNew = [{'Key': k, 'Value': v} for k, v in tags.items()]
                  print(tagSetNew)
                  
                  response = s3.put_object_tagging(
                      Bucket = bucket,
                      Key = key,
                      Tagging = {
                          'TagSet': tagSetNew
                      },
                  )
                  # print(response)
                  
                  return(response)
                  
              except Exception as e:
                  print(e)
                  print('Error tagging object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
                  raise e
      Handler: index.lambda_handler
      Role: !GetAtt RedshiftLogsTaggingFunctionRole.Arn
      Runtime: python3.8
      Timeout: 60

  RedshiftLogsTaggingFunctionRole:
    Type: 'AWS::IAM::Role'
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 's3:GetObjectTagging'
                  - 's3:PutObjectTagging'
                Resource: !Sub 'arn:aws:s3:::${NotificationBucket}/*'
              - Effect: Allow
                Action:
                  - 'logs:CreateLogGroup'
                  - 'logs:CreateLogStream'
                  - 'logs:PutLogEvents'
                Resource: 'arn:aws:logs:*:*:*'

  LambdaInvokePermission:
    Type: 'AWS::Lambda::Permission'
    Properties:
      FunctionName: !GetAtt RedshiftLogsTaggingFunction.Arn
      Action: 'lambda:InvokeFunction'
      Principal: s3.amazonaws.com
      SourceAccount: !Ref 'AWS::AccountId'
      SourceArn: !Sub 'arn:aws:s3:::${NotificationBucket}'

  LambdaTrigger:
    Type: 'Custom::LambdaTrigger'
    DependsOn: LambdaInvokePermission
    Properties:
      ServiceToken: !GetAtt PutBucketNotificationFunction.Arn
      Id: !Sub
        - S3LambdaNotif-${UniqueId}
        - UniqueId: !Select [0, !Split ['-', !Select [2, !Split [/, !Ref 'AWS::StackId']]]]
      Bucket: !Ref NotificationBucket
      Prefix: !Ref Prefix
      Suffix: !Ref Suffix
      LambdaArn: !GetAtt RedshiftLogsTaggingFunction.Arn

  PutBucketNotificationFunction:
    Type: 'AWS::Lambda::Function'
    Properties:
      Handler: index.lambda_handler
      Role: !GetAtt PutBucketNotificationFunctionRole.Arn
      Code:
        ZipFile: |
          import json
          import boto3
          import cfnresponse

          SUCCESS = "SUCCESS"
          FAILED = "FAILED"

          print('Loading function')
          s3 = boto3.resource('s3')

          def lambda_handler(event, context):
              print("Received event: " + json.dumps(event, indent=2))
              responseData={}
              try:
                  if event['RequestType'] == 'Delete':
                      print("Request Type:",event['RequestType'])
                      Id=event['ResourceProperties']['Id']
                      Bucket=event['ResourceProperties']['Bucket']

                      delete_notification(Id, Bucket)
                      print("Sending response to custom resource after Delete")
                  elif event['RequestType'] == 'Create' or event['RequestType'] == 'Update':
                      print("Request Type:",event['RequestType'])
                      Id=event['ResourceProperties']['Id']
                      Prefix=event['ResourceProperties']['Prefix']
                      Suffix=event['ResourceProperties']['Suffix']
                      LambdaArn=event['ResourceProperties']['LambdaArn']
                      Bucket=event['ResourceProperties']['Bucket']
                      
                      add_notification(Id, Prefix, Suffix, LambdaArn, Bucket)
                      responseData={'Bucket':Bucket}
                      print("Sending response to custom resource")
                  responseStatus = 'SUCCESS'
              except Exception as e:
                  print('Failed to process:', e)
                  responseStatus = 'FAILED'
                  responseData = {'Failure': 'Something bad happened.'}
              cfnresponse.send(event, context, responseStatus, responseData)

          def add_notification(Id, Prefix, Suffix, LambdaArn, Bucket):
              bucket_notification = s3.BucketNotification(Bucket)
              print(bucket_notification.lambda_function_configurations)

              lambda_function_configurations = bucket_notification.lambda_function_configurations

              if lambda_function_configurations is None:
                  lambda_function_configurations = []
              else:
                  lambda_function_configurations = [e for e in lambda_function_configurations if e['Id'] != Id]

              lambda_config = {}
              lambda_config['Id'] = Id
              lambda_config['LambdaFunctionArn'] = LambdaArn
              lambda_config['Events'] = ['s3:ObjectCreated:*']
              lambda_config['Filter'] = {'Key': {'FilterRules': [
                      {'Name': 'Prefix', 'Value': Prefix},
                      {'Name': 'Suffix', 'Value': Suffix}
                      ]}
              }
              
              lambda_function_configurations.append(lambda_config)
              print(lambda_function_configurations)
              
              put_bucket_notification(bucket_notification, lambda_function_configurations)
              
              print("Put request completed....")
            
          def delete_notification(Id, Bucket):

              bucket_notification = s3.BucketNotification(Bucket)
              print(bucket_notification.lambda_function_configurations)

              lambda_function_configurations = bucket_notification.lambda_function_configurations

              if lambda_function_configurations is not None:
                  lambda_function_configurations = [e for e in lambda_function_configurations if e['Id'] != Id]

              print(lambda_function_configurations)

              put_bucket_notification(bucket_notification, lambda_function_configurations)

              print("Delete request completed....")

          def put_bucket_notification(BucketNotification, LambdaFunctionConfigurations):

              notification_configuration = {}
              if LambdaFunctionConfigurations is not None:
                  notification_configuration['LambdaFunctionConfigurations'] = LambdaFunctionConfigurations
              
              if BucketNotification.queue_configurations is not None:
                  notification_configuration['QueueConfigurations'] = BucketNotification.queue_configurations

              if BucketNotification.topic_configurations is not None:
                  notification_configuration['TopicConfigurations'] = BucketNotification.topic_configurations

              print(notification_configuration)
              
              response = BucketNotification.put(
                NotificationConfiguration= notification_configuration
              )
      Runtime: python3.8
      Timeout: 50

  PutBucketNotificationFunctionRole:
    Type: 'AWS::IAM::Role'
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 's3:GetBucketNotification'
                  - 's3:PutBucketNotification'
                Resource: !Sub 'arn:aws:s3:::${NotificationBucket}'
              - Effect: Allow
                Action:
                  - 'logs:CreateLogGroup'
                  - 'logs:CreateLogStream'
                  - 'logs:PutLogEvents'
                Resource: 'arn:aws:logs:*:*:*'

参考情報

データベース監査ログ作成 https://docs.aws.amazon.com/ja_jp/redshift/latest/mgmt/db-auditing.html

CloudFormation 一撃で既存のS3バケットでAWS LambdaのS3のオブジェクト作成通知を追加作成してみた