LiteLLMとLangGraphのFunctional API(@entrypoint / @task)でワークフローを素のPython関数として書いてみる
はじめに
データ事業本部のkobayashiです。
LiteLLM × LangGraph シリーズ全20回 + Recap でひと区切りつけたあと、Recap で「シリーズで扱わなかったトピック」として挙げた LangGraph Functional API を試したのでまとめます。
シリーズ本編は StateGraph ベースで統一していたため、@entrypoint / @task で宣言的にワークフローを書く Functional API には触れていませんでした。本記事では Functional API の基本から、StateGraph との書き比べ・並列実行・Checkpointer + HITL までをサンプルコード付きで紹介します。
LangGraph Functional API とは
Functional API は、LangGraph のワークフローを @entrypoint でエントリーポイントを宣言し、@task でステップを切り出す という関数デコレータベースで記述する API です。
主な特徴:
- 宣言的ではあるが「素の Python 関数」のまま書ける:
add_node/add_edgeのような明示的なグラフ宣言が不要で、if/for/whileといった通常の制御構文がそのまま使える @taskは future を返す:task()を呼び出すと future が返り、複数タスクを起動してresult()で集約することで自然に並列化できる- Checkpointer・HITL・previous パラメータと統合済み:
@entrypoint(checkpointer=...)で永続化を有効化、interrupt()で人間レビュー、previousパラメータで同じスレッドの前回結果を参照できる
StateGraph との違い
同じ LangGraph でも記述スタイルが大きく異なります。
| 観点 | StateGraph | Functional API |
|---|---|---|
| 制御フロー | add_edge / add_conditional_edges でグラフ宣言 |
if / for / while の Python 構文で記述 |
| ステート管理 | TypedDict などのステートスキーマで明示 |
関数のローカル変数 + 戻り値で表現 |
| 並列実行 | Send API、ファンアウトノード |
@task の future を複数生成して result() で集約 |
| Checkpointer | graph.compile(checkpointer=...) |
@entrypoint(checkpointer=...) |
| HITL | ノード関数内で interrupt() を呼ぶ + Command(resume=...) |
エントリーポイント関数内で interrupt() を呼ぶ + Command(resume=...)(呼び方は同じ) |
| 学習コスト | グラフ思考が必要 | Python 関数が書ければ書ける |
| 可視化・デバッグ | LangGraph Studio などのグラフ視覚化と相性が良い | 通常の関数として読めるが、構造の俯瞰には弱い |
「フローが複雑で分岐が多いなら StateGraph、線形 + 並列 + 軽い分岐なら Functional API」というのが現状の使い分けの目安です。
環境
今回使用した環境は以下の通りです。
Python 3.13
litellm 1.83.14
langgraph 1.1.10
langchain-litellm 0.6.4
langchain-core 1.3.2
$ uv pip install litellm langgraph langchain-litellm langchain-core
$ export ANTHROPIC_API_KEY="sk-ant-..."
なお本記事のサンプルでは冒頭に litellm.modify_params = True を入れています。これは Anthropic 経由でツール呼び出しを多用する際に、LiteLLM が空メッセージや tool_calls の形式を Anthropic API に合わせて自動補正するためのフラグで、今回の HITL や ReAct ループの動作を安定させるための保険です。
基本: @entrypoint と @task
最小サンプルとして、テキストを LLM に要約させるだけのワークフローを書いてみます。LLM 呼び出しを @task に切り出し、@entrypoint で外側の関数を宣言します。
"""@entrypoint と @task の最小サンプル。
LLM 呼び出しを @task に切り出し、@entrypoint から呼び出すワークフロー。
StateGraph を使わず、関数定義だけでワークフローを記述できる。
"""
import litellm
from langchain_litellm import ChatLiteLLM
from langgraph.func import entrypoint, task
litellm.modify_params = True
llm = ChatLiteLLM(model="anthropic/claude-sonnet-4-6")
@task
def summarize(text: str) -> str:
"""与えられたテキストを 80 字程度で要約する。"""
response = llm.invoke(
[
{"role": "system", "content": "与えられた文章を80字程度の日本語で要約してください。前置きは不要です。"},
{"role": "user", "content": text},
]
)
return str(response.content)
@entrypoint()
def summarize_workflow(text: str) -> dict:
"""要約タスクを呼び出し、結果を辞書で返すエントリーポイント。"""
summary_future = summarize(text)
summary = summary_future.result()
return {"original_length": len(text), "summary": summary}
if __name__ == "__main__":
sample_text = (
"LiteLLMはOpenAI互換のインターフェースで100以上のLLMプロバイダーを統一的に呼び出せるPythonライブラリです。"
"OpenAI・Anthropic・Gemini・Bedrock・Vertex AI・Ollamaなどをモデル名の文字列を変えるだけで切り替えられ、"
"プロバイダー固有のSDKやAPIフォーマットの差異を吸収します。"
"ストリーミング・非同期呼び出し・フォールバック・コスト追跡といった本番運用で必要な機能も標準で備えており、"
"アプリケーションロジックとモデル選定を疎結合に保ちながら開発を進められます。"
)
result = summarize_workflow.invoke(sample_text)
print("=== 要約ワークフローの実行結果 ===")
print(f"原文の長さ: {result['original_length']}文字")
print(f"要約: {result['summary']}")
ポイントは以下の3点です。
@taskでラップした関数summarizeは、呼び出すと future を返す。.result()を呼ぶと処理結果が取れる@entrypoint()でラップした関数が ワークフローのエントリーポイント になり、.invoke(...)で実行する- ワークフロー本体は普通の Python 関数として書ける。
add_nodeもadd_edgeもない
実行してみると要約が返ってきます。
$ python basic_functional_api.py
=== 要約ワークフローの実行結果 ===
原文の長さ: 261文字
要約: LiteLLMは100以上のLLMプロバイダーをOpenAI互換インターフェースで統一的に扱えるPythonライブラリ。モデル名を変えるだけで切り替えでき、ストリーミングやフォールバックなど本番運用機能も標準搭載。
タスクの並列実行
@task は呼び出すと future を返すため、複数タスクを連続で生成してから .result() でまとめて回収する だけで並列実行になります。asyncio.gather のような同期コードのまま並列化できる点が Functional API の利点です。
"""@task を future として並列実行するサンプル。
複数の @task を呼び出すと future が返り、result() を呼び出すまで実行が並走する。
3都市の天気コメントを LLM に並列で生成させ、最後にまとめる。
"""
import time
import litellm
from langchain_litellm import ChatLiteLLM
from langgraph.func import entrypoint, task
litellm.modify_params = True
llm = ChatLiteLLM(model="anthropic/claude-sonnet-4-6")
WEATHER_DATA = {
"東京": "晴れ、気温25°C、湿度60%",
"大阪": "曇り、気温23°C、湿度70%",
"札幌": "雪、気温-2°C、湿度80%",
}
@task
def comment_on_weather(city: str) -> str:
"""都市名を受け取り、天気データを踏まえた一言コメントを LLM に生成させる。"""
weather = WEATHER_DATA[city]
response = llm.invoke(
[
{"role": "system", "content": "天気データを踏まえて、その都市を訪れる人向けの一言アドバイスを40字以内で書いてください。前置きは不要です。"},
{"role": "user", "content": f"{city}の天気: {weather}"},
]
)
return str(response.content).strip()
@entrypoint()
def parallel_weather_comments(cities: list[str]) -> dict:
"""複数都市のコメント生成を並列起動して集約する。"""
started_at = time.time()
futures = [comment_on_weather(city) for city in cities]
comments = {city: f.result() for city, f in zip(cities, futures, strict=True)}
elapsed = time.time() - started_at
return {"comments": comments, "elapsed_sec": round(elapsed, 2)}
if __name__ == "__main__":
cities = ["東京", "大阪", "札幌"]
result = parallel_weather_comments.invoke(cities)
print("=== 並列コメント生成 ===")
for city, comment in result["comments"].items():
print(f"[{city}] {comment}")
print(f"\n所要時間: {result['elapsed_sec']}秒({len(cities)}都市の LLM 呼び出しを並列実行)")
futures = [comment_on_weather(city) for city in cities] の段階で 3 都市分の LLM 呼び出しが起動済みになります。次の行の f.result() で結果を回収するときに既に並列処理が走っているため、3 回分を直列で呼ぶより速く終わります。
実行結果は以下のようになりました。3 都市の LLM 呼び出しを並列で走らせて 1.8 秒程度で完了しています。
$ python parallel_tasks.py
=== 並列コメント生成 ===
[東京] 日差しが強めなので、帽子や日焼け止めを忘れずに!過ごしやすい気候を満喫して。
[大阪] 蒸し暑さを感じやすいので、こまめな水分補給と薄手の上着をお忘れなく!
[札幌] 防寒対策を万全に!路面凍結に注意しながら、ゆっくり歩きましょう。
所要時間: 1.8秒(3都市の LLM 呼び出しを並列実行)
StateGraph で同じことをやろうとすると Send API でファンアウトする必要があり、ノード関数 + 集約ノードを書く必要が出てきます。Functional API ではリスト内包表記 1 行で済むのが大きな差です。
StateGraph と Functional API の書き比べ
LiteLLMとLangGraphでプロバイダー非依存のAIエージェントを構築してみるの custom_graph_agent.py(商品検索→在庫確認の ReAct エージェント)と同じフローを Functional API で書き直します。
"""StateGraph と Functional API の書き比べサンプル。
商品検索 → 在庫確認 という同一フローを Functional API で書き直し、
StateGraph 版(litellm-langgraph/custom_graph_agent.py)と比較する。
"""
import litellm
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.tools import tool
from langchain_litellm import ChatLiteLLM
from langgraph.func import entrypoint, task
from langgraph.prebuilt import ToolNode
litellm.modify_params = True
llm = ChatLiteLLM(model="anthropic/claude-sonnet-4-6")
@tool
def search_product(query: str) -> str:
"""商品を検索します。"""
products = {
"ノートPC": "商品A: 15.6インチ、メモリ16GB、SSD 512GB — ¥120,000",
"キーボード": "商品B: メカニカル、日本語配列、Cherry MX赤軸 — ¥15,000",
"モニター": "商品C: 27インチ、4K、IPS液晶 — ¥45,000",
}
for key, value in products.items():
if key in query:
return value
return f"'{query}'に一致する商品は見つかりませんでした"
@tool
def check_stock(product_name: str) -> str:
"""商品の在庫状況を確認します。商品IDまたはカテゴリ名(例: 商品A、ノートPC)を受け付けます。"""
stock = {
"商品A": "在庫あり(残り3台)",
"商品B": "在庫あり(残り12個)",
"商品C": "在庫なし(入荷予定: 1週間後)",
"ノートPC": "在庫あり(残り3台)",
"キーボード": "在庫あり(残り12個)",
"モニター": "在庫なし(入荷予定: 1週間後)",
}
for key, value in stock.items():
if key in product_name:
return value
return "在庫情報が見つかりません"
tools = [search_product, check_stock]
llm_with_tools = llm.bind_tools(tools)
tool_node = ToolNode(tools)
SYSTEM_PROMPT = (
"あなたは商品検索アシスタントです。"
"ユーザーの質問に対して、必ずツールを使って情報を取得してから回答してください。"
"推測で回答せず、ツールの結果に基づいて回答してください。"
)
@task
def call_model(messages: list) -> AIMessage:
"""LLM を呼び出して AIMessage を返す。"""
response = llm_with_tools.invoke(messages)
return response
@task
def call_tools(message: AIMessage) -> list:
"""tool_calls を実行して ToolMessage のリストを返す。"""
result = tool_node.invoke({"messages": [message]})
return result["messages"]
@entrypoint()
def product_agent(user_query: str) -> dict:
"""ReAct ループを Functional API で記述する。
StateGraph で書く場合は add_node / add_conditional_edges でループを定義するが、
Functional API では普通の while ループで書ける。
"""
messages: list = [
SystemMessage(content=SYSTEM_PROMPT),
HumanMessage(content=user_query),
]
max_steps = 5
for step in range(max_steps):
ai_message = call_model(messages).result()
messages.append(ai_message)
if not ai_message.tool_calls:
return {"messages": messages, "final_answer": ai_message.content}
if step == max_steps - 1:
raise RuntimeError(f"ReAct ループが上限 {max_steps} ステップに達しました。最終回答が得られていません。")
tool_messages = call_tools(ai_message).result()
messages.extend(tool_messages)
raise RuntimeError("unreachable")
if __name__ == "__main__":
result = product_agent.invoke("ノートPCを探しています。在庫はありますか?")
print("=== Functional API 版エージェントの実行結果 ===")
for msg in result["messages"]:
role = msg.type
content = getattr(msg, "content", "")
if content:
print(f"\n[{role}] {content}")
print("\n=== 最終回答 ===")
print(result["final_answer"])
StateGraph 版(前述記事の custom_graph_agent.py)と比較すると、以下の違いがあります。
| StateGraph 版で書いていたもの | Functional API 版での書き方 |
|---|---|
class State(TypedDict): messages: ...(ステートスキーマ定義) |
関数のローカル変数 messages: list = [...] |
add_node("agent", agent_node) / add_node("tools", ToolNode(tools)) |
@task def call_model(...) / @task def call_tools(...) |
add_edge(START, "agent") / add_edge("tools", "agent") |
関数内の for ループ |
add_conditional_edges("agent", should_continue, ...) |
関数内の if not ai_message.tool_calls: return ...(上限到達時は RuntimeError) |
graph.compile() |
@entrypoint() デコレータ |
ReAct ループを「グラフ + 条件分岐エッジ」ではなく「素の for + if + return」で書ける点が、Functional API の読みやすさです(上限ステップ到達時は RuntimeError で fail-fast にしています)。一方、ノードの依存関係を明示的に図示したい場合や Studio で可視化したい場合は StateGraph の方が向きます。
実行結果は以下のとおりです。
$ python compare_with_stategraph.py
=== Functional API 版エージェントの実行結果 ===
[system] あなたは商品検索アシスタントです。ユーザーの質問に対して、必ずツールを使って情報を取得してから回答してください。推測で回答せず、ツールの結果に基づいて回答してください。
[human] ノートPCを探しています。在庫はありますか?
[ai] 承知しました!ノートPCの検索と在庫確認を同時に行います。少々お待ちください。
[tool] 商品A: 15.6インチ、メモリ16GB、SSD 512GB — ¥120,000
[tool] 在庫あり(残り3台)
[ai] 検索・在庫確認の結果をお知らせします!
🖥️ 見つかった商品: 商品A(15.6インチ、メモリ16GB、SSD 512GB、¥120,000)
📦 在庫状況: あり(残り3台)
Checkpointer と HITL
@entrypoint(checkpointer=...) を指定すると、thread_id ごとに状態がスナップショット保存され、interrupt() での中断・Command(resume=...) での再開が使えるようになります。
さらに previous パラメータをエントリーポイント関数に追加すると、同じ thread_id での前回 invoke の戻り値を参照できます。
"""@entrypoint(checkpointer=...) と interrupt() を組み合わせた HITL サンプル。
エッセイを生成 → 人間レビューを interrupt で待つ → 再開して結果を返す
というワークフローを Functional API で書く。
previous パラメータを使うと、同じ thread_id での前回返り値を参照できる。
"""
import litellm
from langchain_litellm import ChatLiteLLM
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import Command, interrupt
litellm.modify_params = True
llm = ChatLiteLLM(model="anthropic/claude-sonnet-4-6")
@task
def compose_essay(topic: str) -> str:
"""与えられたトピックで 3 文程度のミニエッセイを生成する。"""
response = llm.invoke(
[
{"role": "system", "content": "与えられたトピックで3文程度の短いエッセイを日本語で書いてください。"},
{"role": "user", "content": topic},
]
)
return str(response.content)
@entrypoint(checkpointer=InMemorySaver())
def review_workflow(topic: str, *, previous: dict | None = None) -> dict:
"""エッセイ生成 → 人間レビュー → 結果返却のワークフロー。
@task でラップしたエッセイ生成は checkpoint にキャッシュされるため、
interrupt から resume された際に再生成は走らない。
previous には同じ thread_id で前回 invoke が返した値が入る。
"""
essay = compose_essay(topic).result()
review = interrupt({"question": "このエッセイを承認しますか?", "essay": essay})
return {
"topic": topic,
"essay": essay,
"review": review,
"previous_review": previous.get("review") if previous else None,
}
def run_once(topic: str, thread_id: str, mock_review: str) -> dict:
"""1セッション = 初回呼び出し + interrupt + resume を1回ぶん実行する。"""
config = {"configurable": {"thread_id": thread_id}}
print(f"\n--- thread_id={thread_id} / topic={topic!r} ---")
first = review_workflow.invoke(topic, config)
interrupt_payload = first["__interrupt__"][0].value
print("[中断] 人間に判断を仰ぎます")
print(f" question: {interrupt_payload['question']}")
print(f" essay: {interrupt_payload['essay']}")
print(f" (擬似レビュー: {mock_review!r})")
resumed = review_workflow.invoke(Command(resume=mock_review), config)
print("[再開] ワークフロー完了")
print(f" review: {resumed['review']}")
print(f" previous_review: {resumed['previous_review']}")
return resumed
if __name__ == "__main__":
# 1回目: previous は None
run_once("LiteLLM の魅力", thread_id="thread-A", mock_review="approved")
# 2回目: 同じ thread_id で別トピックを実行 → previous に1回目の結果が入る
run_once("LangGraph の魅力", thread_id="thread-A", mock_review="needs revision")
実装上のポイントは3つあります。
@entrypoint(checkpointer=InMemorySaver())で永続化を有効化。本番ではSqliteSaver/PostgresSaverに差し替えるinterrupt({...})を呼ぶとワークフローがそこで停止し、invoke()の戻り値の__interrupt__キーから interrupt のペイロードを取り出せる。再開はCommand(resume=値)をinvoke()に渡すentrypoint関数のシグネチャにpreviousパラメータを足すと、前回 invoke の戻り値が自動的に注入される。スレッド継続性のあるワークフローを書くときに便利
@task でラップした compose_essay は checkpoint にキャッシュされるため、interrupt から resume されたときに LLM が再呼び出しされないのも嬉しいポイントです(StateGraph で同じことをするとノード単位での再実行制御を意識する必要があります)。
実行結果は以下のとおりです。2回目の previous_review に 1 回目の review (approved)が引き継がれています。
$ python with_checkpointer_hitl.py
--- thread_id=thread-A / topic='LiteLLM の魅力' ---
[中断] 人間に判断を仰ぎます
question: このエッセイを承認しますか?
essay: # LiteLLMの魅力
LiteLLMは、OpenAIやAnthropic、Geminiなど、数多くのLLMプロバイダーを統一されたAPIインターフェースで扱える点が最大の魅力です。...
(擬似レビュー: 'approved')
[再開] ワークフロー完了
review: approved
previous_review: None
--- thread_id=thread-A / topic='LangGraph の魅力' ---
[中断] 人間に判断を仰ぎます
question: このエッセイを承認しますか?
essay: # LangGraphの魅力
LangGraphは、LLMを活用したアプリケーションを**グラフ構造**で設計できるフレームワークであり、...
(擬似レビュー: 'needs revision')
[再開] ワークフロー完了
review: needs revision
previous_review: approved
どちらを選ぶか
シリーズ全体を経たうえでの個人的な使い分けは以下です。
Functional API が向く場面
- LLM 呼び出しを 線形に並べる ワークフロー(要約・翻訳・抽出パイプラインなど)
- 並列ファンアウト が中心(複数都市・複数言語・複数候補生成)
- ReAct 風のループ を
for+returnで完結に書きたい - LangChain の Runnable や
@taskをミックスしてシンプルに組みたい
StateGraph が向く場面
- ノードが多く 複雑な条件分岐 が頻発する(Plan-and-Execute、Reflexion、Adaptive RAG など)
- LangGraph Studio や
mermaid出力で グラフを可視化 したい - ステートスキーマを型で固めたい(チームでステートの責務を共有する大規模アプリ)
add_messagesなどの reducer をフル活用したい
両者は排他ではなく、シリーズ第6回 Deep Research の Send のように、StateGraph の中の一部だけを Functional API スタイルで書くこともできるため、ワークフローのパートごとに使い分ける運用が現実的です。
まとめ
LangGraph Functional API(@entrypoint / @task)を使うと、ワークフローを素の Python 関数として書きつつ、Checkpointer・HITL・previous といった StateGraph と同等の機能をそのまま利用できます。線形 + 並列が中心なら Functional API、複雑な分岐や可視化が必要なら StateGraph、と使い分けるのがおすすめです。
最後まで読んでいただきありがとうございました。







