「VirusTotal」でドメインの健全性を定期的に自動でチェックする

2020.04.13

はじめに

こんにちは。大阪オフィスの林です。

「VirusTotal」とはファイルやウェブサイトのマルウェア検査を行うウェブサイトです。ファイルをVirusTotalにアップロードしたりウェブサイトのURLを指定すれば、そのファイルやウェブサイトが「マルウェアを含むかどうか」検査することが出来ます。今回はドメインに対しての健全性をチェックするような仕組みを作る機会がありましたので実装までの内容をまとめておきたいと思います。

構成概要

今回の構成概要をご説明します。詳細は後述します。

① Lambda呼び出し

今回はCloudWatchEventsのスケジュール実行をトリガーとしてLambda(Python3)を呼び出すよう実装します。

② ドメインのリストを取得

チェックしたいドメイン名のリストを予めS3に格納しておきます。取得したリストの先頭から順にドメイン名を取得し、次工程以降のチェックのパラメータとします。

③ APIキック

「VirusTotal」には『URLのチェック』や『ドメインのチェック』を行うための各種APIが用意されています。今回は「②」で取得したドメインをパラメータにドメインのチェックを行うAPIをキックします。

④ レポートを取得

実行したAPIのレポートを取得します。

⑤ レポートをチェック

取得したレポートをチェックします。

⑥ ログを格納

取得したレポートをS3に格納します。

⑦ 異常時通知

取得したレポートに異常が見受けられる場合、SNSへ通知します。

⑧ メール通知

SNSTopicに紐づくメールアドレスに通知を行います。


ザックリ概要はこんなところです。

事前準備

「VirusTotal」に登録

APIを使用するためには予めVirusTotalでユーザー登録を行う必要があります。
VirusTotalのトップページにアクセスし、画面右上の「Sign in」を選択します。

画面左下の「New? Join the community」を選択します。

必要事項を入力後、「Join us」を選択します。

上記登録時に設定したメールアドレス宛に確認メールが届くのでリンクをクリックします。

ここまでで登録自体は完了ですが、APIKEYの確認方法まで続けて説明します。ログインした状態で画面右上の人型アイコンをクリックし、「API key」を選択します。

APIKeyが表示されます。

登録直後は『PublicAPI』というユーザーレベルでAPIを使用することが出来ます。『PublicAPI』はフリーのプランなのですが下記のような制限があります。制限がネックにならない場合はそのまま『PublicAPI』で使って頂いて問題ないかと思います。今回の実装でも『PublicAPI』のままで進めます。

    期間
    APIリクエスト上限
    1分間 4回
    1日 1,000回
    1か月 30,000回

※20200517追記

APIv3での実装は下記をご参照ください。

Lambdaのコード作成

今回「python3.7」を使ってコードを作成していきます。コードの全体は本記事の最後に載せておきますので、本項ではポイントとなる部分だけ説明します。

・使用したライブラリ

標準ライブラリの他、「requests」および「requests」の依存関係のライブラリと「pytz」はzipで予めアップロードしています。

import os
import json
import boto3
import logging
import requests
import re
import time
from pytz import timezone
from datetime import datetime

・環境変数の利用

ハードコードしたくないような値はLambdaの環境変数として指定しておきます。

apikey = os.environ['apikey']
snstopic = os.environ['snstopic']
bucket_name = os.environ['bucket_name']
domainlist = os.environ['domainlist']

・ドメインのリストを取得

S3のオブジェクトを取得し、for domain in domains:で1行ずつdomainという変数に格納してループさせています。

s3 = boto3.client('s3')
response = s3.get_object(Bucket=bucket_name, Key=domainlist)
body = response['Body'].read()
bodystr = body.decode('utf-8')
domains = bodystr.split("\n")
for domain in domains:
 (処理)

・APIキック&レポートを取得

上述したとおり、フリープランである『PublicAPI』では1分間のAPIリクエストが4回に制限されています。チェック自体は数秒で完結するため、チェックするドメインが4つ以下であれば全く問題ないのですが4つより多くのドメインをチェックしようとすると、1分以内に5回目のAPIをキックすることになり、5つ目のドメインチェックからエラーとなってしまいます。そのための回避方法として今回は4つ目および4の倍数の行を処理した後は1分間のWailを持たせることで制限によるエラーを回避させることとしました。Lambdaは実行時間による課金でもありますが微々たるものなので、今回は実行時間課金は許容することとします。

i = 1
if i % 4 != 0:
   (処理)
   i = i + 1
else:
   (処理)
   i = i + 1
   time.sleep(60)

・レポートのチェック&異常時通知

今回2つの観点で異常を判定することとしました。

  • リクエストの失敗
  • positivesの値
  • 1つ目のリクエストの失敗はそのままなのですが、APIをキックしリクエストが正常に完了すると"response_code": 1が返されます。まずはこの"response_code": 1が含まれているかどうかを見て正常に処理が進められたかどうかを判定します。

    pt = '"response_code": 1'
    if pt in response.text:
      (処理)
    else:
      response = client.publish(
      TopicArn = snstopic,
      Message = 'ドメイン名「' + domain + '」はリクエスト失敗',
      Subject = 'ドメイン名「' + domain + '」はリクエスト失敗'
      )
      key = 'Log/' + Shapjst_now  + '/RequestfailedDomain/' + domain + '.log'
    

    次に、positivesですが、色々と調べるとこのpositivesは、アンチウイルスソフトが異常を検知した数を表しているようなのでこの値が「0(正常)」か「0以外(異常)」に応じて正常か異常かを判定することとしました。※この値での異常検知はもう少し深堀が必要かもしれませんが一旦positivesで判定することとします。

    if re.search('"positives": [^0]' , response.text):
      response = client.publish(
        TopicArn = snstopic,
        Message = 'ドメイン名「' + domain + '」は怪しい',
        Subject = 'ドメイン名「' + domain + '」は怪しい'
        )
    else:
      (処理)
    

    ・ログを格納

    #異常の判定に応じてS3のプレフィックスを変える。
    (判定処理)
      (判定A)
      key = 'Log/' + Shapjst_now  + '/BadDomain/' + domain + '.log'
      (判定B)
      key = 'Log/' + Shapjst_now  + '/GoodDomain/' + domain + '.log'
      (判定C)
      key = 'Log/' + Shapjst_now  + '/RequestfailedDomain/' + domain + '.log'
    
    obj = s3.Object(bucket_name,key)
    obj.put( Body = json.dumps(j, indent=4) )
    

    CloudWatchEventsを設定

    CloudWatchEventsからトリガー設定を行います。

    イベントソースから「スケジュール」を選択し、Cronの式を入れます。Cron式はUTC時間で指定します。次にターゲットで作成したLambda関数を指定します。

    実行してみる

    CloudWatchEventsで任意の実行日時を設定しておきます。実行時間になるとLambdaが実行されます。結果としてはS3のログと異常時に通知されたメールから確認することが出来ます。

    ログの確認

    指定したバケットを参照します。指定したバケットに「Log」フォルダが作成されていることが分かります。「Log」の中身を見ていきます。

    実行された日時のフォルダ作成されていることが分かります。中身を見ていきます。

    正常/異常の判定に応じて、フォルダが作成されていることが分かります。

    ここでは「BadDomain」のログを見ていきたいと思います。

    positivesが「0以外(異常)」であることが分かります。

    メールの確認

    異常なドメインと判定されたチェック結果はメールでも通知されます。

    まとめ

    今回はドメインに対してのレポート取得やチェックを実施しましたが、他にも幾つかAPIが用意されているので、他のAPIを触ってみるのも面白そうですね!

    以上、大阪オフィスの林がお送りしました!

    コード

    もっと最適な書き方があるかもしれませんが大目に見て頂ければと思います(。•́︿•̀。)

    import os
    import json
    import boto3
    import logging
    import requests
    import re
    import time
    from pytz import timezone
    from datetime import datetime
    
    def lambda_handler(event, context):
        logger = logging.getLogger()
        logger.setLevel(logging.INFO)
    
        apikey = os.environ['apikey']
        snstopic = os.environ['snstopic']
        bucket_name = os.environ['bucket_name']
        domainlist = os.environ['domainlist']
        
        s3 = boto3.client('s3')
        
        
        response = s3.get_object(Bucket=bucket_name, Key=domainlist)
        body = response['Body'].read()
    
        url = 'https://www.virustotal.com/vtapi/v2/domain/report'
        client = boto3.client('sns')
                
        bodystr = body.decode('utf-8')
        domains = bodystr.split("\n")
    
        i = 1
    
        pt = '"response_code": 1'
        
        s3 = boto3.resource('s3') 
        utc_now = datetime.now(timezone('UTC'))
        jst_now = utc_now.astimezone(timezone('Asia/Tokyo'))
        Shapjst_now = jst_now.strftime('%Y-%m-%d-%H-%M')
        
        for domain in domains:
            if i % 4 != 0:
                domain = domain.replace('\r', '')
                params = {'apikey':apikey,'domain':domain}
                response = requests.get(url, params=params)
                j = response.json()
                if pt in response.text:
                    if re.search('"positives": [^0]' , response.text):
                        response = client.publish(
                            TopicArn = snstopic,
                            Message = json.dumps(j, indent=4),
                            Subject = 'ドメイン名「' + domain + '」は怪しい'
                        )
                        key = 'Log/' + Shapjst_now  + '/BadDomain/' + domain + '.log'
                    else:
                        key = 'Log/' + Shapjst_now  + '/GoodDomain/' + domain + '.log'
                else:
                    response = client.publish(
                        TopicArn = snstopic,
                        Message = json.dumps(j, indent=4),
                        Subject = 'ドメイン名「' + domain + '」はリクエスト失敗'
                    )
                    key = 'Log/' + Shapjst_now  + '/RequestfailedDomain/' + domain + '.log'
                obj = s3.Object(bucket_name,key)
                obj.put( Body = json.dumps(j, indent=4) )
                i = i + 1
            else:
                domain = domain.replace('\r', '')
                params = {'apikey':apikey,'domain':domain}
                response = requests.get(url, params=params)
                j = response.json()
                if pt in response.text:
                    if re.search('"positives": [^0]' , response.text):
                        response = client.publish(
                            TopicArn = snstopic,
                            Message = json.dumps(j, indent=4),
                            Subject = 'ドメイン名「' + domain + '」は怪しい'
                        )
                        key = 'Log/' + Shapjst_now  + '/BadDomain/' + domain + '.log'
                    else:
                        key = 'Log/' + Shapjst_now  + '/GoodDomain/' + domain + '.log'
                else:
                    response = client.publish(
                        TopicArn = snstopic,
                        Message = json.dumps(j, indent=4),
                        Subject = 'ドメイン名「' + domain + '」はリクエスト失敗'
                    )
                    key = 'Log/' + Shapjst_now  + '/RequestfailedDomain/' + domain + '.log'
                obj = s3.Object(bucket_name,key)
                obj.put( Body = json.dumps(j, indent=4) )
                i = i + 1
                time.sleep(60)