LiteLLMとLangGraphでasyncioベースの社内エージェント・バスを自作してpub/subとイベントソーシングを実装してみる

LiteLLMとLangGraphでasyncioベースの社内エージェント・バスを自作してpub/subとイベントソーシングを実装してみる

2026.05.27

はじめに

データ事業本部のkobayashiです。

LiteLLM × LangGraph シリーズ20回 + Recap で挙げた「シリーズで扱わなかったトピック」の続編シリーズ4本目として、マルチエージェント間のメッセージング詳細(社内エージェント・バス) をまとめます。1本目(LangGraph Functional API)・2本目(Ragas)・3本目(オンライン学習・自己改善)と合わせてご覧ください。

https://dev.classmethod.jp/articles/python-litellm-functional-api/

https://dev.classmethod.jp/articles/python-litellm-ragas/

https://dev.classmethod.jp/articles/python-litellm-online-learning/

シリーズ第20回で取り上げた A2A プロトコル は「外部公開/HTTP/JSON-RPC」によるエージェント協調でしたが、本記事は対比として「社内向け/非同期メッセージング/イベント駆動」によるエージェント・バスの実装パターンを取り上げます。外部インフラ(Redis / NATS / Kafka 等)への依存を避けるため、デモは Python 標準ライブラリの asyncio でインメモリのイベントバスを 200 行弱で自作し、その上に複数の LiteLLM エージェントを subscriber として接続していきます。

https://dev.classmethod.jp/articles/python-litellm-a2a/

https://dev.classmethod.jp/articles/python-litellm-multiagent/

A2A との位置づけ

「エージェント間連携」という言葉は同じでも、外部公開の RPC社内のメッセージバス は設計軸がまったく異なります。本記事の立ち位置を明確にするため、最初に対比を整理します。

観点 A2A プロトコル 社内エージェント・バス(本記事)
想定境界 組織・会社をまたぐ外部公開 同一サービス・同一プロセス内 / 社内ネットワーク
トランスポート HTTP + JSON-RPC(message/send プロセス内 asyncio.Queue / メッセージブローカー
通信モデル タスク指向の 1 対 1 通信(同期返答・タスク管理・ストリーミングを包含) pub/sub(非同期・ファンアウト)
エージェントの結合 AgentCard で公開されたスキルを呼び出す トピック名で疎結合(subscriber は publisher を知らない)
状態管理 サーバー側が Task ライフサイクルを保持 イベントログ(履歴)でステートレスに再構成可能
適した粒度 タスク委譲(粗粒度) 細粒度のイベント駆動連携
障害分離 HTTP タイムアウト・リトライ 本番ブローカーで提供されるバックプレッシャー / DLQ(本記事のインメモリ実装には未搭載)

A2A は「外部に公開してエージェント単位で呼び出してもらう」用途、エージェント・バスは「社内で複数エージェントを並べてイベント駆動に動かす」用途、と棲み分けています。両者は排他ではなく、A2A サーバーの内側でエージェント・バスが動いている という入れ子構成も成り立ちます。

通信トポロジの違いをイメージで掴むために、それぞれを図で並べて示します。

A2A プロトコル(外部公開 RPC・直接呼び出し)

クライアントは AgentCard (/.well-known/agent-card.json) で公開されたスキルを呼び出します。返答は同期レスポンスだけでなく、長時間処理の場合は Task オブジェクトを返して tasks/get でのポーリング、message/stream (SSE) でのストリーミング、Push Notification など、タスク指向の 1 対 1 通信 が可能で、サーバー側に Task ライフサイクルが残るのが特徴です。

社内エージェント・バス(pub/sub・疎結合ファンアウト)

publisher は subscriber を知らず、トピック名だけで結合します。1 対 N のファンアウトに加えて、subscriber は処理結果を直接 publisher に返すのではなく 別のトピックに publish して下流へ流します。A2A のタスク指向の返答と違って fire-and-forget なので、subscriber を増減してもパブリッシャ側のコードに変更は要りません。

pub/sub の概念

エージェント・バスのコアは publisher / subscriber / topic という 3 役と、それらをつなぐ イベント です。

  • Topic: イベントを区別するチャンネル名(例: news.raw / news.summarized / news.translated
  • Publisher: トピックにイベントを publish する側(送信元のエージェント)
  • Subscriber: トピックを subscribe してイベントを受信する側(受信側のエージェント)
  • Event: トピックに流れる 1 件のメッセージ。payload(任意 JSON 風辞書)+ event_id + timestamp で構成

publisher は subscriber を知らず、subscriber も publisher を知りません。両者は トピック名だけ で繋がっており、新しいエージェントを追加・削除したいときは subscribe を 1 行増減するだけで済みます。これが pub/sub の「疎結合」となります。

加えて本記事のバスは イベントログ(history) を持っており、後から増やした subscriber が過去イベントを再消費できる イベントソーシング にも対応しています。

環境

今回使用した環境は以下の通りです。

Python 3.13
litellm 1.83.14
langchain-litellm 0.6.4
langchain-core 1.3.2
$ uv pip install litellm langchain-litellm langchain-core
$ export ANTHROPIC_API_KEY="sk-ant-..."

イベントバスのコア実装

最初に、4 サンプルから共通で利用するイベントバス本体を実装します。asyncio.Queue を subscriber 単位で配り、publish で全 subscriber キューに Event を流すだけのシンプルな構造です。なおこの実装は 同一プロセス・同一 event loop で動かす想定 で、複数スレッド / 複数 event loop からの同時アクセスは保護していないので必要であれば asyncio.Lock を使います。

event_bus.py
"""asyncio ベースのインメモリ・イベントバス(pub/sub + イベントログ)。

外部インフラ(Redis / NATS / Kafka 等)に依存せず、Python 標準ライブラリだけで
社内エージェント・バスのコア機能を最小実装する。

サポートする機能:
    * トピック単位の pub/sub(複数 subscriber へのファンアウト)
    * 全イベントの履歴ログ(イベントソーシング向けの再生に利用)
    * subscriber ごとの独立した asyncio.Queue(バックプレッシャー無しの単純実装)

前提:
    * **同一プロセス・同一 event loop で動かす想定**。複数スレッドや複数 event loop
      からの同時アクセスは保護していない(必要なら `asyncio.Lock` で守る)。
    * `subscribe()` は **subscribe 開始以降のイベントだけ** を受け取る live-only API。
      過去イベントを後から消費したい場合は `replay()` を使う。

本番運用ではこのモジュールを下記に置き換える想定:
    * Redis Streams / NATS JetStream / Apache Kafka
    * AWS EventBridge / Google Cloud Pub/Sub
"""

from __future__ import annotations

import asyncio
import copy
import time
import uuid
from collections.abc import AsyncIterator, Awaitable, Callable
from dataclasses import dataclass, field
from typing import Any

@dataclass(frozen=True)
class Event:
    """1 件の publish イベントを表すデータクラス。

    `topic` / `event_id` / `timestamp` は `frozen=True` により不変だが、
    `payload` 自体は dict なので **中身は可変**。`EventBus.publish()` 側で
    `payload` を deepcopy したものを Event に載せて配るため、subscriber が
    受け取った Event の payload を加工しても history や他 subscriber の
    見え方は壊れない。
    """

    topic: str
    payload: dict[str, Any]
    event_id: str = field(default_factory=lambda: uuid.uuid4().hex)
    timestamp: float = field(default_factory=time.time)

# subscriber が受け取るハンドラの型: イベントを受け取って何らかの非同期処理を行う
Handler = Callable[[Event], Awaitable[None]]

class EventBus:
    """asyncio.Queue を subscriber 単位に配るインメモリ pub/sub。

    publish() はトピックに紐付く全 subscriber キューへ Event を配送し、
    subscribe() は新しい Queue を 1 つ作って返す。run_subscriber() はその
    Queue を 1 件ずつハンドラに流す常駐タスクを起動する。
    """

    def __init__(self) -> None:
        # トピック -> subscriber キューのリスト
        self._subscribers: dict[str, list[asyncio.Queue[Event]]] = {}
        # 全イベントの履歴(イベントソーシング向け)
        self._history: list[Event] = []

    async def publish(self, topic: str, payload: dict[str, Any]) -> Event:
        """トピックに 1 件の Event を流し、履歴にも追記する。

        payload は deepcopy したものを Event に載せるため、呼び出し側の元 dict と
        各 subscriber が受け取る dict は独立。subscriber が破壊的に書き換えても
        history や他 subscriber には波及しない。
        """
        event = Event(topic=topic, payload=copy.deepcopy(payload))
        self._history.append(event)
        for queue in list(self._subscribers.get(topic, [])):
            await queue.put(event)
        return event

    def subscribe(self, topic: str) -> asyncio.Queue[Event]:
        """新しい subscriber キューを発行する。

        live-only: subscribe 完了後に publish されたイベントだけが届く。
        過去イベントを処理したい場合は `replay()` を併用する。
        """
        queue: asyncio.Queue[Event] = asyncio.Queue()
        self._subscribers.setdefault(topic, []).append(queue)
        return queue

    def unsubscribe(self, topic: str, queue: asyncio.Queue[Event]) -> None:
        """subscribe 済みの queue をトピックから外す。

        `run_subscriber()` の停止時に finally で呼ぶことで、停止後の publish が
        死んだ queue にイベントを積み続けるリークを防ぐ。
        """
        queues = self._subscribers.get(topic)
        if queues is None:
            return
        try:
            queues.remove(queue)
        except ValueError:
            pass
        if not queues:
            self._subscribers.pop(topic, None)

    async def run_subscriber(
        self,
        topic: str,
        handler: Handler,
        *,
        name: str = "subscriber",
        stop_event: asyncio.Event | None = None,
    ) -> None:
        """常駐 subscriber: subscribe したキューからイベントを取り出してハンドラに流す。

        停止挙動:
            * stop_event が set され、かつキューが空になった時点でループを抜ける
              (drain semantics: 残っているイベントは処理してから終了する)。
            * 0.1 秒の poll で停止判定を行うので、最大 0.1 秒の停止遅延が出る。
              低レイテンシ要件があるなら asyncio.CancelledError ベースの cancel に
              切り替える方が素直。

        終了時に必ず unsubscribe するため、停止後の publish が死んだ queue に
        積まれ続けるリークは発生しない。
        """
        queue = self.subscribe(topic)
        try:
            while True:
                if stop_event is not None and stop_event.is_set() and queue.empty():
                    return
                try:
                    event = await asyncio.wait_for(queue.get(), timeout=0.1)
                except asyncio.TimeoutError:
                    continue
                try:
                    await handler(event)
                except Exception as exc:  # noqa: BLE001 - subscriber の例外で他を止めない
                    print(f"[bus] subscriber={name!r} がエラーを返しました: {exc!r}")
        finally:
            self.unsubscribe(topic, queue)

    @property
    def history(self) -> list[Event]:
        """publish された全イベントの履歴(時系列)。"""
        return list(self._history)

    async def replay(self, topic: str, handler: Handler) -> None:
        """履歴に残っている指定トピックのイベントを順番にハンドラに流す。

        スナップショット方式: replay 開始時点の history をコピーしてから反復するため、
        replay 中に他 coroutine が publish した新規イベントは対象に含まれない。

        新しい subscriber を後から追加して過去イベントを再消費する用途。
        実プロダクトでは Kafka の `--from-beginning` や Redis Streams の XREAD で実現する。
        """
        snapshot = list(self._history)
        for event in snapshot:
            if event.topic == topic:
                await handler(event)

    async def drain(self, topic: str) -> AsyncIterator[Event]:
        """テスト/デモ向け: subscribe したキューを 1 件ずつ yield する。

        非同期ジェネレータなので `async for` で扱う。stop は呼び出し側で break する。
        ジェネレータが GC される際に自動 unsubscribe するため、長時間放置しても
        リークしない。
        """
        queue = self.subscribe(topic)
        try:
            while True:
                event = await queue.get()
                yield event
        finally:
            self.unsubscribe(topic, queue)

ポイントを整理します。

  • publish() は payload を deepcopy して Event を作る: subscriber が受け取った payload を破壊的に書き換えても history や他 subscriber に波及しない。Event 自体は frozen=Truetopic / event_id / timestamp は不変
  • subscribe() は live-only: subscribe 完了後に publish されたイベントしか届かない。過去イベントを処理したいときは replay() を併用する
  • run_subscriber() は drain semantics: stop_event が set されてもキューに残っているイベントを処理してから終了する。finally で必ず unsubscribe() するので、停止後 publish が死んだキューに積まれ続けるリークは発生しない
  • replay() はスナップショット方式: 開始時点の history をコピーしてから反復するので、replay 中に他 coroutine が publish しても新規分は対象外

トレードオフ:

  • 0.1 秒 poll は実装が単純な代わりに idle 時の wake-up コストと最大 0.1 秒の停止遅延を伴う。低レイテンシ要件があれば asyncio.CancelledError ベースの cancel + sentinel に切り替えるほうが素直
  • asyncio.Queue() はデフォルト無制限なので publisher は止まりにくい一方、遅い subscriber がいると その subscriber のキュー backlog が無制限に伸び続ける。本番ブローカーに置き換えると最大長・バックプレッシャー・デッドレターキュー(DLQ)が標準で得られる

サンプル1: pub/sub の最小動作確認(LLM なし)

LLM を絡める前に、EventBus 単体の挙動を確認します。同じトピックに 2 つの subscriber を登録するとファンアウトされる、別トピックの subscriber には届かない、publish 順に history が積み上がる、という 3 点を確認します。

basic_event_bus.py
"""EventBus の最小動作確認サンプル(LLM 呼び出しなし)。

* 同一トピックに 2 つの subscriber を登録するとファンアウトされる
* 別トピックの subscriber には届かない
* publish 順に history が積み上がる

本サンプルは LLM 呼び出しを含まないため、API キー無しで実行可能。
"""

from __future__ import annotations

import asyncio

from event_bus import Event, EventBus

async def main() -> None:
    bus = EventBus()
    stop_event = asyncio.Event()

    received: dict[str, list[str]] = {"sub-a": [], "sub-b": [], "sub-other": []}

    async def make_handler(name: str):
        async def handler(event: Event) -> None:
            received[name].append(f"{event.topic}/{event.payload['msg']}")
            print(f"[{name}] received topic={event.topic} payload={event.payload}")

        return handler

    # news.published に 2 つの subscriber、other.topic に 1 つの subscriber を起動
    tasks = [
        asyncio.create_task(
            bus.run_subscriber(
                "news.published",
                await make_handler("sub-a"),
                name="sub-a",
                stop_event=stop_event,
            )
        ),
        asyncio.create_task(
            bus.run_subscriber(
                "news.published",
                await make_handler("sub-b"),
                name="sub-b",
                stop_event=stop_event,
            )
        ),
        asyncio.create_task(
            bus.run_subscriber(
                "other.topic",
                await make_handler("sub-other"),
                name="sub-other",
                stop_event=stop_event,
            )
        ),
    ]

    # subscriber が subscribe を完了するまで 1 tick 待つ(起動バリア)
    await asyncio.sleep(0)

    print("=== publish 3 件 ===")
    await bus.publish("news.published", {"msg": "first news"})
    await bus.publish("news.published", {"msg": "second news"})
    await bus.publish("other.topic", {"msg": "different topic"})

    # キューに溜まった分の処理が終わるのを待ってから停止
    await asyncio.sleep(0.3)
    stop_event.set()
    await asyncio.gather(*tasks)

    print("\n=== 結果 ===")
    for name, msgs in received.items():
        print(f"  {name}: {msgs}")

    print(f"\n=== history ({len(bus.history)} events) ===")
    for event in bus.history:
        print(f"  {event.topic} -> {event.payload}")

if __name__ == "__main__":
    asyncio.run(main())

await asyncio.sleep(0) は subscribe を完了させるための起動バリアです。subscribe() は live-only(subscribe 完了後の publish しか届かない)なので、publish より前に subscriber が subscribe() を呼び終わっている必要があります。

実行結果は以下のとおりです。sub-a / sub-b の両方に news.published が届き、sub-other には別トピック分しか届いていないことが確認できます。

$ python basic_event_bus.py
=== publish 3 ===
[sub-a] received topic=news.published payload={'msg': 'first news'}
[sub-a] received topic=news.published payload={'msg': 'second news'}
[sub-b] received topic=news.published payload={'msg': 'first news'}
[sub-b] received topic=news.published payload={'msg': 'second news'}
[sub-other] received topic=other.topic payload={'msg': 'different topic'}

=== 結果 ===
  sub-a: ['news.published/first news', 'news.published/second news']
  sub-b: ['news.published/first news', 'news.published/second news']
  sub-other: ['other.topic/different topic']

=== history (3 events) ===
  news.published -> {'msg': 'first news'}
  news.published -> {'msg': 'second news'}
  other.topic -> {'msg': 'different topic'}

history に publish 順で全イベントが残っているのも確認できます。

サンプル2: 3 段パイプライン(収集 → 要約 → 翻訳)

ここから LLM エージェントを subscriber として接続していきます。最初は 直列パイプライン の例で、news.raw → 要約エージェント → news.summarized → 翻訳エージェント → news.translated → 配信エージェント、という 3 段構成です。

pipeline_agents.py
"""3 段パイプライン: ニュース収集 → 要約 → 翻訳。

* `news.raw` トピックにニュース原文が publish される
* 要約エージェントが subscribe して `news.summarized` に publish
* 翻訳エージェントが `news.summarized` を subscribe して `news.translated` に publish
* 配信エージェントが `news.translated` を subscribe して画面に出力

各エージェントはトピック越しに疎結合で繋がっており、`add_edge` のようなグラフ宣言は無い。
新しいエージェントを増やしたいときは `subscribe()` を 1 行追加するだけで挿し込める。
"""

from __future__ import annotations

import asyncio
import warnings

warnings.filterwarnings("ignore", category=DeprecationWarning)
# LangChain ↔ LiteLLM 間で Pydantic シリアライザの UserWarning が大量に出るため抑制
warnings.filterwarnings("ignore", category=UserWarning, module=r"pydantic\..*")

import litellm
from langchain_litellm import ChatLiteLLM

from event_bus import Event, EventBus

litellm.modify_params = True
litellm.drop_params = True

# 翻訳・要約は Claude Sonnet 4.6(既存シリーズと同じ戦略)
llm = ChatLiteLLM(model="anthropic/claude-sonnet-4-6")

# --- ニュース原文(実プロダクトでは RSS / API から取得する想定) ---
NEWS_ITEMS = [
    {
        "id": "n-001",
        "title": "LiteLLM が複数プロバイダー横断のルーティング機能を強化",
        "body": (
            "LiteLLM は OpenAI / Anthropic / Gemini などのプロバイダーを統一インターフェースで扱える Python ライブラリで、"
            "今回のリリースではフォールバック・コスト追跡・Prompt Caching の連携が改善された。"
            "本番運用で API 障害時の自動切替や、テナント別のコスト集計を最小コードで実現できる。"
        ),
    },
    {
        "id": "n-002",
        "title": "LangGraph がマルチエージェント協調パターンの拡充を発表",
        "body": (
            "LangGraph は Python のグラフベース制御フローで LLM アプリケーションを構築するフレームワーク。"
            "Supervisor / Hierarchical / Swarm の協調パターンに加えて、Send API による並列ファンアウトや"
            "Functional API による宣言的記述が整備されている。"
        ),
    },
]

async def summarizer(bus: EventBus, event: Event) -> None:
    """`news.raw` を受け取り、80 字程度に要約して `news.summarized` に publish する。"""
    item = event.payload
    print(f"[summarizer] received {item['id']}: {item['title']}")
    response = await llm.ainvoke(
        [
            {
                "role": "system",
                "content": "次のニュース本文を 80 字程度の日本語で要約してください。前置きは不要です。",
            },
            {"role": "user", "content": item["body"]},
        ]
    )
    summary = str(response.content).strip()
    await bus.publish(
        "news.summarized",
        {"id": item["id"], "title": item["title"], "summary": summary},
    )

async def translator(bus: EventBus, event: Event) -> None:
    """`news.summarized` を受け取り、英訳して `news.translated` に publish する。"""
    item = event.payload
    print(f"[translator] received {item['id']}")
    response = await llm.ainvoke(
        [
            {
                "role": "system",
                "content": "次の日本語の文を、報道向けの自然な英語 1 文に翻訳してください。前置きは不要です。",
            },
            {"role": "user", "content": item["summary"]},
        ]
    )
    translation = str(response.content).strip()
    await bus.publish(
        "news.translated",
        {
            "id": item["id"],
            "title": item["title"],
            "summary": item["summary"],
            "translation": translation,
        },
    )

async def deliverer(bus: EventBus, event: Event) -> None:  # noqa: ARG001
    """`news.translated` を受け取り、最終形を画面に表示する終端 subscriber。"""
    item = event.payload
    print(f"\n[deliverer] {item['id']} 配信")
    print(f"  title:       {item['title']}")
    print(f"  summary(ja): {item['summary']}")
    print(f"  translation: {item['translation']}")

async def main() -> None:
    bus = EventBus()
    stop_event = asyncio.Event()

    # 各エージェントを subscriber として登録
    tasks = [
        asyncio.create_task(
            bus.run_subscriber(
                "news.raw",
                lambda e: summarizer(bus, e),
                name="summarizer",
                stop_event=stop_event,
            )
        ),
        asyncio.create_task(
            bus.run_subscriber(
                "news.summarized",
                lambda e: translator(bus, e),
                name="translator",
                stop_event=stop_event,
            )
        ),
        asyncio.create_task(
            bus.run_subscriber(
                "news.translated",
                lambda e: deliverer(bus, e),
                name="deliverer",
                stop_event=stop_event,
            )
        ),
    ]

    await asyncio.sleep(0)

    print("=== ニュースを news.raw に publish ===")
    for item in NEWS_ITEMS:
        await bus.publish("news.raw", item)

    # パイプラインが流れ切るまで待つ: ニュース件数 × 3 ステップ分の余裕を持たせる
    expected_translated = len(NEWS_ITEMS)
    for _ in range(60):
        translated = sum(1 for e in bus.history if e.topic == "news.translated")
        if translated >= expected_translated:
            break
        await asyncio.sleep(0.5)

    stop_event.set()
    await asyncio.gather(*tasks)

    print("\n=== 全イベント履歴 ===")
    for event in bus.history:
        print(f"  {event.topic:20s} id={event.payload['id']}")

if __name__ == "__main__":
    asyncio.run(main())

ポイントは以下のとおりです。

  • 3 つのエージェントは 互いを直接知らず、トピック名(news.raw / news.summarized / news.translated)だけ で繋がっている。LangGraph の add_edge のようなグラフ宣言は不要
  • summarizer / translator は subscribe したトピックでイベントを受け取り、加工結果を 次のトピック に publish する。出力先のトピック名さえ守れば、次の subscriber が誰かは知らなくてよい
  • deliverer は終端 subscriber で何も publish しない。Slack 通知 / DB 保存 / メトリクス送信などをここに置くイメージ

実行結果は以下のようになりました。news.raw の 2 件が並走で要約 → 翻訳 → 配信を経て、history には合計 6 件のイベントが時系列で並んでいます。

$ python pipeline_agents.py
=== ニュースを news.raw publish ===
[summarizer] received n-001: LiteLLM が複数プロバイダー横断のルーティング機能を強化
[summarizer] received n-002: LangGraph がマルチエージェント協調パターンの拡充を発表
[translator] received n-001
[translator] received n-002

[deliverer] n-001 配信
  title:       LiteLLM が複数プロバイダー横断のルーティング機能を強化
  summary(ja): LiteLLMが複数AIプロバイダーを統一APIで扱えるよう改善され、障害時の自動切替やテナント別コスト集計を少ないコードで実現可能になった。
  translation: LiteLLM has been enhanced to handle multiple AI providers through a unified API, making it possible to implement automatic failover and per-tenant cost aggregation with minimal code.

[deliverer] n-002 配信
  title:       LangGraph がマルチエージェント協調パターンの拡充を発表
  summary(ja): LangGraphはグラフベースのPythonフレームワークで、複数のLLM協調パターンや並列処理、宣言的APIを備えたLLMアプリ構築基盤。
  translation: LangGraph is a graph-based Python framework that serves as a foundation for building LLM applications, offering multiple LLM collaboration patterns, parallel processing capabilities, and a declarative API.

=== 全イベント履歴 ===
  news.raw             id=n-001
  news.raw             id=n-002
  news.summarized      id=n-001
  news.summarized      id=n-002
  news.translated      id=n-001
  news.translated      id=n-002

新しいエージェント(例: 校正・タグ抽出・SNS 投稿)を後から追加したいときは、対応するトピックに subscribe するだけで挿し込めます。既存の summarizer や translator のコードに手を入れる必要はありません。

サンプル3: ファンアウト(1 publish → 複数 subscriber 並列受信)

直列パイプラインに対して、同じイベントを複数 subscriber が並列に受信 するパターンも pub/sub の典型的な使い方です。news.published 1 件に対して、要約 / 英訳 / タグ抽出の 3 エージェントが同時に動き、それぞれ別トピックに結果を publish します。

fanout_subscribers.py
"""ファンアウト: 1 つの publish を複数 subscriber が並列に受信する。

`news.published` トピックに 1 件 publish すると、要約 / 英訳 / タグ抽出の
3 エージェントが**同じイベント**を**並列に**受け取り、それぞれ独立した
出力トピックに publish する。

直列パイプライン(pipeline_agents.py)と異なり、3 つの処理が同時に走るため、
LLM の往復が重なる場合に総レイテンシが直列比 1/3 程度に縮む。
"""

from __future__ import annotations

import asyncio
import time
import warnings

warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=UserWarning, module=r"pydantic\..*")

import litellm
from langchain_litellm import ChatLiteLLM

from event_bus import Event, EventBus

litellm.modify_params = True
litellm.drop_params = True

llm = ChatLiteLLM(model="anthropic/claude-sonnet-4-6")

NEWS_BODY = (
    "LiteLLM と LangGraph を組み合わせると、プロバイダー非依存な LLM アプリケーションを"
    "グラフベースの制御フローで構築できる。"
    "OpenAI / Anthropic / Gemini をモデル名の文字列だけで切り替えられ、"
    "Worker と Judge を別プロバイダーに割り当てるクロスチェックや、"
    "コスト最適化のための役割別モデル選定が容易になる。"
)

async def summarize_subscriber(bus: EventBus, event: Event) -> None:
    item = event.payload
    print(f"[summarize] {item['id']} 開始")
    response = await llm.ainvoke(
        [
            {"role": "system", "content": "次の文を 60 字以内で日本語要約してください。前置きは不要です。"},
            {"role": "user", "content": item["body"]},
        ]
    )
    await bus.publish(
        "news.summary", {"id": item["id"], "summary": str(response.content).strip()}
    )
    print(f"[summarize] {item['id']} 完了")

async def translate_subscriber(bus: EventBus, event: Event) -> None:
    item = event.payload
    print(f"[translate] {item['id']} 開始")
    response = await llm.ainvoke(
        [
            {"role": "system", "content": "次の日本語を自然な英語 1〜2 文に翻訳してください。前置きは不要です。"},
            {"role": "user", "content": item["body"]},
        ]
    )
    await bus.publish(
        "news.translation", {"id": item["id"], "translation": str(response.content).strip()}
    )
    print(f"[translate] {item['id']} 完了")

async def tag_subscriber(bus: EventBus, event: Event) -> None:
    item = event.payload
    print(f"[tag      ] {item['id']} 開始")
    response = await llm.ainvoke(
        [
            {
                "role": "system",
                "content": (
                    "次の文から技術タグを 3 件抽出し、カンマ区切りだけで出力してください。"
                    "前置き・改行・記号装飾は不要です。"
                ),
            },
            {"role": "user", "content": item["body"]},
        ]
    )
    tags = [t.strip() for t in str(response.content).split(",") if t.strip()]
    await bus.publish("news.tags", {"id": item["id"], "tags": tags})
    print(f"[tag      ] {item['id']} 完了")

async def main() -> None:
    bus = EventBus()
    stop_event = asyncio.Event()

    tasks = [
        asyncio.create_task(
            bus.run_subscriber(
                "news.published",
                lambda e: summarize_subscriber(bus, e),
                name="summarize",
                stop_event=stop_event,
            )
        ),
        asyncio.create_task(
            bus.run_subscriber(
                "news.published",
                lambda e: translate_subscriber(bus, e),
                name="translate",
                stop_event=stop_event,
            )
        ),
        asyncio.create_task(
            bus.run_subscriber(
                "news.published",
                lambda e: tag_subscriber(bus, e),
                name="tag",
                stop_event=stop_event,
            )
        ),
    ]
    await asyncio.sleep(0)

    started_at = time.time()
    print("=== news.published に 1 件 publish(3 subscriber が並列受信) ===")
    await bus.publish("news.published", {"id": "n-001", "body": NEWS_BODY})

    # 3 subscriber すべてが下流トピックに publish するまで待つ
    expected_topics = {"news.summary", "news.translation", "news.tags"}
    for _ in range(60):
        topics_emitted = {e.topic for e in bus.history if e.topic in expected_topics}
        if topics_emitted == expected_topics:
            break
        await asyncio.sleep(0.5)

    elapsed = time.time() - started_at
    stop_event.set()
    await asyncio.gather(*tasks)

    print(f"\n=== 並列処理完了: {elapsed:.2f} 秒 ===")
    for topic in ("news.summary", "news.translation", "news.tags"):
        for event in bus.history:
            if event.topic == topic:
                print(f"  {topic:20s} -> {event.payload}")

if __name__ == "__main__":
    asyncio.run(main())

ポイントは以下のとおりです。

  • 3 つの subscriber は 同じ news.published を購読 しているだけで、互いを知らない。新しい解析エージェントを足したくなったら 4 つ目の subscribe を追加するだけで増やせる
  • 各 subscriber は別トピック(news.summary / news.translation / news.tags)に結果を publish するので、下流の集約エージェントは必要なトピックだけ subscribe して使える
  • LLM の往復が並列化されるため、直列で順に呼ぶより総レイテンシが短くなる

実行結果は以下のとおりです。3 subscriber が同時に開始され、2.5 秒程度で 3 つのトピックすべてに結果が流れています。

$ python fanout_subscribers.py
=== news.published 1 publish(3 subscriber が並列受信)===
[summarize] n-001 開始
[translate] n-001 開始
[tag      ] n-001 開始
[tag      ] n-001 完了
[summarize] n-001 完了
[translate] n-001 完了

=== 並列処理完了: 2.51 ===
  news.summary         -> {'id': 'n-001', 'summary': 'LiteLLMとLangGraphを組み合わせることで、複数LLMプロバイダーをモデル名だけで切り替えつつ、グラフ制御によるクロスチェックやコスト最適化が実現できる。'}
  news.translation     -> {'id': 'n-001', 'translation': 'Combining LiteLLM with LangGraph allows you to build provider-agnostic LLM applications with graph-based control flow, switching seamlessly between OpenAI, Anthropic, and Gemini simply by changing a model name string. This makes it easy to assign Worker and Judge roles to different providers for cross-checking, as well as to select cost-optimized models for each role.'}
  news.tags            -> {'id': 'n-001', 'tags': ['LiteLLM', 'LangGraph', 'LLM']}

直列に書いていたら 3 倍近い時間が掛かるところを、subscribe を 3 行並べるだけでファンアウト並列化できる、というのが pub/sub の旨味です。

サンプル4: イベントソーシング(履歴からのリプレイ)

最後はイベントソーシングの例です。先に走らせておいたパイプラインの履歴に対して、後から増やした subscriber が過去イベントを再消費する という構成を見ます。

event_sourcing_replay.py
"""イベントソーシング風: 履歴ログから過去イベントを再生する。

* フェーズ 1: パイプラインを稼働させて `news.summarized` に複数件のイベントを publish
* フェーズ 2: 後から「ハッシュタグ生成」エージェントを追加し、過去の summarized イベントを
  bus.replay() で再生して同じ subscriber インターフェースで処理する

実プロダクトでは Apache Kafka の `--from-beginning`、Redis Streams の `XREAD COUNT N STREAMS topic 0`、
NATS JetStream の `DeliverPolicy.All` がこれに相当する。「**新しい subscriber が後から
過去イベントを再消費できる**」のがイベントソーシングの肝で、ステートレスにエージェントを
追加・差し替えできるアーキテクチャの基盤になる。
"""

from __future__ import annotations

import asyncio
import warnings

warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=UserWarning, module=r"pydantic\..*")

import litellm
from langchain_litellm import ChatLiteLLM

from event_bus import Event, EventBus

litellm.modify_params = True
litellm.drop_params = True

llm = ChatLiteLLM(model="anthropic/claude-sonnet-4-6")

SOURCE_NEWS = [
    {
        "id": "n-001",
        "body": (
            "LiteLLM はマルチプロバイダー対応の Python ライブラリで、"
            "今回のリリースでフォールバック・コスト追跡が改善された。"
        ),
    },
    {
        "id": "n-002",
        "body": (
            "LangGraph の Functional API は @entrypoint と @task で"
            "ワークフローを宣言的に記述できる。"
        ),
    },
]

async def summarizer(bus: EventBus, event: Event) -> None:
    """`news.raw` を要約して `news.summarized` に publish する初期エージェント。"""
    item = event.payload
    print(f"[summarizer] received {item['id']}")
    response = await llm.ainvoke(
        [
            {"role": "system", "content": "次の文を 50 字以内の日本語で要約してください。前置きは不要です。"},
            {"role": "user", "content": item["body"]},
        ]
    )
    await bus.publish(
        "news.summarized",
        {"id": item["id"], "summary": str(response.content).strip()},
    )

async def hashtag_generator(event: Event) -> None:
    """**後から追加された** subscriber: 要約からハッシュタグを 1 件生成する。

    フェーズ 1 では存在しなかったエージェントだが、bus.replay() に渡せば
    過去イベントを再消費して結果を出力できる(イベントソーシング)。
    """
    item = event.payload
    response = await llm.ainvoke(
        [
            {
                "role": "system",
                "content": (
                    "次の日本語要約からハッシュタグを 1 件だけ生成してください。"
                    "出力は #タグ の形式 1 件のみ、前置き不要。"
                ),
            },
            {"role": "user", "content": item["summary"]},
        ]
    )
    print(f"[hashtag(replay)] {item['id']} -> {str(response.content).strip()}")

async def main() -> None:
    bus = EventBus()
    stop_event = asyncio.Event()

    # === フェーズ 1: 初期パイプラインで news.summarized を蓄積 ===
    print("=== フェーズ 1: 初期パイプライン稼働 ===")
    summarizer_task = asyncio.create_task(
        bus.run_subscriber(
            "news.raw",
            lambda e: summarizer(bus, e),
            name="summarizer",
            stop_event=stop_event,
        )
    )
    await asyncio.sleep(0)

    for item in SOURCE_NEWS:
        await bus.publish("news.raw", item)

    # 全件が news.summarized に降りるまで待つ
    for _ in range(60):
        summarized = sum(1 for e in bus.history if e.topic == "news.summarized")
        if summarized >= len(SOURCE_NEWS):
            break
        await asyncio.sleep(0.5)

    stop_event.set()
    await summarizer_task

    print(f"\n--- 蓄積された history ({len(bus.history)} events) ---")
    for event in bus.history:
        summary = event.payload.get("summary", "")
        print(f"  {event.topic:20s} id={event.payload['id']} {summary}")

    # === フェーズ 2: 後から hashtag_generator を追加し、過去イベントを再消費 ===
    print("\n=== フェーズ 2: hashtag_generator を後から追加して replay ===")
    await bus.replay("news.summarized", hashtag_generator)

if __name__ == "__main__":
    asyncio.run(main())

ポイントは以下のとおりです。

  • フェーズ 1 はサンプル2と同じ pub/sub 駆動。news.raw → 要約 → news.summarized の 1 段だけを動かして history に蓄積する
  • フェーズ 2hashtag_generator後から追加する。フェーズ 1 の時点では存在しないエージェントなので live のイベントは受け取れないが、bus.replay("news.summarized", hashtag_generator) を呼ぶと history に残っている過去イベントを順に処理 してくれる
  • subscribe 用のハンドラ関数を そのまま replay にも渡せる のがポイント。実プロダクトでも「新しい解析機能を出す前に、過去 1 ヶ月分のイベントで再評価する」という運用が同じインターフェースで成立する

実行結果は以下のとおりです。フェーズ 1 で蓄積された 2 件の news.summarized を、後付けの hashtag_generator が再消費してハッシュタグを生成しています。

$ python event_sourcing_replay.py
=== フェーズ 1: 初期パイプライン稼働 ===
[summarizer] received n-001
[summarizer] received n-002

--- 蓄積された history (4 events) ---
  news.raw             id=n-001 
  news.raw             id=n-002 
  news.summarized      id=n-001 LiteLLMはマルチプロバイダー対応Pythonライブラリで、今回フォールバックとコスト追跡が強化された。
  news.summarized      id=n-002 `@entrypoint``@task`デコレータを使い、LangGraphのFunctional APIでワークフローを宣言的に定義できる。

=== フェーズ 2: hashtag_generator を後から追加して replay ===
[hashtag(replay)] n-001 -> #LiteLLM
[hashtag(replay)] n-002 -> #LangGraph

実プロダクトでは Apache Kafka の --from-beginning、Redis Streams の XREAD COUNT N STREAMS topic 0、NATS JetStream の DeliverPolicy.All がこの「後から再消費」の役割を担います。イベントログを真実の記録として状態を再構成する という設計思想を Event Sourcing と呼び、command 側と query 側の責務を分ける CQRS(Command Query Responsibility Segregation)と組み合わせて使われることが多いパターンですが、エージェント・バスにも自然に適合する考え方です。

本番運用への橋渡し

ここまでのサンプルは Python 標準ライブラリだけで動くインメモリ実装ですが、本番運用では永続化・スケールアウト・障害耐性のためにメッセージブローカーを使うのが一般的です。本記事のインターフェース(publish / subscribe / replay)はそのまま、内側を以下のいずれかに置き換える形で移行できます。

選択肢 想定用途 持続性 リプレイ スケール 運用負荷
インメモリ(本記事) デモ・単体プロセス・PoC なし プロセス内のみ 単一プロセス ほぼゼロ
Redis Streams 中規模・社内・既に Redis を使っている あり XREAD で履歴再生 中(クラスタで拡張) 低(Redis 1 台から始められる)
NATS JetStream クラウドネイティブ・軽量 MQ・マルチテナント あり DeliverPolicy.All 高(クラスタで水平分割) 低〜中
Apache Kafka 大規模・高スループット・長期保管 あり(数日〜恒久) --from-beginning / コンシューマグループ 非常に高 高(運用専門知識が必要)
AWS EventBridge(+ SQS / SNS) AWS 主体・サーバーレス連携 あり(SQS / DLQ・EventBridge Archive) EventBridge Archive で再生(SNS 単体には無し) 自動 低(マネージド)
Google Cloud Pub/Sub GCP 主体・サーバーレス・他リージョン跨ぎ あり seek でタイムスタンプ巻き戻し 自動 低(マネージド)

選び方の目安は以下です。

  • 小規模・PoC: 本記事のインメモリ実装で十分。プロセスを跨ぐ必要が出てから検討すれば良い
  • すでに Redis がある社内インフラ: Redis Streams が最短距離。コンシューマグループや XACK で本格運用にも耐える
  • クラウドネイティブで軽量さ重視: NATS JetStream。Helm chart 1 個で動かせて、JetStream はリプレイ・永続化も付いてくる
  • 大規模・長期保管・複数チームで共有: Kafka。運用負荷は高いが、データ基盤の共通バスとして使うなら無難
  • AWS / GCP 主体: マネージドサービス(EventBridge + SQS / SNS、または Pub/Sub)が運用の手離れがよい。EventBridge Archive / Pub/Sub seek でリプレイにも対応

実装の置き換えは、EventBus.publish / subscribe / replay の中身をブローカーの SDK 呼び出しに差し替えるだけで済みます。エージェント側のコード(subscriber ハンドラ・publish の呼び方)は変えずに済むため、小さく始めて段階的に本番化する戦略が取りやすいのが pub/sub アーキテクチャの強みです。

どんなときに使うか

A2A プロトコルと本記事のエージェント・バスは、選択軸が違うので両方使い分けるのが現実的です。

エージェント・バスが向く場面

  • 同一サービス内 で複数エージェントを協調させたい(要約・翻訳・タグ抽出を別チームのエージェントとして分業)
  • イベント駆動 で新しいエージェントを差し込みやすくしたい(既存のコードを触らずに subscriber を増やす)
  • ファンアウト が中心(1 イベントを複数 subscriber が並列処理)
  • イベントログを残して再生 したい(過去データへの再評価・新エージェントのバックフィル)
  • 失敗時のリトライ・デッドレターキュー をブローカー側に任せたい

A2A プロトコルが向く場面

  • 組織・会社をまたぐ 外部公開 のエージェント連携
  • 同期 RPC で「依頼 → 結果」を往復したい(タスク委譲)
  • 認証・認可・Card による能力宣言など、プロトコル仕様としての標準化を活用したい
  • ストリーミングを タスク進捗 として段階的に通知したい

両者を組み合わせると、A2A サーバーが受けたタスクを内部のエージェント・バスに publish し、複数エージェントが並列に処理して結果を集約してから A2A レスポンスとして返す、という入れ子構成が自然に実装できます。

まとめ

社内エージェント・バスを asyncio のインメモリ pub/sub で実装し、その上に LiteLLM × LangChain のエージェントを subscriber として接続するパターンを紹介しました。
`A2A プロトコルが「外部公開・同期 RPC」を担うのに対して、本記事のエージェント・バスは「社内・非同期・イベント駆動」を担う相補関係にあります。本番では Redis Streams / NATS JetStream / Apache Kafka / AWS EventBridge / Google Cloud Pub/Sub のいずれかにバックエンドを差し替えることで、永続化・スケール・障害耐性を獲得できます。

最後まで読んでいただきありがとうございました。


生成AI活用はクラスメソッドにお任せ

過去に支援してきた生成AIの支援実績100+を元にホワイトペーパーを作成しました。御社が抱えている課題のうち、どれが解決できて、どのようなサービスが受けられるのか?4つのフェーズに分けてまとめています。どうぞお気軽にご覧ください。

生成AI資料イメージ

無料でダウンロードする

この記事をシェアする

関連記事