AWS Security Hubで、ワークフローのステータスが「NEW」の項目を「NOTIFIED」に一括で変更する

AWS Security Hubで、ワークフローのステータスが「NEW」の項目を「NOTIFIED」に一括で変更する

AWS Security Hubの通知がたくさん来る。あとで見るから、新規だけ通知してほしい。みたいなときに。
Clock Icon2022.04.14

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

AWS Security Hubを有効にすると、ワークフローのステータスが「NEW」の項目が出現することが多いでしょう。 この状態で、「NEWの内容をSlackに通知する」を実現すると、定期的に通知が来ちゃいます。

通常の運用フローであれば問題ないのですが、AWS Security Hubを有効にした直後などの場合は、ひたすら大量の通知が定期的に来るのでかえって分かりづらくなる場合があります。 (すぐに全部の内容を確認して対応すればOKですが、すべてにすぐ対応できるとも限りません。)

そこで今回は、ワークフローのステータスを一括で「NOTIFIED」に変更するスクリプトを作ってみました。 これによって、新規のNG項目だけSlackに通知される動作になることを期待します。

おすすめの方

  • AWS Security Hubの内容をboto3で取得したい方
  • AWS Security Hubのワークフローのステータスをboto3で変更したい方

重要

本記事は、あくまでも、「新規のNG項目だけSlackに通知される動作」にするために、「ワークフローのステータスを一括でNOTIFIEDにする」という内容です。 定期的な大量通知を防ぎたいだけです。この運用でOKなのであれば、何の問題もありません。

また、ワークフローのステータスを「NOTIFIED」に変えたら作業終了ではありません。 それぞれの項目を確認して、必要に応じて対応し、「RESOLVED or SUPPRESSED」にする作業は別途必要です。

AWS Security Hubで、ワークフローのステータスを変更する

スクリプト

取得して更新するだけです。取得条件の重要度をコメントアウト状態で記載しています。念のため重要度ごとに作業したい場合などにご利用ください。

また、件数が大量にある場合は、RateLimitに引っかかる可能性があるため、sleep()しています。

import time
import boto3


securityhub = boto3.client('securityhub')


def main():
    findings = get_findings()

    print(f'count: {len(findings)}')

    for item in findings:
        update_findings(item['Id'], item['ProductArn'])
        time.sleep(0.2)

def get_findings():
    findings = []
    token = None

    while True:
        options = {
            'Filters': {
                'ComplianceStatus': [
                    {'Value': 'FAILED', 'Comparison': 'EQUALS'},
                ],
                'SeverityLabel': [
                    {'Value': 'CRITICAL', 'Comparison': 'EQUALS'},
                    # {'Value': 'HIGH', 'Comparison': 'EQUALS'},
                    # {'Value': 'MEDIUM', 'Comparison': 'EQUALS'},
                    # {'Value': 'LOW', 'Comparison': 'EQUALS'},
                ],
                'WorkflowStatus': [
                    {'Value': 'NEW', 'Comparison': 'EQUALS'},
                ]
            }
        }
        if token is not None:
            options['NextToken'] = token

        resp = securityhub.get_findings(**options)

        findings += resp['Findings']
        if 'NextToken' not in resp:
            break
        token = resp['NextToken']

        time.sleep(0.2)

    return findings


def update_findings(finding_id, finding_product_arn):
    resp = securityhub.batch_update_findings(
        FindingIdentifiers=[
            {
                'Id': finding_id,
                'ProductArn': finding_product_arn,
            },
        ],
        Workflow={
            'Status': 'NOTIFIED'
        },
    )


if __name__=='__main__':
    main()

実行する

python app.py

件数が多い場合は少し時間がかかりますが、そのうち終わります。(仮に100件の場合、100*0.2s=20秒)

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.