【速報】Claude Managed Agents Multiagent Orchestrationを試してみた

【速報】Claude Managed Agents Multiagent Orchestrationを試してみた

2026.05.07

こんにちは、せーのです。

Claude Managed Agents の「Multiagent Orchestration」 は、公式ブログ上 Public Beta として案内されている、複数エージェントの委任と並列実行を API から組み立てる機能です。

1体のコーディネーターが複数のスペシャリストに仕事を振り並列で動かせること、Primary / Session のイベントで追跡しやすいこと、公式ドキュメントの制限、Python SDK と API を試した結果を整理します。

Multiagent Orchestrationとは

Multiagent Orchestration は、1つの コーディネーター エージェントが、複数の スペシャリスト エージェントにタスクを委任して、並列・分散で処理させる仕組みです。

ざっくり言うと、セッションのなかで「リードが仕事を割り振り、スペシャリストがそれぞれのコンテキストで動き、結果を集約する」イメージです。公式ドキュメントでは、次のようなポイントが強調されています。

  • すべてのエージェントが 同じコンテナとファイルシステム を共有する
  • 各エージェントは セッションスレッド という独立したコンテキストで動く
  • コーディネーターが必要に応じてスペシャリストを呼び出す
  • 追跡可能性(全スレッドのイベントを参照できる)

一方、いわゆる「1エージェントに全部やらせる」構成だと、コンテキストが一気に肥えたり、役割の切り替えが曖昧になったりしがちです。

違いを整理すると、シングルエージェントは「1本の会話・1本の脳みそで完走させる」もの、Multiagent Orchestrationは「分割された脳みそが、共有ディスクの上で協調する」もの、というイメージです。

アーキテクチャのイメージ

公式ドキュメントの説明に沿った構成イメージは次のとおりです。

Session (primary thread)
├── Coordinator Agent (例: Engineering Lead)
│   ├── → Thread A: Code Reviewer Agent
│   ├── → Thread B: Test Writer Agent
│   └── → Thread C: Security Agent  (必要なら)
│       (同一コンテナ、共有ファイルシステム)
└── Primary thread に全活動の集約ビューが流れる

Primary thread と Session thread(おさらい)

ストリーミングで追いかける対象が2系統ある、という理解をしておくとよさそうです。

種類 説明 イベントストリームのイメージ
Primary thread コーディネーターの活動 + 全スレッドの集約ビュー /v1/sessions/:id/events/stream
Session thread 各エージェントの詳細活動 /v1/sessions/:id/threads/:thread_id/stream

スレッドは 永続的 で、コーディネーターが同じスペシャリストを再度呼ぶと、前回の履歴を引きずる、という挙動もドキュメント上で説明されています。

違いを整理すると、Primary threadは「全体のダイジェスト番組」、Session threadは「各スペシャリストの密着ドキュメント」、というイメージです。

制限(ドキュメント上の数字)

触る前に押さえておきたい上限です。

  • 同時実行スレッド数: 最大 25
  • roster のユニークエージェント数: 最大 20
  • 委任の深さ: 1レベルのみ(コーディネーター → スペシャリスト。スペシャリストからのさらなる委任は不可)
  • コーディネーターは同一エージェントの複数コピーを呼べる({type: "self"} で自己コピーも可能)

「とりあえず無限にツリーで委任」はできないので、設計はフラットに割り切るのが筋よさそうです。

効果的なパターン(公式の整理)

公式ドキュメントでは、次のようなパターンが例として挙がっていました。

  • 並列化(Parallelization): 独立したサブタスクを複数エージェントに同時処理させ、コーディネーターが統合する
  • 特化(Specialization): ドメイン特化の system やツールを持つエージェントにルーティングする
  • エスカレーション(Escalation): 難しいサブタスクだけ高性能モデル(例: Opus)のエージェントに渡し、単純なものは安価なモデルで処理する

「コードレビュー + テスト生成」のように役割が分かれているタスクは、並列化 × 特化 の例として相性がよいです。

やってみた

試したのは 2026-05-07 時点の API です(以降は仕様や SDK が変わる可能性があります)。

前提(アクセス)

  • 申請不要で、Anthropic API キーがあれば利用できました
  • 管理コンソール側の Admin 権限は不要でした

試した構成

「Engineering Lead(コーディネーター / Opus)」が、「Code Reviewer(Sonnet)」と「Test Writer(Sonnet)」に同じ Python スニペットのレビューとテスト生成を依頼する、という最小構成です。

サンプルコード(再現手順)

以下は実際に実行したスクリプトの抜粋です。エージェント ID などを state.json に書き出しながら step1 → step6 の順で動かしています。

依存関係と環境変数

anthropic>=0.95.0
python-dotenv>=1.0.0
httpx>=0.27.0

プロジェクト直下に .env を置き、ANTHROPIC_API_KEY を設定してください。Step 6 の REST 呼び出しでは Managed Agents 用に anthropic-beta: managed-agents-2026-04-01 ヘッダを付けています(公式ドキュメントの該当ベータ名に合わせて更新してください)。

common.py(クライアントと state)

import json
import logging
import os
import sys
from datetime import datetime
from pathlib import Path

from dotenv import load_dotenv

load_dotenv()

STATE_FILE = Path(__file__).parent / "state.json"
LOGS_DIR = Path(__file__).parent / "logs"

def get_client():
    import anthropic
    api_key = os.environ.get("ANTHROPIC_API_KEY")
    if not api_key:
        raise ValueError("ANTHROPIC_API_KEY が設定されていません。.env を確認してください。")
    return anthropic.Anthropic(api_key=api_key)

def load_state() -> dict:
    if STATE_FILE.exists():
        return json.loads(STATE_FILE.read_text())
    return {}

def save_state(updates: dict) -> dict:
    state = load_state()
    state.update(updates)
    STATE_FILE.write_text(json.dumps(state, indent=2, ensure_ascii=False))
    return state

def setup_logger(step_name: str) -> logging.Logger:
    LOGS_DIR.mkdir(exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    log_file = LOGS_DIR / f"{step_name}_{timestamp}.txt"

    logger = logging.getLogger(step_name)
    logger.setLevel(logging.DEBUG)
    logger.handlers.clear()

    fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S")

    sh = logging.StreamHandler(sys.stdout)
    sh.setFormatter(fmt)
    logger.addHandler(sh)

    fh = logging.FileHandler(log_file, encoding="utf-8")
    fh.setFormatter(fmt)
    logger.addHandler(fh)

    logger.info(f"ログ出力先: {log_file}")
    return logger

Step 1: スペシャリスト2体を作成

from common import get_client, save_state, setup_logger

log = setup_logger("step1")

def main():
    client = get_client()

    log.info("=== Code Reviewer を作成 ===")
    reviewer = client.beta.agents.create(
        name="Code Reviewer",
        model="claude-sonnet-4-6",
        system="""You are a senior code reviewer.
Review code for:
- Security vulnerabilities
- Performance issues
- Code quality and readability
- Best practices

Provide specific, actionable feedback with line numbers.""",
        tools=[{"type": "agent_toolset_20260401"}],
    )
    log.info(f"Reviewer Agent ID: {reviewer.id}")

    log.info("=== Test Writer を作成 ===")
    test_writer = client.beta.agents.create(
        name="Test Writer",
        model="claude-sonnet-4-6",
        system="""You are a test engineer specializing in writing comprehensive unit tests.
Write pytest tests that cover:
- Happy path
- Edge cases
- Error conditions
- Boundary values""",
        tools=[{"type": "agent_toolset_20260401"}],
    )
    log.info(f"Test Writer Agent ID: {test_writer.id}")

    save_state({
        "reviewer_id": reviewer.id,
        "test_writer_id": test_writer.id,
    })
    log.info("state.json に保存しました。次は Step 2 を実行してください。")

if __name__ == "__main__":
    main()

Step 2: コーディネーター(multiagentextra_body

SDK の型に multiagent が無い場合でも、extra_body で API に渡せます。

from common import get_client, load_state, save_state, setup_logger

log = setup_logger("step2")

def main():
    state = load_state()
    reviewer_id = state.get("reviewer_id")
    test_writer_id = state.get("test_writer_id")

    if not reviewer_id or not test_writer_id:
        log.error("reviewer_id / test_writer_id がありません。Step 1 を先に実行してください。")
        return

    client = get_client()

    log.info("=== Engineering Lead(コーディネーター)を作成 ===")
    coordinator = client.beta.agents.create(
        name="Engineering Lead",
        model="claude-opus-4-7",
        system="""You are an engineering lead.
When given code to review:
1. First, delegate to the Code Reviewer agent for security and quality review
2. Then, delegate to the Test Writer agent to create tests
3. Synthesize both results into a final report

Always delegate to both agents in parallel when possible.""",
        tools=[{"type": "agent_toolset_20260401"}],
        extra_body={
            "multiagent": {
                "type": "coordinator",
                "agents": [
                    {"type": "agent", "id": reviewer_id},
                    {"type": "agent", "id": test_writer_id},
                ],
            }
        },
    )
    log.info(f"Coordinator Agent ID: {coordinator.id}")

    save_state({"coordinator_id": coordinator.id})
    log.info("state.json を更新しました。次は Step 3 を実行してください。")

if __name__ == "__main__":
    main()

Step 3: Environment と Session

from common import get_client, load_state, save_state, setup_logger

log = setup_logger("step3")

def main():
    state = load_state()
    coordinator_id = state.get("coordinator_id")

    if not coordinator_id:
        log.error("coordinator_id がありません。Step 2 を先に実行してください。")
        return

    client = get_client()

    log.info("=== Environment を作成 ===")
    environment = client.beta.environments.create(name="code-review-env")
    log.info(f"Environment ID: {environment.id}")

    log.info("=== Session を作成 ===")
    session = client.beta.sessions.create(
        agent=coordinator_id,
        environment_id=environment.id,
    )
    log.info(f"Session ID: {session.id}")

    save_state({
        "environment_id": environment.id,
        "session_id": session.id,
    })
    log.info("state.json を更新しました。次は Step 4 を実行してください。")

if __name__ == "__main__":
    main()

Step 4: タスク投入(content はオブジェクト配列)

ここが重要です。 user.messagecontent は文字列ではなく [{"type": "text", "text": "..."}] 形式にします。文字列のままだと 400 になります。

from common import get_client, load_state, setup_logger

log = setup_logger("step4")

CODE_TO_REVIEW = '''
def calculate_discount(price, discount_percent):
    discount = price * discount_percent / 100
    final_price = price - discount
    return final_price

def process_order(items, user_id):
    total = 0
    for item in items:
        total += item['price']
    discount = calculate_discount(total, 10)
    return {'user': user_id, 'total': discount}
'''

def main():
    state = load_state()
    session_id = state.get("session_id")

    if not session_id:
        log.error("session_id がありません。Step 3 を先に実行してください。")
        return

    client = get_client()

    log.info("=== タスクを送信 ===")
    client.beta.sessions.events.send(
        session_id,
        events=[{
            "type": "user.message",
            "content": [{
                "type": "text",
                "text": (
                    "Please review this Python code and generate tests:\n\n"
                    f"```python\n{CODE_TO_REVIEW}\n```"
                ),
            }],
        }],
    )
    log.info("送信済み。次は Step 5 でストリームを観察してください。")

if __name__ == "__main__":
    main()

Step 5: Primary thread をストリームで観察

from common import get_client, load_state, setup_logger

log = setup_logger("step5")

def main():
    state = load_state()
    session_id = state.get("session_id")

    if not session_id:
        log.error("session_id がありません。Step 3 を先に実行してください。")
        return

    log.info(f"Session ID: {session_id}")
    log.info("=== Primary Thread のイベントストリーム ===")

    client = get_client()

    with client.beta.sessions.events.stream(session_id) as stream:
        for event in stream:
            match event.type:
                case "session.thread_created":
                    agent_name = getattr(event, "agent_name", "unknown")
                    log.info(f">> 新スレッド作成: {agent_name}")

                case "session.thread_status_running":
                    agent_name = getattr(event, "agent_name", "")
                    log.info(f">> スレッド実行中: {agent_name}")

                case "session.thread_status_idle":
                    agent_name = getattr(event, "agent_name", "")
                    session_thread_id = getattr(event, "session_thread_id", None)
                    stop_reason = getattr(event, "stop_reason", None)
                    log.info(f">> スレッド待機: {agent_name} | stop_reason={stop_reason}")
                    if session_thread_id is None:
                        log.info(">> Primary が idle → セッション完了")
                        break

                case "agent.thread_message_received":
                    from_agent = getattr(event, "from_agent_name", "unknown")
                    content = getattr(event, "content", [])
                    log.info(f"\n>> {from_agent} から結果を受信:")
                    for block in content:
                        if getattr(block, "type", None) == "text":
                            preview = block.text[:400]
                            log.info(f"   {preview}{'...' if len(block.text) > 400 else ''}")
                            break

                case "agent.thread_message_sent":
                    to_agent = getattr(event, "to_agent_name", "unknown")
                    log.info(f">> コーディネーターが {to_agent} に委任")

                case _:
                    log.debug(f"   [event] {event.type}")

    log.info("=== ストリーム終了 ===")

if __name__ == "__main__":
    main()

Step 6: スレッド一覧・イベントは REST(SDK 未対応の場合)

当時の環境では sessions.threads が SDK に無かったため、httpx で HTTP GET しています。

import os

import httpx
from dotenv import load_dotenv

from common import load_state, setup_logger

log = setup_logger("step6")

BASE_URL = "https://api.anthropic.com/v1"
BETA_HEADER = "managed-agents-2026-04-01"

def get_headers() -> dict:
    return {
        "x-api-key": os.environ["ANTHROPIC_API_KEY"],
        "anthropic-version": "2023-06-01",
        "anthropic-beta": BETA_HEADER,
        "content-type": "application/json",
    }

def list_threads(session_id: str) -> list:
    resp = httpx.get(
        f"{BASE_URL}/sessions/{session_id}/threads",
        headers=get_headers(),
        timeout=30,
    )
    resp.raise_for_status()
    return resp.json().get("data", [])

def list_thread_events(session_id: str, thread_id: str) -> list:
    resp = httpx.get(
        f"{BASE_URL}/sessions/{session_id}/threads/{thread_id}/events",
        headers=get_headers(),
        timeout=30,
    )
    resp.raise_for_status()
    return resp.json().get("data", [])

def main():
    load_dotenv()

    if not os.environ.get("ANTHROPIC_API_KEY"):
        log.error("ANTHROPIC_API_KEY が設定されていません。")
        return

    state = load_state()
    session_id = state.get("session_id")

    if not session_id:
        log.error("session_id がありません。Step 3 を先に実行してください。")
        return

    log.info("=== 全スレッド一覧 ===")
    threads = list_threads(session_id)
    log.info(f"合計スレッド数: {len(threads)}")

    for thread in threads:
        agent_name = thread.get("agent", {}).get("name", "unknown")
        status = thread.get("status", "unknown")
        parent = thread.get("parent_thread_id", None)
        thread_id = thread.get("id", "")
        log.info(f"  [{agent_name}] id={thread_id} status={status} | parent={parent}")

    log.info("")
    log.info("=== 各スレッドのイベント ===")
    for thread in threads:
        agent_name = thread.get("agent", {}).get("name", "unknown")
        thread_id = thread.get("id", "")
        log.info(f"\n--- {agent_name} (id={thread_id}) ---")

        events = list_thread_events(session_id, thread_id)
        if not events:
            log.info("  (イベントなし)")
            continue

        for ev in events:
            log.info(f"  [{ev.get('type', 'unknown')}] {ev.get('processed_at', '')}")

    log.info("\n=== 完了 ===")

if __name__ == "__main__":
    main()

上記の Step 4 まででタスクが走り始め、Step 5 で Primary のストリーム、Step 6 で各スレッドの REST 取得、という流れです。

実行結果: Primary thread で見えたこと

Primary thread のストリームでは、スレッドの作成 → 実行 → 各スペシャリストからの返信プレビューまで一気通貫で追えました。

委任は並列だったか?

並列でした。 Engineering Lead は Code Reviewer と Test Writer を、だいたい 0.5秒差 でほぼ同時に起動していました。

スレッド数

3本でした(Engineering Lead 1 + Code Reviewer 1 + Test Writer 1)。

ざっくりタイムライン(体感のボトルネック)

フェーズ ざっくり時間
Engineering Lead が思考して委任 約12秒
Code Reviewer 約40秒
Test Writer(tool_use が多め) 約2分
Engineering Lead が合成 約1分
合計 約3分15秒

並列化の効き方で言うと、待ち時間の上限は遅い方のスペシャリスト(この例では Test Writer) に寄ります。直列なら各スペシャリストの時間が足し算になります。

Primary thread のログ抜粋

イベントの流れの例です(スレッド ID は省略)。

05:57:25 [DEBUG]  [event] agent.thinking
05:57:27 [DEBUG]  [event] span.model_request_end
05:57:27 [INFO]  >> 新スレッド作成: Code Reviewer
05:57:27 [INFO]  >> コーディネーターが Code Reviewer に委任
05:57:27 [INFO]  >> 新スレッド作成: Test Writer
05:57:27 [INFO]  >> コーディネーターが Test Writer に委任
05:57:27 [INFO]  >> スレッド実行中: Code Reviewer
05:57:27 [INFO]  >> スレッド実行中: Test Writer
05:58:12 [INFO]  >> Code Reviewer から結果を受信:
                    ## Code Review Report
                    ...(コード品質・セキュリティ指摘)
05:59:33 [INFO]  >> Test Writer から結果を受信:
                    Here is the complete `test_orders.py` (45 tests, all passing...)
06:00:44 [INFO]  >> スレッド待機: Engineering Lead | stop_reason={'type': 'end_turn'}
06:00:44 [DEBUG]  [event] session.status_idle

実行結果: Session thread で増える情報

Primary thread だけでも「いつ誰が動いたか」「結果の先頭プレビュー」は追えます。

一方、次のような 細かい中身 は Session thread 側に降りないと見えませんでした。

  • agent.thinking の中身
  • agent.tool_use / agent.tool_result の詳細
  • モデルリクエストの開始・終了タイミング

違いを整理すると、Primaryは「進行状況と成果のダイジェスト」、Session threadは「各スペシャリストの作業ログ」、というイメージです。

スレッド一覧で見えた親子関係(抜粋)

[Engineering Lead] id=sthr_01xxxx... status=idle | parent=None
[Code Reviewer]    id=sthr_01yyyy... status=idle | parent=sthr_01xxxx...
[Test Writer]      id=sthr_01zzzz... status=idle | parent=sthr_01xxxx...

Python SDK と API(2026-05 時点のメモ)

Managed Agents は進みが早いので、公式の HTTP 例と SDK の対応を突き合わせると詰まりにくいです。

  • anthropic Python SDK は v0.95.0 以降client.beta.agents などが揃ってくる想定です(古い版では未実装のことがあります)
  • agents.create()multiagent が SDK の型に無い場合は、extra_body={"multiagent": ...} で API に渡せる場合があります
  • スレッド一覧GET /v1/sessions/{session_id}/threads のように REST で直接呼ぶ必要がありました
  • エージェントの破棄は agents.archive()delete ではない場合があります)

コーディネーターの system prompt の効き

コーディネーターの system に 「可能なら両方に並列で委任」 と書いた条件では、Code Reviewer と Test Writer がほぼ同時に起動しました。書かないと直列寄りになる可能性は残ります(条件を変えた比較は未実施です)。

補足: always_ask とツール承認

サブエージェント側で always_ask やカスタムツール結果が必要な場合、primary thread にクロスポストされる、という説明があります。session_thread_id でスレッドを識別し、user.tool_confirmation を送るとサーバーが適切なスレッドにルーティングする、という流れです。

Claude Code の Agent Teams との違い

Claude Code の Agent Teams も「複数エージェントが協調する」のですが、エディタ/CLI 上の開発ワークフロー(Team Lead と Teammates、リポジトリや MCP と一体で動く)です。Multiagent OrchestrationManaged Agents API で、アプリから agents / sessions / 環境を組み立て、イベントストリームで追う側の話です。名前が似ているので、ドキュメントや記事を読むときは Claude CodeManaged Agents(API) のどちらの話かを切り分けると混乱しにくいです。

まとめ

分割して並列に走らせる設計は、Primary / Session のイベントで 進行と中身の両方を追いやすいです。一方で SDK と API の対応差はまだ出やすいので、実装では 公式ドキュメントの HTTP と、利用中の SDK の版・型 をセットで確認するのが安全です。

要点

  • Multiagent Orchestration は、コーディネーターがスペシャリストに委任する Public Beta(公式ブログの表記)
  • Primary threadSession thread の2系統で見える設計になっており、追跡性とデバッグのしやすさが強い
  • 委任は1階層、スレッド数や roster に上限があるので、設計はフラットに割り切るのが現実的

使えそうなユースケース

  • コードレビュー + テスト生成のように、独立した専門作業を並列化したいとき
  • スペシャリストごとに system / ツールを分けた方が品質が上がる とき(セキュリティ / パフォーマンス / ドキュメントなど)

注意点

  • 同時スレッド上限(25)roster(20) は、長時間・大量委任の設計では早めに意識した方がよさそう
  • SDKのバージョンuser.message の payload 形 は最初に確認(ここで詰まりやすい)
  • ドキュメントの表現とブログの Public Beta 表記が一致しない箇所が出る場合があります。実装時は 最新の公式ページ を優先してください

試した結果やハマりどころがあれば、SNS などで共有してもらえると助かります。

参考資料


生成AI活用はクラスメソッドにお任せ

過去に支援してきた生成AIの支援実績100+を元にホワイトペーパーを作成しました。御社が抱えている課題のうち、どれが解決できて、どのようなサービスが受けられるのか?4つのフェーズに分けてまとめています。どうぞお気軽にご覧ください。

生成AI資料イメージ

無料でダウンロードする

この記事をシェアする

関連記事