Cloud One File Storage Securityを使ってS3にアップされたマルウェア、異常ファイルをチェックしてみた (カスタム処理編)

2022.05.30

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、コンサル部@大阪オフィスのTodaです。

Trend Micro社が提供するCloud Oneには7点のサービスが存在します。
今回はクラウドストレージのファイルチェックが出来るFile Storage Securityを操作してみました。

前回、File Storage Security(以降C1FSS)にてオブジェクトスキャンの実行をおこない、判定に合わせてバケットポリシーでの非表示化、トレンドマイクロ社が公開しているスクリプトを使って隔離S3バケットに移動をおこなってみました。
今回は、利用用途が少ないかもしれませんが、異常判定を受けたファイルの削除と、あらかじめオブジェクトタグに「file_id」(データベース照会用のID)を仕込んで取り出す処理をLambdaを使っておこなってみたいと思います。

注意事項

当ページで表示しているスクリプトはサンプルのため動作保証がございません。
参考、スクリプト実装時は自己責任に於いてのご利用ください。

当スクリプトはトレンドマイクロ社が公開しているスクリプトを参考に作成しております。
https://github.com/trendmicro/cloudone-filestorage-plugins/tree/master/post-scan-actions/aws-python-promote-or-quarantine

異常ファイルの削除

通常はバケットポリシーの非公開ルールの設定 または 隔離S3バケットへの移動 にて異常ファイルの無力化が出来ると考えますが、判定時に削除する事はできるのか気になったため独自にLambda処理を作って試してみました。
C1FSSでスキャンを実行した結果はSNSを利用して処理に連携されます。
上記、SNSにサブスクリプションを追加して削除処理を埋め込みます。

Lambda処理の準備

SNSから受け取るメッセージを元に対象の判定確認とオブジェクト削除する関数を構築します。
関数のランタイムはPython3.9にておこない、スキャン対象のS3バケットの変更権限があるIAMロールを設定します。

■ 判定確認・削除処理 サンプルコード

import boto3
import re
import json
import urllib.parse

s3_client = boto3.client('s3')

S3_DOMAIN_PATTERN = 's3(\..+)?\.amazonaws.com'

def parse_s3_object_url(url_string):
    """
    S3バケットURLを分解してバケット名とオブジェクト名を取得
    
    Parameters
    ----------
    url_string : string
        S3バケットURL

    Returns
    -------
    bucket : string
        バケット名
    object_key : string
        オブジェクト名
        
    Note
    ------
    当スクリプトはトレンドマイクロ社が提供するcloudone-filestorage-pluginsを参考にしています。
    https://github.com/trendmicro/cloudone-filestorage-plugins/tree/master/post-scan-actions/aws-python-promote-or-quarantine
    """
    url = urllib.parse.urlparse(url_string)
    if re.fullmatch(S3_DOMAIN_PATTERN, url.netloc):
        bucket = url.path.split('/')[1]
        s3_object = '/'.join(url.path.split('/')[2:])
    else:
        bucket = url.netloc.split('.')[0]
        s3_object = url.path[1:]
    object_key = urllib.parse.unquote_plus(s3_object)
    return bucket, object_key

def lambda_handler(event, context):
    for record in event['Records']:
        message = json.loads(record['Sns']['Message'])
        
        # スキャンの状態が0以外の場合は処理スキップ
        if message['scanner_status'] != 0:
            print('Skip: ', message['scanner_status_message'])
            continue
        
        # 異常ファイルかの判定、正常の場合はスキップ
        scanning_result = message['scanning_result']
        findings = scanning_result.get('Findings')
        if not findings :
            print('Skip: no issues found')
            continue

        # S3のオブジェクトURLを分解
        src_bucket, object_key = parse_s3_object_url(message['file_url'])
        
        # S3のオブジェクト削除
        s3_client.delete_object(Bucket=src_bucket, Key=object_key)

作成後は、エンドポイントの情報が必要になるため[関数のARN]の値を取得します。

関数のARN

SNSの設定

C1FSSを設定した際にSNSに「Storage-TM-FileStorageSecurity-ScanResultTopic-XXXXX」というトピックが生成されます。
詳細画面から[サブスクリプション]タグを開き、[サブスクリプション]の作成をクリックします。

SNSの設定1

サブスクリプションの作成にて下記項目を入力して[サブスクリプション]の作成をクリックします。

  • プロトコル: Lambdaを選択
  • エンドポイント: 上記で取得したLambda関数のARNを指定

SNSの設定2

上記にて設定操作が完了になります。
異常ファイルをS3にアップロードしてみてファイルが消える事を確認します。

オブジェクトのタグに埋め込んだIDを取得してみる

システムにてファイルアップロードをおこなった際に、データベースにファイルの情報を書き込んだりするケースがあるかと思います。
C1FSSで異常ファイルを確認した場合、データベース側にも更新をかけたいという要望があると思います。
今回、オブジェクトのタグにfile_id(データベース照会用のID)を設定して値を取得する方法を試してみました。
※サンプルコードはデータベースへの更新処理は含んでおりません。

Lambda処理の準備

削除処理と同様にSNSから受け取るメッセージを元に対象の判定確認とタグ情報を参照する関数を構築します。
関数のランタイムはPython3.9にておこないます。

■ 判定確認・タグ参照処理 サンプルコード

import boto3
import re
import json
import urllib.parse

s3_client = boto3.client('s3')

S3_DOMAIN_PATTERN = 's3(\..+)?\.amazonaws.com'

def parse_s3_object_url(url_string):
    """
    S3バケットURLを分解してバケット名とオブジェクト名を取得
    
    Parameters
    ----------
    url_string : string
        S3バケットURL

    Returns
    -------
    bucket : string
        バケット名
    object_key : string
        オブジェクト名
        
    Note
    ------
    当スクリプトはトレンドマイクロ社が提供するcloudone-filestorage-pluginsを参考にしています。
    https://github.com/trendmicro/cloudone-filestorage-plugins/tree/master/post-scan-actions/aws-python-promote-or-quarantine
    """
    url = urllib.parse.urlparse(url_string)
    if re.fullmatch(S3_DOMAIN_PATTERN, url.netloc):
        bucket = url.path.split('/')[1]
        s3_object = '/'.join(url.path.split('/')[2:])
    else:
        bucket = url.netloc.split('.')[0]
        s3_object = url.path[1:]
    object_key = urllib.parse.unquote_plus(s3_object)
    return bucket, object_key

def lambda_handler(event, context):
    for record in event['Records']:
        message = json.loads(record['Sns']['Message'])
        
        # スキャンの状態が0以外の場合は処理スキップ
        if message['scanner_status'] != 0:
            print('Skip: ', message['scanner_status_message'])
            continue
        
        # 異常ファイルかの判定、正常の場合はスキップ
        scanning_result = message['scanning_result']
        findings = scanning_result.get('Findings')
        if not findings :
            print('Skip: no issues found')
            continue
        
        # S3のオブジェクトURLを分解
        src_bucket, object_key = parse_s3_object_url(message['file_url'])
        
        # S3オブジェクトのタグ取得
        response = s3_client.get_object_tagging(Bucket=src_bucket, Key=object_key)
        for tag_recode in response['TagSet']:

            if tag_recode['Key'] == 'file_id':
                # Amazon DynamoDBやその他処理にファイルスキャンで異常判定を連携
                # tag_recode['Value'] に識別子(ID)が入る
                print(tag_recode['Value'])

作成後は、エンドポイントの情報が必要になるため[関数のARN]の値を取得します。

SNSの設定

上記、削除処理のSNS設定と同じようにサブスクリプションを追加します。
設定後は異常ファイルをS3にアップロードして、対象関数のCloudWatch Logsにて「file_id」が出力されていることを確認します。
データベースに反映する場合は、取得した「file_id」を利用して変更処理を実装していきます。

さいごに

今回はC1FSSのスキャン結果を利用してカスタム処理をしてみました。
少しでもお客様の実現したいことの参考になればと考えております。