
【速報】Claude Managed Agents Multiagent Orchestrationを試してみた
こんにちは、せーのです。
Claude Managed Agents の「Multiagent Orchestration」 は、公式ブログ上 Public Beta として案内されている、複数エージェントの委任と並列実行を API から組み立てる機能です。
1体のコーディネーターが複数のスペシャリストに仕事を振り並列で動かせること、Primary / Session のイベントで追跡しやすいこと、公式ドキュメントの制限、Python SDK と API を試した結果を整理します。
Multiagent Orchestrationとは
Multiagent Orchestration は、1つの コーディネーター エージェントが、複数の スペシャリスト エージェントにタスクを委任して、並列・分散で処理させる仕組みです。
ざっくり言うと、セッションのなかで「リードが仕事を割り振り、スペシャリストがそれぞれのコンテキストで動き、結果を集約する」イメージです。公式ドキュメントでは、次のようなポイントが強調されています。
- すべてのエージェントが 同じコンテナとファイルシステム を共有する
- 各エージェントは セッションスレッド という独立したコンテキストで動く
- コーディネーターが必要に応じてスペシャリストを呼び出す
- 追跡可能性(全スレッドのイベントを参照できる)
一方、いわゆる「1エージェントに全部やらせる」構成だと、コンテキストが一気に肥えたり、役割の切り替えが曖昧になったりしがちです。
違いを整理すると、シングルエージェントは「1本の会話・1本の脳みそで完走させる」もの、Multiagent Orchestrationは「分割された脳みそが、共有ディスクの上で協調する」もの、というイメージです。
アーキテクチャのイメージ
公式ドキュメントの説明に沿った構成イメージは次のとおりです。
Session (primary thread)
├── Coordinator Agent (例: Engineering Lead)
│ ├── → Thread A: Code Reviewer Agent
│ ├── → Thread B: Test Writer Agent
│ └── → Thread C: Security Agent (必要なら)
│ (同一コンテナ、共有ファイルシステム)
└── Primary thread に全活動の集約ビューが流れる
Primary thread と Session thread(おさらい)
ストリーミングで追いかける対象が2系統ある、という理解をしておくとよさそうです。
| 種類 | 説明 | イベントストリームのイメージ |
|---|---|---|
| Primary thread | コーディネーターの活動 + 全スレッドの集約ビュー | /v1/sessions/:id/events/stream |
| Session thread | 各エージェントの詳細活動 | /v1/sessions/:id/threads/:thread_id/stream |
スレッドは 永続的 で、コーディネーターが同じスペシャリストを再度呼ぶと、前回の履歴を引きずる、という挙動もドキュメント上で説明されています。
違いを整理すると、Primary threadは「全体のダイジェスト番組」、Session threadは「各スペシャリストの密着ドキュメント」、というイメージです。
制限(ドキュメント上の数字)
触る前に押さえておきたい上限です。
- 同時実行スレッド数: 最大 25
- roster のユニークエージェント数: 最大 20
- 委任の深さ: 1レベルのみ(コーディネーター → スペシャリスト。スペシャリストからのさらなる委任は不可)
- コーディネーターは同一エージェントの複数コピーを呼べる(
{type: "self"}で自己コピーも可能)
「とりあえず無限にツリーで委任」はできないので、設計はフラットに割り切るのが筋よさそうです。
効果的なパターン(公式の整理)
公式ドキュメントでは、次のようなパターンが例として挙がっていました。
- 並列化(Parallelization): 独立したサブタスクを複数エージェントに同時処理させ、コーディネーターが統合する
- 特化(Specialization): ドメイン特化の system やツールを持つエージェントにルーティングする
- エスカレーション(Escalation): 難しいサブタスクだけ高性能モデル(例: Opus)のエージェントに渡し、単純なものは安価なモデルで処理する
「コードレビュー + テスト生成」のように役割が分かれているタスクは、並列化 × 特化 の例として相性がよいです。
やってみた
試したのは 2026-05-07 時点の API です(以降は仕様や SDK が変わる可能性があります)。
前提(アクセス)
- 申請不要で、Anthropic API キーがあれば利用できました
- 管理コンソール側の Admin 権限は不要でした
試した構成
「Engineering Lead(コーディネーター / Opus)」が、「Code Reviewer(Sonnet)」と「Test Writer(Sonnet)」に同じ Python スニペットのレビューとテスト生成を依頼する、という最小構成です。
サンプルコード(再現手順)
以下は実際に実行したスクリプトの抜粋です。エージェント ID などを state.json に書き出しながら step1 → step6 の順で動かしています。
依存関係と環境変数
anthropic>=0.95.0
python-dotenv>=1.0.0
httpx>=0.27.0
プロジェクト直下に .env を置き、ANTHROPIC_API_KEY を設定してください。Step 6 の REST 呼び出しでは Managed Agents 用に anthropic-beta: managed-agents-2026-04-01 ヘッダを付けています(公式ドキュメントの該当ベータ名に合わせて更新してください)。
common.py(クライアントと state)
import json
import logging
import os
import sys
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
STATE_FILE = Path(__file__).parent / "state.json"
LOGS_DIR = Path(__file__).parent / "logs"
def get_client():
import anthropic
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY が設定されていません。.env を確認してください。")
return anthropic.Anthropic(api_key=api_key)
def load_state() -> dict:
if STATE_FILE.exists():
return json.loads(STATE_FILE.read_text())
return {}
def save_state(updates: dict) -> dict:
state = load_state()
state.update(updates)
STATE_FILE.write_text(json.dumps(state, indent=2, ensure_ascii=False))
return state
def setup_logger(step_name: str) -> logging.Logger:
LOGS_DIR.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = LOGS_DIR / f"{step_name}_{timestamp}.txt"
logger = logging.getLogger(step_name)
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S")
sh = logging.StreamHandler(sys.stdout)
sh.setFormatter(fmt)
logger.addHandler(sh)
fh = logging.FileHandler(log_file, encoding="utf-8")
fh.setFormatter(fmt)
logger.addHandler(fh)
logger.info(f"ログ出力先: {log_file}")
return logger
Step 1: スペシャリスト2体を作成
from common import get_client, save_state, setup_logger
log = setup_logger("step1")
def main():
client = get_client()
log.info("=== Code Reviewer を作成 ===")
reviewer = client.beta.agents.create(
name="Code Reviewer",
model="claude-sonnet-4-6",
system="""You are a senior code reviewer.
Review code for:
- Security vulnerabilities
- Performance issues
- Code quality and readability
- Best practices
Provide specific, actionable feedback with line numbers.""",
tools=[{"type": "agent_toolset_20260401"}],
)
log.info(f"Reviewer Agent ID: {reviewer.id}")
log.info("=== Test Writer を作成 ===")
test_writer = client.beta.agents.create(
name="Test Writer",
model="claude-sonnet-4-6",
system="""You are a test engineer specializing in writing comprehensive unit tests.
Write pytest tests that cover:
- Happy path
- Edge cases
- Error conditions
- Boundary values""",
tools=[{"type": "agent_toolset_20260401"}],
)
log.info(f"Test Writer Agent ID: {test_writer.id}")
save_state({
"reviewer_id": reviewer.id,
"test_writer_id": test_writer.id,
})
log.info("state.json に保存しました。次は Step 2 を実行してください。")
if __name__ == "__main__":
main()
Step 2: コーディネーター(multiagent は extra_body)
SDK の型に multiagent が無い場合でも、extra_body で API に渡せます。
from common import get_client, load_state, save_state, setup_logger
log = setup_logger("step2")
def main():
state = load_state()
reviewer_id = state.get("reviewer_id")
test_writer_id = state.get("test_writer_id")
if not reviewer_id or not test_writer_id:
log.error("reviewer_id / test_writer_id がありません。Step 1 を先に実行してください。")
return
client = get_client()
log.info("=== Engineering Lead(コーディネーター)を作成 ===")
coordinator = client.beta.agents.create(
name="Engineering Lead",
model="claude-opus-4-7",
system="""You are an engineering lead.
When given code to review:
1. First, delegate to the Code Reviewer agent for security and quality review
2. Then, delegate to the Test Writer agent to create tests
3. Synthesize both results into a final report
Always delegate to both agents in parallel when possible.""",
tools=[{"type": "agent_toolset_20260401"}],
extra_body={
"multiagent": {
"type": "coordinator",
"agents": [
{"type": "agent", "id": reviewer_id},
{"type": "agent", "id": test_writer_id},
],
}
},
)
log.info(f"Coordinator Agent ID: {coordinator.id}")
save_state({"coordinator_id": coordinator.id})
log.info("state.json を更新しました。次は Step 3 を実行してください。")
if __name__ == "__main__":
main()
Step 3: Environment と Session
from common import get_client, load_state, save_state, setup_logger
log = setup_logger("step3")
def main():
state = load_state()
coordinator_id = state.get("coordinator_id")
if not coordinator_id:
log.error("coordinator_id がありません。Step 2 を先に実行してください。")
return
client = get_client()
log.info("=== Environment を作成 ===")
environment = client.beta.environments.create(name="code-review-env")
log.info(f"Environment ID: {environment.id}")
log.info("=== Session を作成 ===")
session = client.beta.sessions.create(
agent=coordinator_id,
environment_id=environment.id,
)
log.info(f"Session ID: {session.id}")
save_state({
"environment_id": environment.id,
"session_id": session.id,
})
log.info("state.json を更新しました。次は Step 4 を実行してください。")
if __name__ == "__main__":
main()
Step 4: タスク投入(content はオブジェクト配列)
ここが重要です。 user.message の content は文字列ではなく [{"type": "text", "text": "..."}] 形式にします。文字列のままだと 400 になります。
from common import get_client, load_state, setup_logger
log = setup_logger("step4")
CODE_TO_REVIEW = '''
def calculate_discount(price, discount_percent):
discount = price * discount_percent / 100
final_price = price - discount
return final_price
def process_order(items, user_id):
total = 0
for item in items:
total += item['price']
discount = calculate_discount(total, 10)
return {'user': user_id, 'total': discount}
'''
def main():
state = load_state()
session_id = state.get("session_id")
if not session_id:
log.error("session_id がありません。Step 3 を先に実行してください。")
return
client = get_client()
log.info("=== タスクを送信 ===")
client.beta.sessions.events.send(
session_id,
events=[{
"type": "user.message",
"content": [{
"type": "text",
"text": (
"Please review this Python code and generate tests:\n\n"
f"```python\n{CODE_TO_REVIEW}\n```"
),
}],
}],
)
log.info("送信済み。次は Step 5 でストリームを観察してください。")
if __name__ == "__main__":
main()
Step 5: Primary thread をストリームで観察
from common import get_client, load_state, setup_logger
log = setup_logger("step5")
def main():
state = load_state()
session_id = state.get("session_id")
if not session_id:
log.error("session_id がありません。Step 3 を先に実行してください。")
return
log.info(f"Session ID: {session_id}")
log.info("=== Primary Thread のイベントストリーム ===")
client = get_client()
with client.beta.sessions.events.stream(session_id) as stream:
for event in stream:
match event.type:
case "session.thread_created":
agent_name = getattr(event, "agent_name", "unknown")
log.info(f">> 新スレッド作成: {agent_name}")
case "session.thread_status_running":
agent_name = getattr(event, "agent_name", "")
log.info(f">> スレッド実行中: {agent_name}")
case "session.thread_status_idle":
agent_name = getattr(event, "agent_name", "")
session_thread_id = getattr(event, "session_thread_id", None)
stop_reason = getattr(event, "stop_reason", None)
log.info(f">> スレッド待機: {agent_name} | stop_reason={stop_reason}")
if session_thread_id is None:
log.info(">> Primary が idle → セッション完了")
break
case "agent.thread_message_received":
from_agent = getattr(event, "from_agent_name", "unknown")
content = getattr(event, "content", [])
log.info(f"\n>> {from_agent} から結果を受信:")
for block in content:
if getattr(block, "type", None) == "text":
preview = block.text[:400]
log.info(f" {preview}{'...' if len(block.text) > 400 else ''}")
break
case "agent.thread_message_sent":
to_agent = getattr(event, "to_agent_name", "unknown")
log.info(f">> コーディネーターが {to_agent} に委任")
case _:
log.debug(f" [event] {event.type}")
log.info("=== ストリーム終了 ===")
if __name__ == "__main__":
main()
Step 6: スレッド一覧・イベントは REST(SDK 未対応の場合)
当時の環境では sessions.threads が SDK に無かったため、httpx で HTTP GET しています。
import os
import httpx
from dotenv import load_dotenv
from common import load_state, setup_logger
log = setup_logger("step6")
BASE_URL = "https://api.anthropic.com/v1"
BETA_HEADER = "managed-agents-2026-04-01"
def get_headers() -> dict:
return {
"x-api-key": os.environ["ANTHROPIC_API_KEY"],
"anthropic-version": "2023-06-01",
"anthropic-beta": BETA_HEADER,
"content-type": "application/json",
}
def list_threads(session_id: str) -> list:
resp = httpx.get(
f"{BASE_URL}/sessions/{session_id}/threads",
headers=get_headers(),
timeout=30,
)
resp.raise_for_status()
return resp.json().get("data", [])
def list_thread_events(session_id: str, thread_id: str) -> list:
resp = httpx.get(
f"{BASE_URL}/sessions/{session_id}/threads/{thread_id}/events",
headers=get_headers(),
timeout=30,
)
resp.raise_for_status()
return resp.json().get("data", [])
def main():
load_dotenv()
if not os.environ.get("ANTHROPIC_API_KEY"):
log.error("ANTHROPIC_API_KEY が設定されていません。")
return
state = load_state()
session_id = state.get("session_id")
if not session_id:
log.error("session_id がありません。Step 3 を先に実行してください。")
return
log.info("=== 全スレッド一覧 ===")
threads = list_threads(session_id)
log.info(f"合計スレッド数: {len(threads)}")
for thread in threads:
agent_name = thread.get("agent", {}).get("name", "unknown")
status = thread.get("status", "unknown")
parent = thread.get("parent_thread_id", None)
thread_id = thread.get("id", "")
log.info(f" [{agent_name}] id={thread_id} status={status} | parent={parent}")
log.info("")
log.info("=== 各スレッドのイベント ===")
for thread in threads:
agent_name = thread.get("agent", {}).get("name", "unknown")
thread_id = thread.get("id", "")
log.info(f"\n--- {agent_name} (id={thread_id}) ---")
events = list_thread_events(session_id, thread_id)
if not events:
log.info(" (イベントなし)")
continue
for ev in events:
log.info(f" [{ev.get('type', 'unknown')}] {ev.get('processed_at', '')}")
log.info("\n=== 完了 ===")
if __name__ == "__main__":
main()
上記の Step 4 まででタスクが走り始め、Step 5 で Primary のストリーム、Step 6 で各スレッドの REST 取得、という流れです。
実行結果: Primary thread で見えたこと
Primary thread のストリームでは、スレッドの作成 → 実行 → 各スペシャリストからの返信プレビューまで一気通貫で追えました。
委任は並列だったか?
並列でした。 Engineering Lead は Code Reviewer と Test Writer を、だいたい 0.5秒差 でほぼ同時に起動していました。
スレッド数
3本でした(Engineering Lead 1 + Code Reviewer 1 + Test Writer 1)。
ざっくりタイムライン(体感のボトルネック)
| フェーズ | ざっくり時間 |
|---|---|
| Engineering Lead が思考して委任 | 約12秒 |
| Code Reviewer | 約40秒 |
| Test Writer(tool_use が多め) | 約2分 |
| Engineering Lead が合成 | 約1分 |
| 合計 | 約3分15秒 |
並列化の効き方で言うと、待ち時間の上限は遅い方のスペシャリスト(この例では Test Writer) に寄ります。直列なら各スペシャリストの時間が足し算になります。
Primary thread のログ抜粋
イベントの流れの例です(スレッド ID は省略)。
05:57:25 [DEBUG] [event] agent.thinking
05:57:27 [DEBUG] [event] span.model_request_end
05:57:27 [INFO] >> 新スレッド作成: Code Reviewer
05:57:27 [INFO] >> コーディネーターが Code Reviewer に委任
05:57:27 [INFO] >> 新スレッド作成: Test Writer
05:57:27 [INFO] >> コーディネーターが Test Writer に委任
05:57:27 [INFO] >> スレッド実行中: Code Reviewer
05:57:27 [INFO] >> スレッド実行中: Test Writer
05:58:12 [INFO] >> Code Reviewer から結果を受信:
## Code Review Report
...(コード品質・セキュリティ指摘)
05:59:33 [INFO] >> Test Writer から結果を受信:
Here is the complete `test_orders.py` (45 tests, all passing...)
06:00:44 [INFO] >> スレッド待機: Engineering Lead | stop_reason={'type': 'end_turn'}
06:00:44 [DEBUG] [event] session.status_idle
実行結果: Session thread で増える情報
Primary thread だけでも「いつ誰が動いたか」「結果の先頭プレビュー」は追えます。
一方、次のような 細かい中身 は Session thread 側に降りないと見えませんでした。
agent.thinkingの中身agent.tool_use/agent.tool_resultの詳細- モデルリクエストの開始・終了タイミング
違いを整理すると、Primaryは「進行状況と成果のダイジェスト」、Session threadは「各スペシャリストの作業ログ」、というイメージです。
スレッド一覧で見えた親子関係(抜粋)
[Engineering Lead] id=sthr_01xxxx... status=idle | parent=None
[Code Reviewer] id=sthr_01yyyy... status=idle | parent=sthr_01xxxx...
[Test Writer] id=sthr_01zzzz... status=idle | parent=sthr_01xxxx...
Python SDK と API(2026-05 時点のメモ)
Managed Agents は進みが早いので、公式の HTTP 例と SDK の対応を突き合わせると詰まりにくいです。
anthropicPython SDK は v0.95.0 以降でclient.beta.agentsなどが揃ってくる想定です(古い版では未実装のことがあります)agents.create()のmultiagentが SDK の型に無い場合は、extra_body={"multiagent": ...}で API に渡せる場合があります- スレッド一覧は
GET /v1/sessions/{session_id}/threadsのように REST で直接呼ぶ必要がありました - エージェントの破棄は
agents.archive()(deleteではない場合があります)
コーディネーターの system prompt の効き
コーディネーターの system に 「可能なら両方に並列で委任」 と書いた条件では、Code Reviewer と Test Writer がほぼ同時に起動しました。書かないと直列寄りになる可能性は残ります(条件を変えた比較は未実施です)。
補足: always_ask とツール承認
サブエージェント側で always_ask やカスタムツール結果が必要な場合、primary thread にクロスポストされる、という説明があります。session_thread_id でスレッドを識別し、user.tool_confirmation を送るとサーバーが適切なスレッドにルーティングする、という流れです。
Claude Code の Agent Teams との違い
Claude Code の Agent Teams も「複数エージェントが協調する」のですが、エディタ/CLI 上の開発ワークフロー(Team Lead と Teammates、リポジトリや MCP と一体で動く)です。Multiagent Orchestration は Managed Agents API で、アプリから agents / sessions / 環境を組み立て、イベントストリームで追う側の話です。名前が似ているので、ドキュメントや記事を読むときは Claude Code と Managed Agents(API) のどちらの話かを切り分けると混乱しにくいです。
まとめ
分割して並列に走らせる設計は、Primary / Session のイベントで 進行と中身の両方を追いやすいです。一方で SDK と API の対応差はまだ出やすいので、実装では 公式ドキュメントの HTTP と、利用中の SDK の版・型 をセットで確認するのが安全です。
要点
- Multiagent Orchestration は、コーディネーターがスペシャリストに委任する Public Beta(公式ブログの表記)
- Primary thread と Session thread の2系統で見える設計になっており、追跡性とデバッグのしやすさが強い
- 委任は1階層、スレッド数や roster に上限があるので、設計はフラットに割り切るのが現実的
使えそうなユースケース
- コードレビュー + テスト生成のように、独立した専門作業を並列化したいとき
- スペシャリストごとに system / ツールを分けた方が品質が上がる とき(セキュリティ / パフォーマンス / ドキュメントなど)
注意点
- 同時スレッド上限(25) と roster(20) は、長時間・大量委任の設計では早めに意識した方がよさそう
- SDKのバージョン と
user.messageの payload 形 は最初に確認(ここで詰まりやすい) - ドキュメントの表現とブログの Public Beta 表記が一致しない箇所が出る場合があります。実装時は 最新の公式ページ を優先してください
試した結果やハマりどころがあれば、SNS などで共有してもらえると助かります。









