この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Let's 脆弱性診断、大前です。
皆さん、F-Secure Radar 使ってますでしょうか?
前回のあらすじ
先日、F-Secure Radar API を使った記事を書いてみました。
上記記事で作った仕組みは以下になります。
細かい動きは上記記事を参照いただければと思いますが、ざっくりとした流れはこうです。
- ソースの変更をプッシュすると事前に定義しておいた脆弱性診断が自動で実行される
- 脆弱性診断が終わったら結果を確認し、デプロイしても問題ないと判断したら CodePipeline 上で承認作業を行う
- 変更がデプロイされる
API を呼び出す Lambda を書いてあげれば簡単に作れる仕組みですので、是非試してみてください。
本エントリのゴール
今回はもう一手間かけて、脆弱性診断結果の確認とデプロイ要否の判断も自動化してみようと思います。
言い換えると、「ソースの変更をプッシュするだけで脆弱性診断→結果の確認→デプロイ実施or中止まで全自動でやってくれる仕組み」です。
脆弱性診断が終わるのを待って結果を見にいったりしなくて良いのは最高ですね。
今回やりたい仕組み
今回やりたい仕組みは以下です。
ユーザがソースを変更したら CodePipeline が起動してスキャン開始 API を叩くまでは前回と同じですが、その先が異なります。
- スキャン開始 API を実行した Lambda は continuationToken を CodePipeline に返却
- CodePipeline から同じ Lambda 関数が再度実行される
- スキャン開始後はスキャン情報取得 API を実行し、スキャンが終わっているかどうかをチェックする。まだ実行中であれば continuationToken を返却
- 2と3をスキャンが終了するまで繰り返す
- スキャンが終了したらスキャン結果に対して判定を行い、「success」もしくは「failed」のステータスを CodePipeline に返却する
Lambda からレスポンスを返却する際、continuationToken と呼ばれる文字列をレスポンスに含めることにより、同じ Lambda 関数を繰り返し呼び出すことが出来ます。
詳細は以下を参照ください。
CodePipeline で パイプラインに AWS Lambda 関数を呼び出す
やってみる
それでは実際にやってみます。
基本的な CodePipeline の設定や F-Secure Radar 上でのスキャン設定は前回のものを使いまわしますので、詳細な設定は前回の記事をご覧ください。
やることは以下です。
- Lambda 関数の作成
- CodePipeline の修正
- テスト
1. Lambda 関数の作成
以下の Lambda 関数を作成しました。前回の記事と同じく、CodePipeline に関するポリシーをアタッチすることもお忘れなく。
ポイントとなる箇所だけ説明していきます。
import os
import urllib.request
import boto3
import json
code_pipeline = boto3.client('codepipeline')
APIACCESSKEY = os.environ['ApiAccessKey']
APISECRETKEY = os.environ['ApiSecretKey']
WEBSCAN = "webscans"
WEBSCANID = os.environ['WebScanId']
SYSTEMSCAN = "systemscans"
SYSTEMSCANID = os.environ['SystemScanId']
THRESHOLD_HIGH = os.environ['ThresholdHigh']
THRESHOLD_MEDIUM = os.environ['ThresholdMedium']
THRESHOLD_LOW = os.environ['ThresholdLow']
THRESHOLD_INFORMATION = os.environ['ThresholdInformation']
def get_scan_id(scantype):
# CodePipelineから渡されるユーザーパラメータでスキャンタイプを判別し返却
if scantype == WEBSCAN:
scanId = WEBSCANID
elif scantype == SYSTEMSCAN:
scanId = SYSTEMSCANID
else:
# 予期していないパラメータ値が渡された場合は失敗にする
code_pipeline.put_job_failure_result(jobId=jobId, failureDetails={'message': 'Function failed due to unexpected parameter.', 'type': 'JobFailed'})
return
return scanId
def call_api(url, method, headers):
# APIを呼び出す
req = urllib.request.Request(
url,
method=method,
headers=headers
)
with urllib.request.urlopen(req) as res:
return res.read().decode('utf-8')
def continue_job(jobId, previousReportId):
# continuationTokenを返却してジョブを継続
print('Job continue')
continuationTokenOut = json.dumps(
{
'previousJobId' : jobId,
'previousReportId' : previousReportId
}
)
code_pipeline.put_job_success_result(jobId=jobId, continuationToken=continuationTokenOut)
def judge_result(jobId, result):
# スキャン結果を判定
print('Judge Result')
for item in result:
if item['VulnRiskLevel'] == "High" and item['TotalCount'] > int(THRESHOLD_HIGH):
code_pipeline.put_job_failure_result(jobId=jobId, failureDetails={'message': 'Failed due to number of vulnerabilities.(Type: High)', 'type': 'JobFailed'})
return
elif item['VulnRiskLevel'] == "Medium" and item['TotalCount'] > int(THRESHOLD_MEDIUM):
code_pipeline.put_job_failure_result(jobId=jobId, failureDetails={'message': 'Failed due to number of vulnerabilities.(Type: Medium)', 'type': 'JobFailed'})
return
elif item['VulnRiskLevel'] == "Low" and item['TotalCount'] > int(THRESHOLD_LOW):
code_pipeline.put_job_failure_result(jobId=jobId, failureDetails={'message': 'Failed due to number of vulnerabilities.(Type: Low)', 'type': 'JobFailed'})
return
elif item['VulnRiskLevel'] == "Information" and item['TotalCount'] > int(THRESHOLD_INFORMATION):
code_pipeline.put_job_failure_result(jobId=jobId, failureDetails={'message': 'Failed due to number of vulnerabilities.(Type: Information)', 'type': 'JobFailed'})
return
# 各レベルに設定したしきい値を超えなければOK
code_pipeline.put_job_success_result(jobId=jobId)
def lambda_handler(event, context):
print('----------')
print('Start Function')
try:
# スキャンタイプのセット
jobId = event['CodePipeline.job']['id']
jobData = event['CodePipeline.job']['data']
# スキャンID取得
scanId = get_scan_id(event['CodePipeline.job']['data']['actionConfiguration']['configuration']['UserParameters'])
# continuationTokenの有無をチェック
if 'continuationToken' in jobData:
# スキャン情報を取得
url = "https://api.radar.f-secure.com/api/latest/vulnerabilityscans/" + scanId + "/info"
headers = {
"Content-Type": "application/json",
"ApiAccessKey": APIACCESSKEY,
"ApiSecretKey": APISECRETKEY
}
res = call_api(url, "GET", headers)
# レスポンスから'LastReportId'を取り出す
data = json.loads(res)
reportId = data['LastReportId']
# continuationTokenから'previousReportId'を取り出す
continuationTokenIn = json.loads(jobData['continuationToken'])
previousReportId = continuationTokenIn['previousReportId']
print("reportId:" + str(reportId) + ", previousReportId:" + str(previousReportId))
"""LastReportIdとpreviousReportIdを比較
スキャンが完了するとLastReportIdの値が更新されるため、
LastReportIdとpreviousReportIdを比較して異なる場合はスキャンが完了したものと判定する
"""
if previousReportId == reportId:
# 値が同じ場合はジョブ継続
continue_job(jobId, previousReportId)
else:
# スキャン結果を取得
print('Get ScanResult')
url = "https://api.radar.f-secure.com/api/latest/systemscans/" + scanId + "/reports/" + reportId + "/vulnerabilitiesandfindings/counters"
headers = {
"Content-Type": "application/json",
"ApiAccessKey": APIACCESSKEY,
"ApiSecretKey": APISECRETKEY
}
res = call_api(url, "GET", headers)
# 結果判定
judge_result(jobId, json.loads(res))
print('Finish Scan')
else:
print('Start Scan')
# スキャン開始
url = "https://api.radar.f-secure.com/api/latest/vulnerabilityscans/" + scanId + "/start"
headers = {
"Content-Type": "application/json",
"ApiAccessKey": APIACCESSKEY,
"ApiSecretKey": APISECRETKEY
}
res = call_api(url, "PUT", headers)
data = json.loads(res)
lastReportId = data['LastReportId']
continue_job(jobId, lastReportId)
except Exception as e:
# 予期せぬエラー
print(e)
code_pipeline.put_job_failure_result(jobId=jobId, failureDetails={'message': 'Function failed due to exception.', 'type': 'JobFailed'})
print('Finish Function')
print('----------')
return
スキャン開始 API の呼び出し
前回と同じです。今回は他にも API を叩くため、API 呼び出し処理を外出ししています。
print('Start Scan')
# スキャン開始
url = "https://api.radar.f-secure.com/api/latest/vulnerabilityscans/" + scanId + "/start"
headers = {
"Content-Type": "application/json",
"ApiAccessKey": APIACCESSKEY,
"ApiSecretKey": APISECRETKEY
}
res = call_api(url, "PUT", headers)
def call_api(url, method, headers):
# APIを呼び出す
req = urllib.request.Request(
url,
method=method,
headers=headers
)
with urllib.request.urlopen(req) as res:
return res.read().decode('utf-8')
continuationToken の返却
スキャン開始 API のレスポンスから "LastReportId" を取得し、CodePipeline から渡される jobId と一緒に continuationToken として返却します。
"LastReportId" は「最後に実行したスキャンのレポートID」であり、後でスキャンが完了したかを調べる際に使いたいので continuationToken に含ませています。
data = json.loads(res)
lastReportId = data['LastReportId']
continue_job(jobId, lastReportId)
def continue_job(jobId, previousReportId):
# continuationTokenを返却してジョブを継続
print('Job continue')
continuationTokenOut = json.dumps(
{
'previousJobId' : jobId,
'previousReportId' : previousReportId
}
)
code_pipeline.put_job_success_result(jobId=jobId, continuationToken=continuationTokenOut)
スキャン情報の取得
スキャンの情報を取得する API を実行します。API 仕様については以下を参照ください。
# スキャン情報を取得
url = "https://api.radar.f-secure.com/api/latest/vulnerabilityscans/" + scanId + "/info"
headers = {
"Content-Type": "application/json",
"ApiAccessKey": APIACCESSKEY,
"ApiSecretKey": APISECRETKEY
}
res = call_api(url, "GET", headers)
スキャンが終了しているかチェック
スキャン情報取得 API のレスポンスより "LastReportId" を取り出し、continuationToken に含まれている ID と比較します。
スキャンが完了すると "LastReportId" が更新されるため、値が異なっていた場合にはスキャンが完了したと判断する事が出来ます。
値が同じ場合にはまた continuationToken を返却します。
# レスポンスから'LastReportId'を取り出す
data = json.loads(res)
reportId = data['LastReportId']
# continuationTokenから'previousReportId'を取り出す
continuationTokenIn = json.loads(jobData['continuationToken'])
previousReportId = continuationTokenIn['previousReportId']
print("reportId:" + str(reportId) + ", previousReportId:" + str(previousReportId))
"""LastReportIdとpreviousReportIdを比較
スキャンが完了するとLastReportIdの値が更新されるため、
LastReportIdとpreviousReportIdを比較して異なる場合はスキャンが完了したものと判定する
"""
if previousReportId == reportId:
# 値が同じ場合はジョブ継続
continue_job(jobId, previousReportId)
スキャン結果の判定
スキャンが完了したらスキャン結果を取得する API を実行し、デプロイをして良いか判定を行います。
各レベル(High, Medium, Low, Information)でそれぞれ何個脆弱性が検出されたかを取得する API を使いましたが、判定に使用したい情報に合わせて呼び出す API を変えていただければと思います。
# スキャン結果を取得
print('Get ScanResult')
url = "https://api.radar.f-secure.com/api/latest/systemscans/" + scanId + "/reports/" + reportId + "/vulnerabilitiesandfindings/counters"
headers = {
"Content-Type": "application/json",
"ApiAccessKey": APIACCESSKEY,
"ApiSecretKey": APISECRETKEY
}
res = call_api(url, "GET", headers)
# 結果判定
judge_result(jobId, json.loads(res))
print('Finish Scan')
今回使用した API では以下のようなレスポンスが返却されます。
[
{
"VulnRiskLevel": "High",
"NewCount": 0,
"FixedCount": 0,
"TotalCount": 0
},
{
"VulnRiskLevel": "Medium",
"NewCount": 0,
"FixedCount": 0,
"TotalCount": 2
},
{
"VulnRiskLevel": "Low",
"NewCount": 0,
"FixedCount": 0,
"TotalCount": 1
},
{
"VulnRiskLevel": "Information",
"NewCount": 0,
"FixedCount": 0,
"TotalCount": 3
}
]
スキャン結果を取得後、判定ロジックでデプロイして良いかどうかを判定しています。
今回は取得した各レベルの "TotalCount" に対してしきい値を設定し、その値を超える脆弱性が検出されたらデプロイ不可としました。
しきい値を環境変数から設定するようにしているため、ソースを修正する事なく OK/NG の判定ラインを変える事が出来るようにしました。
def judge_result(jobId, result):
# スキャン結果を判定
print('Judge Result')
for item in result:
if item['VulnRiskLevel'] == "High" and item['TotalCount'] > int(THRESHOLD_HIGH):
code_pipeline.put_job_failure_result(jobId=jobId, failureDetails={'message': 'Failed due to number of vulnerabilities.(Type: High)', 'type': 'JobFailed'})
return
elif item['VulnRiskLevel'] == "Medium" and item['TotalCount'] > int(THRESHOLD_MEDIUM):
code_pipeline.put_job_failure_result(jobId=jobId, failureDetails={'message': 'Failed due to number of vulnerabilities.(Type: Medium)', 'type': 'JobFailed'})
return
elif item['VulnRiskLevel'] == "Low" and item['TotalCount'] > int(THRESHOLD_LOW):
code_pipeline.put_job_failure_result(jobId=jobId, failureDetails={'message': 'Failed due to number of vulnerabilities.(Type: Low)', 'type': 'JobFailed'})
return
elif item['VulnRiskLevel'] == "Information" and item['TotalCount'] > int(THRESHOLD_INFORMATION):
code_pipeline.put_job_failure_result(jobId=jobId, failureDetails={'message': 'Failed due to number of vulnerabilities.(Type: Information)', 'type': 'JobFailed'})
return
# 各レベルに設定したしきい値を超えなければOK
code_pipeline.put_job_success_result(jobId=jobId)
2. CodePipeline の修正
Lambda 関数を作成したので、CodePipeline も修正していきます。
前回同様、Source と Deploy の間にステージを追加します。
(前回記事の際に追加したステージが残っている場合は削除しましょう)
アクショングループを追加します。
以下のアクションを作成しました。
ユーザーパラメーターに "systemscans" と設定しているのは、将来的に Web スキャンでも同じ関数を使い回したいためです。
渡された値を元にソース側でスキャンの種類を判定するようにしています。
最終的にこのようになりました。
3. テスト
では実際に動かしていきます。
パイプラインを動かす前に、Lambda の環境変数でしきい値を設定します。
今回は以下の様に設定してみました。
「Information」は10個まで許容し、それ以外は1個でも検出されたらアウトにしてみます。
ソースを適当に変更してプッシュするとパイプラインが動き始めます。
お茶でも飲んでゆっくり待ちましょう。
CloudWatch を覗いてみると、「Start Scan」が実行された後、Lambda 関数が繰り返し実行されている事も確認する事ができます。
スキャンが終わりメールが来ました。
Medium が 2個、Low が 1個検出されているため、設定したルール上は失敗になっているはずです。
CodePipeline を見に行くと、ちゃんと失敗になっています!
もちろん後続のデプロイ処理は実行されていません。
失敗メッセージもちゃんと出ています。
まとめ
F-Secure Radar を CodePipeline に組み込み、Lambda のcontinuationToken を使用する事で脆弱性診断の実行から結果判定まで自動で行う事が出来ました!
前回より Lambda が少し複雑になりましたが、「結果を確認しにいって手で承認」といったプロセスを自動化出来たのは良かったと思います。
本エントリがどなたかのお役にたてば幸いです。
以上、AWS事業本部の大前でした!