LangChain 混合检索 RAG 实战:从构建到优化全流程

本文详细讲解 RAG 系统的完整构建过程,重点介绍混合检索(Hybrid Search)的原理与实现,涵盖向量化方案选型、索引构建、检索优化等核心内容。


一、RAG 核心架构

RAG(Retrieval-Augmented Generation,检索增强生成)的本质是:先检索,再生成

                    ┌─────────────────────────────────┐
                    │         离线阶段(建库)           │
                    │                                  │
  原始文档 ──▶ 文档切分 ──▶ 向量化 ──▶ 向量库          │
               │                      │               │
               └──────▶ 关键词索引 ────┘               │
                    └─────────────────────────────────┘

                    ┌─────────────────────────────────┐
                    │         在线阶段(检索生成)       │
                    │                                  │
  用户提问 ──▶ 查询处理 ──▶ 向量检索 ──┐              │
                    │                  ├─▶ 融合排序 ──▶ LLM ──▶ 回答
                    │      关键词检索 ──┘              │
                    └─────────────────────────────────┘

为什么需要混合检索?

单一检索方式各有缺陷:

检索方式 优势 劣势
纯向量检索 语义理解强,能找到近义表达 精确词匹配差,专有名词易漏召
纯关键词检索(BM25) 精确词匹配,专有名词准确 无语义理解,同义词无法匹配
混合检索 兼顾语义 + 精确匹配 实现略复杂

举例: 查询"Redis 缓存雪崩怎么解决"

  • 纯向量:可能召回"缓存击穿"的文档(语义相近但不是你要的)
  • BM25:精准召回含"缓存雪崩"关键词的文档
  • 混合:两者结合,效果最佳

二、向量化方案选型

2.1 Embedding 模型对比

模型 类型 维度 中文效果 适用场景
text-embedding-3-large API(OpenAI) 3072 良好 不敏感数据,快速上手
text-embedding-3-small API(OpenAI) 1536 一般 成本敏感场景
BAAI/bge-large-zh-v1.5 本地部署 1024 优秀 中文为主,数据安全要求高
BAAI/bge-m3 本地部署 1024 优秀 多语言,支持稀疏+稠密双向量
jinaai/jina-embeddings-v2 API/本地 768 良好 长文档(支持 8192 token)

推荐组合(中文企业场景):

向量检索:BAAI/bge-large-zh-v1.5(本地,数据安全)
关键词检索:BM25(经典,无需额外资源)
向量库:Milvus(生产) / Chroma(开发测试)

2.2 向量相似度计算方式

# 三种常见距离/相似度
# 1. 余弦相似度(Cosine Similarity)—— 最常用
#    适合:文本语义匹配,不受向量长度影响
cosine_similarity = dot(a, b) / (norm(a) * norm(b))

# 2. 内积(Inner Product / Dot Product)
#    适合:已归一化的向量(效果等同余弦)

# 3. 欧氏距离(L2 Distance)
#    适合:图像特征等,文本场景少用

结论:文本场景统一用余弦相似度,bge 系列模型输出向量默认已归一化。


三、完整项目结构

rag_project/
├── config.py              # 配置管理
├── document_processor.py  # 文档加载 & 切分
├── vector_store.py        # 向量库管理
├── hybrid_retriever.py    # 混合检索核心
├── rag_chain.py           # RAG 链组装
├── main.py                # 入口
└── requirements.txt
# requirements.txt
langchain>=0.2.0
langchain-community>=0.2.0
langchain-openai>=0.1.0
chromadb>=0.5.0
sentence-transformers>=3.0.0
rank-bm25>=0.2.2
faiss-cpu>=1.8.0
tiktoken>=0.7.0
python-dotenv>=1.0.0

四、Step 1:文档加载与智能切分

4.1 文档加载

# document_processor.py
from langchain_community.document_loaders import (
    PyPDFLoader,
    TextLoader,
    UnstructuredMarkdownLoader,
    DirectoryLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from typing import List
import re


class DocumentProcessor:
    """文档处理器:加载 + 清洗 + 切分"""

    def __init__(self, chunk_size: int = 512, chunk_overlap: int = 64):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

        # RecursiveCharacterTextSplitter 是最推荐的切分器
        # 按优先级依次尝试:段落 → 句子 → 词 → 字符
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", "。", "!", "?", ";", " ", ""],
            length_function=len,
        )

    def load_directory(self, directory_path: str) -> List[Document]:
        """批量加载目录下的文档"""
        loaders = {
            "**/*.pdf": PyPDFLoader,
            "**/*.txt": TextLoader,
            "**/*.md": UnstructuredMarkdownLoader,
        }

        all_docs = []
        for glob_pattern, loader_cls in loaders.items():
            try:
                loader = DirectoryLoader(
                    directory_path,
                    glob=glob_pattern,
                    loader_cls=loader_cls,
                    show_progress=True
                )
                docs = loader.load()
                all_docs.extend(docs)
                print(f"✅ 加载 {glob_pattern}: {len(docs)} 个文档")
            except Exception as e:
                print(f"⚠️  加载 {glob_pattern} 失败: {e}")

        return all_docs

    def clean_text(self, text: str) -> str:
        """文本清洗:去除噪音"""
        # 去除多余空白行
        text = re.sub(r'\n{3,}', '\n\n', text)
        # 去除页眉页脚等噪音(根据实际情况调整)
        text = re.sub(r'第\s*\d+\s*页', '', text)
        # 去除特殊控制字符
        text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', text)
        return text.strip()

    def split_documents(self, documents: List[Document]) -> List[Document]:
        """切分文档并添加元数据"""
        # 清洗文本
        for doc in documents:
            doc.page_content = self.clean_text(doc.page_content)

        # 过滤空文档
        documents = [doc for doc in documents if len(doc.page_content) > 50]

        # 切分
        chunks = self.text_splitter.split_documents(documents)

        # 给每个 chunk 添加序号(方便溯源)
        for i, chunk in enumerate(chunks):
            chunk.metadata["chunk_id"] = i
            chunk.metadata["chunk_size"] = len(chunk.page_content)

        print(f"📄 切分完成:{len(documents)} 个文档 → {len(chunks)} 个 chunks")
        return chunks

4.2 切分策略说明

原始文档(5000字)
       │
       ▼
RecursiveCharacterTextSplitter
       │
       ├── chunk_size = 512     # 每块最大字符数
       ├── chunk_overlap = 64   # 相邻块重叠字符数(保证上下文连续性)
       │
       ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│  Chunk 1         │ │  Chunk 2         │ │  Chunk 3         │
│  [0 : 512]       │ │  [448 : 960]     │ │  [896 : 1408]    │
│        ←64→      │ │  ←64→            │ │                  │
└──────────────────┘ └──────────────────┘ └──────────────────┘
        重叠部分确保语义不在边界处断裂

五、Step 2:构建向量库

# vector_store.py
from langchain_community.vectorstores import Chroma, FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain.schema import Document
from typing import List, Optional
import os


class VectorStoreManager:
    """向量库管理器"""

    def __init__(self, embedding_type: str = "bge", persist_dir: str = "./chroma_db"):
        self.embedding_type = embedding_type
        self.persist_dir = persist_dir
        self.embeddings = self._build_embeddings()
        self.vector_store = None

    def _build_embeddings(self):
        """构建 Embedding 模型"""

        if self.embedding_type == "bge":
            # 推荐:本地部署,中文效果最好
            # 首次运行会自动下载模型(约 1.3GB)
            return HuggingFaceEmbeddings(
                model_name="BAAI/bge-large-zh-v1.5",
                model_kwargs={"device": "cpu"},  # GPU 改为 "cuda"
                encode_kwargs={
                    "normalize_embeddings": True,  # 归一化,使余弦相似度 = 内积
                    "batch_size": 32               # 批量编码,提升速度
                },
                # BGE 模型推荐在查询时加前缀,提升检索效果
                query_instruction="为这个句子生成表示以用于检索相关文章:"
            )

        elif self.embedding_type == "openai":
            # API 方式,无需本地 GPU
            return OpenAIEmbeddings(
                model="text-embedding-3-large",
                dimensions=1536  # 可以降维节省存储
            )

        elif self.embedding_type == "bge-m3":
            # 支持稠密 + 稀疏双向量,单模型实现混合检索
            return HuggingFaceEmbeddings(
                model_name="BAAI/bge-m3",
                model_kwargs={"device": "cpu"},
                encode_kwargs={"normalize_embeddings": True}
            )

        else:
            raise ValueError(f"不支持的 embedding 类型: {self.embedding_type}")

    def build_from_documents(self, documents: List[Document], use_faiss: bool = False):
        """从文档列表构建向量库"""
        print(f"🔨 开始构建向量库({self.embedding_type} embeddings)...")

        if use_faiss:
            # FAISS:纯内存,速度极快,适合中小规模
            self.vector_store = FAISS.from_documents(
                documents=documents,
                embedding=self.embeddings
            )
            # 保存到磁盘
            self.vector_store.save_local(self.persist_dir)
            print(f"✅ FAISS 向量库已保存到 {self.persist_dir}")
        else:
            # Chroma:支持持久化,适合开发阶段
            self.vector_store = Chroma.from_documents(
                documents=documents,
                embedding=self.embeddings,
                persist_directory=self.persist_dir,
                collection_metadata={"hnsw:space": "cosine"}  # 指定余弦距离
            )
            print(f"✅ Chroma 向量库已保存到 {self.persist_dir}")

        return self.vector_store

    def load_existing(self, use_faiss: bool = False):
        """加载已存在的向量库(避免重复构建)"""
        if use_faiss:
            self.vector_store = FAISS.load_local(
                self.persist_dir,
                self.embeddings,
                allow_dangerous_deserialization=True
            )
        else:
            self.vector_store = Chroma(
                persist_directory=self.persist_dir,
                embedding_function=self.embeddings
            )
        print(f"✅ 向量库加载成功")
        return self.vector_store

    def get_retriever(self, top_k: int = 5):
        """获取纯向量检索器"""
        return self.vector_store.as_retriever(
            search_type="similarity",    # 也可以用 "mmr" 减少结果冗余
            search_kwargs={"k": top_k}
        )

六、Step 3:混合检索核心实现

6.1 混合检索架构

用户查询
   │
   ├──▶ 向量检索(Chroma/FAISS)──▶ Top-K 结果 + 相似度分数
   │
   └──▶ BM25 检索(关键词)──────▶ Top-K 结果 + BM25 分数
               │
               ▼
        RRF 融合排序(Reciprocal Rank Fusion)
               │
               ▼
        最终 Top-K 文档

6.2 RRF 融合算法原理

RRF(Reciprocal Rank Fusion)是混合检索中最经典的排序融合算法:

RRF_score(doc) = Σ 1 / (k + rank_i)

其中:
  k = 60(经验值,防止头部文档过度主导)
  rank_i = 文档在第 i 路检索结果中的排名

示例: 文档 A 在向量检索中排第 2,在 BM25 中排第 1

RRF(A) = 1/(60+2) + 1/(60+1) = 0.01613 + 0.01639 = 0.03252

文档 B 只在向量检索中排第 1(BM25 未召回):
RRF(B) = 1/(60+1) + 0 = 0.01639

结果:A > B,融合后 A 排名更靠前 ✓

6.3 完整混合检索实现

# hybrid_retriever.py
from langchain.schema import Document, BaseRetriever
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
from langchain.vectorstores.base import VectorStore
from typing import List, Callable
import numpy as np


class HybridRetriever:
    """
    混合检索器
    融合:向量检索(语义) + BM25(关键词)
    排序:RRF(Reciprocal Rank Fusion)
    """

    def __init__(
        self,
        vector_store: VectorStore,
        documents: List[Document],
        vector_weight: float = 0.5,   # 向量检索权重
        bm25_weight: float = 0.5,     # BM25 检索权重
        top_k: int = 5
    ):
        self.top_k = top_k
        self.vector_weight = vector_weight
        self.bm25_weight = bm25_weight

        # 验证权重
        assert abs(vector_weight + bm25_weight - 1.0) < 1e-6, "权重之和必须为 1"

        # 构建向量检索器
        self.vector_retriever = vector_store.as_retriever(
            search_type="similarity",
            search_kwargs={"k": top_k * 2}  # 多取一些,融合后再截断
        )

        # 构建 BM25 检索器
        # BM25 需要所有文档的文本(纯离线,无需 API)
        self.bm25_retriever = BM25Retriever.from_documents(
            documents,
            k=top_k * 2
        )
        # 中文分词优化(默认按字符切分,可接入 jieba)
        self.bm25_retriever.preprocess_func = self._chinese_tokenize

        # 用 LangChain 内置的 EnsembleRetriever 组合(底层即 RRF)
        self.ensemble_retriever = EnsembleRetriever(
            retrievers=[self.vector_retriever, self.bm25_retriever],
            weights=[vector_weight, bm25_weight]
        )

    @staticmethod
    def _chinese_tokenize(text: str) -> List[str]:
        """
        中文分词(使用 jieba,比默认字符切分效果更好)
        pip install jieba
        """
        try:
            import jieba
            return list(jieba.cut(text))
        except ImportError:
            # 降级:按字符切分
            return list(text)

    def retrieve(self, query: str) -> List[Document]:
        """执行混合检索"""
        docs = self.ensemble_retriever.invoke(query)
        return docs[:self.top_k]

    def retrieve_with_scores(self, query: str) -> List[tuple]:
        """
        返回带分数的结果(用于调试和评估)
        手动实现 RRF 以获取分数
        """
        # 向量检索结果
        vector_docs = self.vector_retriever.invoke(query)
        # BM25 检索结果
        bm25_docs = self.bm25_retriever.invoke(query)

        # 计算 RRF 分数
        k = 60
        doc_scores = {}

        for rank, doc in enumerate(vector_docs):
            doc_id = doc.page_content[:50]  # 用内容前50字作为唯一键
            if doc_id not in doc_scores:
                doc_scores[doc_id] = {"doc": doc, "score": 0, "vector_rank": None, "bm25_rank": None}
            doc_scores[doc_id]["score"] += self.vector_weight * (1 / (k + rank + 1))
            doc_scores[doc_id]["vector_rank"] = rank + 1

        for rank, doc in enumerate(bm25_docs):
            doc_id = doc.page_content[:50]
            if doc_id not in doc_scores:
                doc_scores[doc_id] = {"doc": doc, "score": 0, "vector_rank": None, "bm25_rank": None}
            doc_scores[doc_id]["score"] += self.bm25_weight * (1 / (k + rank + 1))
            doc_scores[doc_id]["bm25_rank"] = rank + 1

        # 按 RRF 分数排序
        sorted_results = sorted(
            doc_scores.values(),
            key=lambda x: x["score"],
            reverse=True
        )

        return [
            (item["doc"], item["score"], item["vector_rank"], item["bm25_rank"])
            for item in sorted_results[:self.top_k]
        ]

七、Step 4:组装 RAG Chain

# rag_chain.py
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnablePassthrough, RunnableLambda
from langchain.schema.output_parser import StrOutputParser
from langchain.schema import Document
from typing import List
from hybrid_retriever import HybridRetriever


# ==========================================
# Prompt 模板(中文优化版)
# ==========================================
RAG_PROMPT = ChatPromptTemplate.from_template("""
你是一个专业的问答助手。请根据以下检索到的参考资料回答用户的问题。

## 参考资料
{context}

## 用户问题
{question}

## 回答要求
- 仅根据参考资料中的信息回答,不要编造
- 如果参考资料中没有相关信息,明确说明"根据现有资料无法回答"
- 回答要简洁、准确,必要时引用资料来源
- 使用中文回答

## 回答:
""")


def format_docs(docs: List[Document]) -> str:
    """将检索到的文档格式化为上下文字符串"""
    formatted = []
    for i, doc in enumerate(docs, 1):
        source = doc.metadata.get("source", "未知来源")
        page = doc.metadata.get("page", "")
        source_info = f"{source}" + (f" 第{page}页" if page else "")
        formatted.append(f"【资料{i}】({source_info})\n{doc.page_content}")

    return "\n\n".join(formatted)


class RAGChain:
    """完整的 RAG 问答链"""

    def __init__(self, hybrid_retriever: HybridRetriever, model_name: str = "gpt-4o-mini"):
        self.retriever = hybrid_retriever

        self.llm = ChatOpenAI(
            model=model_name,
            temperature=0,      # RAG 场景设为 0,减少幻觉
            streaming=True      # 流式输出,提升用户体验
        )

        # 构建 LCEL(LangChain Expression Language)链
        self.chain = (
            {
                "context": RunnableLambda(
                    lambda x: format_docs(self.retriever.retrieve(x["question"]))
                ),
                "question": RunnablePassthrough() | RunnableLambda(lambda x: x["question"])
            }
            | RAG_PROMPT
            | self.llm
            | StrOutputParser()
        )

    def ask(self, question: str) -> str:
        """同步问答"""
        return self.chain.invoke({"question": question})

    def ask_with_sources(self, question: str) -> dict:
        """返回答案 + 来源文档 + 检索分数(调试用)"""
        # 获取检索结果(带分数)
        retrieved = self.retriever.retrieve_with_scores(question)

        # 格式化上下文
        docs = [item[0] for item in retrieved]
        context = format_docs(docs)

        # 生成答案
        answer = (RAG_PROMPT | self.llm | StrOutputParser()).invoke({
            "context": context,
            "question": question
        })

        return {
            "answer": answer,
            "sources": [
                {
                    "content": doc.page_content[:200] + "...",
                    "source": doc.metadata.get("source", "未知"),
                    "rrf_score": round(score, 4),
                    "vector_rank": v_rank,
                    "bm25_rank": b_rank,
                }
                for doc, score, v_rank, b_rank in retrieved
            ]
        }

    def stream_ask(self, question: str):
        """流式输出"""
        for chunk in self.chain.stream({"question": question}):
            print(chunk, end="", flush=True)
        print()  # 换行

八、Step 5:入口整合

# main.py
import os
from dotenv import load_dotenv
from document_processor import DocumentProcessor
from vector_store import VectorStoreManager
from hybrid_retriever import HybridRetriever
from rag_chain import RAGChain
import json

load_dotenv()  # 加载 .env 中的 OPENAI_API_KEY


def build_rag_system(docs_dir: str, rebuild: bool = False):
    """
    构建或加载 RAG 系统
    rebuild=False 时直接加载已有向量库,避免重复构建
    """
    persist_dir = "./chroma_db"

    # Step 1:初始化向量库管理器
    vs_manager = VectorStoreManager(
        embedding_type="bge",      # 使用 BGE 中文模型
        persist_dir=persist_dir
    )

    if rebuild or not os.path.exists(persist_dir):
        # Step 2:加载并处理文档
        processor = DocumentProcessor(
            chunk_size=512,
            chunk_overlap=64
        )
        raw_docs = processor.load_directory(docs_dir)
        chunks = processor.split_documents(raw_docs)

        # Step 3:构建向量库
        vector_store = vs_manager.build_from_documents(chunks)
    else:
        # 直接加载已有向量库
        vector_store = vs_manager.load_existing()
        # 重新加载文档用于 BM25(BM25 不持久化,需要重建)
        processor = DocumentProcessor(chunk_size=512, chunk_overlap=64)
        raw_docs = processor.load_directory(docs_dir)
        chunks = processor.split_documents(raw_docs)

    # Step 4:构建混合检索器
    retriever = HybridRetriever(
        vector_store=vector_store,
        documents=chunks,
        vector_weight=0.6,   # 向量权重略高(语义理解更重要)
        bm25_weight=0.4,
        top_k=5
    )

    # Step 5:组装 RAG 链
    rag = RAGChain(retriever, model_name="gpt-4o-mini")

    return rag


def main():
    # 构建系统(第一次运行后设 rebuild=False 加快启动)
    rag = build_rag_system(docs_dir="./documents", rebuild=True)

    print("=" * 60)
    print("🤖 RAG 问答系统已就绪,输入 'quit' 退出")
    print("=" * 60)

    while True:
        question = input("\n❓ 请输入问题:").strip()
        if question.lower() in ["quit", "exit", "q"]:
            break
        if not question:
            continue

        print("\n🔍 检索 + 生成中...\n")

        # 带来源的问答(生产环境可改为 ask() 简洁模式)
        result = rag.ask_with_sources(question)

        print(f"💬 回答:\n{result['answer']}")
        print(f"\n📚 参考来源:")
        for i, src in enumerate(result['sources'], 1):
            print(f"  [{i}] {src['source']} | RRF分数: {src['rrf_score']} "
                  f"| 向量排名: {src['vector_rank']} | BM25排名: {src['bm25_rank']}")


if __name__ == "__main__":
    main()

九、运行输出示例

✅ 加载 **/*.pdf: 3 个文档
✅ 加载 **/*.md: 12 个文档
📄 切分完成:15 个文档 → 287 个 chunks
🔨 开始构建向量库(bge embeddings)...
✅ Chroma 向量库已保存到 ./chroma_db

============================================================
🤖 RAG 问答系统已就绪,输入 'quit' 退出
============================================================

❓ 请输入问题:Redis 缓存雪崩怎么处理?

🔍 检索 + 生成中...

💬 回答:
缓存雪崩是指大量缓存同时失效,导致请求全部打到数据库。主要解决方案:
1. **随机过期时间**:在基础 TTL 上加随机偏移(如 TTL + random(0, 300s)),避免集中失效
2. **缓存预热**:系统启动时提前加载热点数据
3. **互斥锁**:缓存失效时只允许一个线程重建缓存
4. **熔断降级**:数据库压力过大时返回默认值或降级响应

📚 参考来源:
  [1] redis-best-practices.pdf | RRF分数: 0.0325 | 向量排名: 1 | BM25排名: 2
  [2] cache-design.md         | RRF分数: 0.0316 | 向量排名: 2 | BM25排名: 1
  [3] distributed-cache.pdf   | RRF分数: 0.0164 | 向量排名: 3 | BM25排名: None

十、进阶优化技巧

10.1 查询改写(Query Rewriting)

用户的问题往往不够准确,先用 LLM 改写再检索:

from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI

query_rewrite_prompt = PromptTemplate.from_template("""
将以下用户问题改写为 3 个不同角度的检索查询,有助于从知识库中找到最相关的信息。
每行一个查询,只输出查询内容,不要编号和解释。

原始问题:{question}

改写后的查询:
""")

def rewrite_query(question: str, llm) -> List[str]:
    result = (query_rewrite_prompt | llm | StrOutputParser()).invoke({"question": question})
    queries = [q.strip() for q in result.strip().split("\n") if q.strip()]
    return queries[:3]

# 对多个查询分别检索,合并去重
def multi_query_retrieve(question: str, retriever, llm) -> List[Document]:
    queries = rewrite_query(question, llm) + [question]
    all_docs = []
    seen_ids = set()

    for q in queries:
        docs = retriever.retrieve(q)
        for doc in docs:
            doc_id = doc.page_content[:50]
            if doc_id not in seen_ids:
                seen_ids.add(doc_id)
                all_docs.append(doc)

    return all_docs[:5]

10.2 重排序(Reranking)

检索后用 Cross-Encoder 对结果精排,效果提升显著:

# pip install sentence-transformers
from sentence_transformers import CrossEncoder

class RerankerRetriever:
    def __init__(self, base_retriever, top_k=5):
        self.base_retriever = base_retriever
        self.top_k = top_k
        # 中文重排序模型
        self.reranker = CrossEncoder("BAAI/bge-reranker-large")

    def retrieve(self, query: str) -> List[Document]:
        # 先用混合检索召回更多候选(top_k * 3)
        candidates = self.base_retriever.retrieve(query)

        # 用 CrossEncoder 重新打分
        pairs = [[query, doc.page_content] for doc in candidates]
        scores = self.reranker.predict(pairs)

        # 按重排序分数重新排序
        ranked = sorted(zip(scores, candidates), key=lambda x: x[0], reverse=True)
        return [doc for _, doc in ranked[:self.top_k]]

10.3 权重调优建议

场景 向量权重 BM25 权重
通用问答(语义为主) 0.6 0.4
代码 / 技术文档 0.4 0.6
法律 / 合同文档 0.3 0.7
产品说明书 0.5 0.5

含有大量专有名词、编号、型号的场景,BM25 权重应适当调高。


十一、生产部署注意事项

11.1 向量库选型

向量库 适用规模 特点
Chroma < 100 万条 零配置,适合开发
FAISS < 500 万条 纯内存,速度极快
Milvus > 百万条 分布式,支持动态增删
Elasticsearch 已有 ES 集群 自带 BM25,可直接加向量混合

11.2 性能优化

# FAISS 使用 IVF 索引(大规模数据加速)
import faiss

# 构建 IVF 索引(100万条以上才有明显收益)
nlist = 100   # 聚类中心数量
quantizer = faiss.IndexFlatIP(embedding_dim)
index = faiss.IndexIVFFlat(quantizer, embedding_dim, nlist, faiss.METRIC_INNER_PRODUCT)
index.train(embeddings)   # 需要先训练
index.nprobe = 10         # 查询时探测的聚类数(越大越准但越慢)

11.3 增量更新

def add_new_documents(new_docs: List[Document], vs_manager, retriever):
    """新文档增量入库(无需重建整个索引)"""
    chunks = processor.split_documents(new_docs)

    # Chroma 支持直接 add
    vs_manager.vector_store.add_documents(chunks)

    # BM25 需要重建(因为是内存索引)
    # 建议:维护一个文档列表文件,增量追加后重建
    retriever.bm25_retriever = BM25Retriever.from_documents(all_chunks)

    print(f"✅ 新增 {len(chunks)} 个 chunks 入库完成")

十二、总结

RAG 系统构建流程总览

文档处理:RecursiveCharacterTextSplitter(chunk_size=512, overlap=64)
    │
向量模型:BAAI/bge-large-zh-v1.5(中文首选,本地部署)
    │
向量库:  Chroma(开发)/ Milvus(生产)
    │
检索策略:向量检索(60%)+ BM25 关键词检索(40%)
    │
融合算法:RRF(Reciprocal Rank Fusion)
    │
可选增强:查询改写 → 多路召回 → Cross-Encoder 重排序
    │
生成阶段:temperature=0,明确 Prompt 约束防幻觉

混合检索是目前 RAG 系统在实际生产中效果最稳定的方案。向量检索解决语义泛化问题,BM25 解决精确匹配问题,RRF 将两路结果优雅融合,工程实现简单、效果显著。

在此基础上叠加查询改写和重排序,基本能覆盖 90% 以上的企业知识库问答场景。


如有问题欢迎评论区交流 🚀

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐