Cloud One File Storage Securityを使ったスキャンで検知した時にメール通知したい

2024.02.01

こんにちは、シマです。
皆さんはCloud One File Storage Security(以降C1FSS)と噛まずに言えますか?私は言えません。今回は、C1FSSでスキャンした結果、悪意のあるファイルと判定された場合にだけメール通知する方法について記載します。

C1FSSとは

トレンドマイクロ社が提供するセキュリティサービスの1つで、Amazon S3等のファイルに対してマルウェアスキャン機能を提供してくれます。詳細については以下の記事がとてもわかりやすくまとまっていますのでご参照ください。

今回の構成

今回の構成は以下のようになります。
図中の上枠内はC1FSSの実装部分になるため、以下により構築は済んでいる前提です。

その環境に対して構成図下枠内を追加していきます。

設定追加

IAMポリシー追加

LambdaにSNSをPublishする権限を追加するためにポリシーを作成します。
AWS管理コンソールのIAM画面から「ポリシー」→「ポリシーの作成」を押下します。
サービスは「SNS」を選択し、書き込み内にある「Publish」にチェックをつけ、リソースでは「すべて」を選択し、「次へ」を押下します。

任意のポリシー名を入力し、「ポリシーの作成」ボタンを押下します。

SNSトピック作成

メール送信のためにSNSトピックを作成します。
AWS管理コンソールのSNS画面から「トピック」→「トピックの作成」を押下します。

「スタンダード」を選択し、任意の名前を入力したら、「トピックの作成」を押下します。

「サブスクリプションの作成」を押下します。

プロトコルは「Eメール」を選択し、エンドポイントにメールアドレスを入力し、「サブスクリプションの作成」を押下します。

メールアドレス宛にサブスクリプションの確認メールが来ているので、「Confirm Subscription」をクリックしておきます。

Lambda関数作成

実際に処理をするLambda関数を作成していきます。
AWS管理コンソールのLambda画面から「関数の作成」を押下します。
任意の関数名を入力し、ランタイムで「Python 3.12」を選択し、「関数の作成」を押下します。
「トリガーを追加」をクリックします。
ソースはSNSを指定し、SNS topicでは「Storage-TM-FileStorageSecurity-ScanResultTopic」から始まるものを指定し、追加を押下します。
上記SNS topicはC1FSS構築(ストレージ登録)時に作成されるものなので、厳密にはC1FSS構築(ストレージ登録)をしたCloudFormationの対象スタックにて、「ScanResultTopicARN」として出力されているものを選択します。

Lambdaの画面から、「設定」→「アクセス権限」を選択し、表示されているロール名をクリックします。

表示されたロールの画面で「許可を追加」→「ポリシーをアタッチ」をクリックします。

本手順の最初に作成したIAMポリシーを選択し、「許可を追加」を押下します。

Lambdaの画面に戻り、「コード」から実行するためのコードを入力し、「Deploy」を押下します。サンプルコードは以下に記載します。

lambda_function.py
import json
import boto3
import re
import urllib.parse

sns_client = boto3.client('sns')
topic_arn = "arn:aws:sns:ap-northeast-1:【AWSアカウントID】:xxxxxxxx"

S3_DOMAIN_PATTERN = 's3(\..+)?\.amazonaws.com'

def parse_s3_object_url(url_string):
    url = urllib.parse.urlparse(url_string)

    if re.fullmatch(S3_DOMAIN_PATTERN, url.netloc):
        bucket = url.path.split('/')[1]
        s3_object = '/'.join(url.path.split('/')[2:])
    else:
        bucket = url.netloc.split('.')[0]
        s3_object = url.path[1:]
    object_key = urllib.parse.unquote_plus(s3_object)

    return bucket, object_key

def lambda_handler(event, context):
    for record in event['Records']:
        message = json.loads(record['Sns']['Message'])

        if message['scanner_status'] != 0:
            print('Skip: ', message['scanner_status_message'])
            continue
        
        src_bucket, object_key = parse_s3_object_url(message['file_url'])

        scanning_result = message['scanning_result']
        findings = scanning_result.get('Findings')

        if findings:
            mail_title = "[testC1FSS] 検知しました"
            message = f"検知しました。\nバケット名: {src_bucket}\nファイル名: {object_key}\n"
            for fd in findings:
                malware = fd.get('malware')
                type = fd.get('type')
                message += f"malware: {malware}\ntype: {type}\n"

            mail = sns_client.publish(
                    TopicArn=topic_arn,
                    Subject=mail_title,    
                    Message=message
                    )

    return {
        'statusCode': 200
    }

※コード内7行目の「topic_arn」は手順「SNSトピック作成」で作成したものに書き換えます。

試してみた

対象S3バケットへ無害なテキストファイルと、有害なファイルとしてEicarテストファイルを格納して挙動を確認しました。無害なテキストファイル格納時にはメール送信は行われず、Eicarテストファイル格納時には以下のようにメールが送信されることを確認しました。

最後に

今回はC1FSSで検知した時にメール通知する設定について記載しました。
スキャン結果に関わらずJSON形式でメールを通知する方式やSlackに通知する方式は記事になっていましたが、本ケースは記事が見つからなかったので記事にしました。

本記事がどなたかのお役に立てれば幸いです。