Cloud One File Storage Securityの検出をSlackに通知する

2022.11.11

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは。たかやまです。

今回はCloud One File Storage Security(c1fss)のウイルス検出結果をSlackに通知する方法をご紹介したいと思います。

メール向けの通知についてはこちらのブログをご確認ください。

やってみた

Webhookの作成

こちらを参考に事前にWebhook URLを用意してください。

SAMデプロイ

c1fssのslack通知のプラグインはServerless Application Repositoryで公開されています。
今回はこちらのプラグインをご紹介します。

リンク先からDeployを選択します。

以下の項目を入力していきます。
SlackChannelSlackUsernameはカスタムインテグレーション*1のIncoming Webhookでのみ有効な値になります。Slack AppでIncomming Webhookを作成した場合は入力不要になります。
1 現在非推奨

  • ScanResultTopicARN : c1fssストレージスタックのSNS Topic ARN
  • SlackChannel : slack送信先チャンネル(カスタムインテグレーションでのみ有効)
  • SlackURL : 作成したWebhook URL
  • SlackUsername : slack通知で表示するユーザ名(カスタムインテグレーションでのみ有効)

アプリケーションが作成されていればOKです!

テスト検知

アプリケーションが作成されたらこちらのテスト検知方法を実施します。

ローカルPCにウイルス対策ソフトが入っていると、検知に使うeicarファイルが削除される可能性があります。なのでここではCloudshellを使ってeicarファイルをS3にアップロードしていきます。

curl -O https://secure.eicar.org/eicar.com
aws s3 cp eicar.com s3://<YOUR_BUCKET>

ウイルス検知がされるとこのような通知が来ます。
(画像と名前はSlack側で設定しています)

供養

実はこのSAMに気づかず、自分で通知用のLambdaを書いていました...
せっかくなので作成したスクリプトを供養がてらこちらに載せておきます。

index.py

import json
import logging
import os
from datetime import datetime, timedelta, timezone

import urllib3

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

JST = timezone(timedelta(hours=+9), "JST")
http = urllib3.PoolManager()
url = os.environ.get("WEBHOOK_URL")


def lambda_handler(event, context):
    sns_json = json.loads(event["Records"][0]["Sns"]["Message"])
    logger.info(sns_json)
    payload = check_message(sns_json)
    if payload:
        post_slack(url, json.dumps(payload))
    else:
        pass


def check_message(sns_json: dict):
    file_url = sns_json["file_url"]
    timestamp = datetime.fromtimestamp(sns_json["timestamp"], JST).strftime(
        "%Y-%m-%d %H:%M:%S"
    )
    if sns_json["scanner_status"] == 0 and sns_json["scanning_result"]["Findings"]:
        logger.info("c1fss virus detected")
        findings = json.dumps(sns_json["scanning_result"]["Findings"][0], indent=2)
        return {
            "attachments": [
                {
                    "color": "#e60033",
                    "blocks": [
                        {
                            "type": "header",
                            "text": {
                                "type": "plain_text",
                                "text": ":rotating_light:Virus detected",
                                "emoji": True,
                            },
                        },
                        {
                            "type": "section",
                            "text": {
                                "type": "mrkdwn",
                                "text": f"File URL: <{file_url}|{file_url}>",
                            },
                        },
                        {
                            "type": "section",
                            "fields": [
                                {
                                    "type": "mrkdwn",
                                    "text": f"Finding:\n                                 },
                                {"type": "mrkdwn", "text": f"Timestamp:\n {timestamp}"},
                            ],
                        },
                    ],
                }
            ]
        }
    elif sns_json["scanner_status"] != 0:
        logger.info("c1fss error status")
        status_message = sns_json["scanner_status_message"]
        return {
            "attachments": [
                {
                    "color": "#e60033",
                    "blocks": [
                        {
                            "type": "header",
                            "text": {
                                "type": "plain_text",
                                "text": ":rotating_light:Error Status",
                                "emoji": True,
                            },
                        },
                        {
                            "type": "section",
                            "text": {
                                "type": "mrkdwn",
                                "text": f"File URL: <{file_url}|{file_url}>",
                            },
                        },
                        {
                            "type": "section",
                            "fields": [
                                {
                                    "type": "mrkdwn",
                                    "text": f"Error:\n {status_message}",
                                },
                                {"type": "mrkdwn", "text": f"Timestamp:\n {timestamp}"},
                            ],
                        },
                    ],
                }
            ]
        }
    else:
        logger.info("c1fss successful scan")


def post_slack(url, payload):

    try:
        res = http.request("POST", url, body=payload)
    except res.exceptions.RequestExceptioexceptions.RequestExceptionn as e:
        logger.error(e)
        raise
    else:
        logger.info({"request_result": {"status": res.status, "data": res.data}})

通知内容はこんな感じ。

ウイルス検知時以外に、エラーコード検知時も通知するようにしています。
今回ご紹介したSAMはウイルス検知時のみ通知する内容のため、c1fssエラー時にも通知したい場合は上記のようなコードをSAM内のLambdaに追記いただければと思います。

c1fssの通知形式はこちらになります。

最後に

Trend Micro製品は結構AWSでテンプレートを提供していることがあるので先に調べてみるのがよいかもしれません。使えるものは使っていきましょう!私のような悲しみをうまないために。。

以上、たかやま(@nyan_kotaroo)でした。