開発者マシンのセキュリティスキャン完全ガイド — Bumblebee・TruffleHog・OSV-Scannerで3つの脅威軸をカバーする

開発者マシンのセキュリティスキャン完全ガイド — Bumblebee・TruffleHog・OSV-Scannerで3つの脅威軸をカバーする

MCP設定・シークレット漏洩・既知CVEの3つの脅威軸に対して、Bumblebee・TruffleHog・OSV-Scannerの3つの無料ツールで開発者マシンをスキャンする方法を紹介します。CVSSだけでは判断できない脆弱性の優先度をCISA KEVとEPSSで補強するエンリッチメントスクリプトも作成しました。
2026.06.15

はじめに

MCP(Model Context Protocol)サーバーが200を超え、Coding Agentが当たり前になった今、攻撃面はproduction環境だけでなく開発者のローカルマシンにも広がっています。Perplexityの報告では、開発者マシンを狙ったサプライチェーン攻撃は今年60%増加しているとのことです。

「自分のマシンにどんなMCPサーバーが入っているか把握していますか?」「CursorやClaude Desktopの設定ファイルにトークンが平文で保存されていませんか?」「npm依存関係に既知の脆弱性はありませんか?」

こういった疑問から、開発者マシン向けのセキュリティスキャンツールを一通り調べて試してみました。結論として、3つのツールで3つの脅威軸をカバーできることがわかりました。

3つの脅威軸と対応ツール

開発者マシンのセキュリティリスクは、大きく3つの軸に分けられます。

脅威軸 内容 ツール コスト
サプライチェーン攻撃 改ざんされたパッケージ、悪意あるMCPサーバー Bumblebee 無料
シークレット漏洩 設定ファイル中の平文トークン、APIキー TruffleHog 無料
既知の脆弱性(CVE) 依存パッケージの公開済み脆弱性 OSV-Scanner 無料

それぞれの守備範囲は全く異なり、重複しません

developer-machine-security-scan-bumblebee-trufflehog-osv-scanner-tool-coverage

Bumblebee — MCP設定をスキャンできる唯一のツール

Bumblebeeとは

PerplexityがオープンソースとしてリリースしたGo製のバイナリで、開発者のローカルマシン上のパッケージ、IDE拡張機能、ブラウザ拡張機能、そしてMCP設定ファイルを読み取り専用でスキャンします。

重要なポイント:

  • 絶対にコードを実行しない — lockfileやメタデータを直接読むだけ。インストールスクリプトやパッケージマネージャーは起動しない
  • MCP設定ファイルをセキュリティサーフェスとして扱う初のOSSスキャナー
  • TanStack、SAP、Zapierのパッケージを汚染したShai-Huludワームへの対応から生まれた

スキャン対象

カテゴリ 対象
パッケージ npm, pnpm, Yarn, Bun, PyPI, Go modules, RubyGems, Composer
MCP設定 Claude Desktop, Cursor, Windsurf, VSCodiumなどの設定ファイル
IDE拡張 VS Code, Cursor, Windsurf, VSCodium
ブラウザ拡張 Chromium, Firefox

インストールと使い方

# Go 1.25+が必要
brew install go

# Bumblebeeインストール
go install github.com/perplexityai/bumblebee/cmd/bumblebee@latest

# PATHに追加(未設定の場合)
echo 'export PATH=$PATH:$HOME/go/bin' >> ~/.zshrc

# 動作確認
bumblebee selftest

# ベースラインスキャン
bumblebee scan --profile baseline

3つのスキャンプロファイルがあります:

プロファイル 用途
baseline 日常的なラップトップチェック
project 特定のリポジトリ・ワークスペースを対象にしたスキャン
deep インシデント対応時のフルスキャン

出力はNDJSON — そのままでは読めない

BumblebeeはNDJSON(Newline-Delimited JSON)で出力します。NDJSONとは、1行に1つのJSONオブジェクトを配置する形式で、通常のJSON配列と違い各行が独立してパースできるため、ストリーミング処理やパイプラインに適しています。

{"ecosystem":"mcp","package_name":"slack","source_type":"mcp-config",...}
{"ecosystem":"npm","package_name":"react","version":"18.2.0",...}

しかし、6.3MBのNDJSON出力を目視でチェックするのは現実的ではありません。

ラッパースクリプトで人間が読める出力に

そこで、Pythonのラッパースクリプトを作成しました。

bumblebee-summary.py のソースコード
#!/usr/bin/env python3
"""
Bumblebee NDJSON scan summary with colored output.

Usage:
    bumblebee scan --profile baseline 2>/dev/null | python3 bumblebee-summary.py
    bumblebee scan --profile baseline 2>/dev/null | python3 bumblebee-summary.py --verbose
"""

import sys
import json
import argparse
from collections import Counter

# ANSI colors
GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
CYAN = "\033[96m"
BOLD = "\033[1m"
DIM = "\033[2m"
RESET = "\033[0m"

def colored(text, color):
    return f"{color}{text}{RESET}"

def severity_color(level):
    return {"critical": RED, "high": RED, "medium": YELLOW, "low": DIM}.get(
        level, RESET
    )

def categorize_mcp_source(source_file):
    if "claude-plugins-official" in source_file or "local-agent-mode" in source_file:
        return "first-party"
    if ".cursor" in source_file:
        return "cursor"
    if "claude_desktop_config" in source_file:
        return "claude-desktop"
    return "third-party"

def main():
    parser = argparse.ArgumentParser(description="Bumblebee scan summary")
    parser.add_argument(
        "--verbose", "-v", action="store_true", help="Show detailed MCP server list"
    )
    args = parser.parse_args()

    ecosystems = Counter()
    findings = []
    mcp_servers = []
    browser_exts = []
    editor_exts = []
    mcp_categories = Counter()

    for line in sys.stdin:
        line = line.strip()
        if not line:
            continue
        try:
            r = json.loads(line)
        except json.JSONDecodeError:
            continue

        # skip selftest fixtures
        if "selftest" in r.get("source_file", "") or "selftest" in r.get(
            "package_name", ""
        ):
            continue

        record_type = r.get("record_type", "")

        if record_type == "finding":
            findings.append(r)
            continue

        eco = r.get("ecosystem", "unknown")
        ecosystems[eco] += 1

        if eco == "mcp":
            cat = categorize_mcp_source(r.get("source_file", ""))
            mcp_categories[cat] += 1
            mcp_servers.append(
                {
                    "name": r.get("server_name") or r.get("package_name", "?"),
                    "manager": r.get("package_manager", ""),
                    "spec": r.get("requested_spec", ""),
                    "category": cat,
                }
            )
        elif eco == "browser-extension":
            browser_exts.append(r.get("package_name", ""))
        elif eco == "editor-extension":
            editor_exts.append(r.get("package_name", ""))

    total = sum(ecosystems.values())

    # Header
    print()
    print(colored("=" * 50, BOLD))
    print(colored("  Bumblebee Scan Summary", BOLD + CYAN))
    print(colored("=" * 50, BOLD))
    print()

    # Status
    if findings:
        status = colored(f"  FINDINGS: {len(findings)} known-bad match(es)", BOLD + RED)
    else:
        status = colored("  STATUS: CLEAN", BOLD + GREEN)
    print(status)
    print()

    # Findings detail
    if findings:
        print(colored("  --- Findings ---", BOLD + RED))
        for f in findings:
            sev = f.get("severity", "unknown")
            sc = severity_color(sev)
            print(
                f"  {colored(sev.upper(), BOLD + sc):>20s}  "
                f"{f.get('ecosystem', '?')}/{f.get('package_name', '?')}@{f.get('version', '?')}"
            )
            if f.get("catalog_id"):
                print(f"{'':>22s}  catalog: {f['catalog_id']}")
            if f.get("source_file"):
                print(colored(f"{'':>22s}  {f['source_file']}", DIM))
        print()

    # Inventory
    print(colored("  --- Inventory ---", BOLD))
    print(f"  {'Total records':.<30s} {total:>6,}")
    print()
    for eco, count in ecosystems.most_common():
        label = eco.replace("-", " ").title()
        print(f"    {label:.<28s} {count:>6,}")
    print()

    # MCP breakdown
    if mcp_servers:
        print(colored("  --- MCP Servers ---", BOLD))
        first = mcp_categories.get("first-party", 0)
        third = mcp_categories.get("third-party", 0)
        cursor = mcp_categories.get("cursor", 0)
        desktop = mcp_categories.get("claude-desktop", 0)

        print(f"    {colored('First-party (Claude managed)', DIM):.<50s} {first:>4}")
        if cursor:
            print(
                f"    {colored('Cursor config', YELLOW):.<50s} {colored(str(cursor), YELLOW):>4}"
            )
        if desktop:
            print(
                f"    {colored('Claude Desktop config', YELLOW):.<50s} {colored(str(desktop), YELLOW):>4}"
            )
        if third:
            print(
                f"    {colored('Third-party / unknown', RED):.<50s} {colored(str(third), BOLD + RED):>4}"
            )
        else:
            print(f"    {colored('Third-party / unknown', DIM):.<50s}    0")
        print()

        # Verbose: list all MCP servers
        if args.verbose:
            seen = set()
            print(colored("  --- MCP Server Details ---", BOLD))
            for cat_label, cat_key, color in [
                ("First-party", "first-party", DIM),
                ("Cursor", "cursor", YELLOW),
                ("Claude Desktop", "claude-desktop", YELLOW),
                ("Third-party", "third-party", RED),
            ]:
                servers = [s for s in mcp_servers if s["category"] == cat_key]
                if not servers:
                    continue
                print(f"\n    {colored(f'[{cat_label}]', BOLD + color)}")
                for s in servers:
                    key = (s["name"], s["spec"])
                    if key in seen:
                        continue
                    seen.add(key)
                    target = s["spec"] or "(local)"
                    print(f"      {s['name']:<22s} {colored(target, DIM)}")
            print()

    # Footer
    print(colored("-" * 50, DIM))
    note = "  Note: 0 findings = no known-bad catalog matches."
    print(colored(note, DIM))
    print(colored("  Run with --verbose for MCP server details.", DIM))
    print(colored("  Use --exposure-catalog for threat matching.", DIM))
    print()

if __name__ == "__main__":
    main()
bumblebee scan --profile baseline 2>/dev/null | python3 bumblebee-summary.py

出力例:

==================================================
  Bumblebee Scan Summary
==================================================

  STATUS: CLEAN

  --- Inventory ---
  Total records.................  7,087

    Npm.........................  6,946
    Pypi........................     57
    Mcp.........................     33
    Browser Extension...........     28
    Editor Extension............     22

  --- MCP Servers ---
    First-party (Claude managed)..............   33
    Third-party / unknown.....................    0

MCP設定は「First-party(Claude管理)」と「Third-party」に自動分類されます。

SCR-20260615-lgwk-redacted_dot_app

MCP設定のリスク分類

Bumblebeeを実行してわかったのは、どこから来たMCPサーバーかで、リスクが大きく異なるということです。

種類 リスク
Claude Cowork公式コネクタ Slack, Gmail, Notion, Google Calendar Anthropicが管理。OAuth経由。低リスク
ベンダー公式MCP Figma (mcp.figma.com), Linear (mcp.linear.app) ベンダーが管理。中リスク
GitHubからクローンしたMCPサーバー 個人開発のMCPサーバー 自己管理。SOC2なし、責任開示プログラムなし。高リスク

Exposure Catalogの課題

Bumblebeeの真価は、Exposure Catalog(既知の悪意あるパッケージリスト)とのマッチングにあります。

bumblebee scan --profile baseline --exposure-catalog catalog.json --findings-only

ただし、実用的なカタログは現時点ではPerplexity社内のものだけです。公開カタログやコミュニティ管理のカタログが出てくるまでは、Bumblebeeは「インベントリツール」としての価値がメインになります。

TruffleHog — シークレット漏洩を検出して検証する

なぜgitleaksではなくTruffleHogか

gitleaks TruffleHog
速度 最速(サブ秒) 遅め
検出 150+のregexパターン regex + エントロピー
検証 なし あり — 実際にAPIを叩いて有効か確認
スキャン対象 Gitリポジトリ Git + S3 + Docker + Slack + ファイルシステム

TruffleHogの決定的な優位点は検証機能です。「このAWSキーはパターンに合致する」ではなく「このAWSキーは今この瞬間、有効である」と教えてくれます。

インストールと使い方

brew install trufflehog

# 検証済みシークレットのみ表示
trufflehog filesystem ~/.claude/ --only-verified

# 特定ディレクトリをスキャン
trufflehog filesystem ~/.cursor/ --only-verified

# 開発リポジトリ全体
trufflehog filesystem ~/Developer/ --only-verified

--only-verifiedを付けると、ノイズが大幅に減ります。

こちらもラッパースクリプトで見やすく

TruffleHogもログとシークレットが混在して読みにくいため、ラッパースクリプトを作成しました。

trufflehog-summary.py のソースコード
#!/usr/bin/env python3
"""
TruffleHog stderr/stdout summary with colored output.

Usage:
    trufflehog filesystem ~/.claude/ --only-verified 2>&1 | python3 trufflehog-summary.py
    trufflehog filesystem ~/Developer/ 2>&1 | python3 trufflehog-summary.py
    trufflehog filesystem ~/Developer/ 2>&1 | python3 trufflehog-summary.py --verbose
"""

import sys
import json
import re
import argparse

# ANSI colors
GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
CYAN = "\033[96m"
BOLD = "\033[1m"
DIM = "\033[2m"
RESET = "\033[0m"

def colored(text, color):
    return f"{color}{text}{RESET}"

def format_bytes(b):
    if b >= 1_000_000_000:
        return f"{b / 1_000_000_000:.1f} GB"
    if b >= 1_000_000:
        return f"{b / 1_000_000:.0f} MB"
    if b >= 1_000:
        return f"{b / 1_000:.0f} KB"
    return f"{b} B"

def parse_secret_line(line):
    """Parse a detected secret result line from TruffleHog stdout."""
    result = {}
    # TruffleHog outputs JSON for findings when using --json, otherwise plaintext
    try:
        data = json.loads(line)
        result = {
            "detector": data.get("DetectorType", data.get("detectorType", "?")),
            "verified": data.get("Verified", data.get("verified", False)),
            "source": data.get("SourceMetadata", {})
            .get("Data", {})
            .get("Filesystem", {})
            .get("file", "?"),
            "raw": data.get("Raw", "")[:40] + "...",
        }
        return result
    except (json.JSONDecodeError, AttributeError):
        pass

    # Plaintext format: "Found verified result 🐷🔑"
    if "Found" in line and "result" in line:
        result["raw_line"] = line.strip()
        result["verified"] = "verified" in line.lower()
        return result

    return None

def main():
    parser = argparse.ArgumentParser(description="TruffleHog scan summary")
    parser.add_argument(
        "--verbose", "-v", action="store_true", help="Show skipped file details"
    )
    args = parser.parse_args()

    errors = []
    secrets = []
    scan_stats = {}
    scan_dir = "?"
    finished = False

    for line in sys.stdin:
        line = line.strip()
        if not line:
            continue

        # Parse structured log lines (tab-separated)
        if "\ttrufflehog\t" in line:
            parts = line.split("\t")
            # Extract message and JSON payload
            msg = parts[3] if len(parts) > 3 else ""
            json_part = parts[4] if len(parts) > 4 else "{}"

            try:
                payload = json.loads(json_part)
            except json.JSONDecodeError:
                payload = {}

            if "error reading chunk" in msg:
                errors.append(
                    {
                        "path": payload.get("path", "?").split("/")[-1],
                        "mime": payload.get("mime", "?"),
                        "error": payload.get("error", "?"),
                    }
                )
            elif "running source" in msg:
                continue
            elif "finished scanning" in msg:
                finished = True
                scan_stats = {
                    "chunks": payload.get("chunks", 0),
                    "bytes": payload.get("bytes", 0),
                    "verified": payload.get("verified_secrets", 0),
                    "unverified": payload.get("unverified_secrets", 0),
                    "duration": payload.get("scan_duration", "?"),
                    "version": payload.get("trufflehog_version", "?"),
                }
            elif payload.get("unit"):
                scan_dir = payload["unit"]
            continue

        # Check for secret findings (non-log lines)
        secret = parse_secret_line(line)
        if secret:
            secrets.append(secret)

    verified = scan_stats.get("verified", len([s for s in secrets if s.get("verified")]))
    unverified = scan_stats.get("unverified", len([s for s in secrets if not s.get("verified")]))
    total_secrets = verified + unverified

    # Header
    print()
    print(colored("=" * 50, BOLD))
    print(colored("  TruffleHog Scan Summary", BOLD + CYAN))
    print(colored("=" * 50, BOLD))
    print()

    # Status
    if verified > 0:
        print(colored(f"  STATUS: {verified} VERIFIED SECRET(S) FOUND", BOLD + RED))
    elif unverified > 0:
        print(colored(f"  STATUS: {unverified} UNVERIFIED SECRET(S)", BOLD + YELLOW))
    elif finished:
        print(colored("  STATUS: CLEAN", BOLD + GREEN))
    else:
        print(colored("  STATUS: NO SCAN DATA FOUND", BOLD + YELLOW))
        print(colored("  Make sure to pipe both stderr and stdout:", DIM))
        print(colored("    trufflehog filesystem <dir> 2>&1 | python3 trufflehog-summary.py", DIM))
        print()
        return
    print()

    # Scan stats
    if scan_stats:
        print(colored("  --- Scan Stats ---", BOLD))
        print(f"    {'Directory':.<30s} {scan_dir}")
        print(f"    {'Chunks scanned':.<30s} {scan_stats['chunks']:>,}")
        print(f"    {'Bytes scanned':.<30s} {format_bytes(scan_stats['bytes'])}")
        print(f"    {'Duration':.<30s} {scan_stats['duration']}")
        print(f"    {'TruffleHog version':.<30s} {scan_stats['version']}")
        print()

    # Secrets
    print(colored("  --- Results ---", BOLD))
    v_color = RED + BOLD if verified > 0 else GREEN
    u_color = YELLOW if unverified > 0 else GREEN
    print(f"    {colored('Verified secrets', v_color):.<50s} {colored(str(verified), v_color)}")
    print(f"    {colored('Unverified secrets', u_color):.<50s} {colored(str(unverified), u_color)}")
    print()

    # Secret details
    if secrets:
        print(colored("  --- Secret Details ---", BOLD + RED))
        for s in secrets:
            if s.get("detector"):
                v_tag = colored("[VERIFIED]", BOLD + RED) if s["verified"] else colored("[unverified]", YELLOW)
                print(f"    {v_tag} {s['detector']}")
                print(colored(f"      file: {s.get('source', '?')}", DIM))
            elif s.get("raw_line"):
                print(f"    {s['raw_line']}")
        print()

    # Errors
    if errors:
        err_color = DIM
        print(colored(f"  --- Skipped Files ({len(errors)}, non-critical) ---", DIM))
        if args.verbose:
            for e in errors:
                print(colored(f"    {e['path']:<40s} {e['mime']}", DIM))
        else:
            print(colored(f"    {len(errors)} binary/unreadable file(s) skipped", DIM))
            print(colored("    Run with --verbose to see details", DIM))
        print()

    # Footer
    print(colored("-" * 50, DIM))
    print(colored("  Verified = secret confirmed active via API call.", DIM))
    print(colored("  Unverified = pattern match, may be rotated/fake.", DIM))
    print()

if __name__ == "__main__":
    main()
trufflehog filesystem ~/.claude/ --only-verified 2>&1 | python3 trufflehog-summary.py

ポイント: TruffleHogはログをstderrに、検出結果をstdoutに出力するため、2>&1で両方をパイプに流します。

出力例:

==================================================
  TruffleHog Scan Summary
==================================================

  STATUS: CLEAN

  --- Scan Stats ---
    Directory..................... ~/.claude/
    Chunks scanned................ 70,048
    Bytes scanned................. 552 MB
    Duration...................... 14.4s

  --- Results ---
    Verified secrets......................... 0
    Unverified secrets....................... 0

  --- Skipped Files (2, non-critical) ---
    2 binary/unreadable file(s) skipped

SCR-20260615-lkij-redacted_dot_app

OSV-Scanner — 依存パッケージのCVEを検出する

OSV-Scannerとは

Googleが開発した無料のOSSで、npm、PyPI、Go modules、RubyGems、Cargo、Mavenなど複数のエコシステムのlockfileを読み取り、OSVデータベース(NVD、GitHub Advisoriesなどを集約)と照合して既知の脆弱性を検出します。

brew install osv-scanner

# ディレクトリ配下のlockfileを再帰的に自動検出してスキャン
osv-scanner scan -r ~/Developer/

# 特定のlockfileを指定することも可能
osv-scanner scan --lockfile=pnpm-lock.yaml

-r(再帰)オプションでディレクトリを指定すると、package-lock.jsonpnpm-lock.yamlgo.sumrequirements.txtなどのlockfileを自動検出してスキャンします。

類似ツールとの比較

OSV-Scanner Snyk Grype
メンテナ Google Snyk社 Anchore
コスト 完全無料 フリーミアム(200テスト/月まで無料) 無料
コンテナスキャン なし あり あり(主力機能)
エコシステムカバレッジ 広い 広い 広い

個人開発者にはOSV-Scannerが最適です。アカウント不要、制限なし。

SCR-20260615-lmrf-redacted_dot_app

CVSSだけでは足りない — CISA KEVとEPSSで優先度を判断する

問題: 「Critical 30件」では行動できない

OSV-Scannerを~/Developer/に対して実行すると、908件の脆弱性、うち30件がCriticalと報告されました。しかし、ここで2つの問題があります:

  1. エコシステム別に表示されるため、深刻度順に見られない
  2. 30件のCriticalが本当に緊急かわからない — 実際に攻撃に使われているのか?すぐ対処が必要なのか?

CVSSの限界

CVSS(Common Vulnerability Scoring System)は理論的な最大影響度を測定します。

レーティング CVSSスコア 意味
Critical 9.0 - 10.0 リモートから認証なしで完全侵害可能
High 7.0 - 8.9 深刻だが条件付き(認証やユーザー操作が必要)
Medium 4.0 - 6.9 限定的な影響、または悪用が困難
Low 0.1 - 3.9 最小限の影響

しかし、CVSSは実際の悪用状況を反映しません。コードから呼び出されない関数のCritical脆弱性よりも、野生で悪用されているHigh脆弱性のほうが緊急です。

2つの追加データソース

データソース 提供元 教えてくれること
CISA KEV (Known Exploited Vulnerabilities) 米国CISA このCVEは実際の攻撃で悪用が確認されている。今すぐパッチを。
EPSS (Exploit Prediction Scoring System) FIRST.org このCVEが今後30日以内に悪用される確率は37%。

重要な点として、CISA KEVもEPSSもOSV-Scannerには組み込まれていません。別の公開データベースとして存在しており、クロスリファレンスするには外部APIを呼ぶ必要があります。

developer-machine-security-scan-bumblebee-trufflehog-osv-scanner-priority-flow

エンリッチメントスクリプトで優先度付き出力に

OSV-ScannerのJSON出力を受け取り、CISA KEVカタログ(無料JSONダウンロード)とEPSS API(FIRST.org、無料)を叩いて、3つのリスクティアに分類するPythonスクリプトを作成しました。

osv-enrich.py のソースコード
#!/usr/bin/env python3
"""
OSV-Scanner enrichment script with CISA KEV and EPSS data.

Reads osv-scanner JSON output, cross-references CISA KEV (Known Exploited
Vulnerabilities) and EPSS (Exploit Prediction Scoring System), then outputs
a prioritized summary grouped by real-world risk.

Usage:
    osv-scanner scan -r /path/to/project --format json 2>/dev/null | python3 osv-enrich.py
    osv-scanner scan -r /path/to/project --format json 2>/dev/null | python3 osv-enrich.py --verbose
    osv-scanner scan -r /path/to/project --format json 2>/dev/null | python3 osv-enrich.py --skip-epss
"""

import sys
import json
import argparse
import urllib.request
import urllib.error

# ANSI colors
GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
CYAN = "\033[96m"
MAGENTA = "\033[95m"
BOLD = "\033[1m"
DIM = "\033[2m"
RESET = "\033[0m"

CISA_KEV_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
EPSS_API_URL = "https://api.first.org/data/v1/epss"
EPSS_BATCH_SIZE = 100

def colored(text, color):
    return f"{color}{text}{RESET}"

def fetch_json(url, timeout=30):
    req = urllib.request.Request(url, headers={"User-Agent": "osv-enrich/1.0"})
    with urllib.request.urlopen(req, timeout=timeout) as resp:
        return json.loads(resp.read().decode())

def fetch_kev_set():
    """Download CISA KEV catalog and return set of CVE IDs."""
    sys.stderr.write(colored("  Fetching CISA KEV catalog...", DIM) + "\n")
    try:
        data = fetch_json(CISA_KEV_URL, timeout=15)
        vulns = data.get("vulnerabilities", [])
        kev_set = {v["cveID"] for v in vulns if "cveID" in v}
        sys.stderr.write(
            colored(f"  KEV catalog loaded: {len(kev_set)} CVEs\n", DIM)
        )
        return kev_set
    except Exception as e:
        sys.stderr.write(colored(f"  Warning: Failed to fetch KEV: {e}\n", YELLOW))
        return set()

def fetch_epss_scores(cve_ids):
    """Fetch EPSS scores for a list of CVE IDs. Returns dict of cve -> {epss, percentile}."""
    if not cve_ids:
        return {}

    sys.stderr.write(
        colored(f"  Fetching EPSS scores for {len(cve_ids)} CVEs...", DIM) + "\n"
    )
    scores = {}
    batches = [
        list(cve_ids)[i : i + EPSS_BATCH_SIZE]
        for i in range(0, len(cve_ids), EPSS_BATCH_SIZE)
    ]

    for i, batch in enumerate(batches):
        cve_param = ",".join(batch)
        url = f"{EPSS_API_URL}?cve={cve_param}"
        try:
            data = fetch_json(url, timeout=15)
            for entry in data.get("data", []):
                scores[entry["cve"]] = {
                    "epss": float(entry.get("epss", 0)),
                    "percentile": float(entry.get("percentile", 0)),
                }
        except Exception as e:
            sys.stderr.write(
                colored(f"  Warning: EPSS batch {i + 1} failed: {e}\n", YELLOW)
            )

    sys.stderr.write(colored(f"  EPSS scores loaded: {len(scores)} CVEs\n", DIM))
    return scores

def severity_from_osv(vuln):
    """Extract severity from OSV vulnerability object."""
    # Check database_specific
    db = vuln.get("database_specific", {})
    if db.get("severity"):
        return db["severity"].upper()

    # Check severity array
    for s in vuln.get("severity", []):
        score_str = s.get("score", "")
        # CVSS v3 vector string contains severity
        if "CVSS" in s.get("type", ""):
            try:
                # Parse base score from vector or score field
                if isinstance(score_str, str) and score_str.replace(".", "").isdigit():
                    score = float(score_str)
                elif isinstance(score_str, (int, float)):
                    score = float(score_str)
                else:
                    continue
                if score >= 9.0:
                    return "CRITICAL"
                if score >= 7.0:
                    return "HIGH"
                if score >= 4.0:
                    return "MEDIUM"
                return "LOW"
            except (ValueError, TypeError):
                continue

    return "UNKNOWN"

def extract_cve_ids(vuln):
    """Get all CVE aliases from a vulnerability."""
    ids = set()
    vid = vuln.get("id", "")
    if vid.startswith("CVE-"):
        ids.add(vid)
    for alias in vuln.get("aliases", []):
        if alias.startswith("CVE-"):
            ids.add(alias)
    return ids

def main():
    parser = argparse.ArgumentParser(description="OSV-Scanner enrichment with KEV + EPSS")
    parser.add_argument(
        "--verbose", "-v", action="store_true", help="Show all vulnerabilities"
    )
    parser.add_argument(
        "--skip-epss", action="store_true", help="Skip EPSS API calls (faster)"
    )
    args = parser.parse_args()

    # Parse OSV-Scanner JSON from stdin
    try:
        osv_data = json.load(sys.stdin)
    except json.JSONDecodeError:
        print(colored("Error: Invalid JSON. Make sure to use: osv-scanner scan -r <dir> --format json", RED))
        sys.exit(1)

    # Collect all vulnerabilities with package info
    vuln_records = []
    seen_vuln_ids = set()

    for result in osv_data.get("results", []):
        source = result.get("source", {}).get("path", "?")
        for pkg_entry in result.get("packages", []):
            pkg = pkg_entry.get("package", {})
            pkg_name = pkg.get("name", "?")
            pkg_version = pkg.get("version", "?")
            pkg_eco = pkg.get("ecosystem", "?")

            for vuln in pkg_entry.get("vulnerabilities", []):
                vid = vuln.get("id", "?")
                if vid in seen_vuln_ids:
                    continue
                seen_vuln_ids.add(vid)

                cve_ids = extract_cve_ids(vuln)
                severity = severity_from_osv(vuln)

                vuln_records.append(
                    {
                        "id": vid,
                        "cve_ids": cve_ids,
                        "severity": severity,
                        "package": f"{pkg_eco}/{pkg_name}@{pkg_version}",
                        "source": source,
                        "summary": vuln.get("summary", ""),
                    }
                )

    if not vuln_records:
        print()
        print(colored("=" * 56, BOLD))
        print(colored("  OSV-Scanner Enriched Summary", BOLD + CYAN))
        print(colored("=" * 56, BOLD))
        print()
        print(colored("  STATUS: CLEAN — no vulnerabilities found", BOLD + GREEN))
        print()
        return

    # Collect all CVE IDs for enrichment
    all_cves = set()
    for r in vuln_records:
        all_cves.update(r["cve_ids"])

    # Fetch enrichment data
    sys.stderr.write("\n")
    kev_set = fetch_kev_set()
    epss_scores = {} if args.skip_epss else fetch_epss_scores(all_cves)
    sys.stderr.write("\n")

    # Enrich records
    for r in vuln_records:
        r["in_kev"] = bool(r["cve_ids"] & kev_set)
        best_epss = 0.0
        best_percentile = 0.0
        for cve in r["cve_ids"]:
            if cve in epss_scores:
                e = epss_scores[cve]
                if e["epss"] > best_epss:
                    best_epss = e["epss"]
                    best_percentile = e["percentile"]
        r["epss"] = best_epss
        r["epss_percentile"] = best_percentile

    # Categorize by risk tier
    tier1_kev = [r for r in vuln_records if r["in_kev"]]
    tier2_high_epss = [
        r for r in vuln_records if not r["in_kev"] and r["epss"] >= 0.1
    ]
    tier3_rest = [
        r for r in vuln_records if not r["in_kev"] and r["epss"] < 0.1
    ]

    # Severity counts
    sev_counts = {}
    for r in vuln_records:
        s = r["severity"]
        sev_counts[s] = sev_counts.get(s, 0) + 1

    sev_order = ["CRITICAL", "HIGH", "MEDIUM", "LOW", "UNKNOWN"]
    sev_colors = {
        "CRITICAL": RED + BOLD,
        "HIGH": RED,
        "MEDIUM": YELLOW,
        "LOW": DIM,
        "UNKNOWN": DIM,
    }

    # Output
    print()
    print(colored("=" * 56, BOLD))
    print(colored("  OSV-Scanner Enriched Summary", BOLD + CYAN))
    print(colored("=" * 56, BOLD))
    print()

    # Overall status
    if tier1_kev:
        print(colored(f"  STATUS: {len(tier1_kev)} ACTIVELY EXPLOITED", BOLD + RED))
    elif tier2_high_epss:
        print(colored(f"  STATUS: {len(tier2_high_epss)} HIGH EXPLOIT PROBABILITY", BOLD + YELLOW))
    else:
        print(colored(f"  STATUS: {len(vuln_records)} vuln(s), none actively exploited", BOLD + GREEN))
    print()

    # Severity breakdown
    print(colored("  --- By Severity (CVSS) ---", BOLD))
    for s in sev_order:
        if s in sev_counts:
            c = sev_colors.get(s, RESET)
            print(f"    {colored(s, c):.<50s} {colored(str(sev_counts[s]), c)}")
    print(f"    {'':.<38s} {'─' * 6}")
    print(f"    {'Total':.<38s} {len(vuln_records):>6}")
    print()

    # Tier 1: CISA KEV
    kev_label = colored("ACTIVELY EXPLOITED", BOLD + RED) if tier1_kev else colored("ACTIVELY EXPLOITED", DIM)
    kev_count = colored(str(len(tier1_kev)), BOLD + RED) if tier1_kev else "0"
    print(colored("  --- CISA KEV (confirmed exploited in the wild) ---", BOLD))
    print(f"    Count: {kev_count}")
    if tier1_kev:
        print()
        for r in sorted(tier1_kev, key=lambda x: -x["epss"]):
            sc = sev_colors.get(r["severity"], RESET)
            epss_str = f"EPSS {r['epss']:.0%}" if r["epss"] > 0 else ""
            print(f"    {colored('!!! FIX NOW', BOLD + RED)}  {colored(r['severity'], sc):<10s}  {r['package']}")
            cve_str = ", ".join(r["cve_ids"]) if r["cve_ids"] else r["id"]
            print(colored(f"{'':>16s}{cve_str}  {epss_str}", DIM))
            if r["summary"]:
                print(colored(f"{'':>16s}{r['summary'][:80]}", DIM))
    print()

    # Tier 2: High EPSS
    print(colored("  --- High Exploit Probability (EPSS >= 10%) ---", BOLD))
    print(f"    Count: {len(tier2_high_epss)}")
    if tier2_high_epss:
        print()
        for r in sorted(tier2_high_epss, key=lambda x: -x["epss"]):
            sc = sev_colors.get(r["severity"], RESET)
            epss_val = r["epss"]
            epss_label = colored(f"EPSS {epss_val:.0%}", BOLD + YELLOW)
            print(
                f"    {epss_label:>20s}  "
                f"{colored(r['severity'], sc):<10s}  {r['package']}"
            )
            cve_str = ", ".join(r["cve_ids"]) if r["cve_ids"] else r["id"]
            print(colored(f"{'':>22s}{cve_str}", DIM))
            if args.verbose and r["summary"]:
                print(colored(f"{'':>22s}{r['summary'][:80]}", DIM))
    print()

    # Tier 3: Rest
    print(colored("  --- Low Risk (EPSS < 10%) ---", BOLD))
    print(f"    Count: {len(tier3_rest)}")
    if args.verbose and tier3_rest:
        print()
        for r in sorted(tier3_rest, key=lambda x: (-x["epss"], x["severity"])):
            sc = sev_colors.get(r["severity"], RESET)
            epss_str = f"EPSS {r['epss']:.1%}" if r["epss"] > 0 else "EPSS n/a"
            cve_str = ", ".join(r["cve_ids"]) if r["cve_ids"] else r["id"]
            print(
                f"    {epss_str:>12s}  {colored(r['severity'], sc):<10s}  "
                f"{r['package']:<40s}  {colored(cve_str, DIM)}"
            )
    elif tier3_rest:
        # Compact summary by severity
        rest_by_sev = {}
        for r in tier3_rest:
            rest_by_sev.setdefault(r["severity"], []).append(r)
        for s in sev_order:
            if s in rest_by_sev:
                c = sev_colors.get(s, RESET)
                print(f"      {colored(s, c)}: {len(rest_by_sev[s])}")
        print(colored("      Run with --verbose to see all", DIM))
    print()

    # Footer
    print(colored("-" * 56, DIM))
    print(colored("  Risk tiers:", DIM))
    print(colored("    1. CISA KEV = confirmed exploited, fix immediately", DIM))
    print(colored("    2. EPSS >= 10% = likely to be exploited soon", DIM))
    print(colored("    3. EPSS < 10% = lower probability, monitor", DIM))
    if args.skip_epss:
        print(colored("  Note: EPSS scores skipped (--skip-epss)", YELLOW))
    print()

if __name__ == "__main__":
    main()
osv-scanner scan -r ~/Developer/ --format json 2>/dev/null | python3 osv-enrich.py

出力例:

========================================================
  OSV-Scanner Enriched Summary
========================================================

  STATUS: 908 vuln(s), none actively exploited

  --- By Severity (CVSS) ---
    CRITICAL........................................ 30
    HIGH............................................ 150
    MEDIUM.......................................... 500
    LOW............................................. 228

  --- CISA KEV (confirmed exploited in the wild) ---
    Count: 0

  --- High Exploit Probability (EPSS >= 10%) ---
    Count: 3

       EPSS 45%  HIGH      npm/example-pkg@1.2.3
                  CVE-2025-12345

  --- Low Risk (EPSS < 10%) ---
    Count: 905
      CRITICAL: 30
      HIGH: 148
      MEDIUM: 500
      LOW: 227
      Run with --verbose to see all

これで、908件の脆弱性が実際に行動すべき3-5件に絞り込めます。

スキャンツールのUX課題

今回3つのツールを試して感じた共通の課題は、出力が機械向けで人間に読めないことです。

ツール 出力形式 課題
Bumblebee NDJSON(6.3MB) パイプライン向け。ラッパーなしでは読めない
TruffleHog stderrにログ + stdoutに検出結果が混在 エラーログとスキャン統計が混ざる
OSV-Scanner テーブル表示 エコシステム別で深刻度順でない。908件を眺めても行動できない

これらのツールはSIEM連携やCI/CDパイプラインを想定しており、「個人開発者が手元で結果を確認する」ユースケースには向いていません。

今回作成したラッパースクリプト3本で、この課題を解消しました:

スクリプト 機能
bumblebee-summary.py NDJSON → カラー付きサマリー。MCP設定をFirst-party/Third-partyに分類
trufflehog-summary.py stderr+stdout混在 → CLEAN/FINDINGS表示 + スキャン統計
osv-enrich.py JSON → CISA KEV + EPSS enrichment → 3ティア優先度分類

まとめ

3ツールのセットアップコマンド

# Bumblebee(サプライチェーン + MCP)
go install github.com/perplexityai/bumblebee/cmd/bumblebee@latest

# TruffleHog(シークレット漏洩)
brew install trufflehog

# OSV-Scanner(既知CVE)
brew install osv-scanner

日常のスキャンコマンド

# サプライチェーン + MCP設定チェック
bumblebee scan --profile baseline 2>/dev/null | python3 bumblebee-summary.py

# シークレット漏洩チェック
trufflehog filesystem ~/.cursor/ --only-verified 2>&1 | python3 trufflehog-summary.py

# 依存パッケージのCVEチェック(優先度付き)
osv-scanner scan -r ~/Developer/ --format json 2>/dev/null | python3 osv-enrich.py

判断基準

  • CISA KEVに載っている → 今すぐパッチを当てる
  • EPSS ≥ 10% → 近日中に対処する
  • CVSS Critical/Highだが EPSS < 10% → 計画的に対処。慌てない
  • MCP設定にThird-partyサーバーがある → ソースコードとメンテナンス状況を確認する
  • 平文トークンが検出された → 即座にローテーション + 環境変数 or 1Password CLIに移行

Agent生態系が拡大するほど、開発者マシンのセキュリティは「個人の習慣」ではなく組織の衛生管理として扱う必要があります。3つの無料ツール + ラッパースクリプトで、その第一歩が踏み出せます。

この記事をシェアする

関連記事