この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
AWS事業本部 梶原@福岡オフィスです。
S3に出力されるRedshiftの監査ログをオブジェクト作成時イベント通知でLambdaを起動し自動でタグ付けを行ったので、ソース等を共有します。
ユースケースとしては単純にタグ付けしたいだけの場合ももちろんありますが、タグ付けすることによりライフサイクル設定時の条件とすることが可能となり、プリフィックスで指定できない場合の条件として、タグの値を使用することができます。
データ量が多くなりがちなユーザーアクティビティログなどについては、ライフサイクルを短くすることで、保存期間の調整などが可能となります
また、複数のRedshiftから同一のバケットにログ出力している場合なども、クラスタ毎に条件を分けることも可能となります。
Redshiftの監査ログに限らずS3に保存されたタイミングでオブジェクトにタグ付けを行いたいと思う機会は結構あるので、結構応用がきくかと思います
注意
Redshiftのログ作成のタイミングは定期的また大量に呼び出されないですが、オブジェクト作成が大量に頻繁にあるようなバケットの場合はLambdaの実行回数などの上限、また、Lambdaの実行、S3APIの呼び出しは従量課金となりますので、意図しない動きなどにご注意しつつご検証の上ご使用ください
説明
既存のバケットにS3イベント通知を作成する方法はこちらのブログで説明しているの割愛します。
CloudFormation一撃で作成できるようにしていますので、ブログの最後のテンプレートでRedshiftのログ出力先バケットを指定してスタックを作成してください。
パラメータ
Parameters:
NotificationBucket:
Type: String
Description: S3 bucket log output destination for Redshift.
Prefix:
Type: String
Default: redshift/
Description: Prefix log output destination for Redshift. Prefix of the object key name for filtering rules.
Suffix:
Type: String
Default: .gz
Description: Suffix of the object key name for filtering rules
NotificationBucket: オブジェクト作成時にLambda関数を呼び出す既存のバケットになります
Prefix: redshiftのログの出力先のディレクトリを指定してください (redshift/
としています)
Suffix: GZIP形式で保存されるので.gz
を指定します
Redshiftの監査ログのタグ付け関数
RedshiftLogsTaggingFunction:
Type: 'AWS::Lambda::Function'
Properties:
Code:
ZipFile: |
import json
import urllib.parse
import posixpath
import boto3
print('Loading function')
s3 = boto3.client('s3')
def lambda_handler(event, context):
# print("Received event: " + json.dumps(event, indent=2))
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
try:
# key redshift log
# AWSLogs/AccountID/ServiceName/Region/Year/Month/Day/AccountID_ServiceName_Region_ClusterName_LogType_Timestamp.gz
fileName = posixpath.basename(key)
accountID, serviceName, region, clusterName, logType, timestamp = fileName.split('_')
timestamp, ext = timestamp.split('.')
print(accountID, serviceName, region, clusterName, logType, timestamp, ext)
response = s3.get_object_tagging(
Bucket = bucket,
Key = key
)
tagSet = response['TagSet']
print(tagSet)
tags = {tag['Key']: tag['Value'] for tag in tagSet}
# print(tags)
tags['ClusterName'] = clusterName
tags['LogType'] = logType
tags['Timestamp'] = timestamp
tagSetNew = [{'Key': k, 'Value': v} for k, v in tags.items()]
print(tagSetNew)
response = s3.put_object_tagging(
Bucket = bucket,
Key = key,
Tagging = {
'TagSet': tagSetNew
},
)
# print(response)
return(response)
except Exception as e:
print(e)
print('Error tagging object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
Handler: index.lambda_handler
Role: !GetAtt RedshiftLogsTaggingFunctionRole.Arn
Runtime: python3.8
Timeout: 60
Redshiftの監査ログファイル名は
特定の形式(AWSLogs/AccountID/ServiceName/Region/Year/Month/Day/AccountID_ServiceName_Region_ClusterName_LogType_Timestamp.gz)
参考 https://docs.aws.amazon.com/ja_jp/redshift/latest/mgmt/db-auditing.html
で保存されますので、作成イベントで呼ばれた際にファイル名を分解しタグつけを行っています。
- ファイル名を分解して、各タグの値とする
- オブジェクトの既存のタグを取得する
- 既存タグに、タグ、タグ値を追加する(ClusterName, LogType, Timestamp)
- オブジェクトにタグ付けを行う
tags['ClusterName'] = clusterName
tags['LogType'] = logType
tags['Timestamp'] = timestamp
の部分をカスタマイズすれば、独自のタグ付けが行えますので、適時変更してください
Redshidtログのタグ付け関数用のRole
RedshiftLogsTaggingFunctionRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- 'sts:AssumeRole'
Path: /
Policies:
- PolicyName: root
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 's3:GetObjectTagging'
- 's3:PutObjectTagging'
Resource: !Sub 'arn:aws:s3:::${NotificationBucket}/*'
- Effect: Allow
Action:
- 'logs:CreateLogGroup'
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: 'arn:aws:logs:*:*:*'
S3バケットオブジェクトのタグ取得's3:GetObjectTagging'またタグ付け's3:PutObjectTagging'を許可しています
動作検証
スタックが正常に作成されたら。指定したバケットにRedshiftの監査ログ形式でオブジェクト(ファイル)をアップロードしてみてください。
動作検証としてredshift/の配下にRedshiftのログファイルを保存してみます。
保存タイミングでLambda関数が呼び出され自動でタグ付けされます。
まとめ
オブジェクト作成のタイミングでタグ付けしたいときに役に立つと嬉しいです。
Template
AWSTemplateFormatVersion: 2010-09-09
Description: >-
Tagging the Redshift log
Parameters:
NotificationBucket:
Type: String
Description: S3 bucket log output destination for Redshift.
Prefix:
Type: String
Default: redshift/
Description: Prefix log output destination for Redshift. Prefix of the object key name for filtering rules.
Suffix:
Type: String
Default: .gz
Description: Suffix of the object key name for filtering rules
Resources:
RedshiftLogsTaggingFunction:
Type: 'AWS::Lambda::Function'
Properties:
Code:
ZipFile: |
import json
import urllib.parse
import posixpath
import boto3
print('Loading function')
s3 = boto3.client('s3')
def lambda_handler(event, context):
# print("Received event: " + json.dumps(event, indent=2))
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
try:
# key redshift log
# AWSLogs/AccountID/ServiceName/Region/Year/Month/Day/AccountID_ServiceName_Region_ClusterName_LogType_Timestamp.gz
fileName = posixpath.basename(key)
accountID, serviceName, region, clusterName, logType, timestamp = fileName.split('_')
timestamp, ext = timestamp.split('.')
print(accountID, serviceName, region, clusterName, logType, timestamp, ext)
response = s3.get_object_tagging(
Bucket = bucket,
Key = key
)
tagSet = response['TagSet']
print(tagSet)
tags = {tag['Key']: tag['Value'] for tag in tagSet}
# print(tags)
tags['ClusterName'] = clusterName
tags['LogType'] = logType
tags['Timestamp'] = timestamp
tagSetNew = [{'Key': k, 'Value': v} for k, v in tags.items()]
print(tagSetNew)
response = s3.put_object_tagging(
Bucket = bucket,
Key = key,
Tagging = {
'TagSet': tagSetNew
},
)
# print(response)
return(response)
except Exception as e:
print(e)
print('Error tagging object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
Handler: index.lambda_handler
Role: !GetAtt RedshiftLogsTaggingFunctionRole.Arn
Runtime: python3.8
Timeout: 60
RedshiftLogsTaggingFunctionRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- 'sts:AssumeRole'
Path: /
Policies:
- PolicyName: root
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 's3:GetObjectTagging'
- 's3:PutObjectTagging'
Resource: !Sub 'arn:aws:s3:::${NotificationBucket}/*'
- Effect: Allow
Action:
- 'logs:CreateLogGroup'
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: 'arn:aws:logs:*:*:*'
LambdaInvokePermission:
Type: 'AWS::Lambda::Permission'
Properties:
FunctionName: !GetAtt RedshiftLogsTaggingFunction.Arn
Action: 'lambda:InvokeFunction'
Principal: s3.amazonaws.com
SourceAccount: !Ref 'AWS::AccountId'
SourceArn: !Sub 'arn:aws:s3:::${NotificationBucket}'
LambdaTrigger:
Type: 'Custom::LambdaTrigger'
DependsOn: LambdaInvokePermission
Properties:
ServiceToken: !GetAtt PutBucketNotificationFunction.Arn
Id: !Sub
- S3LambdaNotif-${UniqueId}
- UniqueId: !Select [0, !Split ['-', !Select [2, !Split [/, !Ref 'AWS::StackId']]]]
Bucket: !Ref NotificationBucket
Prefix: !Ref Prefix
Suffix: !Ref Suffix
LambdaArn: !GetAtt RedshiftLogsTaggingFunction.Arn
PutBucketNotificationFunction:
Type: 'AWS::Lambda::Function'
Properties:
Handler: index.lambda_handler
Role: !GetAtt PutBucketNotificationFunctionRole.Arn
Code:
ZipFile: |
import json
import boto3
import cfnresponse
SUCCESS = "SUCCESS"
FAILED = "FAILED"
print('Loading function')
s3 = boto3.resource('s3')
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
responseData={}
try:
if event['RequestType'] == 'Delete':
print("Request Type:",event['RequestType'])
Id=event['ResourceProperties']['Id']
Bucket=event['ResourceProperties']['Bucket']
delete_notification(Id, Bucket)
print("Sending response to custom resource after Delete")
elif event['RequestType'] == 'Create' or event['RequestType'] == 'Update':
print("Request Type:",event['RequestType'])
Id=event['ResourceProperties']['Id']
Prefix=event['ResourceProperties']['Prefix']
Suffix=event['ResourceProperties']['Suffix']
LambdaArn=event['ResourceProperties']['LambdaArn']
Bucket=event['ResourceProperties']['Bucket']
add_notification(Id, Prefix, Suffix, LambdaArn, Bucket)
responseData={'Bucket':Bucket}
print("Sending response to custom resource")
responseStatus = 'SUCCESS'
except Exception as e:
print('Failed to process:', e)
responseStatus = 'FAILED'
responseData = {'Failure': 'Something bad happened.'}
cfnresponse.send(event, context, responseStatus, responseData)
def add_notification(Id, Prefix, Suffix, LambdaArn, Bucket):
bucket_notification = s3.BucketNotification(Bucket)
print(bucket_notification.lambda_function_configurations)
lambda_function_configurations = bucket_notification.lambda_function_configurations
if lambda_function_configurations is None:
lambda_function_configurations = []
else:
lambda_function_configurations = [e for e in lambda_function_configurations if e['Id'] != Id]
lambda_config = {}
lambda_config['Id'] = Id
lambda_config['LambdaFunctionArn'] = LambdaArn
lambda_config['Events'] = ['s3:ObjectCreated:*']
lambda_config['Filter'] = {'Key': {'FilterRules': [
{'Name': 'Prefix', 'Value': Prefix},
{'Name': 'Suffix', 'Value': Suffix}
]}
}
lambda_function_configurations.append(lambda_config)
print(lambda_function_configurations)
put_bucket_notification(bucket_notification, lambda_function_configurations)
print("Put request completed....")
def delete_notification(Id, Bucket):
bucket_notification = s3.BucketNotification(Bucket)
print(bucket_notification.lambda_function_configurations)
lambda_function_configurations = bucket_notification.lambda_function_configurations
if lambda_function_configurations is not None:
lambda_function_configurations = [e for e in lambda_function_configurations if e['Id'] != Id]
print(lambda_function_configurations)
put_bucket_notification(bucket_notification, lambda_function_configurations)
print("Delete request completed....")
def put_bucket_notification(BucketNotification, LambdaFunctionConfigurations):
notification_configuration = {}
if LambdaFunctionConfigurations is not None:
notification_configuration['LambdaFunctionConfigurations'] = LambdaFunctionConfigurations
if BucketNotification.queue_configurations is not None:
notification_configuration['QueueConfigurations'] = BucketNotification.queue_configurations
if BucketNotification.topic_configurations is not None:
notification_configuration['TopicConfigurations'] = BucketNotification.topic_configurations
print(notification_configuration)
response = BucketNotification.put(
NotificationConfiguration= notification_configuration
)
Runtime: python3.8
Timeout: 50
PutBucketNotificationFunctionRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- 'sts:AssumeRole'
Path: /
Policies:
- PolicyName: root
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 's3:GetBucketNotification'
- 's3:PutBucketNotification'
Resource: !Sub 'arn:aws:s3:::${NotificationBucket}'
- Effect: Allow
Action:
- 'logs:CreateLogGroup'
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: 'arn:aws:logs:*:*:*'
参考情報
データベース監査ログ作成 https://docs.aws.amazon.com/ja_jp/redshift/latest/mgmt/db-auditing.html
CloudFormation 一撃で既存のS3バケットでAWS LambdaのS3のオブジェクト作成通知を追加作成してみた