
Kiro ヘッドレスLambda + AssumeRoleでGuardDuty Findingを自動トリアージを試してみた
はじめに
前編ではLambdaコンテナ上でKiro CLIの --no-interactive モードを動かし、IAMロールで操作範囲を制御する仕組みを構築しました。
Lambda環境ではプロファイル切替やSSO認証が使えないため、AssumeRoleで取得した一時クレデンシャルをKiroに渡す方式を採用しました。
前編との差分を整理します。
| 項目 | 前編 | 今回 |
|---|---|---|
| IAM方式 | Lambda実行ロールに直接権限付与 | AssumeRoleで専用ロールに切替 |
| 許可操作 | ec2:DescribeRegions等 | Logs Insightsクエリ実行(StartQueryはフローログ限定) |
| ユースケース | リージョン一覧取得 | GuardDuty Findingのフローログ調査 |
| 後処理 | なし | VERDICT判定→自動アーカイブ/SNS通知 |
検証内容
前編からの差分のみ紹介します。Lambda実行環境(ECR/CodeBuild/Dockerfile)は前編のStack1をそのまま利用しています。今回の変更点はhandler.py(AssumeRole + 前処理/後処理の追加)と、IAMロール定義(stack2-lambda.yaml: KiroCloudWatchRole新設 + LambdaRole権限追加)の2つです。
アーキテクチャ
Kiro自体に広いAWS権限を渡さない設計がポイントです。Lambda実行ロールはGuardDutyの取得・アーカイブやSNS通知を担当し、KiroにはAssumeRole後の一時クレデンシャルとしてCloudWatch Logs調査用の権限だけを渡しています。
IAMロール設計(前編との差分)
| 項目 | 前編 | 今回 |
|---|---|---|
| Lambda実行ロール権限 | ssm:GetParameter + ec2:Describe* | ssm:GetParameter + sts:AssumeRole + sns:Publish + guardduty:Get/Archive |
| Kiroが使えるAWS操作 | ec2:DescribeRegions等(直接付与) | Logs Insightsクエリ実行(StartQueryはフローログ限定)※AssumeRole経由 |
| AssumeRole先 | なし | KiroCloudWatchRole(新設) |
2ロール構成の役割分担は以下のとおりです。
| ロール | 権限 | 用途 |
|---|---|---|
| KiroLambdaRole | CloudWatch Logsへのログ出力, ssm:GetParameter, sts:AssumeRole, sns:Publish, guardduty:GetFindings/ArchiveFindings | Lambda実行ロール |
| KiroCloudWatchRole | logs:StartQuery(対象フローログ)+ GetQueryResults/StopQuery + DescribeLogGroups | Kiroがuse_awsで操作する範囲 |
ReadOnlyAccessではなく、今回の調査に必要な操作へ絞った権限設計としました。StartQuery/GetQueryResults/StopQueryは対象ロググループARNに絞っています。DescribeLogGroupsはロググループ情報の確認に必要なため、ロググループARNのワイルドカードを許容しています。Kiroが新規にクエリを開始できる対象は、StartQueryのResource指定でVPCフローログに限定されます。
stack2-lambda.yaml(IAMロール定義)
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Kiro Headless Lambda (AssumeRole) - Stack 2: Lambda + CloudWatch Role + SNS'
Parameters:
ImageUri:
Type: String
Description: ECR Image URI (from Stack 1 CodeBuild output)
Resources:
# SNS Topic for alerts
AlertTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: kiro-guardduty-alerts
# Lambda実行ロール: 最小権限 (自身のログ + SSM + AssumeRole + SNS)
LambdaRole:
Type: AWS::IAM::Role
Properties:
RoleName: KiroLambdaRole
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: KiroLambdaMinimal
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: SsmApiKey
Effect: Allow
Action: ssm:GetParameter
Resource: !Sub 'arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/kiro/headless/api-key'
- Sid: KmsDecrypt
# SSM SecureStringの復号用。実運用では対象KMSキーARNに絞ることを推奨
Effect: Allow
Action: kms:Decrypt
Resource: '*'
- Sid: AssumeTargetRoles
Effect: Allow
Action: sts:AssumeRole
Resource: !GetAtt KiroCloudWatchRole.Arn
- Sid: SnsPublish
Effect: Allow
Action: sns:Publish
Resource: !Ref AlertTopic
- Sid: GuardDutyReadArchive
Effect: Allow
Action:
- guardduty:GetFindings
- guardduty:ArchiveFindings
Resource: !Sub 'arn:aws:guardduty:*:${AWS::AccountId}:detector/*'
# AssumeRole先: VPCフローログのLogs Insights調査用
KiroCloudWatchRole:
Type: AWS::IAM::Role
Properties:
RoleName: KiroCloudWatchRole
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
Action: sts:AssumeRole
Policies:
- PolicyName: FlowLogInsightsOnly
PolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: LogsInsightsQuery
Effect: Allow
Action:
- logs:StartQuery
- logs:GetQueryResults
- logs:StopQuery
Resource: !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/vpc/sushiro-test-flow-logs:*'
- Sid: DescribeLogGroups
Effect: Allow
Action: logs:DescribeLogGroups
Resource: !Sub 'arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*'
KiroLambda:
Type: AWS::Lambda::Function
Properties:
FunctionName: kiro-headless
PackageType: Image
Code:
ImageUri: !Ref ImageUri
Architectures:
- arm64
Role: !GetAtt LambdaRole.Arn
Timeout: 600
MemorySize: 512
Environment:
Variables:
SNS_TOPIC_ARN: !Ref AlertTopic
Outputs:
LambdaFunction:
Value: !Ref KiroLambda
LambdaArn:
Value: !GetAtt KiroLambda.Arn
KiroCloudWatchRoleArn:
Value: !GetAtt KiroCloudWatchRole.Arn
Export:
Name: KiroCloudWatchRoleArn
AlertTopicArn:
Value: !Ref AlertTopic
handler.py: 前処理/後処理
前処理として、finding_idとdetector_idからGuardDuty Findingを取得し、アーカイブ済みであれば即スキップします(冪等性の確保)。Finding JSONはプロンプトに付与し、末尾にVERDICT出力指示を追加します。
AssumeRoleと環境変数注入が今回のポイントです。
creds = sts.assume_role(
RoleArn=role_arn,
RoleSessionName="kiro-headless",
)["Credentials"]
env = {**os.environ, "KIRO_API_KEY": api_key, "HOME": "/tmp"}
env.update({
"AWS_ACCESS_KEY_ID": creds["AccessKeyId"],
"AWS_SECRET_ACCESS_KEY": creds["SecretAccessKey"],
"AWS_SESSION_TOKEN": creds["SessionToken"],
})
KiroプロセスにはAssumeRole後の一時クレデンシャルをAWS認証情報として渡します。AWS SDK/CLIの認証チェーンではこの認証情報が優先されるため、Lambda実行ロールのGuardDutyアーカイブやSNS通知権限をKiroに直接渡さない構成になっています。role_arn省略時はLambda実行ロールの認証情報がそのままKiroに渡ります。
後処理ではKiro出力からVERDICTを抽出し、以下のとおり分岐します。
| VERDICT | アクション |
|---|---|
| SAFE | guardduty:ArchiveFindings |
| ALERT | sns:Publish |
| パース失敗 | sns:Publish(安全側に倒す) |
handler.py全文
import subprocess
import os
import time
import re
import json
import boto3
_ansi_re = re.compile(r'\x1b\[[0-9;]*m')
ssm = boto3.client("ssm", region_name=os.environ.get("AWS_REGION", "us-east-1"))
sts = boto3.client("sts")
_api_key_cache = None
def get_api_key():
global _api_key_cache
if not _api_key_cache:
_api_key_cache = os.environ.get("KIRO_API_KEY")
if not _api_key_cache:
resp = ssm.get_parameter(Name="/kiro/headless/api-key", WithDecryption=True)
_api_key_cache = resp["Parameter"]["Value"]
return _api_key_cache
def assume_role(role_arn):
resp = sts.assume_role(RoleArn=role_arn, RoleSessionName="kiro-headless")
creds = resp["Credentials"]
return {
"AWS_ACCESS_KEY_ID": creds["AccessKeyId"],
"AWS_SECRET_ACCESS_KEY": creds["SecretAccessKey"],
"AWS_SESSION_TOKEN": creds["SessionToken"],
}
def parse_verdict(output):
for line in reversed(output.strip().splitlines()):
if line.startswith("VERDICT:"):
return line.split(":", 1)[1].strip()
return None
def handler(event, context):
timings = {}
t0 = time.time()
# --- 前処理: GuardDuty Finding のアーカイブ判定 ---
finding_id = event.get("finding_id")
detector_id = event.get("detector_id")
finding_json = None
if finding_id and detector_id:
gd_region = event.get("guardduty_region", "ap-northeast-1")
gd = boto3.client("guardduty", region_name=gd_region)
findings = gd.get_findings(DetectorId=detector_id, FindingIds=[finding_id])["Findings"]
if not findings:
return {"statusCode": 200, "result": "Finding not found, skipped.", "timings": {}}
if findings[0]["Service"]["Archived"]:
return {"statusCode": 200, "result": "Finding already archived, skipped.", "timings": {}}
finding_json = json.dumps(findings[0], default=str)
# --- プロンプト構築 ---
prompt = event.get("prompt", "")
if finding_json:
prompt = f"{prompt}\n\nFinding JSON:\n{finding_json}"
if not prompt or len(prompt.strip()) < 10:
return {"statusCode": 400, "error": "Prompt too short (min 10 chars)"}
# VERDICT出力を指示
prompt += "\n\n最終行に必ず VERDICT:SAFE または VERDICT:ALERT を出力して。"
# --- Kiro実行 ---
api_key = get_api_key()
env = {**os.environ, "KIRO_API_KEY": api_key, "HOME": "/tmp"}
role_arn = event.get("role_arn")
if role_arn:
env.update(assume_role(role_arn))
start = time.time()
result = subprocess.run(
["/usr/local/bin/kiro-cli", "chat", "--no-interactive", "--trust-all-tools", prompt],
capture_output=True, text=True, timeout=280, env=env,
)
timings["kiro_execution"] = round(time.time() - start, 2)
if result.returncode != 0:
timings["total"] = round(time.time() - t0, 2)
return {
"statusCode": 500,
"error": result.stderr.strip() or f"kiro-cli exited with code {result.returncode}",
"timings": timings,
}
output = _ansi_re.sub('', result.stdout.strip())
# --- 後処理: VERDICT に基づく分岐 ---
verdict = parse_verdict(output)
action_taken = None
if finding_id and detector_id:
gd_region = event.get("guardduty_region", "ap-northeast-1")
gd = boto3.client("guardduty", region_name=gd_region)
sns_topic = os.environ.get("SNS_TOPIC_ARN")
if verdict == "SAFE":
gd.archive_findings(DetectorId=detector_id, FindingIds=[finding_id])
action_taken = "archived"
elif verdict == "ALERT":
if sns_topic:
sns = boto3.client("sns", region_name=os.environ.get("AWS_REGION", "us-east-1"))
sns.publish(
TopicArn=sns_topic,
Subject=f"GuardDuty ALERT: {event.get('finding_id', 'unknown')}",
Message=output,
)
action_taken = "sns_notified"
else:
# パース失敗 → 安全側に倒してSNS通知
if sns_topic:
sns = boto3.client("sns", region_name=os.environ.get("AWS_REGION", "us-east-1"))
sns.publish(
TopicArn=sns_topic,
Subject=f"GuardDuty VERDICT_UNKNOWN: {event.get('finding_id', 'unknown')}",
Message=f"VERDICT parse failed.\n\n{output}",
)
action_taken = "sns_notified_parse_fail"
timings["total"] = round(time.time() - t0, 2)
return {
"statusCode": 200,
"prompt": event.get("prompt", ""),
"role_arn": role_arn,
"verdict": verdict,
"action_taken": action_taken,
"result": output,
"timings": timings,
}
GuardDutyトリアージデモ
テスト用に dig guarddutyc2activityb.com を実行し、Backdoor:EC2/C&CActivity.B!DNS を検出させました。

Kiroの調査プロセスは以下の流れでした。まずLogs Insightsクエリを実行しましたが、最初のクエリでは結果を得られませんでした。ロググループの作成時刻を確認してエポック秒を補正し、正しい時間範囲で再実行しています。同時間帯の外向きTCP通信を確認したところ、確認範囲ではAWS公開IPレンジ宛の443/TCPが中心でした。非標準ポートへの通信は0件。最終的に「今回の確認範囲では追加の通信痕跡なし」として VERDICT:SAFE を出力しました。
このような試行錯誤を自律的に行える点が、単純な固定クエリLambdaとの違いです。
| 確認項目 | 結果 |
|---|---|
| C2ドメインへのDNSクエリ | あり。GuardDutyが検出 |
| 同時間帯の外向きTCP通信 | 明確に不審な通信は確認できず |
| 外部向けTCP port 443 | 確認範囲ではAWS IPレンジ宛が中心 |
| 非標準ポート通信 | 0件 |
| 判定 | 今回の確認範囲では追加の通信痕跡なし。VERDICT:SAFE |
VERDICT自動アーカイブの実証
Finding JSON付きプロンプトで、調査→判定→アーカイブまでの一連の流れをテストしました。
{
"verdict": "SAFE",
"action_taken": "archived",
"timings": {"kiro_execution": 50.59, "total": 53.24}
}
53秒で調査→判定→アーカイブが完了しました。検索範囲をエポック秒で明示したことで、ロググループ作成時刻の確認や時刻補正の試行錯誤が不要になっています。
VERDICT自動アーカイブ生ログ(マスク済み)
以下はKiroの出力原文です。一部に断定的な表現がありますが、VPCフローログの性質を踏まえ、本文では「今回の確認範囲では」と表現しています。
{
"statusCode": 200,
"prompt": "このGuardDuty Findingを調査して。ap-northeast-1のロググループ /vpc/sushiro-test-flow-logs をLogs Insightsでクエリし、該当インスタンスからC2ドメインへの実TCP通信があるか確認して。検索範囲: start-time=1779807062 end-time=1779814262。実害有無を判定して。",
"role_arn": "arn:aws:iam::123456789012:role/KiroCloudWatchRole",
"verdict": "SAFE",
"action_taken": "archived",
"result": "> ## GuardDuty Finding 調査\n\n### Finding概要\n\n| 項目 | 値 |\n|------|-----|\n| Type | Backdoor:EC2/C&CActivity.B!DNS |\n| Severity | 8.0 (High) |\n| Instance | i-0abc1234def56789 (10.0.1.100 / xx.xx.xx.xx) |\n| C2ドメイン | guarddutyc2activityb.com |\n| アクション | DNS_REQUEST (UDP) |\n| Blocked | false |\n| 初回/最終検知 | 2026-05-26T15:51:02Z |\n\nこのFindingはDNSクエリのみを検知しています。実際にTCP通信(データ送受信)があったかをVPC Flow Logsで確認します。\n\n### VPC Flow Logs クエリ\n\nC2ドメインへの実TCP通信があるか、該当インスタンスのENI eni-0abc1234def56789 からの通信を確認します。\nRunning aws cli command (using tool: aws):\n\nService name: logs\nOperation name: start-query\nParameters: \n- --start-time: \"1779807062\"\n- --log-group-name: \"/vpc/sushiro-test-flow-logs\"\n- --end-time: \"1779814262\"\n- --query-string: \"fields @timestamp, srcAddr, dstAddr, srcPort, dstPort, protocol, action, bytes, packets | filter srcAddr = '10.0.1.100' and protocol = 6 | sort @timestamp desc | limit 50\"\nRegion: ap-northeast-1\nLabel: Query VPC Flow Logs for TCP traffic from the instance - Completed in 5.432s\n\nI will run the following command: sleep 5 (using tool: shell)\nPurpose: Wait for CloudWatch Logs Insights query to complete\n\n - Completed in 5.17s\n\nRunning aws cli command (using tool: aws):\n\nService name: logs\nOperation name: get-query-results\nParameters: \n- --query-id: \"c21b65c4-737c-441f-b1a8-70968b700f16\"\nRegion: ap-northeast-1\nLabel: Get query results for TCP traffic from the instance - Completed in 5.325s\n\n> ### 分析結果\n\nVPC Flow Logsから、インスタンス 10.0.1.100 のTCP通信(protocol=6)を確認しました。通信先は以下のIPアドレスです:\n\n| 宛先IP | ポート | 特徴 |\n|--------|--------|------|\n| 52.xxx.xxx.106 | 443 | 最も頻繁。約1分間隔で継続的に通信(keepalive的) |\n| 52.xxx.xxx.15 | 443 | AWS系IP。SSM/CloudFormation等のAWSサービス通信 |\n| 52.xxx.xxx.146 | 443 | AWS系IP |\n| 52.xxx.xxx.191 | 443 | AWS系IP |\n| 52.xxx.xxx.57 | 443 | AWS系IP |\n| 52.xxx.xxx.50 | 443 | AWS系IP |\n| 52.xxx.xxx.20 | 443 | S3エンドポイント(52.219.x.x はS3) |\n| 52.xxx.xxx.146 | 443 | S3エンドポイント |\n| 3.xxx.xxx.148 | 443 | AWS系IP(ap-northeast-1) |\n\n### 判定\n\n実害なし(テスト環境のGuardDutyサンプルFinding)\n\n根拠:\n\n1. C2ドメインへの実TCP通信は確認されない: guarddutyc2activityb.com はGuardDutyのテスト用ドメインとして知られています。FindingのアクションタイプはDNS_REQUEST(UDP)のみで、このドメインのIPへのTCP接続はFlow Logsに存在しません。\n\n2. 通信先はすべてAWSサービスIP: 検出されたTCP通信はすべて 52.195.x.x(EC2/SSM等のAWSサービスエンドポイント)、52.219.x.x(S3)、3.112.x.x(AWS ap-northeast-1)宛のポート443通信であり、正常なAWSサービス通信です。\n\n3. テスト環境の証拠:\n - スタック名: GuardDutyTestEc2\n - インスタンス名: guardduty-test\n - IAMプロファイル名に GuardDutyTestEc2 を含む\n - インスタンス起動からFinding検知まで約6分(テスト実行パターン)\n\n4. DNSクエリのみでデータ流出なし: DNS解決は行われたが、解決先IPへのTCPセッション確立(実際のC2通信)は発生していません。\n\nVERDICT:SAFE",
"timings": {"kiro_execution": 50.59, "total": 53.24}
}
コスト
| 項目 | 費用 |
|---|---|
| Kiro クレジット (0.42 credit × $0.02) | $0.008 |
| Lambda (ARM64, 512MB, 53秒) | $0.0004 |
| CloudWatch Logs Insights | 今回は少量のため実質僅少。ログ量に応じて別途発生 |
| 合計 | 約 $0.009 / 回(Logs Insights除く) |
※ Kiro Pro ($20/月, 1,000 credits) で計算
Logs Insightsを除くと、コストの大部分はKiroのクレジット消費です。実運用ではログ量・検索期間に応じてLogs Insightsの料金も別途確認が必要です。
まとめ
AssumeRoleでKiroがクエリを開始できる対象をVPCフローログに絞り、GuardDuty Findingのトリアージ判断にKiroが使えることを確認できました。今回はVPCフローログで確認できる範囲に不審な外向き通信が見当たらないことを判定条件としています。より詳細な通信ログが取得できる環境であれば、それらも判定に活用いただくことをご検討ください。自動アーカイブを試しましたが、実環境に導入する場合、Kiroの判定精度を十分に評価してから有効化することをおすすめします。
今回はシンプルなLambda利用を試しましたが、Step Functionsを活用したマルチエージェントによる調査や、EventBridge連携などの応用も試みてみたいと思います。









