LiteLLM × LangGraphでHyDE・RAG-Fusion・Rerank・Self-RAG・Adaptive RAGなど高度なRAGパターンを実装してみる

LiteLLM × LangGraphでHyDE・RAG-Fusion・Rerank・Self-RAG・Adaptive RAGなど高度なRAGパターンを実装してみる

2026.05.05

はじめに

データ事業本部のkobayashiです。

これまでの記事ではLiteLLMの基本的な使い方と各エンドポイント、LangGraphと組み合わせたエージェント構築を紹介しました。今回はその続編として、LiteLLMを使って**RAG(Retrieval-Augmented Generation: 検索拡張生成)**を構築する方法を紹介します。

RAGは「外部のドキュメントから関連情報を検索し、その情報をもとにLLMが回答を生成する」アーキテクチャです。LLMの学習データに含まれない社内ドキュメントやFAQなどに基づいた回答を可能にします。

本記事では、LiteLLM単体でのシンプルなRAGから、HyDE・RAG-Fusion・Rerank・Self-RAG・Adaptive RAGなどの高度なRAGパターンまで、LiteLLMとLangGraphを使って段階的に実装します。

https://dev.classmethod.jp/articles/python-litellm-langgraph/

RAGの全体像

RAGパイプラインは大きく2つのフェーズに分かれます。

1. インデックス作成(事前処理)

ドキュメント → Embeddingモデルでベクトル化 → ベクトルDBに格納

2. 質問応答(実行時)

質問 → Embeddingモデルでベクトル化 → ベクトルDBで類似検索
    → 関連ドキュメント取得 → LLMにコンテキストとして渡して回答生成

LiteLLMが担うのはEmbeddingモデルと**LLM(Completion)**の2箇所です。ベクトルDBには今回ChromaDBを使用します。

環境構築

環境

今回使用した環境は以下の通りです。

Python 3.13
litellm 1.83.14
chromadb 1.5.7
langgraph 1.1.10
langchain-litellm 0.6.4
langchain-core 1.3.2
boto3 1.42.97

なお、pipでインストールします(boto3はBedrock経由のRerankで使用します)。

インストール

$ uv pip install litellm chromadb langgraph langchain-litellm langchain-core boto3

APIキーの設定

$ export OPENAI_API_KEY="sk-..."
$ export ANTHROPIC_API_KEY="sk-ant-..."
$ export GEMINI_API_KEY="..."

シンプルなRAGパイプライン

まずは基本的なRAGの実装です。社内ドキュメントをベクトル化してChromaDBに格納し、質問に対して関連ドキュメントを検索して回答を生成します。

simple_rag.py
import chromadb
from litellm import completion, embedding

# --- 1. ドキュメントの準備と埋め込み ---
documents = [
    "有給休暇は入社6ヶ月後に10日付与されます。申請は3営業日前までに上長の承認が必要です。",
    "リモートワークは週3日まで可能です。事前申請は不要ですが、勤怠システムへの記録が必要です。",
    "経費精算は月末締め、翌月15日払いです。領収書の原本またはスキャンデータの提出が必要です。",
    "育児休業は子が1歳になるまで取得可能です。延長の場合は最長2歳まで認められます。",
    "健康診断は年1回、会社負担で受診できます。35歳以上は人間ドックも選択可能です。",
]

# LiteLLMでドキュメントをベクトル化
response = embedding(model="openai/text-embedding-3-small", input=documents)
vectors = [item["embedding"] for item in response.data]

# --- 2. ChromaDBに格納 ---
client = chromadb.Client()
collection = client.create_collection("company_docs")
collection.add(
    ids=[f"doc_{i}" for i in range(len(documents))],
    embeddings=vectors,
    documents=documents,
)
print(f"ドキュメント {len(documents)} 件を登録しました")

# --- 3. 質問に回答 ---
queries = [
    "リモートワークは週何日までできますか?",
    "健康診断の費用は自己負担ですか?",
]

for query in queries:
    print(f"\n{'='*50}")
    print(f"質問: {query}")
    print(f"{'='*50}")

    # クエリをベクトル化して類似検索
    query_response = embedding(model="openai/text-embedding-3-small", input=[query])
    results = collection.query(
        query_embeddings=[query_response.data[0]["embedding"]],
        n_results=2,
    )

    # 検索結果を表示
    retrieved_docs = results["documents"][0]
    print(f"\n検索結果({len(retrieved_docs)} 件):")
    for i, doc in enumerate(retrieved_docs, 1):
        print(f"  {i}. {doc}")

    # 検索結果をコンテキストにして回答生成
    context = "\n".join(retrieved_docs)
    response = completion(
        model="anthropic/claude-sonnet-4-6",
        messages=[
            {
                "role": "system",
                "content": (
                    "以下の社内ドキュメントの情報のみをもとに質問に回答してください。"
                    "ドキュメントに記載がない場合は「情報が見つかりませんでした」と回答してください。\n\n"
                    f"{context}"
                ),
            },
            {"role": "user", "content": query},
        ],
    )
    print(f"\n回答: {response.choices[0].message.content}")

ポイントはEmbeddingとCompletionの両方でlitellmの関数を使っている点です。それぞれのモデル名を変えるだけでプロバイダーを切り替えられます。

また、systemプロンプトで「ドキュメントの情報のみをもとに回答」と指定することで、LLMが学習データに基づいた誤った回答(ハルシネーション)を生成するリスクを低減しています。

プロバイダーを組み合わせたRAG

LiteLLMの強みは、EmbeddingモデルとCompletionモデルを異なるプロバイダーから自由に組み合わせられる点です。コストや精度の最適な組み合わせを簡単に検証できます。

hybrid_rag.py
import chromadb
from litellm import completion, completion_cost, embedding

# --- ドキュメント ---
documents = [
    "有給休暇は入社6ヶ月後に10日付与されます。申請は3営業日前までに上長の承認が必要です。",
    "リモートワークは週3日まで可能です。事前申請は不要ですが、勤怠システムへの記録が必要です。",
    "経費精算は月末締め、翌月15日払いです。領収書の原本またはスキャンデータの提出が必要です。",
    "育児休業は子が1歳になるまで取得可能です。延長の場合は最長2歳まで認められます。",
    "健康診断は年1回、会社負担で受診できます。35歳以上は人間ドックも選択可能です。",
]

# --- Embeddingモデルと回答生成モデルの組み合わせを定義 ---
configurations = [
    {
        "name": "OpenAI Embedding + Claude 回答生成",
        "embedding_model": "openai/text-embedding-3-small",
        "completion_model": "anthropic/claude-sonnet-4-6",
    },
    {
        "name": "Gemini Embedding + GPT-5.5 回答生成",
        "embedding_model": "gemini/text-embedding-004",
        "completion_model": "openai/gpt-5.5",
    },
    {
        "name": "OpenAI Embedding + Gemini 回答生成",
        "embedding_model": "openai/text-embedding-3-small",
        "completion_model": "gemini/gemini-2.5-flash",
    },
]

query = "リモートワークのルールを教えてください"

for config in configurations:
    print(f"\n{'='*60}")
    print(f"構成: {config['name']}")
    print(f"  Embedding: {config['embedding_model']}")
    print(f"  Completion: {config['completion_model']}")
    print(f"{'='*60}")

    # ドキュメントをベクトル化
    embed_response = embedding(model=config["embedding_model"], input=documents)
    vectors = [item["embedding"] for item in embed_response.data]

    # ChromaDBに格納(構成ごとにコレクションを分ける)
    client = chromadb.Client()
    collection_name = config["embedding_model"].replace("/", "_")
    collection = client.get_or_create_collection(collection_name)
    collection.add(
        ids=[f"doc_{i}" for i in range(len(documents))],
        embeddings=vectors,
        documents=documents,
    )

    # クエリをベクトル化して検索
    query_response = embedding(model=config["embedding_model"], input=[query])
    results = collection.query(
        query_embeddings=[query_response.data[0]["embedding"]],
        n_results=2,
    )

    # 回答生成
    context = "\n".join(results["documents"][0])
    comp_response = completion(
        model=config["completion_model"],
        messages=[
            {
                "role": "system",
                "content": (
                    "以下の社内ドキュメントの情報のみをもとに質問に回答してください。\n\n"
                    f"{context}"
                ),
            },
            {"role": "user", "content": query},
        ],
    )

    # コスト計算
    cost = completion_cost(completion_response=comp_response)

    print(f"\n回答: {comp_response.choices[0].message.content}")
    print(f"コスト: ${cost:.6f}")

実行結果の例:

============================================================
構成: OpenAI Embedding + Claude 回答生成
  Embedding: openai/text-embedding-3-small
  Completion: anthropic/claude-sonnet-4-6
============================================================

回答: リモートワークは週3日まで可能です。事前申請は不要ですが、勤怠システムへの記録が必要です。
コスト: $0.000630

============================================================
構成: OpenAI Embedding + Gemini 回答生成
  Embedding: openai/text-embedding-3-small
  Completion: gemini/gemini-2.5-flash
============================================================

回答: リモートワークは週3日まで可能です。事前申請は不要ですが、勤怠システムへの記録が必要です。
コスト: $0.000098

このように、LiteLLMなら設定を変えるだけでEmbeddingとCompletionの組み合わせを自由に試すことができます。コストと回答品質のバランスを定量的に比較して、最適な構成を選べるのが大きな利点です。

シンプルRAGの課題

ここまでの「検索→回答」の1パスRAGには以下の課題があります。

  • 無関係なドキュメントが検索される: ベクトル検索はあくまで類似度ベースなので、的外れな文書が上位に来ることがある
  • 検索クエリの質に依存: ユーザーの質問がそのまま良い検索クエリとは限らない
  • すべての質問に検索する無駄: 挨拶や一般知識への質問にもドキュメント検索を実行してしまう

以降では、これらの課題を解決する様々なRAG改善パターンをLiteLLMとLangGraphを使って紹介します。

本記事で扱うRAGパターンの全体像

各パターンを個別に見ていく前に、一般的なRAGパイプラインのステージごとに本記事で扱う一般的な改善手法を整理しておきます。

[1] クエリ処理(Query Expansion)
                    → HyDE / Multi-Query(RAG-Fusion)

[2] 一次検索        → ベクトル検索 / BM25 / ハイブリッド
                    (複数結果リストの場合は RRF で統合)

[3] 再ランキング    → Rerank(litellm.rerank)

[4] LLMへ投入       → Corrective RAG / Self-RAG(生成前後の品質チェック)

本記事では[1]〜[4]の各ステージに対応するパターンを順に扱います。[1]のクエリ処理は「Query Expansion」と総称される領域で、HyDE(仮想回答による拡張)やMulti-Query(複数の言い換えクエリ生成、RAG-Fusionの中核)が代表的な手法です。[2]の一次検索ではChromaDBによるベクトル検索のみを使いますが、実運用ではBM25とのハイブリッド検索を組み合わせることでさらに精度を高められます。RRF(Reciprocal Rank Fusion)はRAG-Fusionなど複数の結果リストが生まれる場合に使うサブステップで、本記事ではRAG-Fusion節で扱います。

なお[1]の前段にAdaptive RAG / Agentic RAGによる「検索の要否や経路のルーティング」を挟むことで、パイプライン全体の効率をさらに高められます。

それでは、各パターンで共通して使用するドキュメントとベクトルDBのセットアップを共通モジュールとして切り出すところから始めます。

共通モジュール(common.py)

ここからはすべてのRAGパターンは同じドキュメントとベクトルDBを使用します。毎回セットアップするのは無駄なので、PersistentClientで永続化し、初回のみ埋め込みを実行する共通モジュールを作成します。

common.py
import chromadb
from litellm import embedding

# --- ドキュメント ---
documents = [
    "有給休暇は入社6ヶ月後に10日付与されます。申請は3営業日前までに上長の承認が必要です。",
    "リモートワークは週3日まで可能です。事前申請は不要ですが、勤怠システムへの記録が必要です。",
    "経費精算は月末締め、翌月15日払いです。領収書の原本またはスキャンデータの提出が必要です。",
    "育児休業は子が1歳になるまで取得可能です。延長の場合は最長2歳まで認められます。",
    "健康診断は年1回、会社負担で受診できます。35歳以上は人間ドックも選択可能です。",
]

EMBEDDING_MODEL = "openai/text-embedding-3-small"

# --- ChromaDBにドキュメントを格納(永続化) ---
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_or_create_collection("company_docs")

if collection.count() == 0:
    response = embedding(model=EMBEDDING_MODEL, input=documents)
    vectors = [item["embedding"] for item in response.data]
    collection.add(
        ids=[f"doc_{i}" for i in range(len(documents))],
        embeddings=vectors,
        documents=documents,
    )
    print(f"ドキュメント {len(documents)} 件を登録しました")
else:
    print(f"既存のコレクションを使用します({collection.count()} 件)")

def retrieve(query: str, n_results: int = 2) -> list[str]:
    """クエリに関連するドキュメントを検索する。"""
    query_response = embedding(model=EMBEDDING_MODEL, input=[query])
    results = collection.query(
        query_embeddings=[query_response.data[0]["embedding"]],
        n_results=n_results,
    )
    return results["documents"][0]

ポイントは以下の3点です。

  • PersistentClient: chromadb.Client()(インメモリ)ではなくPersistentClient(path="./chroma_db")を使い、ディスクに永続化します。2回目以降の実行ではEmbedding APIを呼ばずに既存データを再利用できます
  • get_or_create_collection: コレクションが存在すればそのまま取得、なければ新規作成します
  • retrieve()関数: クエリをベクトル化してChromaDBで類似検索する処理を共通化しています

以降のコードではfrom common import retrievefrom common import EMBEDDING_MODEL, collectionでこのモジュールをインポートして使用します。

HyDE(仮説文書埋め込み)

HyDE(Hypothetical Document Embeddings)は、検索クエリとドキュメントの語彙ギャップを解消する事前処理手法です。ユーザーの質問をそのままベクトル化するのではなく、LLMに「仮想的な理想回答」を生成させ、その回答文のベクトルで検索します。

例えば「在宅勤務は可能ですか?」という質問は、ドキュメント内の「リモートワークは週3日まで可能です」と異なる語彙を使っています。HyDEでは、LLMが生成した仮説回答のベクトルが実際のドキュメントにより近くなるため、語彙の違いを乗り越えた検索が可能になります。

フロー

質問 → LLMで仮説回答を生成 → 仮説回答をベクトル化 → ベクトルDBで類似検索
    → 関連ドキュメント取得 → LLMに渡して最終回答を生成

実装

hyde_rag.py
from litellm import completion, embedding

from common import EMBEDDING_MODEL, collection

COMPLETION_MODEL = "openai/gpt-5.5"

# --- HyDEによる検索と回答 ---
queries = [
    "在宅勤務は可能ですか?",
    "子どもが生まれたら会社を休めますか?",
]

for query in queries:
    print(f"\n{'='*60}")
    print(f"質問: {query}")
    print(f"{'='*60}")

    # Step 1: LLMで仮説回答を生成
    hyde_response = completion(
        model=COMPLETION_MODEL,
        messages=[
            {
                "role": "system",
                "content": (
                    "質問に対して、社内規定として想定される回答を1〜2文で生成してください。"
                    "正確でなくて構いません。一般的な企業の規定を想定してください。"
                ),
            },
            {"role": "user", "content": query},
        ],
    )
    hypothetical_answer = hyde_response.choices[0].message.content
    print(f"\n仮説回答: {hypothetical_answer}")

    # Step 2: 仮説回答のベクトルで検索
    hyde_embed = embedding(model=EMBEDDING_MODEL, input=[hypothetical_answer])
    results = collection.query(
        query_embeddings=[hyde_embed.data[0]["embedding"]],
        n_results=2,
    )

    retrieved_docs = results["documents"][0]
    print(f"\n検索結果({len(retrieved_docs)} 件):")
    for i, doc in enumerate(retrieved_docs, 1):
        print(f"  {i}. {doc}")

    # Step 3: 最終回答を生成
    context = "\n".join(retrieved_docs)
    final_response = completion(
        model=COMPLETION_MODEL,
        messages=[
            {
                "role": "system",
                "content": (
                    "以下の社内ドキュメントの情報のみをもとに質問に回答してください。"
                    "ドキュメントに記載がない場合は「情報が見つかりませんでした」と回答してください。\n\n"
                    f"{context}"
                ),
            },
            {"role": "user", "content": query},
        ],
    )
    print(f"\n回答: {final_response.choices[0].message.content}")

実行例

============================================================
質問: 在宅勤務は可能ですか?
============================================================

仮説回答: 在宅勤務(テレワーク)は、所定の条件を満たす社員に対して認められています。
週に数日の在宅勤務が可能で、勤怠管理システムへの記録が必要です。

検索結果(2 件):
  1. リモートワークは週3日まで可能です。事前申請は不要ですが、勤怠システムへの記録が必要です。
  2. 有給休暇は入社6ヶ月後に10日付与されます。申請は3営業日前までに上長の承認が必要です。

回答: リモートワーク(在宅勤務)は週3日まで可能です。事前申請は不要ですが、勤怠システムへの記録が必要です。

HyDEはLangGraphを使わずにLiteLLMのcompletion()embedding()だけで実装できるシンプルな手法です。「在宅勤務」→「リモートワーク」、「子どもが生まれたら休む」→「育児休業」のように、ユーザーの自然な表現とドキュメントの正式な用語のギャップを埋める効果があります。ただし、LLMによる仮説回答の生成コストが追加される点に注意してください。

RAG-Fusion(検索拡張RAG)

RAG-Fusionは、元の質問から**複数の類似クエリを生成(Multi-Query Generation)し、それぞれで検索を行った結果をReciprocal Rank Fusion(RRF)**で統合する手法です。単一クエリでは拾いきれない関連ドキュメントを、異なる視点からの検索で網羅的に取得します。HyDEが「検索クエリの質」を改善するのに対し、RAG-Fusionは「検索の網羅性」を高めるアプローチです。

フロー

[START] → [generate_queries] → [retrieve_all] → [fuse_results] → [generate] → [END]
  1. generate_queries: LLMが元の質問から3つの異なる視点のクエリを生成
  2. retrieve_all: 元の質問 + 生成クエリの計4件で検索
  3. fuse_results: RRFアルゴリズムで結果を統合・重複排除
  4. generate: 統合された上位ドキュメントから回答を生成

RRFの計算式は RRF_score(d) = Σ 1 / (k + rank(d)) で、各検索結果でのドキュメントの順位に基づきスコアを算出します(kは定数、通常60)。

実装

rag_fusion.py
from common import retrieve
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_litellm import ChatLiteLLM
from langgraph.graph import END, START, StateGraph
from typing_extensions import TypedDict

COMPLETION_MODEL = "openai/gpt-5.5"

def reciprocal_rank_fusion(
    results_list: list[list[str]], k: int = 60
) -> list[str]:
    """Reciprocal Rank Fusionで複数の検索結果を統合する。"""
    scores: dict[str, float] = {}
    for results in results_list:
        for rank, doc in enumerate(results):
            if doc not in scores:
                scores[doc] = 0.0
            scores[doc] += 1.0 / (k + rank + 1)
    sorted_docs = sorted(scores.items(), key=lambda x: x[1], reverse=True)
    return [doc for doc, _score in sorted_docs]

# =============================================
# LLMの設定
# =============================================
llm = ChatLiteLLM(model=COMPLETION_MODEL)

query_generation_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "あなたは検索クエリ生成の専門家です。\n"
            "ユーザーの質問に対して、異なる視点や言い回しの検索クエリを3つ生成してください。\n"
            "1行に1つずつ、クエリのみを出力してください。番号や記号は付けないでください。",
        ),
        ("user", "{question}"),
    ]
)

generation_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "以下の社内ドキュメントの情報のみをもとに質問に回答してください。\n"
            "ドキュメントに記載がない情報は推測せず、「情報が見つかりませんでした」と回答してください。\n\n"
            "{context}",
        ),
        ("user", "{question}"),
    ]
)

query_chain = query_generation_prompt | llm | StrOutputParser()
generation_chain = generation_prompt | llm | StrOutputParser()

# =============================================
# ステートとグラフの定義
# =============================================
class RAGFusionState(TypedDict):
    question: str
    generated_queries: list[str]
    all_results: list[list[str]]
    fused_documents: list[str]
    generation: str

def generate_queries_node(state: RAGFusionState) -> dict:
    """元の質問から複数の検索クエリを生成する。"""
    result = query_chain.invoke({"question": state["question"]})
    queries = [q.strip() for q in result.strip().split("\n") if q.strip()]
    all_queries = [state["question"]] + queries
    print(f"  [クエリ生成] {len(all_queries)} 件:")
    for q in all_queries:
        print(f"    - {q}")
    return {"generated_queries": all_queries}

def retrieve_all_node(state: RAGFusionState) -> dict:
    """全クエリに対して検索を実行する。"""
    all_results = []
    for query in state["generated_queries"]:
        docs = retrieve(query)
        all_results.append(docs)
    print(f"  [検索] {len(all_results)} 回の検索を実行")
    return {"all_results": all_results}

def fuse_results_node(state: RAGFusionState) -> dict:
    """Reciprocal Rank Fusionで結果を統合する。"""
    fused = reciprocal_rank_fusion(state["all_results"])
    print(f"  [RRF統合] {len(fused)} 件のユニークなドキュメントを統合")
    for i, doc in enumerate(fused, 1):
        print(f"    {i}. {doc[:50]}...")
    return {"fused_documents": fused[:3]}

def generate_node(state: RAGFusionState) -> dict:
    """統合されたドキュメントをもとに回答を生成する。"""
    context = "\n\n".join(state["fused_documents"])
    answer = generation_chain.invoke(
        {"context": context, "question": state["question"]}
    )
    print(f"  [生成] 回答を生成しました")
    return {"generation": answer}

# =============================================
# グラフの構築
# =============================================
graph = StateGraph(RAGFusionState)

graph.add_node("generate_queries", generate_queries_node)
graph.add_node("retrieve_all", retrieve_all_node)
graph.add_node("fuse_results", fuse_results_node)
graph.add_node("generate", generate_node)

graph.add_edge(START, "generate_queries")
graph.add_edge("generate_queries", "retrieve_all")
graph.add_edge("retrieve_all", "fuse_results")
graph.add_edge("fuse_results", "generate")
graph.add_edge("generate", END)

app = graph.compile()

# =============================================
# 実行
# =============================================
queries = [
    "休みを取るにはどうすればいいですか?",
    "お金に関する社内ルールを教えてください",
]

for query in queries:
    print(f"\n{'='*60}")
    print(f"質問: {query}")
    print(f"{'='*60}")
    result = app.invoke({
        "question": query,
        "generated_queries": [],
        "all_results": [],
        "fused_documents": [],
        "generation": "",
    })
    print(f"\n最終回答: {result['generation']}")

実行例

「休みを取るには」という曖昧な質問に対し、複数クエリが有給休暇・育児休業など異なる角度からドキュメントを拾います。

============================================================
質問: 休みを取るにはどうすればいいですか?
============================================================
  [クエリ生成] 4 件:
    - 休みを取るにはどうすればいいですか?
    - 休みを取る 方法 有給休暇 申請 会社 手続き
    - 仕事 休む ための 伝え方 上司 相談
    - 休暇 取得 条件 労働基準法 有給 ルール 日本
  [検索] 4 回の検索を実行
  [RRF統合] 2 件のユニークなドキュメントを統合
    1. 有給休暇は入社6ヶ月後に10日付与されます。申請は3営業日前までに上長の承認が必要です。...
    2. リモートワークは週3日まで可能です。事前申請は不要ですが、勤怠システムへの記録が必要です。...
  [生成] 回答を生成しました

最終回答: 有給休暇を取る場合は、以下の対応が必要です。

- 有給休暇は入社6ヶ月後に10日付与されます
- 申請は3営業日前までに行う必要があります
- 上長の承認が必要です

単一クエリでは「有給休暇」のドキュメントしか見つからない場合でも、RAG-Fusionなら「育児休業」など関連する休暇制度も網羅的に取得できます。reciprocal_rank_fusion関数がPythonの純粋な計算処理なので、LLMコストは最初のクエリ生成時のみです。

Rerank(再ランキング)

Rerankは、ベクトル検索で取得した上位N件のドキュメントを、より高精度なRerankerモデルで再評価し、上位K件に絞り込む手法です。ベクトル検索は高速ですが類似度ベースのため精度に限界があり、Rerankerはクエリとドキュメントのペアをcross-encoder等で精査するため、より関連性の高いドキュメントを上位に並べ替えられます。

LiteLLMは litellm.rerank() を提供しており、AWS Bedrock・Cohere・Jina AI・Voyage AI などのRerankerを統一インターフェースで呼び出せます。本記事ではAWS Bedrock経由のCohere Rerank 3.5(cohere.rerank-v3-5:0)を使用します。Cohere Rerankは100+言語対応で日本語の精度も高く、Bedrock経由ならAWSのIAM・請求に統合されます。

フロー

質問 → ベクトル検索でtop-N取得 → Rerankerで再評価 → top-K取得 → LLMに渡して回答生成

実装

事前にAWSの認証情報とリージョンを設定しておきます。Cohere Rerank 3.5は東京リージョン(ap-northeast-1)でも利用可能です。

$ export AWS_ACCESS_KEY_ID="..."
$ export AWS_SECRET_ACCESS_KEY="..."
$ export AWS_REGION_NAME="ap-northeast-1"

なお、Bedrock RerankはモデルIDのみの指定では動作せず、フルARN形式(arn:aws:bedrock:<region>::foundation-model/<model-id>)が必須です。Completionモデルとは仕様が異なる点に注意してください。

rerank_rag.py
from litellm import completion, rerank

from common import retrieve

COMPLETION_MODEL = "anthropic/claude-sonnet-4-6"
RERANK_MODEL = "bedrock/arn:aws:bedrock:ap-northeast-1::foundation-model/cohere.rerank-v3-5:0"

queries = [
    "休みのルールを教えてください",
    "子育て関連の制度はありますか?",
]

for query in queries:
    print(f"\n{'='*60}")
    print(f"質問: {query}")
    print(f"{'='*60}")

    # Step 1: ベクトル検索でtop-N(広めに)取得
    candidates = retrieve(query, n_results=5)
    print(f"\n[一次検索] {len(candidates)} 件取得")
    for i, doc in enumerate(candidates, 1):
        print(f"  {i}. {doc[:50]}...")

    # Step 2: Bedrock Rerankerで再評価してtop-Kに絞り込む
    rerank_response = rerank(
        model=RERANK_MODEL,
        query=query,
        documents=candidates,
        top_n=2,
    )
    reranked_docs = [candidates[r["index"]] for r in rerank_response.results]
    print(f"\n[Rerank] top-{len(reranked_docs)} に絞り込み")
    for i, r in enumerate(rerank_response.results, 1):
        doc = candidates[r["index"]]
        print(f"  {i}. (score={r['relevance_score']:.3f}) {doc[:50]}...")

    # Step 3: 絞り込んだドキュメントで回答生成
    context = "\n".join(reranked_docs)
    response = completion(
        model=COMPLETION_MODEL,
        messages=[
            {
                "role": "system",
                "content": (
                    "以下の社内ドキュメントの情報のみをもとに質問に回答してください。\n\n"
                    f"{context}"
                ),
            },
            {"role": "user", "content": query},
        ],
    )
    print(f"\n回答: {response.choices[0].message.content}")

LiteLLMのrerank()はCohere互換のレスポンス形式に正規化されているため、Bedrockを使う場合でもresults[].indexresults[].relevance_scoreでアクセスできます。

実行例

============================================================
質問: 休みのルールを教えてください
============================================================

[一次検索] 5 件取得
  1. 有給休暇は入社6ヶ月後に10日付与されます。申請は3営業日前までに上長の承認が必要です。...
  2. リモートワークは週3日まで可能です。事前申請は不要ですが、勤怠システムへの記録が必要です。...
  3. 育児休業は子が1歳になるまで取得可能です。延長の場合は最長2歳まで認められます。...
  4. 経費精算は月末締め、翌月15日払いです。領収書の原本またはスキャンデータの提出が必要です。...
  5. 健康診断は年1回、会社負担で受診できます。35歳以上は人間ドックも選択可能です。...

[Rerank] top-2 に絞り込み
  1. (score=0.200) 有給休暇は入社6ヶ月後に10日付与されます。申請は3営業日前までに上長の承認が必要です。...
  2. (score=0.106) 育児休業は子が1歳になるまで取得可能です。延長の場合は最長2歳まで認められます。...

回答: 社内ドキュメントに記載されている休暇のルールをご説明します。

## 有給休暇
- **付与タイミング**:入社6ヶ月後に**10日**付与されます
- **申請方法**:取得希望日の**3営業日前まで**に上長の承認が必要です

## 育児休業
- **取得可能期間**:子どもが**1歳になるまで**取得可能です
- **延長の場合**:最長**2歳まで**延長が認められます

ご不明な点があれば、お気軽にご質問ください。

relevance_scoreを見ると、ベクトル検索の順位とは異なる粒度でクエリとドキュメントの関連性が定量化されているのがわかります。実運用では一次検索でtop-20〜top-50ほど広く取得し、Rerankerでtop-3〜top-5に絞り込むことで、検索精度を大幅に高められます。

なお、Rerankは本記事の他のパターンとも組み合わせ可能です。例えばRAG-Fusionで複数クエリの結果を統合した後、最終段でRerankをかけることで「網羅性 + 精度」を両立できます。

Corrective RAG(自己修正RAG:CRAG)

Corrective RAGは、検索結果の関連性をLLMが判定し、関連性が低い場合はクエリを書き換えて再検索するパターンです。

フロー

[START] → [retrieve] → [grade_documents] → 関連あり → [generate] → [END]
                                         → 関連なし → [rewrite_query] → [retrieve] → ...

実装

corrective_rag.py
from common import retrieve
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_litellm import ChatLiteLLM
from langgraph.graph import END, START, StateGraph
from typing_extensions import TypedDict

COMPLETION_MODEL = "openai/gpt-5.5"

# =============================================
# LLMの設定
# =============================================
llm = ChatLiteLLM(model=COMPLETION_MODEL)

# 関連性判定用プロンプト
grading_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "あなたはドキュメントの関連性を判定する専門家です。\n"
            "ドキュメントがユーザーの質問に回答するための情報を含んでいるかを判定してください。\n"
            "関連がある場合は 'relevant'、関連がない場合は 'not_relevant' とだけ回答してください。",
        ),
        (
            "user",
            "ドキュメント:\n{document}\n\n質問: {question}",
        ),
    ]
)

# クエリ書き換え用プロンプト
rewrite_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "あなたは検索クエリの最適化の専門家です。\n"
            "ユーザーの質問を、ベクトル検索でより良い結果が得られるように書き換えてください。\n"
            "書き換えたクエリのみを出力してください。",
        ),
        ("user", "元の質問: {question}"),
    ]
)

# 回答生成用プロンプト
generation_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "以下の社内ドキュメントの情報のみをもとに質問に回答してください。\n"
            "ドキュメントに記載がない情報は推測せず、「情報が見つかりませんでした」と回答してください。\n\n"
            "{context}",
        ),
        ("user", "{question}"),
    ]
)

grading_chain = grading_prompt | llm | StrOutputParser()
rewrite_chain = rewrite_prompt | llm | StrOutputParser()
generation_chain = generation_prompt | llm | StrOutputParser()

# =============================================
# ステートとグラフの定義
# =============================================
class RAGState(TypedDict):
    question: str
    documents: list[str]
    generation: str
    retry_count: int

def retrieve_node(state: RAGState) -> dict:
    """ドキュメントを検索する。"""
    print(f"  [検索] クエリ: {state['question']}")
    docs = retrieve(state["question"])
    print(f"  [検索] {len(docs)} 件取得")
    return {"documents": docs}

def grade_documents_node(state: RAGState) -> dict:
    """検索結果の関連性を判定し、関連のあるドキュメントだけを残す。"""
    relevant_docs = []
    for doc in state["documents"]:
        grade = grading_chain.invoke({"document": doc, "question": state["question"]})
        if "relevant" in grade.lower() and "not_relevant" not in grade.lower():
            relevant_docs.append(doc)
            print(f"  [判定] relevant: {doc[:40]}...")
        else:
            print(f"  [判定] not_relevant: {doc[:40]}...")
    return {"documents": relevant_docs}

def rewrite_query_node(state: RAGState) -> dict:
    """クエリを書き換えて再検索の精度を上げる。"""
    new_query = rewrite_chain.invoke({"question": state["question"]})
    print(f"  [書き換え] '{state['question']}' → '{new_query}'")
    return {"question": new_query, "retry_count": state["retry_count"] + 1}

def generate_node(state: RAGState) -> dict:
    """関連ドキュメントをもとに回答を生成する。"""
    context = "\n\n".join(state["documents"])
    answer = generation_chain.invoke({"context": context, "question": state["question"]})
    print(f"  [生成] 回答を生成しました")
    return {"generation": answer}

def decide_after_grading(state: RAGState) -> str:
    """関連ドキュメントがあれば回答生成、なければクエリ書き換え。"""
    if state["documents"]:
        return "generate"
    if state["retry_count"] >= 2:
        return "generate"  # リトライ上限に達したらそのまま生成
    return "rewrite"

# =============================================
# グラフの構築
# =============================================
graph = StateGraph(RAGState)

# ノード追加
graph.add_node("retrieve", retrieve_node)
graph.add_node("grade_documents", grade_documents_node)
graph.add_node("rewrite_query", rewrite_query_node)
graph.add_node("generate", generate_node)

# エッジ定義
graph.add_edge(START, "retrieve")
graph.add_edge("retrieve", "grade_documents")
graph.add_conditional_edges(
    "grade_documents",
    decide_after_grading,
    {"generate": "generate", "rewrite": "rewrite_query"},
)
graph.add_edge("rewrite_query", "retrieve")
graph.add_edge("generate", END)

app = graph.compile()

# =============================================
# 実行
# =============================================
queries = [
    "リモートワークは週何日までできますか?",
    "社員旅行の補助金はいくらですか?",
]

for query in queries:
    print(f"\n{'='*60}")
    print(f"質問: {query}")
    print(f"{'='*60}")
    result = app.invoke({"question": query, "documents": [], "generation": "", "retry_count": 0})
    print(f"\n最終回答: {result['generation']}")

実行例

関連ドキュメントが見つかる場合は、1回の検索で回答が生成されます。

============================================================
質問: リモートワークは週何日までできますか?
============================================================
  [検索] クエリ: リモートワークは週何日までできますか?
  [検索] 2 件取得
  [判定] relevant: リモートワークは週3日まで可能です。事前申請は不要ですが...
  [判定] not_relevant: 経費精算は月末締め、翌月15日払いです。領収書の原本ま...
  [生成] 回答を生成しました

最終回答: リモートワークは週3日まで可能です。事前申請は不要ですが、勤怠システムへの記録が必要です。

ドキュメントに情報がない質問の場合は、クエリ書き換え→再検索を試みた上で回答します。

============================================================
質問: 社員旅行の補助金はいくらですか?
============================================================
  [検索] クエリ: 社員旅行の補助金はいくらですか?
  [検索] 2 件取得
  [判定] not_relevant: 経費精算は月末締め、翌月15日払いです。領収書の原本ま...
  [判定] not_relevant: 健康診断は年1回、会社負担で受診できます。35歳以上は...
  [書き換え] '社員旅行の補助金はいくらですか?' → '社員旅行 補助金 福利厚生 手当'
  [検索] クエリ: 社員旅行 補助金 福利厚生 手当
  [検索] 2 件取得
  [判定] not_relevant: ...
  [判定] not_relevant: ...
  [生成] 回答を生成しました

最終回答: 社員旅行の補助金に関する情報は見つかりませんでした。

条件分岐のdecide_after_gradingが、関連ドキュメントの有無に応じて「回答生成」か「クエリ書き換え→再検索」かを振り分けます。無限ループを防ぐため、retry_countでリトライ回数を制限しています。

Self-RAG(自己反省RAG)

Self-RAGは、LLMが検索結果の関連性だけでなく、生成した回答の事実性(ハルシネーション)と有用性も自己評価する手法です。Corrective RAGがドキュメントの品質を評価するのに対し、Self-RAGは回答そのものの品質もチェックします。

フロー

[START] → [retrieve] → [grade_documents] → 関連あり → [generate] → [grade_generation]
                                          → 関連なし → [rewrite_query] → [retrieve]

                                                                   合格 → [END]
                                                                   不合格 → [rewrite_query] → [retrieve]

Corrective RAGとの主な違いは、grade_generationノードで以下2点を追加で判定する点です。

  1. 事実性(Grounded): 回答がドキュメントの内容に基づいているか
  2. 有用性(Useful): 回答がユーザーの質問に対する有用な回答になっているか

実装

self_rag.py
from common import retrieve
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_litellm import ChatLiteLLM
from langgraph.graph import END, START, StateGraph
from typing_extensions import TypedDict

COMPLETION_MODEL = "openai/gpt-5.5"

# =============================================
# LLMの設定
# =============================================
llm = ChatLiteLLM(model=COMPLETION_MODEL)

grading_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "あなたはドキュメントの関連性を判定する専門家です。\n"
            "ドキュメントがユーザーの質問に回答するための情報を含んでいるかを判定してください。\n"
            "関連がある場合は 'relevant'、関連がない場合は 'not_relevant' とだけ回答してください。",
        ),
        ("user", "ドキュメント:\n{document}\n\n質問: {question}"),
    ]
)

rewrite_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "あなたは検索クエリの最適化の専門家です。\n"
            "ユーザーの質問を、ベクトル検索でより良い結果が得られるように書き換えてください。\n"
            "書き換えたクエリのみを出力してください。",
        ),
        ("user", "元の質問: {question}"),
    ]
)

generation_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "以下の社内ドキュメントの情報のみをもとに質問に回答してください。\n"
            "ドキュメントに記載がない情報は推測せず、「情報が見つかりませんでした」と回答してください。\n\n"
            "{context}",
        ),
        ("user", "{question}"),
    ]
)

# --- Self-RAG 固有: 回答の品質を判定するプロンプト ---
hallucination_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "あなたは回答の事実性を判定する専門家です。\n"
            "回答がドキュメントの内容に基づいているかを判定してください。\n"
            "基づいている場合は 'grounded'、基づいていない場合は 'not_grounded' とだけ回答してください。",
        ),
        ("user", "ドキュメント:\n{documents}\n\n回答: {generation}"),
    ]
)

usefulness_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "あなたは回答の有用性を判定する専門家です。\n"
            "回答がユーザーの質問に対して有用な回答になっているかを判定してください。\n"
            "有用な場合は 'useful'、有用でない場合は 'not_useful' とだけ回答してください。",
        ),
        ("user", "質問: {question}\n\n回答: {generation}"),
    ]
)

grading_chain = grading_prompt | llm | StrOutputParser()
rewrite_chain = rewrite_prompt | llm | StrOutputParser()
generation_chain = generation_prompt | llm | StrOutputParser()
hallucination_chain = hallucination_prompt | llm | StrOutputParser()
usefulness_chain = usefulness_prompt | llm | StrOutputParser()

# =============================================
# ステートとグラフの定義
# =============================================
class SelfRAGState(TypedDict):
    question: str
    documents: list[str]
    generation: str
    retry_count: int
    generation_grade: str  # "accepted" / "not_grounded" / "not_useful"

def retrieve_node(state: SelfRAGState) -> dict:
    """ドキュメントを検索する。"""
    print(f"  [検索] クエリ: {state['question']}")
    docs = retrieve(state["question"])
    print(f"  [検索] {len(docs)} 件取得")
    return {"documents": docs}

def grade_documents_node(state: SelfRAGState) -> dict:
    """検索結果の関連性を判定する。"""
    relevant_docs = []
    for doc in state["documents"]:
        grade = grading_chain.invoke({"document": doc, "question": state["question"]})
        if "relevant" in grade.lower() and "not_relevant" not in grade.lower():
            relevant_docs.append(doc)
            print(f"  [文書判定] relevant: {doc[:40]}...")
        else:
            print(f"  [文書判定] not_relevant: {doc[:40]}...")
    return {"documents": relevant_docs}

def rewrite_query_node(state: SelfRAGState) -> dict:
    """クエリを書き換えて再検索の精度を上げる。"""
    new_query = rewrite_chain.invoke({"question": state["question"]})
    print(f"  [書き換え] '{state['question']}' → '{new_query}'")
    return {"question": new_query, "retry_count": state["retry_count"] + 1}

def generate_node(state: SelfRAGState) -> dict:
    """関連ドキュメントをもとに回答を生成する。"""
    context = "\n\n".join(state["documents"]) if state["documents"] else "関連ドキュメントなし"
    answer = generation_chain.invoke({"context": context, "question": state["question"]})
    print(f"  [生成] 回答を生成しました")
    return {"generation": answer}

def grade_generation_node(state: SelfRAGState) -> dict:
    """生成された回答の事実性と有用性を判定する(Self-RAG固有)。"""
    docs_text = "\n\n".join(state["documents"]) if state["documents"] else ""

    # ハルシネーションチェック
    h_result = hallucination_chain.invoke({
        "documents": docs_text,
        "generation": state["generation"],
    })
    is_grounded = "grounded" in h_result.lower() and "not_grounded" not in h_result.lower()
    print(f"  [事実性] {'grounded' if is_grounded else 'not_grounded'}")

    if not is_grounded:
        return {"generation_grade": "not_grounded"}

    # 有用性チェック
    u_result = usefulness_chain.invoke({
        "question": state["question"],
        "generation": state["generation"],
    })
    is_useful = "useful" in u_result.lower() and "not_useful" not in u_result.lower()
    print(f"  [有用性] {'useful' if is_useful else 'not_useful'}")

    return {"generation_grade": "accepted" if is_useful else "not_useful"}

def decide_after_grading_docs(state: SelfRAGState) -> str:
    """関連ドキュメントがあれば回答生成、なければクエリ書き換え。"""
    if state["documents"]:
        return "generate"
    if state["retry_count"] >= 2:
        return "generate"
    return "rewrite"

def decide_after_grading_generation(state: SelfRAGState) -> str:
    """回答品質に応じて終了か再試行かを判定する。"""
    if state["generation_grade"] == "accepted":
        return "accepted"
    if state["retry_count"] >= 2:
        return "accepted"  # リトライ上限
    print(f"  [再試行] 回答品質が不十分なため再検索します")
    return "retry"

# =============================================
# グラフの構築
# =============================================
graph = StateGraph(SelfRAGState)

graph.add_node("retrieve", retrieve_node)
graph.add_node("grade_documents", grade_documents_node)
graph.add_node("rewrite_query", rewrite_query_node)
graph.add_node("generate", generate_node)
graph.add_node("grade_generation", grade_generation_node)

graph.add_edge(START, "retrieve")
graph.add_edge("retrieve", "grade_documents")
graph.add_conditional_edges(
    "grade_documents",
    decide_after_grading_docs,
    {"generate": "generate", "rewrite": "rewrite_query"},
)
graph.add_edge("rewrite_query", "retrieve")
graph.add_edge("generate", "grade_generation")
graph.add_conditional_edges(
    "grade_generation",
    decide_after_grading_generation,
    {"accepted": END, "retry": "rewrite_query"},
)

app = graph.compile()

# =============================================
# 実行
# =============================================
queries = [
    "リモートワークは週何日までできますか?",
    "副業は許可されていますか?",
]

for query in queries:
    print(f"\n{'='*60}")
    print(f"質問: {query}")
    print(f"{'='*60}")
    result = app.invoke({
        "question": query,
        "documents": [],
        "generation": "",
        "retry_count": 0,
        "generation_grade": "",
    })
    print(f"\n最終回答: {result['generation']}")

実行例

ドキュメントに情報がある質問では、回答生成後にハルシネーションチェックと有用性チェックを通過して回答が確定します。

============================================================
質問: リモートワークは週何日までできますか?
============================================================
  [検索] クエリ: リモートワークは週何日までできますか?
  [検索] 2 件取得
  [文書判定] relevant: リモートワークは週3日まで可能です。事前申請は不要ですが、勤怠システムへの記録が...
  [文書判定] not_relevant: 有給休暇は入社6ヶ月後に10日付与されます。申請は3営業日前までに上長の承認が必...
  [生成] 回答を生成しました
  [事実性] grounded
  [有用性] useful

最終回答: リモートワークは週3日まで可能です。

ドキュメントに情報がない質問では、クエリ書き換え→再検索を試みた上で、最終的にハルシネーションのない回答を生成します。

============================================================
質問: 副業は許可されていますか?
============================================================
  [検索] クエリ: 副業は許可されていますか?
  [検索] 2 件取得
  [文書判定] not_relevant: ...
  [文書判定] not_relevant: ...
  [書き換え] '副業は許可されていますか?' → '副業 兼業 許可 就業規則'
  [検索] クエリ: 副業 兼業 許可 就業規則
  [検索] 2 件取得
  [文書判定] not_relevant: ...
  [文書判定] not_relevant: ...
  [生成] 回答を生成しました
  [事実性] grounded
  [有用性] not_useful

最終回答: 情報が見つかりませんでした。

Corrective RAGではgrade_documentsまでだった品質チェックが、Self-RAGではgrade_generationとして回答生成後にも追加されています。これにより、検索結果が適切でも回答にハルシネーションが含まれるケースを検出して再試行できます。

Adaptive RAG(適応型RAG)

Adaptive RAGは、質問の複雑さに応じて処理ルートを動的に切り替える手法です。すべての質問に重いRAGパイプラインを適用するのではなく、以下の3ルートに振り分けます。

  • direct: 挨拶や一般知識の質問 → 検索なしでLLMが直接回答
  • vectorstore: 社内制度に関する質問 → ベクトルDBで検索して回答
  • web_search: 最新ニュースなどの質問 → Web検索して回答

Corrective RAGが「検索後の品質評価」、RAG-Fusionが「検索入口の拡張」であるのに対し、Adaptive RAGは「検索前のルーティング」に焦点を当てたアプローチです。

フロー

[START] → [route_query] → direct → [direct_answer] → [END]
                        → vectorstore → [retrieve] → [generate] → [END]
                        → web_search → [web_search] → [generate] → [END]

実装

adaptive_rag.py
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_litellm import ChatLiteLLM
from langgraph.graph import END, START, StateGraph
from litellm import completion as litellm_completion
from typing_extensions import TypedDict

from common import retrieve

COMPLETION_MODEL = "openai/gpt-5.5"

# =============================================
# LLMの設定
# =============================================
llm = ChatLiteLLM(model=COMPLETION_MODEL)

router_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "あなたは質問の分類の専門家です。\n"
            "質問を以下の3つのカテゴリに分類してください。\n"
            "- vectorstore: 社内制度・規定に関する質問\n"
            "- direct: 一般的な挨拶や社内制度と無関係な質問\n"
            "- web_search: 最新ニュースや時事問題に関する質問\n"
            "カテゴリ名のみを出力してください。",
        ),
        ("user", "{question}"),
    ]
)

generation_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "以下の社内ドキュメントの情報のみをもとに質問に回答してください。\n"
            "ドキュメントに記載がない情報は推測せず、「情報が見つかりませんでした」と回答してください。\n\n"
            "{context}",
        ),
        ("user", "{question}"),
    ]
)

router_chain = router_prompt | llm | StrOutputParser()
generation_chain = generation_prompt | llm | StrOutputParser()

# =============================================
# ステートとグラフの定義
# =============================================
class AdaptiveRAGState(TypedDict):
    question: str
    route: str
    documents: list[str]
    generation: str

def route_query_node(state: AdaptiveRAGState) -> dict:
    """質問を分類してルーティング先を決定する。"""
    result = router_chain.invoke({"question": state["question"]}).strip().lower()
    if "vectorstore" in result:
        route = "vectorstore"
    elif "web_search" in result or "web" in result:
        route = "web_search"
    else:
        route = "direct"
    print(f"  [ルーティング] {route}")
    return {"route": route}

def retrieve_node(state: AdaptiveRAGState) -> dict:
    """ベクトルDBからドキュメントを検索する。"""
    docs = retrieve(state["question"])
    print(f"  [検索] {len(docs)} 件取得")
    return {"documents": docs}

def web_search_node(state: AdaptiveRAGState) -> dict:
    """Web検索を実行する(※本デモではLLMの知識で代替)。"""
    # 実運用では Tavily API や Google Search API を使用
    print(f"  [Web検索] ※実運用ではWeb検索APIを使用")
    response = litellm_completion(
        model=COMPLETION_MODEL,
        messages=[
            {
                "role": "system",
                "content": "質問に対してあなたの知識をもとに回答の参考情報を提供してください。",
            },
            {"role": "user", "content": state["question"]},
        ],
    )
    return {"documents": [response.choices[0].message.content]}

def generate_node(state: AdaptiveRAGState) -> dict:
    """検索結果をもとに回答を生成する。"""
    context = "\n\n".join(state["documents"])
    answer = generation_chain.invoke({"context": context, "question": state["question"]})
    print(f"  [生成] 回答を生成しました")
    return {"generation": answer}

def direct_answer_node(state: AdaptiveRAGState) -> dict:
    """検索なしでLLMが直接回答する。"""
    response = litellm_completion(
        model=COMPLETION_MODEL,
        messages=[
            {"role": "system", "content": "質問に対して簡潔に回答してください。"},
            {"role": "user", "content": state["question"]},
        ],
    )
    print(f"  [直接回答] 検索なしで回答")
    return {"generation": response.choices[0].message.content}

def route_decision(state: AdaptiveRAGState) -> str:
    """ルーティング先に応じて次のノードを決定する。"""
    route = state["route"]
    if route == "vectorstore":
        return "retrieve"
    elif route == "web_search":
        return "web_search"
    return "direct_answer"

# =============================================
# グラフの構築
# =============================================
graph = StateGraph(AdaptiveRAGState)

graph.add_node("route_query", route_query_node)
graph.add_node("retrieve", retrieve_node)
graph.add_node("web_search", web_search_node)
graph.add_node("generate", generate_node)
graph.add_node("direct_answer", direct_answer_node)

graph.add_edge(START, "route_query")
graph.add_conditional_edges(
    "route_query",
    route_decision,
    {"retrieve": "retrieve", "web_search": "web_search", "direct_answer": "direct_answer"},
)
graph.add_edge("retrieve", "generate")
graph.add_edge("web_search", "generate")
graph.add_edge("generate", END)
graph.add_edge("direct_answer", END)

app = graph.compile()

# =============================================
# 実行
# =============================================
queries = [
    "こんにちは!",                                       # → direct
    "有給休暇の申請方法を教えてください",                 # → vectorstore
    "最近のAI業界のトレンドは?",                         # → web_search
    "経費精算の締め日はいつですか?",                     # → vectorstore
]

for query in queries:
    print(f"\n{'='*60}")
    print(f"質問: {query}")
    print(f"{'='*60}")
    result = app.invoke({
        "question": query,
        "route": "",
        "documents": [],
        "generation": "",
    })
    print(f"\n最終回答: {result['generation']}")

実行例

質問の性質に応じて異なるルートが選択されます。

============================================================
質問: こんにちは!
============================================================
  [ルーティング] direct
  [直接回答] 検索なしで回答

最終回答: こんにちは!どうしましたか?

============================================================
質問: 有給休暇の申請方法を教えてください
============================================================
  [ルーティング] vectorstore
  [検索] 2 件取得
  [生成] 回答を生成しました

最終回答: 有給休暇は、3営業日前までに上長の承認を得て申請してください。

確認できた情報は以下です。
- 有給休暇は入社6ヶ月後に10日付与
- 申請は3営業日前までに上長の承認が必要

申請手順の詳細までは、情報が見つかりませんでした。

============================================================
質問: 最近のAI業界のトレンドは?
============================================================
  [ルーティング] web_search
  [Web検索] ※実運用ではWeb検索APIを使用
  [生成] 回答を生成しました

最終回答: 最近のAI業界のトレンドは、重要どころに絞ると次の10点です。

1. **生成AIの実用化が加速**  
   対話AIだけでなく、業務自動化、社内ナレッジ検索、文章作成、要約、翻訳、コーディング支援など、実務への組み込みが進んでいます。  
....

Adaptive RAGのポイントはroute_queryノードで質問を分類し、add_conditional_edgesで処理ルートを分岐させる点です。挨拶のような質問にまでドキュメント検索を行う無駄を省き、コストとレイテンシを最適化します。実運用では、このルーティングの内部にCorrective RAGやSelf-RAGを組み込むことで、さらに堅牢なパイプラインを構築できます。

Agentic RAG(エージェント型RAG)

Agentic RAGは、LLMエージェントが検索の要否を自律的に判断するパターンです。社内制度の質問にはドキュメント検索を行い、挨拶や一般知識の質問にはツールを使わず直接回答します。

agentic_rag.py
from langchain.agents import create_agent
from langchain_core.tools import tool
from langchain_litellm import ChatLiteLLM

from common import retrieve

COMPLETION_MODEL = "anthropic/claude-sonnet-4-6"

# =============================================
# ツール定義
# =============================================
@tool
def search_company_docs(query: str) -> str:
    """社内ドキュメントを検索します。社内規定や制度に関する質問に使ってください。"""
    return "\n\n".join(retrieve(query))

# =============================================
# エージェントの構築
# =============================================
system_prompt = (
    "あなたは社内ドキュメントに基づいて質問に回答するアシスタントです。\n"
    "以下のルールに従ってください:\n"
    "1. 社内制度に関する質問には、必ず search_company_docs ツールで検索してから回答してください\n"
    "2. 検索結果に情報がない場合は「社内ドキュメントに該当する情報が見つかりませんでした」と回答してください\n"
    "3. 一般的な知識で回答できる質問(挨拶など)には、ツールを使わず直接回答してください\n"
    "4. 回答は簡潔にまとめてください"
)

llm = ChatLiteLLM(model=COMPLETION_MODEL)
agent = create_agent(llm, tools=[search_company_docs], system_prompt=system_prompt)

# =============================================
# 実行
# =============================================
queries = [
    "リモートワークのルールを教えてください",           # → ツール使用
    "こんにちは!",                                       # → 直接回答
    "育児休業はいつまで取れますか?",                     # → ツール使用
    "経費精算の締め日と支払日を教えてください",           # → ツール使用
    "Pythonのリスト内包表記について教えてください",       # → 直接回答
]

for query in queries:
    print(f"\n{'='*60}")
    print(f"質問: {query}")
    print(f"{'='*60}")
    result = agent.invoke({"messages": [{"role": "user", "content": query}]})
    final = result["messages"][-1]
    print(f"回答: {final.content}")

社内制度の質問にはsearch_company_docsツールで検索してから回答し、挨拶やプログラミングの質問にはツールを使わず直接回答します。エージェントが質問の性質を判断して、検索の要否を自律的に決定しているのがポイントです。

RAGパターンの比較

観点 シンプルRAG HyDE RAG-Fusion Rerank CRAG Self-RAG Adaptive RAG Agentic RAG
LangGraph 不要 不要 使用 不要 使用 使用 使用 使用
主な改善点 - 語彙ギャップ解消 検索網羅性向上 検索精度向上 検索品質チェック 回答品質チェック 動的ルーティング 自律的な検索判断
検索の判断 常に検索 常に検索 常に検索 常に検索 常に検索 常に検索 LLMが分類 エージェントが判断
制御フロー 固定 固定 固定 固定 固定(分岐にLLM判定) 固定(分岐にLLM判定) 固定(分岐にLLM判定) LLMが動的に決定
適したケース プロトタイプ 語彙差が大きい場合 曖昧・広範な質問 検索精度重視 検索精度の最大化 ハルシネーション防止 多様な質問タイプ 汎用アシスタント

各パターンは排他的ではなく組み合わせ可能で、例えばAdaptive RAGでルーティングしつつ、検索ルート内でCorrective RAGの品質チェックを行い、最終回答にSelf-RAGのハルシネーション判定を適用する、といった統合アーキテクチャも構築できます。

まとめ

LiteLLMを使ったRAGパイプラインの構築方法を、シンプルRAGからHyDE・RAG-Fusion・Rerank・Corrective RAG・Self-RAG・Adaptive RAG・Agentic RAGまで段階的に紹介しました。

LiteLLMのembedding()completion()rerank()を使うことで、すべてのパターンをプロバイダー非依存で実装できます。LangGraphと組み合わせることで、検索結果の評価・クエリ書き換え・ルーティングといった高度な制御をグラフの条件分岐として明示的に実装できます。

最後まで読んでいただきありがとうございました。


生成AI活用はクラスメソッドにお任せ

過去に支援してきた生成AIの支援実績100+を元にホワイトペーパーを作成しました。御社が抱えている課題のうち、どれが解決できて、どのようなサービスが受けられるのか?4つのフェーズに分けてまとめています。どうぞお気軽にご覧ください。

生成AI資料イメージ

無料でダウンロードする

この記事をシェアする

関連記事