LangGraphで AIエージェントをさらに学んでいく - ストリーミング機能を使ってみる -

LangGraphで AIエージェントをさらに学んでいく - ストリーミング機能を使ってみる -

Clock Icon2025.02.04

LangGraphのストリーミング機能 は、リアルタイムでのデータ処理やユーザーエクスペリエンスの向上を目的として使用します。
特に、LLMやエージェントワークフローの実行中に発生するデータをリアルタイムで取得したい場合に適しています。

チャットボットであればLLMの応答をトークン単位でストリームする(世の中のAIチャットぼっとは全部これかと思う)、

データ処理や外部APIへのリクエストの進捗を確認する、エージェントのデバッグにも使用されます。

Streamingについて

ストリーミングを試す

必要なパッケージのインストール

pip install --quiet -U langgraph langchain_openai

LLMはOpenAIを使ってみます。

import getpass
import os

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("OPENAI_API_KEY")

LLMのトークン

LLMからの応答をストリームさせてみます。

参考:How to stream LLM tokens from your graph

1つのノードに2つのLLMコールがある例です。
与えたトピックで、ジョークを返すものとポエムを返すLLMコールです。

from IPython.display import Image, display
from typing import TypedDict
from langgraph.graph import START, StateGraph, MessagesState
from langchain_openai import ChatOpenAI

# Note: we're adding the tags here to be able to filter the model outputs down the line
joke_model = ChatOpenAI(model="gpt-4o-mini", tags=["joke"])
poem_model = ChatOpenAI(model="gpt-4o-mini", tags=["poem"])

class State(TypedDict):
    topic: str
    joke: str
    poem: str

async def call_model(state, config):
    topic = state["topic"]
    print("Writing joke...")
    # Note: Passing the config through explicitly is required for python < 3.11
    # Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
    joke_response = await joke_model.ainvoke(
        [{"role": "user", "content": f"Write a joke about {topic}"}],
        config,
    )
    print("\n\nWriting poem...")
    poem_response = await poem_model.ainvoke(
        [{"role": "user", "content": f"Write a short poem about {topic}"}],
        config,
    )
    return {"joke": joke_response.content, "poem": poem_response.content}

graph = StateGraph(State).add_node(call_model).add_edge(START, "call_model").compile()

display(Image(graph.get_graph().draw_mermaid_png()))

グラフ呼び出し

async for msg, metadata in graph.astream(
    {"topic": "cat"},
    stream_mode="messages",
):
    if msg.content:
        print(msg.content, end="|", flush=True)

グラフの出力は、streamもしくはastream関数(同期と非同期)を使います。

これらのメソッドを呼び出す際に指定できるモードはいくつかあります。

モード 説明
values 各ステップ後の 状態全体 をストリーム
updates 各ステップ後の 変更があった部分のみ をストリーム
custom ノード内の カスタムデータ をストリーム
messages LLM のトークンとメタデータ をストリーム
debug グラフの実行中の全情報 をストリーム(デバッグ用)

猫をトピックとして与えた場合の結果ですが、

https://youtu.be/4-a4hyqHSmk

都度返答が表示されていました。

複数のストリーミング・モードをリストとして渡すことで、同時に指定することもできます。

async for msg, metadata in graph.astream(
    {"topic": "cat"},
    stream_mode=["updates", "messages"],
):
    if isinstance(msg, dict) and "content" in msg:
        print(msg["content"], end="|", flush=True)

スクリーンショット 2025-02-04 13.31.51

変更がなかったのでストリームはされないという結果になりました。

LLMトークンとイベントのストリーミング

LangGraph では、ワークフローの実行中にさまざまな イベント(処理の開始・終了・途中経過など) が発生します。
.astream_events を使うと、ワークフロー内のイベントをリアルタイムでストリームし、各ノードや LLM の動作を調べることができます。

主な用途

  • LLMのトークンストリーミング
  • ワークフロー内の 処理の進行状況を監視
  • 各ノードの開始・終了・途中経過の情報を取得
  • デバッグ(ワークフローのどの部分で問題が発生したかをリアルタイムで確認)

サンプルコード

from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START, END

model = ChatOpenAI(model="gpt-4o-mini")

def call_model(state: MessagesState):
    response = model.invoke(state['messages'])
    return {"messages": response}

workflow = StateGraph(MessagesState)
workflow.add_node(call_model)
workflow.add_edge(START, "call_model")
workflow.add_edge("call_model", END)
app = workflow.compile()

inputs = [{"role": "user", "content": "hi!"}]
async for event in app.astream_events({"messages": inputs}, version="v1"):
    kind = event["event"]
    print(f"{kind}: {event['name']}")

スクリーンショット 2025-02-04 13.59.52

各イベントには、次の 3 つのフィールドが含まれます。

  • event → イベントの種類
    • どの種類の処理が実行されたかを示す(例: on_chain_start, on_chain_end)。
  • name → イベントの名前
    • どのノードやグラフがイベントを発生させたかを示す。
  • data → イベントの詳細データ
    • 実際の処理内容に関連するデータ(LLM の出力やツールのレスポンスなど)。

サンプルコードのイベントの種類には以下のものがありました。

  • on_chain_start → ノードの処理が開始
  • on_chain_stream → ノードの処理中(ストリーミング)
  • on_chain_end → ノードの処理が終了
  • on_chat_model_start → チャットモデルの呼び出しが開始
  • on_chat_model_stream → チャットモデルがトークンを生成
  • on_chat_model_end → チャットモデルの呼び出しが完了

LLM のストリーミング応答をリアルタイムで受け取りたい場合は.astream(),

ワークフローのどの部分が動いているか監視したい → .astream_events()

を使うと良いみたいですね。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.