Kiro ヘッドレスLambda + AssumeRoleでGuardDuty Findingを自動トリアージを試してみた

Kiro ヘッドレスLambda + AssumeRoleでGuardDuty Findingを自動トリアージを試してみた

前回LambdaでKiro CLIヘッドレスモードを動かしました。今回はAssumeRoleでCloudWatch Logs調査専用ロールに切り替え、GuardDuty FindingをVPCフローログのLogs Insightsクエリで自動トリアージを試みました。
2026.05.27

はじめに

前編ではLambdaコンテナ上でKiro CLIの --no-interactive モードを動かし、IAMロールで操作範囲を制御する仕組みを構築しました。

https://dev.classmethod.jp/articles/kiro-cli-headless-lambda-iam-use-aws/

Lambda環境ではプロファイル切替やSSO認証が使えないため、AssumeRoleで取得した一時クレデンシャルをKiroに渡す方式を採用しました。

前編との差分を整理します。

項目 前編 今回
IAM方式 Lambda実行ロールに直接権限付与 AssumeRoleで専用ロールに切替
許可操作 ec2:DescribeRegions等 Logs Insightsクエリ実行(StartQueryはフローログ限定)
ユースケース リージョン一覧取得 GuardDuty Findingのフローログ調査
後処理 なし VERDICT判定→自動アーカイブ/SNS通知

検証内容

前編からの差分のみ紹介します。Lambda実行環境(ECR/CodeBuild/Dockerfile)は前編のStack1をそのまま利用しています。今回の変更点はhandler.py(AssumeRole + 前処理/後処理の追加)と、IAMロール定義(stack2-lambda.yaml: KiroCloudWatchRole新設 + LambdaRole権限追加)の2つです。

アーキテクチャ

Kiro自体に広いAWS権限を渡さない設計がポイントです。Lambda実行ロールはGuardDutyの取得・アーカイブやSNS通知を担当し、KiroにはAssumeRole後の一時クレデンシャルとしてCloudWatch Logs調査用の権限だけを渡しています。

IAMロール設計(前編との差分)

項目 前編 今回
Lambda実行ロール権限 ssm:GetParameter + ec2:Describe* ssm:GetParameter + sts:AssumeRole + sns:Publish + guardduty:Get/Archive
Kiroが使えるAWS操作 ec2:DescribeRegions等(直接付与) Logs Insightsクエリ実行(StartQueryはフローログ限定)※AssumeRole経由
AssumeRole先 なし KiroCloudWatchRole(新設)

2ロール構成の役割分担は以下のとおりです。

ロール 権限 用途
KiroLambdaRole CloudWatch Logsへのログ出力, ssm:GetParameter, sts:AssumeRole, sns:Publish, guardduty:GetFindings/ArchiveFindings Lambda実行ロール
KiroCloudWatchRole logs:StartQuery(対象フローログ)+ GetQueryResults/StopQuery + DescribeLogGroups Kiroがuse_awsで操作する範囲

ReadOnlyAccessではなく、今回の調査に必要な操作へ絞った権限設計としました。StartQuery/GetQueryResults/StopQueryは対象ロググループARNに絞っています。DescribeLogGroupsはロググループ情報の確認に必要なため、ロググループARNのワイルドカードを許容しています。Kiroが新規にクエリを開始できる対象は、StartQueryのResource指定でVPCフローログに限定されます。

stack2-lambda.yaml(IAMロール定義)
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Kiro Headless Lambda (AssumeRole) - Stack 2: Lambda + CloudWatch Role + SNS'

Parameters:
  ImageUri:
    Type: String
    Description: ECR Image URI (from Stack 1 CodeBuild output)

Resources:
  # SNS Topic for alerts
  AlertTopic:
    Type: AWS::SNS::Topic
    Properties:
      TopicName: kiro-guardduty-alerts

  # Lambda実行ロール: 最小権限 (自身のログ + SSM + AssumeRole + SNS)
  LambdaRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: KiroLambdaRole
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: KiroLambdaMinimal
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Sid: SsmApiKey
                Effect: Allow
                Action: ssm:GetParameter
                Resource: !Sub 'arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/kiro/headless/api-key'
              - Sid: KmsDecrypt
                # SSM SecureStringの復号用。実運用では対象KMSキーARNに絞ることを推奨
                Effect: Allow
                Action: kms:Decrypt
                Resource: '*'
              - Sid: AssumeTargetRoles
                Effect: Allow
                Action: sts:AssumeRole
                Resource: !GetAtt KiroCloudWatchRole.Arn
              - Sid: SnsPublish
                Effect: Allow
                Action: sns:Publish
                Resource: !Ref AlertTopic
              - Sid: GuardDutyReadArchive
                Effect: Allow
                Action:
                  - guardduty:GetFindings
                  - guardduty:ArchiveFindings
                Resource: !Sub 'arn:aws:guardduty:*:${AWS::AccountId}:detector/*'

  # AssumeRole先: VPCフローログのLogs Insights調査用
  KiroCloudWatchRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: KiroCloudWatchRole
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
            Action: sts:AssumeRole
      Policies:
        - PolicyName: FlowLogInsightsOnly
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Sid: LogsInsightsQuery
                Effect: Allow
                Action:
                  - logs:StartQuery
                  - logs:GetQueryResults
                  - logs:StopQuery
                Resource: !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/vpc/sushiro-test-flow-logs:*'
              - Sid: DescribeLogGroups
                Effect: Allow
                Action: logs:DescribeLogGroups
                Resource: !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*'

  KiroLambda:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: kiro-headless
      PackageType: Image
      Code:
        ImageUri: !Ref ImageUri
      Architectures:
        - arm64
      Role: !GetAtt LambdaRole.Arn
      Timeout: 600
      MemorySize: 512
      Environment:
        Variables:
          SNS_TOPIC_ARN: !Ref AlertTopic

Outputs:
  LambdaFunction:
    Value: !Ref KiroLambda
  LambdaArn:
    Value: !GetAtt KiroLambda.Arn
  KiroCloudWatchRoleArn:
    Value: !GetAtt KiroCloudWatchRole.Arn
    Export:
      Name: KiroCloudWatchRoleArn
  AlertTopicArn:
    Value: !Ref AlertTopic

handler.py: 前処理/後処理

前処理として、finding_idとdetector_idからGuardDuty Findingを取得し、アーカイブ済みであれば即スキップします(冪等性の確保)。Finding JSONはプロンプトに付与し、末尾にVERDICT出力指示を追加します。

AssumeRoleと環境変数注入が今回のポイントです。

creds = sts.assume_role(
    RoleArn=role_arn,
    RoleSessionName="kiro-headless",
)["Credentials"]

env = {**os.environ, "KIRO_API_KEY": api_key, "HOME": "/tmp"}
env.update({
    "AWS_ACCESS_KEY_ID": creds["AccessKeyId"],
    "AWS_SECRET_ACCESS_KEY": creds["SecretAccessKey"],
    "AWS_SESSION_TOKEN": creds["SessionToken"],
})

KiroプロセスにはAssumeRole後の一時クレデンシャルをAWS認証情報として渡します。AWS SDK/CLIの認証チェーンではこの認証情報が優先されるため、Lambda実行ロールのGuardDutyアーカイブやSNS通知権限をKiroに直接渡さない構成になっています。role_arn省略時はLambda実行ロールの認証情報がそのままKiroに渡ります。

後処理ではKiro出力からVERDICTを抽出し、以下のとおり分岐します。

VERDICT アクション
SAFE guardduty:ArchiveFindings
ALERT sns:Publish
パース失敗 sns:Publish(安全側に倒す)
handler.py全文
import subprocess
import os
import time
import re
import json

import boto3

_ansi_re = re.compile(r'\x1b\[[0-9;]*m')

ssm = boto3.client("ssm", region_name=os.environ.get("AWS_REGION", "us-east-1"))
sts = boto3.client("sts")
_api_key_cache = None

def get_api_key():
    global _api_key_cache
    if not _api_key_cache:
        _api_key_cache = os.environ.get("KIRO_API_KEY")
        if not _api_key_cache:
            resp = ssm.get_parameter(Name="/kiro/headless/api-key", WithDecryption=True)
            _api_key_cache = resp["Parameter"]["Value"]
    return _api_key_cache

def assume_role(role_arn):
    resp = sts.assume_role(RoleArn=role_arn, RoleSessionName="kiro-headless")
    creds = resp["Credentials"]
    return {
        "AWS_ACCESS_KEY_ID": creds["AccessKeyId"],
        "AWS_SECRET_ACCESS_KEY": creds["SecretAccessKey"],
        "AWS_SESSION_TOKEN": creds["SessionToken"],
    }

def parse_verdict(output):
    for line in reversed(output.strip().splitlines()):
        if line.startswith("VERDICT:"):
            return line.split(":", 1)[1].strip()
    return None

def handler(event, context):
    timings = {}
    t0 = time.time()

    # --- 前処理: GuardDuty Finding のアーカイブ判定 ---
    finding_id = event.get("finding_id")
    detector_id = event.get("detector_id")
    finding_json = None

    if finding_id and detector_id:
        gd_region = event.get("guardduty_region", "ap-northeast-1")
        gd = boto3.client("guardduty", region_name=gd_region)
        findings = gd.get_findings(DetectorId=detector_id, FindingIds=[finding_id])["Findings"]
        if not findings:
            return {"statusCode": 200, "result": "Finding not found, skipped.", "timings": {}}
        if findings[0]["Service"]["Archived"]:
            return {"statusCode": 200, "result": "Finding already archived, skipped.", "timings": {}}
        finding_json = json.dumps(findings[0], default=str)

    # --- プロンプト構築 ---
    prompt = event.get("prompt", "")
    if finding_json:
        prompt = f"{prompt}\n\nFinding JSON:\n{finding_json}"
    if not prompt or len(prompt.strip()) < 10:
        return {"statusCode": 400, "error": "Prompt too short (min 10 chars)"}

    # VERDICT出力を指示
    prompt += "\n\n最終行に必ず VERDICT:SAFE または VERDICT:ALERT を出力して。"

    # --- Kiro実行 ---
    api_key = get_api_key()
    env = {**os.environ, "KIRO_API_KEY": api_key, "HOME": "/tmp"}

    role_arn = event.get("role_arn")
    if role_arn:
        env.update(assume_role(role_arn))

    start = time.time()
    result = subprocess.run(
        ["/usr/local/bin/kiro-cli", "chat", "--no-interactive", "--trust-all-tools", prompt],
        capture_output=True, text=True, timeout=280, env=env,
    )
    timings["kiro_execution"] = round(time.time() - start, 2)

    if result.returncode != 0:
        timings["total"] = round(time.time() - t0, 2)
        return {
            "statusCode": 500,
            "error": result.stderr.strip() or f"kiro-cli exited with code {result.returncode}",
            "timings": timings,
        }

    output = _ansi_re.sub('', result.stdout.strip())

    # --- 後処理: VERDICT に基づく分岐 ---
    verdict = parse_verdict(output)
    action_taken = None

    if finding_id and detector_id:
        gd_region = event.get("guardduty_region", "ap-northeast-1")
        gd = boto3.client("guardduty", region_name=gd_region)
        sns_topic = os.environ.get("SNS_TOPIC_ARN")

        if verdict == "SAFE":
            gd.archive_findings(DetectorId=detector_id, FindingIds=[finding_id])
            action_taken = "archived"
        elif verdict == "ALERT":
            if sns_topic:
                sns = boto3.client("sns", region_name=os.environ.get("AWS_REGION", "us-east-1"))
                sns.publish(
                    TopicArn=sns_topic,
                    Subject=f"GuardDuty ALERT: {event.get('finding_id', 'unknown')}",
                    Message=output,
                )
            action_taken = "sns_notified"
        else:
            # パース失敗 → 安全側に倒してSNS通知
            if sns_topic:
                sns = boto3.client("sns", region_name=os.environ.get("AWS_REGION", "us-east-1"))
                sns.publish(
                    TopicArn=sns_topic,
                    Subject=f"GuardDuty VERDICT_UNKNOWN: {event.get('finding_id', 'unknown')}",
                    Message=f"VERDICT parse failed.\n\n{output}",
                )
            action_taken = "sns_notified_parse_fail"

    timings["total"] = round(time.time() - t0, 2)

    return {
        "statusCode": 200,
        "prompt": event.get("prompt", ""),
        "role_arn": role_arn,
        "verdict": verdict,
        "action_taken": action_taken,
        "result": output,
        "timings": timings,
    }

GuardDutyトリアージデモ

テスト用に dig guarddutyc2activityb.com を実行し、Backdoor:EC2/C&CActivity.B!DNS を検出させました。

GuardDuty検出結果

Kiroの調査プロセスは以下の流れでした。まずLogs Insightsクエリを実行しましたが、最初のクエリでは結果を得られませんでした。ロググループの作成時刻を確認してエポック秒を補正し、正しい時間範囲で再実行しています。同時間帯の外向きTCP通信を確認したところ、確認範囲ではAWS公開IPレンジ宛の443/TCPが中心でした。非標準ポートへの通信は0件。最終的に「今回の確認範囲では追加の通信痕跡なし」として VERDICT:SAFE を出力しました。

このような試行錯誤を自律的に行える点が、単純な固定クエリLambdaとの違いです。

確認項目 結果
C2ドメインへのDNSクエリ あり。GuardDutyが検出
同時間帯の外向きTCP通信 明確に不審な通信は確認できず
外部向けTCP port 443 確認範囲ではAWS IPレンジ宛が中心
非標準ポート通信 0件
判定 今回の確認範囲では追加の通信痕跡なし。VERDICT:SAFE

VERDICT自動アーカイブの実証

Finding JSON付きプロンプトで、調査→判定→アーカイブまでの一連の流れをテストしました。

{
  "verdict": "SAFE",
  "action_taken": "archived",
  "timings": {"kiro_execution": 50.59, "total": 53.24}
}

53秒で調査→判定→アーカイブが完了しました。検索範囲をエポック秒で明示したことで、ロググループ作成時刻の確認や時刻補正の試行錯誤が不要になっています。

VERDICT自動アーカイブ生ログ(マスク済み)

以下はKiroの出力原文です。一部に断定的な表現がありますが、VPCフローログの性質を踏まえ、本文では「今回の確認範囲では」と表現しています。

{
  "statusCode": 200,
  "prompt": "このGuardDuty Findingを調査して。ap-northeast-1のロググループ /vpc/sushiro-test-flow-logs をLogs Insightsでクエリし、該当インスタンスからC2ドメインへの実TCP通信があるか確認して。検索範囲: start-time=1779807062 end-time=1779814262。実害有無を判定して。",
  "role_arn": "arn:aws:iam::123456789012:role/KiroCloudWatchRole",
  "verdict": "SAFE",
  "action_taken": "archived",
  "result": "> ## GuardDuty Finding 調査\n\n### Finding概要\n\n| 項目 | 値 |\n|------|-----|\n| Type | Backdoor:EC2/C&CActivity.B!DNS |\n| Severity | 8.0 (High) |\n| Instance | i-0abc1234def56789 (10.0.1.100 / xx.xx.xx.xx) |\n| C2ドメイン | guarddutyc2activityb.com |\n| アクション | DNS_REQUEST (UDP) |\n| Blocked | false |\n| 初回/最終検知 | 2026-05-26T15:51:02Z |\n\nこのFindingはDNSクエリのみを検知しています。実際にTCP通信(データ送受信)があったかをVPC Flow Logsで確認します。\n\n### VPC Flow Logs クエリ\n\nC2ドメインへの実TCP通信があるか、該当インスタンスのENI eni-0abc1234def56789 からの通信を確認します。\nRunning aws cli command (using tool: aws):\n\nService name: logs\nOperation name: start-query\nParameters: \n- --start-time: \"1779807062\"\n- --log-group-name: \"/vpc/sushiro-test-flow-logs\"\n- --end-time: \"1779814262\"\n- --query-string: \"fields @timestamp, srcAddr, dstAddr, srcPort, dstPort, protocol, action, bytes, packets | filter srcAddr = '10.0.1.100' and protocol = 6 | sort @timestamp desc | limit 50\"\nRegion: ap-northeast-1\nLabel: Query VPC Flow Logs for TCP traffic from the instance - Completed in 5.432s\n\nI will run the following command: sleep 5 (using tool: shell)\nPurpose: Wait for CloudWatch Logs Insights query to complete\n\n - Completed in 5.17s\n\nRunning aws cli command (using tool: aws):\n\nService name: logs\nOperation name: get-query-results\nParameters: \n- --query-id: \"c21b65c4-737c-441f-b1a8-70968b700f16\"\nRegion: ap-northeast-1\nLabel: Get query results for TCP traffic from the instance - Completed in 5.325s\n\n> ### 分析結果\n\nVPC Flow Logsから、インスタンス 10.0.1.100 のTCP通信(protocol=6)を確認しました。通信先は以下のIPアドレスです:\n\n| 宛先IP | ポート | 特徴 |\n|--------|--------|------|\n| 52.xxx.xxx.106 | 443 | 最も頻繁。約1分間隔で継続的に通信(keepalive的) |\n| 52.xxx.xxx.15 | 443 | AWS系IP。SSM/CloudFormation等のAWSサービス通信 |\n| 52.xxx.xxx.146 | 443 | AWS系IP |\n| 52.xxx.xxx.191 | 443 | AWS系IP |\n| 52.xxx.xxx.57 | 443 | AWS系IP |\n| 52.xxx.xxx.50 | 443 | AWS系IP |\n| 52.xxx.xxx.20 | 443 | S3エンドポイント(52.219.x.x はS3) |\n| 52.xxx.xxx.146 | 443 | S3エンドポイント |\n| 3.xxx.xxx.148 | 443 | AWS系IP(ap-northeast-1) |\n\n### 判定\n\n実害なし(テスト環境のGuardDutyサンプルFinding)\n\n根拠:\n\n1. C2ドメインへの実TCP通信は確認されない: guarddutyc2activityb.com はGuardDutyのテスト用ドメインとして知られています。FindingのアクションタイプはDNS_REQUEST(UDP)のみで、このドメインのIPへのTCP接続はFlow Logsに存在しません。\n\n2. 通信先はすべてAWSサービスIP: 検出されたTCP通信はすべて 52.195.x.x(EC2/SSM等のAWSサービスエンドポイント)、52.219.x.x(S3)、3.112.x.x(AWS ap-northeast-1)宛のポート443通信であり、正常なAWSサービス通信です。\n\n3. テスト環境の証拠:\n   - スタック名: GuardDutyTestEc2\n   - インスタンス名: guardduty-test\n   - IAMプロファイル名に GuardDutyTestEc2 を含む\n   - インスタンス起動からFinding検知まで約6分(テスト実行パターン)\n\n4. DNSクエリのみでデータ流出なし: DNS解決は行われたが、解決先IPへのTCPセッション確立(実際のC2通信)は発生していません。\n\nVERDICT:SAFE",
  "timings": {"kiro_execution": 50.59, "total": 53.24}
}

コスト

項目 費用
Kiro クレジット (0.42 credit × $0.02) $0.008
Lambda (ARM64, 512MB, 53秒) $0.0004
CloudWatch Logs Insights 今回は少量のため実質僅少。ログ量に応じて別途発生
合計 約 $0.009 / 回(Logs Insights除く)

※ Kiro Pro ($20/月, 1,000 credits) で計算

Logs Insightsを除くと、コストの大部分はKiroのクレジット消費です。実運用ではログ量・検索期間に応じてLogs Insightsの料金も別途確認が必要です。

まとめ

AssumeRoleでKiroがクエリを開始できる対象をVPCフローログに絞り、GuardDuty Findingのトリアージ判断にKiroが使えることを確認できました。今回はVPCフローログで確認できる範囲に不審な外向き通信が見当たらないことを判定条件としています。より詳細な通信ログが取得できる環境であれば、それらも判定に活用いただくことをご検討ください。自動アーカイブを試しましたが、実環境に導入する場合、Kiroの判定精度を十分に評価してから有効化することをおすすめします。

今回はシンプルなLambda利用を試しましたが、Step Functionsを活用したマルチエージェントによる調査や、EventBridge連携などの応用も試みてみたいと思います。

参考リンク

この記事をシェアする

関連記事