【F-Secure Radar API + CodePipeline】Webスキャンの実行と結果判定を自動化してみた

F-Secure Radarが提供しているF-Secure Radar APIを使用して、CodePipelineにWebスキャンを組み込んでみました。 本記事ではその手順を記していきます。
2019.09.09

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

以前、F-secure Radar の API を使って脆弱性診断を行う記事を2本ほど書きました。

F-Secure Radar API を CodePipeline に組み込んで脆弱性診断を自動実行してみた

【F-Secure Radar API + CodePipeline】脆弱性診断の実行から結果取得、判定まで全自動でやってみた

その時はシステムスキャンを実行しましたが、今回は Web スキャンを実行してみましたので記事にしていきます。

Web スキャンを CI/CD パイプラインに組み込む事によってアプリケーションをデプロイする度に脆弱性診断を実施することが出来るようになります。

やること

構成図としては以下になります。

基本的な流れは以前 と同じですが、CodeDeploy でデプロイを実行した後に Web スキャンを実行します。

使用するAPI

3つの API を使用します。

その他の API は以下で確認出来ますのでお時間ある方は是非ご参照ください。swaggerで記述されておりとても見やすいです。

F-Secure Radar API

PUT /api/latest/vulnerabilityscans/{scanId}/start

指定した ID のスキャンを開始します。システムスキャンと共通で使用できます。

Start the vulnerability scan

GET /api/latest/vulnerabilityscans/{scanId}/info

指定した ID のスキャン情報を取得します。こちらもシステムスキャンと共通で使用できます。

Get the scan information

GET /api/latest/webscans/{scanId}/reports/{reportId}/statistics/vulnerabilitiesandfindings

スキャン ID とレポート ID を指定し、検出された脆弱性の個数などを取得します。こちらはシステムスキャンとは API が異なります。

Get Web Scans vulnerabilities and findings

やってみた

事前準備

事前準備として、以下を実施しておきます。

  • F-Secure Radar のスキャン設定
  • F-Secure Radar API キーの取得
  • CodePipeline で基本的なパイプライン作成

詳細な手順は以下を参照ください。

F-Secure Radar API を CodePipeline に組み込んで脆弱性診断を自動実行してみた

チュートリアル: シンプルなパイプラインを作成する (CodeCommit リポジトリの場合)

Lambda 関数の作成

invoke-web-scan という名前で以下の Lambda を作成しました。

import os
import urllib.request
import boto3
import json

code_pipeline = boto3.client('codepipeline')

APIACCESSKEY = os.environ['ApiAccessKey']
APISECRETKEY = os.environ['ApiSecretKey']

WEBSCANID = os.environ['WebScanId']

THRESHOLD_HIGH = os.environ['ThresholdHigh']
THRESHOLD_MEDIUM = os.environ['ThresholdMedium']
THRESHOLD_LOW = os.environ['ThresholdLow']
THRESHOLD_INFORMATION = os.environ['ThresholdInformation']

def call_api(url, method, headers):
    # APIを呼び出す
    req = urllib.request.Request(
        url,
        method=method,
        headers=headers
        )
    with urllib.request.urlopen(req) as res:
        return res.read().decode('utf-8')

def continue_job(jobId, previousReportId):
    # continuationTokenを返却してジョブを継続
    print('Job continue')
    continuationTokenOut = json.dumps(
        {
            'previousJobId' : jobId,
            'previousReportId' : previousReportId
        }
    )
    code_pipeline.put_job_success_result(jobId=jobId, continuationToken=continuationTokenOut)

def judge_result(jobId, result):
    # スキャン結果を判定
    print('Judge Result')
    print(str(result))
    for item in result:
        print(str(item))
        if item['VulnRiskLevel'] == "High" and item['TotalCount'] > int(THRESHOLD_HIGH):
            code_pipeline.put_job_failure_result(jobId=jobId, failureDetails={'message': 'Failed due to number of vulnerabilities.(Type: High)', 'type': 'JobFailed'})
            return
        elif item['VulnRiskLevel'] == "Medium" and item['TotalCount'] > int(THRESHOLD_MEDIUM):
            code_pipeline.put_job_failure_result(jobId=jobId, failureDetails={'message': 'Failed due to number of vulnerabilities.(Type: Medium)', 'type': 'JobFailed'})
            return
        elif item['VulnRiskLevel'] == "Low" and item['TotalCount'] > int(THRESHOLD_LOW):
            code_pipeline.put_job_failure_result(jobId=jobId, failureDetails={'message': 'Failed due to number of vulnerabilities.(Type: Low)', 'type': 'JobFailed'})
            return
        elif item['VulnRiskLevel'] == "Information" and item['TotalCount'] > int(THRESHOLD_INFORMATION):
            code_pipeline.put_job_failure_result(jobId=jobId, failureDetails={'message': 'Failed due to number of vulnerabilities.(Type: Information)', 'type': 'JobFailed'})
            return

    # 各レベルに設定したしきい値を超えなければOK
    code_pipeline.put_job_success_result(jobId=jobId)

def lambda_handler(event, context):
    print('----------')
    print('Start Function')
    try:
        jobId = event['CodePipeline.job']['id']
        jobData = event['CodePipeline.job']['data']

        # スキャンID取得
        scanId = WEBSCANID

        # continuationTokenの有無をチェック
        if 'continuationToken' in jobData:
            # スキャン情報を取得
            url = "https://api.radar.f-secure.com/api/latest/vulnerabilityscans/" + scanId + "/info"
            headers = {
                "Content-Type": "application/json",
                "ApiAccessKey": APIACCESSKEY,
                "ApiSecretKey": APISECRETKEY
            }
            res = call_api(url, "GET", headers)

            # レスポンスから'LastReportId'を取り出す
            data = json.loads(res)
            reportId = data['LastReportId']

            # continuationTokenから'previousReportId'を取り出す
            continuationTokenIn = json.loads(jobData['continuationToken'])
            previousReportId = continuationTokenIn['previousReportId']

            print("reportId:" + str(reportId) + ", previousReportId:" + str(previousReportId))

            """LastReportIdとpreviousReportIdを比較
            スキャンが完了するとLastReportIdの値が更新されるため、
            LastReportIdとpreviousReportIdを比較して異なる場合はスキャンが完了したものと判定する
            """
            if previousReportId == reportId:
                # 値が同じ場合はジョブ継続
                continue_job(jobId, previousReportId)
            else:
                # スキャン結果を取得
                print('Get ScanResult')
                url = "https://api.radar.f-secure.com/api/latest/webscans/" + scanId + "/reports/" + reportId + "/statistics/vulnerabilitiesandfindings"
                headers = {
                    "Content-Type": "application/json",
                    "ApiAccessKey": APIACCESSKEY,
                    "ApiSecretKey": APISECRETKEY
                }
                res = call_api(url, "GET", headers)

                # 結果判定
                judge_result(jobId, json.loads(res))

                print('Finish Scan')
        else:
            print('Start Scan')
            # スキャン開始
            url = "https://api.radar.f-secure.com/api/latest/vulnerabilityscans/" + scanId + "/start"
            headers = {
                "Content-Type": "application/json",
                "ApiAccessKey": APIACCESSKEY,
                "ApiSecretKey": APISECRETKEY
            }

            res = call_api(url, "PUT", headers)

            data = json.loads(res)
            lastReportId = data['LastReportId']

            continue_job(jobId, lastReportId)

    except Exception as e:
        # 予期せぬエラー
        print(e)
        code_pipeline.put_job_failure_result(jobId=jobId, failureDetails={'message': 'Function failed due to exception.', 'type': 'JobFailed'})

    print('Finish Function')
    print('----------')
    return

システムスキャンの時 とほぼ同じコードの為説明は割愛します。

Lambda を実行するロールに CodePipeline へのアクセスを許可するポリシーをつけるのをお忘れなく。

 

API キーや、結果判定時に使うための値は環境変数にセットしています。

パイプラインの編集

CodePipeline のコンソール画面より、[編集する]→[ステージを追加する]にて以下アクションを追加します。

 

以下の様にデプロイ後にスキャンが実行されるパイプラインになります。

動かしてみる

では、パイプラインを動かしていきます。

ソースを更新してプッシュすると、パイプラインが動きます。

 

少し待つと InvokeWebScan が進行中になります。スキャンには時間がかかるのでしばらく待ちましょう。

 

しばらくすると、InvokeWebScan が完了しました。

Lambda の環境変数に設定しているしきい値を超える脆弱性は検出されなかった様です。

 

メール通知の設定もしていたので、結果がメールで届いていました。Low が一個だけ検出された様です。

本筋とは関係ありませんが、設定するだけでメールでお手軽に結果が確認できるのは嬉しいポイントです。

 

上記コードで結果取得 API で取得した値をログに吐き出しているため、CloudWatch でも同様の内容が確認できました。

同じく Low が1つですね。

まとめ

今回はF-secure Radar の API を使った Web スキャンを試してみました。

アプリケーションをデプロイする度に脆弱性診断が自動実行できるのは非常に嬉しいポイントだと思いますので、興味ある方は是非パイプラインに組み込んでみては如何でしょうか。

F-Secure Radar API は非常に数があるため、触っていて面白いです。

 

本記事がどなたかのお役にたてば幸いです。

以上、AWS 事業本部の大前でした。