Agent RegistryにA2Aエージェントを登録してStrands Agentsから動的に呼び出してみた

Agent RegistryにA2Aエージェントを登録してStrands Agentsから動的に呼び出してみた

2026.04.22

はじめに

こんにちは、スーパーマーケットが好きなコンサル部の神野(じんの)です。

前回の記事ではAgent RegistryにAgent Skillsを登録して、Strands Agentsから動的に取り込む方法を紹介しました。

https://dev.classmethod.jp/articles/aws-agent-registry-dynamic-skills-strands-agents/

今回はその続編で、A2A 版です。AgentCore RuntimeにデプロイしたStrands AgentsのA2AサーバーをTerraformでサクッと2体作って、Agent Registryに登録し、オーケストレーターエージェントからRegistry経由で動的に呼び出すところまでやっていきます。

イメージとしてはこんな感じです。

CleanShot 2026-04-22 at 10.12.06@2x

オーケストレーター側はA2A相手のURLやツール定義を一切持たずに、Registryで動的に他のエージェントの情報を参照できるので、対応させたいエージェントが増減してもコードはそのまま使えるというのが嬉しいポイントかと思います。

組織全体でA2Aエージェントが増えていくほど、オーケストレーターでできることも自動で増えていく感じですね。もちろん動的に参照する分、実行時のレイテンシや何が呼ばれるかの予測しづらさとはトレードオフなので、そこは用途次第という感じです。できることが増えるとオーケストレーターのチューニングも大変になりそうですね・・・

前提

今回使用したバージョンは下記の通りです。

項目 バージョン・設定
Python 3.12
パッケージ管理 uv
Terraform >= 1.5.0
AWS Provider >= 6.25.0
strands-agents[a2a] >= 1.36.0
strands-agents-tools >= 0.5.0
リージョン us-east-1
モデル us.anthropic.claude-haiku-4-5-20251001-v1:0

AgentCore RuntimeのA2AサーバーへはIAM (SigV4) で認証します。

前回の記事で作成した自動承認のRegistryをそのまま使います。Registry IDを控えておいてください。Registryの作成方法は前回の記事を参照してください。

今回使用した完全なコードはこちらのリポジトリに置いてあります。手元で試す場合はcloneしてから進めてください。

https://github.com/yuu551/a2a-agentcore-strands

git clone https://github.com/yuu551/a2a-agentcore-strands.git
cd a2a-agentcore-strands
uv sync

全体の流れ

実装の流れは下記のように進めていきます!

  1. TerraformでA2AサーバーをECR&AgentCore Runtimeへデプロイ(Calculator / Weather)
  2. Agent RegistryにA2A Agent Cardを登録
  3. ローカルのオーケストレーターエージェントがRegistryを検索してA2Aサーバーに接続
  4. オーケストレーターもAgentCore Runtimeに載せて、Runtime→Runtimeで呼び出す

ファイル構成はこんな感じです。

プロジェクト構成
.
├── register_a2a.py             # A2A Agent CardをRegistryへ登録
├── registry_a2a_loader.py      # RegistryからA2AClientToolProviderを組み立てる
├── a2a_client.py               # ローカルからオーケストレーターを動かす
├── invoke_orchestrator.py      # Runtime上のオーケストレーターをSSEで呼び出す
├── sigv4_auth.py               # httpx用 SigV4 Auth
└── terraform/
    ├── versions.tf
    ├── variables.tf
    ├── iam.tf
    ├── runtime.tf
    ├── outputs.tf
    └── agent/
        ├── calculator/         # A2A: 計算エージェント
   ├── server.py
   ├── Dockerfile
   └── requirements.txt
        ├── weather/            # A2A: 天気エージェント
   ├── server.py
   ├── Dockerfile
   └── requirements.txt
        └── orchestrator/       # HTTP: オーケストレーター
            ├── server.py
            ├── registry_a2a_loader.py  # ルートからコピー
            ├── sigv4_auth.py           # ルートからコピー
            ├── Dockerfile
            └── requirements.txt

A2Aサーバーの実装

計算エージェント

Strands AgentsのA2AServerでAgentをラップしてFastAPIアプリとして公開します。計算ツールはStrandsで提供されている strands_tools.calculator をそのまま使います。

AgentCore RuntimeはA2A用に環境変数 AGENTCORE_RUNTIME_URL を注入してくれるので、それをA2AServerの http_url に渡します。Agent Cardの url フィールドにこのURLが入り、クライアント側はそこへJSON-RPCを投げる流れになります。

terraform/agent/calculator/server.py
from __future__ import annotations

import os

import uvicorn
from strands import Agent
from strands.models import BedrockModel
from strands.multiagent.a2a import A2AServer
from strands_tools.calculator import calculator

MODEL_ID = "us.anthropic.claude-haiku-4-5-20251001-v1:0"
REGION = os.environ.get("AWS_REGION", "us-east-1")
RUNTIME_URL = os.environ.get("AGENTCORE_RUNTIME_URL", "http://127.0.0.1:9000/")

model = BedrockModel(model_id=MODEL_ID, region_name=REGION)

agent = Agent(
    name="計算エージェント",
    description="四則演算・代数・微積分・記号計算を行うエージェントです。",
    model=model,
    tools=[calculator],
    callback_handler=None,
)

server = A2AServer(
    agent=agent,
    host="0.0.0.0",
    port=9000,
    http_url=RUNTIME_URL,
    serve_at_root=True,
)

app = server.to_fastapi_app()

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=9000)

AgentCore RuntimeでA2Aを動かすにはポート9000・ルートパスが必須です。/ping/.well-known/agent-card.json はA2AServerが自動でエンドポイントを作成してくれるので、自前で書く必要はありません。

天気エージェント

もう1体は天気情報を返すエージェントです。今回はモックで固定データを返す関数をツール化します。

terraform/agent/weather/server.py
from __future__ import annotations

import os

import uvicorn
from strands import Agent, tool
from strands.models import BedrockModel
from strands.multiagent.a2a import A2AServer

MODEL_ID = "us.anthropic.claude-haiku-4-5-20251001-v1:0"
REGION = os.environ.get("AWS_REGION", "us-east-1")
RUNTIME_URL = os.environ.get("AGENTCORE_RUNTIME_URL", "http://127.0.0.1:9000/")

_MOCK_WEATHER = {
    "tokyo": {"condition": "Sunny", "temperature_c": 22, "humidity": 55},
    "osaka": {"condition": "Cloudy", "temperature_c": 20, "humidity": 65},
    "sapporo": {"condition": "Snowy", "temperature_c": -2, "humidity": 80},
    "fukuoka": {"condition": "Rainy", "temperature_c": 18, "humidity": 75},
    "naha": {"condition": "Sunny", "temperature_c": 27, "humidity": 70},
}

@tool
def get_weather(city: str) -> dict:
    """日本の都市の現在の天気情報を返します。対応都市: Tokyo, Osaka, Sapporo, Fukuoka, Naha。"""
    data = _MOCK_WEATHER.get(city.strip().lower())
    if data is None:
        return {"error": f"Unknown city: {city}. Supported: {list(_MOCK_WEATHER)}"}
    return {"city": city, **data}

model = BedrockModel(model_id=MODEL_ID, region_name=REGION)

agent = Agent(
    name="天気エージェント",
    description="日本の主要都市の現在の天気情報を返すエージェントです。",
    model=model,
    tools=[get_weather],
    callback_handler=None,
)

server = A2AServer(
    agent=agent,
    host="0.0.0.0",
    port=9000,
    http_url=RUNTIME_URL,
    serve_at_root=True,
)

app = server.to_fastapi_app()

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=9000)

共通のDockerfile

2体とも同じ構成のDockerfileを使用します。AgentCore RuntimeはARM64必須なので --platform=linux/arm64 を忘れないようにしましょう。

terraform/agent/calculator/Dockerfile
FROM --platform=linux/arm64 public.ecr.aws/docker/library/python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY server.py .

EXPOSE 9000

CMD ["python", "server.py"]
terraform/agent/calculator/requirements.txt
strands-agents[a2a]>=1.36.0
strands-agents-tools>=0.5.0
fastapi>=0.115.0
uvicorn>=0.32.0

Terraformでデプロイ

ECR 2つとDockerビルド&プッシュ、AgentCore Runtime で動く2体のエージェントをで作ります。Runtimeの protocol_configurationserver_protocol = "A2A" を指定します。

terraform/runtime.tf
locals {
  agents = {
    calculator = { source_dir = "${path.module}/agent/calculator" }
    weather    = { source_dir = "${path.module}/agent/weather" }
  }
}

resource "aws_ecr_repository" "calculator" {
  name                 = "${var.project_name}-calculator-${random_string.suffix.result}"
  image_tag_mutability = "MUTABLE"
  force_delete         = true
}

resource "aws_ecr_repository" "weather" {
  name                 = "${var.project_name}-weather-${random_string.suffix.result}"
  image_tag_mutability = "MUTABLE"
  force_delete         = true
}

resource "null_resource" "calculator_build_push" {
  triggers = {
    dockerfile   = filemd5("${local.agents.calculator.source_dir}/Dockerfile")
    server       = filemd5("${local.agents.calculator.source_dir}/server.py")
    requirements = filemd5("${local.agents.calculator.source_dir}/requirements.txt")
  }

  provisioner "local-exec" {
    working_dir = local.agents.calculator.source_dir
    command     = <<-EOT
      aws ecr get-login-password --region ${data.aws_region.current.region} | \
        docker login --username AWS --password-stdin ${aws_ecr_repository.calculator.repository_url}
      docker buildx build --platform linux/arm64 \
        -t ${aws_ecr_repository.calculator.repository_url}:latest --push .
    EOT
  }

  depends_on = [aws_ecr_repository.calculator]
}

# weather_build_push も同じ構造なので省略

resource "aws_bedrockagentcore_agent_runtime" "calculator" {
  agent_runtime_name = replace("${var.project_name}_calculator_${random_string.suffix.result}", "-", "_")
  description        = "Calculator A2A server"
  role_arn           = aws_iam_role.agent_runtime.arn

  agent_runtime_artifact {
    container_configuration {
      container_uri = "${aws_ecr_repository.calculator.repository_url}:latest"
    }
  }

  network_configuration {
    network_mode = "PUBLIC"
  }

  protocol_configuration {
    server_protocol = "A2A"
  }

  depends_on = [
    null_resource.calculator_build_push,
    aws_iam_role_policy.runtime_ecr,
    aws_iam_role_policy.runtime_logs,
    aws_iam_role_policy.runtime_bedrock,
  ]
}

# aws_bedrockagentcore_agent_runtime.weather も同じ構造なので省略

Runtimeの実行ロールはECR pull・CloudWatch Logs書き込み・Bedrock InvokeModelの3種類を付与します。

terraform/iam.tf
resource "aws_iam_role" "agent_runtime" {
  name = "${var.project_name}-runtime-${random_string.suffix.result}"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect    = "Allow"
      Action    = "sts:AssumeRole"
      Principal = { Service = "bedrock-agentcore.amazonaws.com" }
      Condition = {
        StringEquals = { "aws:SourceAccount" = data.aws_caller_identity.current.account_id }
      }
    }]
  })
}

resource "aws_iam_role_policy" "runtime_bedrock" {
  name = "bedrock-invoke"
  role = aws_iam_role.agent_runtime.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect = "Allow"
      Action = ["bedrock:InvokeModel", "bedrock:InvokeModelWithResponseStream"]
      Resource = "*"
    }]
  })
}

# runtime_ecr / runtime_logs は省略。詳細はGitHubのリポジトリを参照してください

outputsではA2A用にRuntimeのARNを出しておきます。

terraform/outputs.tf
output "calculator_runtime_arn" {
  value = aws_bedrockagentcore_agent_runtime.calculator.agent_runtime_arn
}

output "weather_runtime_arn" {
  value = aws_bedrockagentcore_agent_runtime.weather.agent_runtime_arn
}

クライアント側に渡すURLは https://bedrock-agentcore.<region>.amazonaws.com/runtimes/<arn>/invocations の形で組み立てます(ARNは urllib.parse.quote でURLエンコードします)。

デプロイします。

cd terraform
terraform init
terraform apply

デプロイが終わったら、このエージェントたちをRegistryに登録しておきます。

A2A Agent CardをRegistryに登録

A2A descriptorの構造

Agent RegistryのdescriptorTypeには A2A が用意されています。A2A用のdescriptorは a2a.agentCard.inlineContent にAgent Card JSONを入れる形です。

登録前にRuntimeから実際のAgent Cardを取得して、その中身をRegistryへ登録します。この登録によって、クライアント側はエージェント名・ スキル・接続先URLなどを把握できるようになるイメージです。

A2Aの詳細は以前書いたブログをご参照ください。

https://dev.classmethod.jp/articles/strands-agents-amazon-bedrock-agentcore-a2a/

SigV4 認証ヘルパー

Runtimeは bedrock-agentcore のIAM認証なので、Agent Card取得もJSON-RPC呼び出しもSigV4署名が必要です。httpxのAuthを自作します。

sigv4_auth.py
from __future__ import annotations

import boto3
import httpx
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest

class SigV4HttpxAuth(httpx.Auth):
    requires_request_body = True

    def __init__(self, region: str, service: str = "bedrock-agentcore") -> None:
        credentials = boto3.Session().get_credentials()
        if credentials is None:
            raise RuntimeError("No AWS credentials found.")
        self._signer = SigV4Auth(credentials, service, region)

    def auth_flow(self, request: httpx.Request):
        aws_request = AWSRequest(
            method=request.method,
            url=str(request.url),
            data=request.content,
            headers=dict(request.headers),
        )
        self._signer.add_auth(aws_request)
        for key, value in aws_request.headers.items():
            request.headers[key] = value
        yield request

requires_request_body = True にしないとJSON-RPCのボディが署名対象に含まれず、POST時に4xxになるので注意です。

登録スクリプト

register_a2a.py
"""Register A2A agents (Calculator / Weather) to AWS Agent Registry."""
from __future__ import annotations

import json
import logging
import os
import time
from urllib.parse import quote

import boto3
import httpx

from sigv4_auth import SigV4HttpxAuth

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s")
logger = logging.getLogger(__name__)

REGISTRY_ID = os.environ["AGENT_REGISTRY_ID"]
REGION = os.environ.get("AWS_REGION", "us-east-1")

def runtime_base_url(arn: str) -> str:
    return f"https://bedrock-agentcore.{REGION}.amazonaws.com/runtimes/{quote(arn, safe='')}/invocations"

def fetch_agent_card(base_url: str, auth: SigV4HttpxAuth) -> dict:
    card_url = f"{base_url}/.well-known/agent-card.json"
    response = httpx.get(card_url, auth=auth, timeout=30.0)
    response.raise_for_status()
    return response.json()

def wait_until_not(control, registry_id: str, record_id: str, transient: str, *, timeout: float = 30.0) -> str:
    deadline = time.monotonic() + timeout
    while True:
        status = control.get_registry_record(registryId=registry_id, recordId=record_id)["status"]
        if status != transient:
            return status
        if time.monotonic() > deadline:
            raise TimeoutError(f"record {record_id} stuck in {transient}")
        time.sleep(1.0)

def register(control, name: str, description: str, card: dict) -> str:
    created = control.create_registry_record(
        registryId=REGISTRY_ID,
        name=name,
        description=description,
        recordVersion="1.0",
        descriptorType="A2A",
        descriptors={"a2a": {"agentCard": {"inlineContent": json.dumps(card)}}},
    )
    record_id = created["recordArn"].rsplit("/", 1)[-1]
    logger.info("Created record %s (status=%s)", record_id, created["status"])

    wait_until_not(control, REGISTRY_ID, record_id, "CREATING")
    control.submit_registry_record_for_approval(registryId=REGISTRY_ID, recordId=record_id)
    status = wait_until_not(control, REGISTRY_ID, record_id, "SUBMITTING")
    logger.info("Record %s reached status %s", record_id, status)
    return record_id

def main() -> None:
    control = boto3.client("bedrock-agentcore-control", region_name=REGION)
    auth = SigV4HttpxAuth(region=REGION)

    targets = [
        ("calculator-a2a", "Calculator A2A agent (SymPy)", os.environ["CALCULATOR_RUNTIME_ARN"]),
        ("weather-a2a", "Weather A2A agent (mock for Japan cities)", os.environ["WEATHER_RUNTIME_ARN"]),
    ]

    for name, description, arn in targets:
        base = runtime_base_url(arn)
        card = fetch_agent_card(base, auth)
        logger.info("Fetched agent card for %s: %s", name, card["name"])
        record_id = register(control, name, description, card)
        print(f"{name}: {record_id}")

if __name__ == "__main__":
    main()

Runtime ARNからbase URLを組み立て、Agent Cardを取ってきて、そのままRegistryへ登録します。このRegistryは自動承認にしているのでcreate → submit → 自動承認で APPROVED になります。

実行コマンド
export AGENT_REGISTRY_ID="xxx"
export CALCULATOR_RUNTIME_ARN="$(cd terraform && terraform output -raw calculator_runtime_arn)"
export WEATHER_RUNTIME_ARN="$(cd terraform && terraform output -raw weather_runtime_arn)"
uv run python register_a2a.py
実行結果
INFO: Fetched agent card for calculator-a2a: 計算エージェント
INFO: Created record ul4ncy9CGo0p (status=CREATING)
INFO: Record ul4ncy9CGo0p reached status APPROVED
INFO: Fetched agent card for weather-a2a: 天気エージェント
INFO: Created record vZspJcEwxhy5 (status=CREATING)
INFO: Record vZspJcEwxhy5 reached status APPROVED
calculator-a2a: ul4ncy9CGo0p
weather-a2a: vZspJcEwxhy5

Registryから動的にA2Aエージェントをロードする

A2Aローダー

RegistryからA2Aレコードを検索して、Agent Cardの url フィールドを取り出し、A2AClientToolProvider を組み立てるローダーです。

registry_a2a_loader.py
"""Search AWS Agent Registry for A2A records and build A2AClientToolProvider."""
from __future__ import annotations

import json
import logging
from dataclasses import dataclass
from typing import Any

import boto3
from strands_tools.a2a_client import A2AClientToolProvider

from sigv4_auth import SigV4HttpxAuth

logger = logging.getLogger(__name__)

@dataclass
class RegistryA2ALoaderConfig:
    registry_id: str
    region_name: str = "us-east-1"
    max_results: int = 10

class RegistryA2ALoader:
    def __init__(self, config: RegistryA2ALoaderConfig) -> None:
        self._config = config
        self._client = boto3.client("bedrock-agentcore", region_name=config.region_name)

    def search(self, query: str) -> A2AClientToolProvider:
        response = self._client.search_registry_records(
            registryIds=[self._config.registry_id],
            searchQuery=query,
            maxResults=self._config.max_results,
            filters={"descriptorType": {"$eq": "A2A"}},
        )
        records = response.get("registryRecords") or []
        logger.info("Fetched %d A2A record(s) from registry", len(records))

        urls: list[str] = []
        for record in records:
            url = self._extract_url(record)
            if url:
                urls.append(url)
                logger.info("Loaded A2A: %s -> %s", record.get("name"), url[:80])

        auth = SigV4HttpxAuth(region=self._config.region_name)
        return A2AClientToolProvider(
            known_agent_urls=urls,
            httpx_client_args={"auth": auth},
        )

    @staticmethod
    def _extract_url(record: dict[str, Any]) -> str | None:
        descriptors = record.get("descriptors") or {}
        a2a_desc = descriptors.get("a2a") or {}
        card_json = (a2a_desc.get("agentCard") or {}).get("inlineContent")
        if not card_json:
            return None
        card = json.loads(card_json)
        return card.get("url")

Registry検索 → Agent Cardの url 抽出 → A2AClientToolProvider に渡す、というシンプルな流れです。httpx_client_args で認証を設定します。

オーケストレーター

ローカルのStrands AgentにRegistryから取得したA2Aツールを紐づけて、複合タスクを実行してみます。
東京の天気を教えてください。その後、現在の気温(摂氏)を華氏に変換してください(華氏 = 摂氏 × 9 / 5 + 32)。といった指示を依頼してみます。

a2a_client.py
"""Orchestrator Strands Agent that loads A2A tools from Agent Registry."""
from __future__ import annotations

import logging
import os

from strands import Agent
from strands.models import BedrockModel

from registry_a2a_loader import RegistryA2ALoader, RegistryA2ALoaderConfig

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s: %(message)s")
logger = logging.getLogger(__name__)

MODEL_ID = "us.anthropic.claude-haiku-4-5-20251001-v1:0"

def main() -> None:
    region = os.environ.get("AWS_REGION", "us-east-1")

    config = RegistryA2ALoaderConfig(
        registry_id=os.environ["AGENT_REGISTRY_ID"],
        region_name=region,
    )
    loader = RegistryA2ALoader(config)
    provider = loader.search("calculator weather")

    model = BedrockModel(model_id=MODEL_ID, region_name=region)

    agent = Agent(
        model=model,
        tools=provider.tools,
        system_prompt=(
            "You are an orchestrator that delegates tasks to remote A2A agents. "
            "Use the A2A tools to send requests to the appropriate agent discovered from the registry."
        ),
    )

    user_input = (
        "東京の天気を教えてください。その後、現在の気温(摂氏)を華氏に変換してください(華氏 = 摂氏 × 9 / 5 + 32)。"
    )

    logger.info("Query: %s", user_input)
    result = agent(user_input)
    print("\n--- result ---")
    print(result.message)

if __name__ == "__main__":
    main()

動作確認

実装できたので動かしてみます。IDは事前に作成したRegistryを指定します。

実行コマンド
AGENT_REGISTRY_ID="xxx" uv run python a2a_client.py
実行ログ
registry_a2a_loader INFO: Fetched 2 A2A record(s) from registry
registry_a2a_loader INFO: Loaded A2A: calculator-a2a -> https://bedrock-agentcore.us-east-1.amazonaws.com/runtimes/...
registry_a2a_loader INFO: Loaded A2A: weather-a2a -> https://bedrock-agentcore.us-east-1.amazonaws.com/runtimes/...
strands_tools.a2a_client INFO: Successfully discovered and cached agent card for (Weather)
strands_tools.a2a_client INFO: Sending message to (Weather)
strands_tools.a2a_client INFO: Successfully discovered and cached agent card for (Calculator)
strands_tools.a2a_client INFO: Sending message to (Calculator)
実行結果
## 東京の天気情報
- 天気: 晴れ
- 気温: 22°C
- 湿度: 55%

## 気温の変換
摂氏22°C を華氏に変換:
計算式:華氏 = 摂氏 × 9 / 5 + 32
22 × 9 ÷ 5 + 32 = 71.6°F

したがって、東京の現在の気温 22°C は華氏では 71.6°F です!

オーケストレーターがRegistryから取得したA2Aツールを使って、天気エージェント → 計算エージェントと順番に呼び出し、複合タスクを完結できました!

エージェント側のコードには個別のツール定義を一切書いておらず、Registryから動的に取得したエンドポイント情報だけで連携できているのは良いですね。

オーケストレーターもAgentCore Runtimeに載せる

ここまではオーケストレーターをローカルで動かしてきましたが、同じロジックはHTTPプロトコルのAgentCore Runtimeに載せてそのままサーバー化できます。bedrock-agentcore SDKの BedrockAgentCoreApp でラップするだけでOKです。

terraform/agent/orchestrator/server.py
from __future__ import annotations

import logging
import os

from bedrock_agentcore.runtime import BedrockAgentCoreApp
from strands import Agent
from strands.models import BedrockModel

from registry_a2a_loader import RegistryA2ALoader, RegistryA2ALoaderConfig

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s: %(message)s")
logger = logging.getLogger(__name__)

MODEL_ID = "us.anthropic.claude-haiku-4-5-20251001-v1:0"
REGION = os.environ.get("AWS_REGION", "us-east-1")
REGISTRY_ID = os.environ["AGENT_REGISTRY_ID"]

app = BedrockAgentCoreApp()

@app.entrypoint
async def handler(payload: dict, context):
    query = payload.get("prompt")
    if not query:
        yield {"error": "`prompt` is required"}
        return

    search_query = payload.get("search_query", query)

    config = RegistryA2ALoaderConfig(registry_id=REGISTRY_ID, region_name=REGION)
    loader = RegistryA2ALoader(config)
    provider = loader.search(search_query)

    model = BedrockModel(model_id=MODEL_ID, region_name=REGION)
    agent = Agent(
        model=model,
        tools=provider.tools,
        system_prompt=(
            "You are an orchestrator that delegates tasks to remote A2A agents."
        ),
    )

    async for event in agent.stream_async(query):
        yield event

if __name__ == "__main__":
    app.run()

BedrockAgentCoreApp を設定して、Registry経由でA2A連携するシンプルなエージェントを実装します。

Dockerfileも下記のように実装します。

terraform/agent/orchestrator/Dockerfile
FROM --platform=linux/arm64 public.ecr.aws/docker/library/python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir aws-opentelemetry-distro==0.12.2

COPY server.py .
COPY registry_a2a_loader.py .
COPY sigv4_auth.py .

EXPOSE 8080

CMD ["opentelemetry-instrument", "python", "server.py"]

Terraform側は server_protocol = "HTTP" で新しいRuntimeを追加します。Registry IDは環境変数で設定します。

terraform/runtime.tf(抜粋)
resource "aws_bedrockagentcore_agent_runtime" "orchestrator" {
  agent_runtime_name = replace("${var.project_name}_orchestrator_${random_string.suffix.result}", "-", "_")
  description        = "Orchestrator HTTP server calling A2A agents via Registry"
  role_arn           = aws_iam_role.orchestrator_runtime.arn

  agent_runtime_artifact {
    container_configuration {
      container_uri = "${aws_ecr_repository.orchestrator.repository_url}:latest"
    }
  }

  network_configuration {
    network_mode = "PUBLIC"
  }

  protocol_configuration {
    server_protocol = "HTTP"
  }

  environment_variables = {
    AGENT_REGISTRY_ID = var.agent_registry_id
  }
}

GetAgentCard 権限も必要

他のAgentCore Runtimeに対してA2Aで接続するとき、bedrock-agentcore:InvokeAgentRuntime だけでは足りません。A2A仕様で最初に GET /.well-known/agent-card.json を叩いてエージェント情報を取得するのですが、このAPIには別のアクション bedrock-agentcore:GetAgentCard が割り当てられています。

Registry検索まではできるのに、Agent Card取得段階で403 Forbidden、というパターンで引っかかったので情報共有として記しておきます。注意したいですね。

terraform/iam.tf(抜粋)
resource "aws_iam_role_policy" "orchestrator_invoke_runtime" {
  name = "invoke-a2a-runtime"
  role = aws_iam_role.orchestrator_runtime.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect = "Allow"
      Action = [
        "bedrock-agentcore:InvokeAgentRuntime",
        "bedrock-agentcore:InvokeAgentRuntimeForUser",
        "bedrock-agentcore:GetAgentCard",  # ← A2A Agent Card取得に必要
      ]
      Resource = [
        aws_bedrockagentcore_agent_runtime.calculator.agent_runtime_arn,
        "${aws_bedrockagentcore_agent_runtime.calculator.agent_runtime_arn}/runtime-endpoint/*",
        aws_bedrockagentcore_agent_runtime.weather.agent_runtime_arn,
        "${aws_bedrockagentcore_agent_runtime.weather.agent_runtime_arn}/runtime-endpoint/*",
      ]
    }]
  })
}

オーケストレーターを呼び出す(Runtime経由)

AgentCoreにデプロイしたオーケストレーターを呼び出してみます。
レスポンスを整形する簡単なクライアント invoke_orchestrator.py を用意して実行します。

invoke_orchestrator.py
"""Invoke the Orchestrator Runtime and pretty-print the SSE stream."""
from __future__ import annotations

import json
import os
import sys

import boto3

REGION = os.environ.get("AWS_REGION", "us-east-1")

def stream_events(body: bytes):
    for chunk in body.decode("utf-8", errors="replace").split("\n\n"):
        chunk = chunk.strip()
        if not chunk.startswith("data:"):
            continue
        try:
            yield json.loads(chunk[5:].strip())
        except json.JSONDecodeError:
            continue

def pretty_print(body: bytes) -> None:
    tool_use: dict[str, dict] = {}
    for event in stream_events(body):
        if not isinstance(event, dict):
            continue
        ev = event.get("event") or {}

        if "contentBlockStart" in ev:
            tu = (ev["contentBlockStart"].get("start") or {}).get("toolUse")
            if tu:
                tool_use[tu["toolUseId"]] = {"name": tu.get("name"), "input": ""}
                print(f"\n[tool_call] {tu.get('name')}", flush=True)

        if "contentBlockDelta" in ev:
            delta = ev["contentBlockDelta"].get("delta") or {}
            if "text" in delta:
                sys.stdout.write(delta["text"])
                sys.stdout.flush()
            if "toolUse" in delta:
                frag = delta["toolUse"].get("input", "")
                for tu in tool_use.values():
                    tu["input"] += frag
                    break

        if "contentBlockStop" in ev:
            for tu in list(tool_use.values()):
                if tu["input"]:
                    try:
                        args = json.loads(tu["input"])
                        print(f"  args: {json.dumps(args, ensure_ascii=False)[:200]}", flush=True)
                    except json.JSONDecodeError:
                        pass
                    tu["input"] = ""

        if "messageStop" in ev:
            print(flush=True)

def main() -> None:
    arn = os.environ["ORCHESTRATOR_RUNTIME_ARN"]
    prompt = " ".join(sys.argv[1:]) or "東京の天気を教えて、気温を華氏に変換して。"
    search_query = os.environ.get("SEARCH_QUERY", "calculator weather")

    client = boto3.client("bedrock-agentcore", region_name=REGION)
    response = client.invoke_agent_runtime(
        agentRuntimeArn=arn,
        qualifier="DEFAULT",
        payload=json.dumps({"prompt": prompt, "search_query": search_query}).encode(),
        contentType="application/json",
    )
    pretty_print(response["response"].read())

if __name__ == "__main__":
    main()

これで動作確認してみます。

実行コマンド
export ORCHESTRATOR_RUNTIME_ARN="$(cd terraform && terraform output -raw orchestrator_runtime_arn)"
uv run python invoke_orchestrator.py "東京の天気を教えて、気温を華氏に変換して。"
実行結果
東京の天気情報と気温の華氏変換をお手伝いします。まず、利用可能なエージェントを確認させてください。
[tool_call] a2a_list_discovered_agents

完璧です!天気エージェントと計算機エージェントが利用可能です。東京の天気情報を取得し、その後気温を華氏に変換します。
[tool_call] a2a_send_message
  args: {"message_text": "東京の現在の天気情報を教えてください。", "target_agent_url": "https://.../a2a_demo_weather_.../invocations/"}

素晴らしい!東京の天気情報を取得できました。次に、気温の22°Cを華氏に変換します。
[tool_call] a2a_send_message
  args: {"message_text": "22度セルシウスを華氏に変換してください。計算式は (22 * 9/5) + 32 です。", "target_agent_url": "https://.../a2a_demo_calculator_.../invocations/"}

完璧です!東京の天気情報と気温の華氏への変換が完了しました。

## 📍 東京の天気情報

**現在の天気:**
- **天気:** 晴れ(Sunny)
- **気温:** 22°C **71.6°F(華氏)**
- **湿度:** 55%

良い天気が続いており、過ごしやすい気温のようですね。22度セルシウスは華氏では71.6度で、室温としては快適な温度です。

a2a_list_discovered_agents でRegistry由来のエージェント一覧を取得 → 天気エージェントへメッセージ → 計算エージェントへメッセージ、という流れが確認できますね!

Runtime側のCloudWatch Logsを覗くと、Registry検索 → Agent Card取得 → JSON-RPC送信、という内部の流れも実行されているのが確認できました!

Runtime内オーケストレーターのログ(抜粋)
Fetched 2 A2A record(s) from registry
Loaded A2A: calculator-a2a -> https://bedrock-agentcore.us-east-1.amazonaws.com/...
Loaded A2A: weather-a2a -> https://bedrock-agentcore.us-east-1.amazonaws.com/...
Successfully discovered and cached agent card for ...weather...
Successfully discovered and cached agent card for ...calculator...
Sending message to ...weather...
Sending message to ...calculator...

おわりに

前回のSkills版に続いて、今回はA2A版の動的ロードを試しました。
オーケストレーターのようなエージェントがA2Aしたい相手の情報を直接持たずとも、Registryで参照して動的に必要なものと繋げるのは面白いですね。繋げるものが増えた時に管理しやすそうです。組織として承認されたエージェントを一元管理する土台として便利そうですね。(RegistryはHTTPSが未対応なのが残念なのでいつか対応して欲しいですね・・・)

本記事が少しでも参考になりましたら幸いです。最後までご覧いただきありがとうございました!

この記事をシェアする

関連記事