
Trend Cloud One File Storage SecurityからGuardDuty Malware Protection for S3への移行を試してみた
こんにちは、シマです。
先日、Trend Cloud One File Storage Security(以降C1FSS)のEOLにより、後継製品であるTrend Vision One File Security(V1FSS)への移行を試してみた記事を公開しました。
今回は別の選択肢として、Amazon GuardDuty Malware Protection for Amazon S3(以降GuardDuty S3)への移行を検証してみます。C1FSSユーザーの中には、コストやお手軽さを重視してAWSサービスへの統合を検討される方もいらっしゃるかと思います。実際の移行手順を確認していきましょう。
移行先について
GuardDuty S3は小規模環境ではランニングコスト面で有利です。
ただし、C1FSSからの移行は実装ロジックが異なるため、新規構築に近い作業となります。また、スキャン結果に応じた処理は自分で実装し、運用やメンテナンスをする必要があります。隔離処理等の実装が難しい場合には、トレンドマイクロ社が推奨しているTrend Vision One File Security(V1FSS)への移行もご検討ください。
今回はGuardDuty S3への移行検証を進めていきます。
移行手順
C1FSSとGuardDuty S3はそれぞれ独立した実装ロジックを持つため、競合する可能性があります。そのため、今回は先にC1FSSを無効化してからGuardDuty S3を実装する流れを取ります。C1FSSの状態としては、異常ファイルを隔離S3バケットに移動する処理を含んでいるものとします。
それでは、具体的な移行手順を見ていきましょう。
1. C1FSSの無効化
C1FSSの管理コンソールから、対象のバケットを選択し、「Delete」ボタンをクリックします。
表示された「Stack Name」をメモしておき、「Delete」ボタンをクリックします。
AWS管理コンソールでCloudFormationの画面から、先ほどメモした「Stack Name」のスタックを削除します。
複数バケットを設定している場合は、すべてのバケット設定を削除してから、C1FSSの管理コンソールからScanner Stackを削除します。対象のScanner Stackを選択し、「Delete」ボタンをクリックします。
表示された「Stack Name」をメモしておき、「Delete」ボタンをクリックします。
AWS管理コンソールでCloudFormationの画面から、先ほどメモした「Stack Name」のスタックを削除します。
C1FSSで不正プログラム検出時に別のバケットへ隔離する方式を実装していた場合は、別途削除が必要です。デフォルトでは「cloudone-filestorage-plugin-action-promote-or-quarantine」という名称なので、AWS管理コンソールでCloudFormationの画面から、対象スタックを削除します。
2. GuardDuty S3の有効化
AWS管理コンソールからGuardDutyのページを開き、「S3 の Malware Protection」から、「有効にする」ボタンをクリックします。
「S3を参照」ボタンからスキャン対象のS3バケットを選択します。また、「オブジェクトにタグを付ける」が選択されていることを確認します。
サービスロールはデフォルトのまま、新規で作成しています。必要に応じてロール名を変更し、「有効にする」ボタンをクリックします。
保護されたバケット一覧に対象バケットが表示され、保護が開始されていることを確認します。
3. EventBridge + Lambdaで検出後の処理を実装
GuardDuty S3ではタグの付与までは自動で実装されますが、それ以降の処理は自分で実装する必要があります。今回は、EventBridgeとLambdaで実装しました。
AWS管理コンソールよりLambdaのページを開き「関数の作成」を押下します。
任意の関数名を入力し、ランタイムは「Python 3.13」を選択し、「関数の作成」ボタンをクリックします。
「コード」タブ内にコードを入力し、「Deploy」を押下します。
コードは実施したい内容に合わせて作成してください。サンプルコードを記載します。
このサンプルコードはS3タグを確認し、「GuardDutyMalwareScanStatus: THREATS_FOUND」が付与されていたらオブジェクトの移動や削除を行います。
サンプルコード
import boto3
import json
import os
import logging
from datetime import datetime, timezone
from botocore.exceptions import ClientError
# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 環境変数から設定を取得
SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN', '')
TAG_KEY = os.environ.get('TAG_KEY', 'GuardDutyMalwareScanStatus')
TAG_VALUE_THREAT = os.environ.get('TAG_VALUE_THREAT', 'THREATS_FOUND')
DELETE_ORIGINAL = os.environ.get('DELETE_ORIGINAL', 'true').lower() == 'true'
# AWSクライアントの初期化
s3_client = boto3.client('s3')
sns_client = boto3.client('sns') if SNS_TOPIC_ARN else None
def send_notification(message, subject):
"""SNS通知を送信"""
if not sns_client or not SNS_TOPIC_ARN:
logger.info("SNS通知はスキップされました(SNS_TOPIC_ARNが未設定)")
return
try:
sns_client.publish(
TopicArn=SNS_TOPIC_ARN,
Subject=subject,
Message=message
)
logger.info(f"SNS通知を送信しました: {subject}")
except ClientError as e:
logger.error(f"SNS通知の送信に失敗: {e}")
def get_object_metadata(bucket_name, object_key):
"""オブジェクトのメタデータを取得"""
try:
response = s3_client.head_object(Bucket=bucket_name, Key=object_key)
return {
'ContentType': response.get('ContentType', 'unknown'),
'ContentLength': response.get('ContentLength', 0),
'LastModified': response.get('LastModified', '').isoformat() if response.get('LastModified') else '',
'ETag': response.get('ETag', ''),
'ServerSideEncryption': response.get('ServerSideEncryption', 'None'),
'StorageClass': response.get('StorageClass', 'STANDARD')
}
except ClientError as e:
logger.error(f"メタデータ取得エラー: {e}")
return {}
def quarantine_object(bucket_name, object_key, metadata, quarantine_bucket):
"""オブジェクトを隔離バケットに移動"""
timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
quarantine_key = f"{timestamp}/{bucket_name}/{object_key}"
try:
# メタデータをタグとして準備
tagging = {
'TagSet': [
{'Key': 'OriginalBucket', 'Value': bucket_name},
{'Key': 'OriginalKey', 'Value': object_key},
{'Key': 'QuarantineDate', 'Value': timestamp},
{'Key': 'MalwareStatus', 'Value': TAG_VALUE_THREAT},
{'Key': 'FileSize', 'Value': str(metadata.get('ContentLength', 0))}
]
}
# サイズ制限(タグの値は256文字まで)
for tag in tagging['TagSet']:
if len(tag['Value']) > 256:
tag['Value'] = tag['Value'][:253] + '...'
# オブジェクトをコピー
copy_source = {'Bucket': bucket_name, 'Key': object_key}
s3_client.copy_object(
CopySource=copy_source,
Bucket=quarantine_bucket,
Key=quarantine_key,
TaggingDirective='REPLACE',
Tagging=f"OriginalBucket={bucket_name}&OriginalKey={object_key}&QuarantineDate={timestamp}"
)
logger.info(f"オブジェクトを隔離しました: {bucket_name}/{object_key} -> {quarantine_bucket}/{quarantine_key}")
# 元のオブジェクトを削除(設定による)
if DELETE_ORIGINAL:
s3_client.delete_object(Bucket=bucket_name, Key=object_key)
logger.info(f"元のオブジェクトを削除しました: {bucket_name}/{object_key}")
else:
logger.info(f"元のオブジェクトは保持されます: {bucket_name}/{object_key}")
return True, quarantine_key
except ClientError as e:
logger.error(f"隔離処理エラー: {e}")
return False, None
def lambda_handler(event, context):
"""メインハンドラー"""
try:
# イベント情報をログ出力
logger.info(f"受信イベント: {json.dumps(event)}")
# EventBridgeイベントの解析
if 'detail' not in event:
logger.error("イベントに'detail'フィールドがありません")
return {
'statusCode': 400,
'body': json.dumps('Invalid event format')
}
detail = event['detail']
# バケット名とオブジェクトキーの取得
bucket_name = detail.get('bucket', {}).get('name')
object_key = detail.get('object', {}).get('key')
if not bucket_name or not object_key:
logger.error(f"必須パラメータが不足: bucket={bucket_name}, key={object_key}")
return {
'statusCode': 400,
'body': json.dumps('Missing required parameters')
}
# EventBridgeから隔離バケットを取得(入力トランスフォーマーで追加される)
quarantine_bucket = detail.get('quarantine_bucket', '')
if not quarantine_bucket:
# フォールバック:命名規則で自動決定
quarantine_bucket = f"{bucket_name}-quarantine"
logger.warning(f"隔離バケットが指定されていないため、命名規則を使用: {quarantine_bucket}")
logger.info(f"処理対象: {bucket_name}/{object_key} -> 隔離先: {quarantine_bucket}")
# タグの確認
try:
response = s3_client.get_object_tagging(
Bucket=bucket_name,
Key=object_key
)
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
logger.warning(f"オブジェクトが見つかりません: {bucket_name}/{object_key}")
return {
'statusCode': 404,
'body': json.dumps('Object not found')
}
raise
tags = {tag['Key']: tag['Value'] for tag in response.get('TagSet', [])}
logger.info(f"オブジェクトタグ: {tags}")
# マルウェアが検出されたか確認
if tags.get(TAG_KEY) == TAG_VALUE_THREAT:
logger.warning(f"マルウェアを検出: {bucket_name}/{object_key}")
# メタデータ取得
metadata = get_object_metadata(bucket_name, object_key)
# 隔離処理
success, quarantine_key = quarantine_object(bucket_name, object_key, metadata, quarantine_bucket)
if success:
# 通知の送信
notification_message = f"""
マルウェアが検出され、隔離されました。
検出情報:
- 元のバケット: {bucket_name}
- 元のオブジェクト: {object_key}
- ファイルサイズ: {metadata.get('ContentLength', 0)} bytes
- 検出時刻: {datetime.now(timezone.utc).isoformat()}
- 隔離先: {quarantine_bucket}/{quarantine_key}
- 元ファイル削除: {'はい' if DELETE_ORIGINAL else 'いいえ'}
詳細はCloudWatchログをご確認ください。
"""
send_notification(
notification_message,
f"[GuardDuty] マルウェア検出通知: {object_key}"
)
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Malware quarantined successfully',
'bucket': bucket_name,
'key': object_key,
'quarantine_location': f"{quarantine_bucket}/{quarantine_key}"
})
}
else:
# 隔離失敗時の通知
error_message = f"""
マルウェアが検出されましたが、隔離に失敗しました。
検出情報:
- バケット: {bucket_name}
- オブジェクト: {object_key}
- 検出時刻: {datetime.now(timezone.utc).isoformat()}
CloudWatchログで詳細を確認し、手動で対応してください。
"""
send_notification(
error_message,
f"[GuardDuty] マルウェア隔離失敗: {object_key}"
)
return {
'statusCode': 500,
'body': json.dumps('Quarantine failed')
}
else:
logger.info(f"マルウェアは検出されませんでした: {bucket_name}/{object_key}")
return {
'statusCode': 200,
'body': json.dumps('No malware detected')
}
except Exception as e:
logger.error(f"予期しないエラー: {str(e)}", exc_info=True)
# エラー通知
if sns_client and SNS_TOPIC_ARN:
try:
error_notification = f"""
Lambda関数で予期しないエラーが発生しました。
エラー内容: {str(e)}
関数名: {context.function_name}
リクエストID: {context.aws_request_id}
CloudWatchログで詳細を確認してください。
"""
send_notification(
error_notification,
"[GuardDuty] Lambda処理エラー"
)
except:
pass
return {
'statusCode': 500,
'body': json.dumps(f'Internal error: {str(e)}')
}
隔離アクションに伴うオブジェクト複製が、ファイル容量次第ではデフォルトの3秒以内に終わらないことがあるため、「設定」タブからタイムアウト値を30秒に変更しました。
必要に応じて環境変数にSNSトピックのARNを設定します。コード内の処理で不正プログラムを検出時にSNSで通知を行います。キーSNS_TOPIC_ARN
にARNを設定してください(例:arn:aws:sns:ap-northeast-1:123456789012:malware-alerts
)。
左ペインの「アクセス権限」から実行ロール名をクリックし、実行ロールに権限を追加します。
今回追加した権限は次のとおりです。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectTagging",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::【スキャン対象バケット名】/*"
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectTagging"
],
"Resource": "arn:aws:s3:::【隔離バケット名】/*"
},
{
"Effect": "Allow",
"Action": [
"sns:Publish"
],
"Resource": "arn:aws:sns:*:*:*"
}
]
}
EventBridgeの画面から、ルールを作成していきます。
任意の名前を入力し、「次へ」を押下します。
画面下部のメソッド欄で「カスタムパターン (JSON エディタ)」を選択し、スキャン対象バケットにタグが付与されたイベントを検出するイベントパターンを入力して、「次へ」を押下します。
{
"source": ["aws.s3"],
"detail-type": ["Object Tags Added"],
"detail": {
"bucket": {
"name": ["【スキャン対象バケット名】"]
}
}
}
ターゲットは先ほど作成したLambda関数を指定し、「追加設定」を開きます。
Lambda関数で必要な設定をしていきます。「ターゲット入力を設定」で「入力トランスフォーマー」を選択します。
「入力パス」に次のように入力します。
{
"bucket_name": "$.detail.bucket.name",
"object_key": "$.detail.object.key",
"detail": "$.detail"
}
「テンプレート」には隔離バケット名を含めて入力し、「次へ」を押下します。Lambda関数のコードでこのテンプレートから渡された隔離バケット名に隔離を行います。
{
"detail": {
"bucket": {
"name": "<bucket_name>"
},
"object": {
"key": "<object_key>"
},
"quarantine_bucket": "【隔離バケット名】"
}
}
タグについては特に設定せずに「次へ」を押下します。
設定内容を確認し、「ルールの作成」を押下します。
4. 動作確認
Eicarテストファイルを使用して、正常に動作していることを確認します。
スキャン対象バケットにEicarテストファイルとテキストファイルをアップロードして挙動を確認しました。
テキストファイルは隔離されず、Eicarテストファイルは隔離バケットに隔離されました。
今回はLambdaからSNSで通知を飛ばすように設定しているため、不正プログラム検出時にはメール通知が届きます。
まとめ
C1FSSからGuardDuty S3マルウェア保護への移行手順を確認しました。
LambdaやEventBridge周りの設定が少し複雑かもしれませんが、自分で実装できるため柔軟な対応が可能です。一方で、Pythonのアップデートへの対応等、メンテナンスも必要になるので、運用が難しい場合はV1FSSも候補に入ると思います。
本記事がどなたかのお役に立てれば幸いです。