LiteLLMとLangGraphでストリーミングを3階層(素・ノード・SSE)で実装してみる
はじめに
データ事業本部のkobayashiです。
今回はLiteLLMとLangGraphでのストリーミングを扱います。チャットUIで「文字が次々に出てくる」体験は、LLMアプリにおける必須機能です。本記事では、LiteLLM素のストリーミング、LangGraphのstream / astream_events、そしてFastAPI+SSEによるWeb公開までを段階的に紹介します。
ストリーミングの3階層
| 階層 | API | 何が yield されるか | 用途 |
|---|---|---|---|
| LiteLLM | completion(stream=True) |
チャンクごとのdelta | 単発呼び出し |
| LangGraph | app.stream(stream_mode="updates") |
ノード単位のステート変化 | 進捗表示 |
| LangGraph 細粒度 | app.astream_events(version="v2") |
LLMトークン・ツール起動など全イベント | UIへの組み込み |
環境
Python 3.13
litellm 1.83.14
langgraph 1.1.10
langchain-litellm 0.6.4
fastapi
sse-starlette
LiteLLM 素のストリーミング
最もシンプルな形式です。プロバイダーを問わず同じインターフェースで使えます。
"""LiteLLM 素のストリーミング: stream=True でチャンクを逐次受け取る。"""
from litellm import completion
response = completion(
model="anthropic/claude-sonnet-4-6",
messages=[{"role": "user", "content": "AIエージェントを200字で説明してください"}],
stream=True,
)
print("=== ストリーミング出力 ===")
for chunk in response:
delta = chunk.choices[0].delta.content or ""
print(delta, end="", flush=True)
print()
実行結果は以下のようになります(実際にはチャンクが逐次表示される)。
$ python litellm_stream.py
=== ストリーミング出力 ===
# AIエージェント
AIエージェントとは、人間の指示を受け、自律的に考えて行動するAIシステムです。単に質問に答えるだけでなく、**目標達成に向けて計画を立て、ツールを使用し、複数のステップを自ら実行**します。例えば、ウェブ検索・コード実行・ファイル操作などを組み合わせ、複雑なタスクを自動でこなします。人間の介入を最小限に抑えながら、継続的に判断・行動できる点が特徴です。
stream=Trueを渡すとイテレータが返り、各チャンクのdelta.contentにトークン断片が入ります。OpenAI・Anthropic・Geminiすべてこの形に正規化されます。Claude Sonnet 4.6 でも同じインターフェースで動くため、プロバイダーを変更しても呼び出し側コードは修正不要です。
LangGraphのストリーミング
エージェントの場合は、各ノードの実行・ツール呼び出し・LLMトークンを階層的にストリームできます。
"""LangGraph の stream() / astream_events() によるノード単位 / トークン単位ストリーミング。"""
import asyncio
from langchain.agents import create_agent
from langchain_core.tools import tool
from langchain_litellm import ChatLiteLLM
@tool
def get_population(city: str) -> str:
"""都市の人口を返します。"""
data = {"東京": "約1,400万人", "ニューヨーク": "約830万人"}
return data.get(city, "不明")
# streaming=True を渡さないと ChatLiteLLM の astream() は AIMessageChunk を
# 1チャンクで返すため on_chat_model_stream イベントが発火しない。
# また GPT-5系は reasoning モードで chunk が集約されるため、
# トークン単位を見せたいデモでは Claude を使う。
llm = ChatLiteLLM(model="anthropic/claude-sonnet-4-6", streaming=True)
agent = create_agent(model=llm, tools=[get_population])
# stream(): 各ノード実行後にステートが yield される
print("=== stream(): ノード単位 ===")
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "東京の人口は?"}]},
stream_mode="updates",
):
print(chunk)
print()
# astream_events(): トークン単位など細かいイベントが取れる
async def run_events() -> None:
print("=== astream_events(): トークン単位 ===", flush=True)
async for event in agent.astream_events(
{"messages": [{"role": "user", "content": "ニューヨークの人口は?"}]},
version="v2",
):
if event["event"] == "on_chat_model_stream":
chunk = event["data"]["chunk"]
content = chunk.content if isinstance(chunk.content, str) else ""
if content:
print(content, end="", flush=True)
elif event["event"] == "on_tool_start":
print(
f"\n[ツール開始] {event['name']}({event['data'].get('input')})",
flush=True,
)
elif event["event"] == "on_tool_end":
print(f"[ツール終了] -> {event['data'].get('output')}", flush=True)
print()
asyncio.run(run_events())
streaming=Trueが必須:ChatLiteLLM(model=...)だけだとastream()が AIMessageChunk を1チャンクで返してしまい、astream_events(v2)でもon_chat_model_streamが発火しません。コンストラクタでstreaming=Trueを明示する必要があります(デフォルトはFalse)。また IDE 実行時に出力が見えなくなるバッファリング対策としてprint(..., flush=True)を付けています。
ポイントは2つの粒度を使い分けることです。
stream(stream_mode="updates")
各ノードが完了するたびに更新ステートが yield されます。「Researcherが終わった」「Writerが終わった」などの進捗表示に向きます。
astream_events(version="v2")
LLMトークン・ツール呼び出し開始/終了など、実行中のあらゆるイベントが取れます。チャットUIの逐次表示に必須のAPIです。
主要なイベント:
event |
意味 |
|---|---|
on_chat_model_stream |
LLMがトークンを出力 |
on_tool_start |
ツール呼び出し開始 |
on_tool_end |
ツール呼び出し終了 |
on_chain_start / on_chain_end |
グラフ・ノードの開始/終了 |
実行結果は以下のようになります。
$ python langgraph_stream.py
=== stream(): ノード単位 ===
{'model': {'messages': [AIMessage(content='東京の人口を調べます!', tool_calls=[{'name': 'get_population', 'args': {'city': '東京'}, 'id': 'toolu_01FkMddoyYcHh9ythnRfBohL', 'type': 'tool_call'}], ...)]}}
{'tools': {'messages': [ToolMessage(content='約1,400万人', name='get_population', tool_call_id='toolu_01FkMddoyYcHh9ythnRfBohL')]}}
{'model': {'messages': [AIMessage(content='東京の人口は**約1,400万人**です。東京は日本の首都であり、世界有数の大都市の一つです。都市圏全体(東京圏)では3,500万人を超えるとも言われており、世界最大級の都市圏を形成しています。', ...)]}}
=== astream_events(): トークン単位 ===
ニューヨークの人口を調べます!
[ツール開始] get_population({'city': 'New York'})
[ツール終了] -> content='不明' name='get_population' tool_call_id='toolu_01PHdxTXiAjazGgX5bX9XgW5'
申し訳ありませんが、現在ニューヨークの人口データを取得することができませんでした。
一般的な情報としては、ニューヨーク市の人口は約**800万人以上**(市内)、都市圏全体では約**2,000万人以上**とされています。ただし、最新の正確な数値については、国勢調査や公式統計をご確認いただくことをお勧めします。
stream(stream_mode="updates") ではノード(model / tools)が完了するごとにステート差分が yield され、エージェントの進捗を粗い粒度で観察できます。
astream_events(version="v2") では on_chat_model_stream がトークン単位で発火し、ツール呼び出し前後の自然文が逐次表示されます。さらに on_tool_start / on_tool_end がリアルタイムに発火し、エージェントが「いまツールを呼んでいる」「ツールが返ってきた」というイベントを受け取れます。なおGPT-5系のreasoningモデルは応答チャンクが集約されるため on_chat_model_stream でトークンが流れず、トークンストリーミングの体感が出ません。本記事では Claude Sonnet 4.6 を採用しています。
FastAPI + SSE でWebに公開
実運用ではWebUIから呼ぶことが多いので、**Server-Sent Events(SSE)**でストリーミングを公開する例を見ます。
"""FastAPI + SSE でストリーミングをWebに公開する。
クライアントから /chat?q=... にアクセスすると、
LangGraphの出力が Server-Sent Events で返る。
"""
from collections.abc import AsyncIterator
from fastapi import FastAPI
from langchain.agents import create_agent
from langchain_core.tools import tool
from langchain_litellm import ChatLiteLLM
from sse_starlette.sse import EventSourceResponse
@tool
def get_weather(city: str) -> str:
"""都市の天気を返します。"""
return f"{city}: 晴れ、24°C"
# on_chat_model_stream を発火させるため streaming=True を必須にし、
# トークンが集約されない Claude を採用する。
llm = ChatLiteLLM(model="anthropic/claude-sonnet-4-6", streaming=True)
agent = create_agent(model=llm, tools=[get_weather])
app = FastAPI()
async def stream_chat(query: str) -> AsyncIterator[dict]:
async for event in agent.astream_events(
{"messages": [{"role": "user", "content": query}]},
version="v2",
):
if event["event"] == "on_chat_model_stream":
chunk = event["data"]["chunk"]
content = chunk.content if isinstance(chunk.content, str) else ""
if content:
yield {"event": "token", "data": content}
elif event["event"] == "on_tool_start":
yield {
"event": "tool_start",
"data": f"{event['name']}({event['data'].get('input')})",
}
elif event["event"] == "on_tool_end":
yield {"event": "tool_end", "data": str(event["data"].get("output"))}
yield {"event": "done", "data": "[DONE]"}
@app.get("/chat")
async def chat(q: str) -> EventSourceResponse:
return EventSourceResponse(stream_chat(q))
# 起動: uvicorn fastapi_sse:app --reload
# テスト: curl -N "http://localhost:8000/chat?q=東京の天気は?"
起動方法:
$ uvicorn fastapi_sse:app --reload
別ターミナルで:
$ curl -sN --get --data-urlencode "q=東京の天気は?" "http://localhost:8000/chat"
event: token
data: 東京の天気を
event: token
data: 確認します!
event: tool_start
data: get_weather({'city': '東京'})
event: tool_end
data: content='東京: 晴れ、24°C' name='get_weather' tool_call_id='toolu_01C7BLs4nthPNZ9aCPEZTgk5'
event: token
data: 現
event: token
data: 在の東京の天気は以下の通りです:
data:
data: - **天気**: 晴れ ☀️
data: - **気温**: 24°C
event: token
data:
data:
data: 過ごしやすい陽気ですね!何かお出かけの予定はありますか?
event: token
data: 😊
event: done
data: [DONE]
event: token がツール呼び出しの前後で逐次配信され、合間に event: tool_start / event: tool_end が挟まる構造が確認できます。ブラウザのフロントエンドからは、EventSource('/chat?q=...') で SSE を購読すれば、トークンを逐次UIに反映でき、ツール呼び出し中は「いまツールを呼んでいます」のような表示も実装できます。
LiteLLMでのモデル使い分け
| ロール | スクリプト | モデル | 理由 |
|---|---|---|---|
| 単純ストリーミング | litellm_stream | anthropic/claude-sonnet-4-6 |
プロバイダー非依存性の証明(OpenAIに切替えても同じコード) |
| ReActストリーム | langgraph_stream | anthropic/claude-sonnet-4-6 (streaming=True) |
on_chat_model_stream でトークン単位を可視化したいため Claude を採用 |
| FastAPI+SSE | fastapi_sse | anthropic/claude-sonnet-4-6 (streaming=True) |
event: token を逐次配信するため Claude を採用 |
LiteLLMでmodel=を変えるだけで、Claude→GPT、GPT→Claude の切替が可能です。本記事では LiteLLM 素ストリーミングだけ Claude にして、tool_use を伴うエージェント系は GPT を使う構成にしています。
まとめ
LiteLLM × LangGraph でのストリーミングを、completion(stream=True) を直接呼ぶパターン、LangGraph の stream(stream_mode="updates") でノード進捗を、astream_events(v2) でトークン・ツール呼び出しを受け取るパターン、そしてそれを FastAPI + SSE でブラウザに配信するパターンの3階層で紹介しました。
ChatLiteLLM は streaming=True を渡さないと astream_events(v2) で on_chat_model_stream が発火しないため、トークン単位を見せたいデモではコンストラクタ引数の指定が必須でした。また GPT-5 系の reasoning モデルは応答チャンクが集約されてしまうため、ストリーミング体感を出したい場面では Claude Sonnet 4.6 のようなトークン粒度で出力するモデルを選ぶ方が効果的です。LiteLLM で model= を切り替えるだけでこの選択ができる点も、本構成の利点です。
最後まで読んでいただきありがとうございました。







