LangGraphで AIエージェントをさらに学んでいく - ストリーミング機能を使ってみる -
LangGraphのストリーミング機能 は、リアルタイムでのデータ処理やユーザーエクスペリエンスの向上を目的として使用します。
特に、LLMやエージェントワークフローの実行中に発生するデータをリアルタイムで取得したい場合に適しています。
チャットボットであればLLMの応答をトークン単位でストリームする(世の中のAIチャットぼっとは全部これかと思う)、
データ処理や外部APIへのリクエストの進捗を確認する、エージェントのデバッグにも使用されます。
ストリーミングを試す
必要なパッケージのインストール
pip install --quiet -U langgraph langchain_openai
LLMはOpenAIを使ってみます。
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
LLMのトークン
LLMからの応答をストリームさせてみます。
参考:How to stream LLM tokens from your graph
1つのノードに2つのLLMコールがある例です。
与えたトピックで、ジョークを返すものとポエムを返すLLMコールです。
from IPython.display import Image, display
from typing import TypedDict
from langgraph.graph import START, StateGraph, MessagesState
from langchain_openai import ChatOpenAI
# Note: we're adding the tags here to be able to filter the model outputs down the line
joke_model = ChatOpenAI(model="gpt-4o-mini", tags=["joke"])
poem_model = ChatOpenAI(model="gpt-4o-mini", tags=["poem"])
class State(TypedDict):
topic: str
joke: str
poem: str
async def call_model(state, config):
topic = state["topic"]
print("Writing joke...")
# Note: Passing the config through explicitly is required for python < 3.11
# Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
joke_response = await joke_model.ainvoke(
[{"role": "user", "content": f"Write a joke about {topic}"}],
config,
)
print("\n\nWriting poem...")
poem_response = await poem_model.ainvoke(
[{"role": "user", "content": f"Write a short poem about {topic}"}],
config,
)
return {"joke": joke_response.content, "poem": poem_response.content}
graph = StateGraph(State).add_node(call_model).add_edge(START, "call_model").compile()
display(Image(graph.get_graph().draw_mermaid_png()))
グラフ呼び出し
async for msg, metadata in graph.astream(
{"topic": "cat"},
stream_mode="messages",
):
if msg.content:
print(msg.content, end="|", flush=True)
グラフの出力は、streamもしくはastream関数(同期と非同期)を使います。
これらのメソッドを呼び出す際に指定できるモードはいくつかあります。
モード | 説明 |
---|---|
values |
各ステップ後の 状態全体 をストリーム |
updates |
各ステップ後の 変更があった部分のみ をストリーム |
custom |
ノード内の カスタムデータ をストリーム |
messages |
LLM のトークンとメタデータ をストリーム |
debug |
グラフの実行中の全情報 をストリーム(デバッグ用) |
猫をトピックとして与えた場合の結果ですが、
都度返答が表示されていました。
複数のストリーミング・モードをリストとして渡すことで、同時に指定することもできます。
async for msg, metadata in graph.astream(
{"topic": "cat"},
stream_mode=["updates", "messages"],
):
if isinstance(msg, dict) and "content" in msg:
print(msg["content"], end="|", flush=True)
変更がなかったのでストリームはされないという結果になりました。
LLMトークンとイベントのストリーミング
LangGraph では、ワークフローの実行中にさまざまな イベント(処理の開始・終了・途中経過など) が発生します。
.astream_events
を使うと、ワークフロー内のイベントをリアルタイムでストリームし、各ノードや LLM の動作を調べることができます。
主な用途
- LLMのトークンストリーミング
- ワークフロー内の 処理の進行状況を監視
- 各ノードの開始・終了・途中経過の情報を取得
- デバッグ(ワークフローのどの部分で問題が発生したかをリアルタイムで確認)
サンプルコード
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END
model = ChatOpenAI(model="gpt-4o-mini")
def call_model(state: MessagesState):
response = model.invoke(state['messages'])
return {"messages": response}
workflow = StateGraph(MessagesState)
workflow.add_node(call_model)
workflow.add_edge(START, "call_model")
workflow.add_edge("call_model", END)
app = workflow.compile()
inputs = [{"role": "user", "content": "hi!"}]
async for event in app.astream_events({"messages": inputs}, version="v1"):
kind = event["event"]
print(f"{kind}: {event['name']}")
各イベントには、次の 3 つのフィールドが含まれます。
- event → イベントの種類
- どの種類の処理が実行されたかを示す(例: on_chain_start, on_chain_end)。
- name → イベントの名前
- どのノードやグラフがイベントを発生させたかを示す。
- data → イベントの詳細データ
- 実際の処理内容に関連するデータ(LLM の出力やツールのレスポンスなど)。
サンプルコードのイベントの種類には以下のものがありました。
- on_chain_start → ノードの処理が開始
- on_chain_stream → ノードの処理中(ストリーミング)
- on_chain_end → ノードの処理が終了
- on_chat_model_start → チャットモデルの呼び出しが開始
- on_chat_model_stream → チャットモデルがトークンを生成
- on_chat_model_end → チャットモデルの呼び出しが完了
LLM のストリーミング応答をリアルタイムで受け取りたい場合は.astream()
,
ワークフローのどの部分が動いているか監視したい → .astream_events()
を使うと良いみたいですね。