LangChain 混合检索 RAG 实战:从构建到优化全流程
本文详细介绍了构建RAG(检索增强生成)系统的全流程,重点讲解了混合检索(HybridSearch)技术的实现方法。文章从RAG核心架构入手,分析了纯向量检索和纯关键词检索的优缺点,提出混合检索方案以兼顾语义理解和精确匹配。主要内容包括:1)文档加载与智能切分策略;2)向量化方案选型(推荐BGE中文模型);3)混合检索实现细节,采用RRF(ReciprocalRankFusion)算法融合向量检索
·
LangChain 混合检索 RAG 实战:从构建到优化全流程
本文详细讲解 RAG 系统的完整构建过程,重点介绍混合检索(Hybrid Search)的原理与实现,涵盖向量化方案选型、索引构建、检索优化等核心内容。
一、RAG 核心架构
RAG(Retrieval-Augmented Generation,检索增强生成)的本质是:先检索,再生成。
┌─────────────────────────────────┐
│ 离线阶段(建库) │
│ │
原始文档 ──▶ 文档切分 ──▶ 向量化 ──▶ 向量库 │
│ │ │
└──────▶ 关键词索引 ────┘ │
└─────────────────────────────────┘
┌─────────────────────────────────┐
│ 在线阶段(检索生成) │
│ │
用户提问 ──▶ 查询处理 ──▶ 向量检索 ──┐ │
│ ├─▶ 融合排序 ──▶ LLM ──▶ 回答
│ 关键词检索 ──┘ │
└─────────────────────────────────┘
为什么需要混合检索?
单一检索方式各有缺陷:
| 检索方式 | 优势 | 劣势 |
|---|---|---|
| 纯向量检索 | 语义理解强,能找到近义表达 | 精确词匹配差,专有名词易漏召 |
| 纯关键词检索(BM25) | 精确词匹配,专有名词准确 | 无语义理解,同义词无法匹配 |
| 混合检索 | 兼顾语义 + 精确匹配 | 实现略复杂 |
举例: 查询"Redis 缓存雪崩怎么解决"
- 纯向量:可能召回"缓存击穿"的文档(语义相近但不是你要的)
- BM25:精准召回含"缓存雪崩"关键词的文档
- 混合:两者结合,效果最佳
二、向量化方案选型
2.1 Embedding 模型对比
| 模型 | 类型 | 维度 | 中文效果 | 适用场景 |
|---|---|---|---|---|
text-embedding-3-large |
API(OpenAI) | 3072 | 良好 | 不敏感数据,快速上手 |
text-embedding-3-small |
API(OpenAI) | 1536 | 一般 | 成本敏感场景 |
BAAI/bge-large-zh-v1.5 |
本地部署 | 1024 | 优秀 | 中文为主,数据安全要求高 |
BAAI/bge-m3 |
本地部署 | 1024 | 优秀 | 多语言,支持稀疏+稠密双向量 |
jinaai/jina-embeddings-v2 |
API/本地 | 768 | 良好 | 长文档(支持 8192 token) |
推荐组合(中文企业场景):
向量检索:BAAI/bge-large-zh-v1.5(本地,数据安全)
关键词检索:BM25(经典,无需额外资源)
向量库:Milvus(生产) / Chroma(开发测试)
2.2 向量相似度计算方式
# 三种常见距离/相似度
# 1. 余弦相似度(Cosine Similarity)—— 最常用
# 适合:文本语义匹配,不受向量长度影响
cosine_similarity = dot(a, b) / (norm(a) * norm(b))
# 2. 内积(Inner Product / Dot Product)
# 适合:已归一化的向量(效果等同余弦)
# 3. 欧氏距离(L2 Distance)
# 适合:图像特征等,文本场景少用
结论:文本场景统一用余弦相似度,bge 系列模型输出向量默认已归一化。
三、完整项目结构
rag_project/
├── config.py # 配置管理
├── document_processor.py # 文档加载 & 切分
├── vector_store.py # 向量库管理
├── hybrid_retriever.py # 混合检索核心
├── rag_chain.py # RAG 链组装
├── main.py # 入口
└── requirements.txt
# requirements.txt
langchain>=0.2.0
langchain-community>=0.2.0
langchain-openai>=0.1.0
chromadb>=0.5.0
sentence-transformers>=3.0.0
rank-bm25>=0.2.2
faiss-cpu>=1.8.0
tiktoken>=0.7.0
python-dotenv>=1.0.0
四、Step 1:文档加载与智能切分
4.1 文档加载
# document_processor.py
from langchain_community.document_loaders import (
PyPDFLoader,
TextLoader,
UnstructuredMarkdownLoader,
DirectoryLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from typing import List
import re
class DocumentProcessor:
"""文档处理器:加载 + 清洗 + 切分"""
def __init__(self, chunk_size: int = 512, chunk_overlap: int = 64):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# RecursiveCharacterTextSplitter 是最推荐的切分器
# 按优先级依次尝试:段落 → 句子 → 词 → 字符
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", "。", "!", "?", ";", " ", ""],
length_function=len,
)
def load_directory(self, directory_path: str) -> List[Document]:
"""批量加载目录下的文档"""
loaders = {
"**/*.pdf": PyPDFLoader,
"**/*.txt": TextLoader,
"**/*.md": UnstructuredMarkdownLoader,
}
all_docs = []
for glob_pattern, loader_cls in loaders.items():
try:
loader = DirectoryLoader(
directory_path,
glob=glob_pattern,
loader_cls=loader_cls,
show_progress=True
)
docs = loader.load()
all_docs.extend(docs)
print(f"✅ 加载 {glob_pattern}: {len(docs)} 个文档")
except Exception as e:
print(f"⚠️ 加载 {glob_pattern} 失败: {e}")
return all_docs
def clean_text(self, text: str) -> str:
"""文本清洗:去除噪音"""
# 去除多余空白行
text = re.sub(r'\n{3,}', '\n\n', text)
# 去除页眉页脚等噪音(根据实际情况调整)
text = re.sub(r'第\s*\d+\s*页', '', text)
# 去除特殊控制字符
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', text)
return text.strip()
def split_documents(self, documents: List[Document]) -> List[Document]:
"""切分文档并添加元数据"""
# 清洗文本
for doc in documents:
doc.page_content = self.clean_text(doc.page_content)
# 过滤空文档
documents = [doc for doc in documents if len(doc.page_content) > 50]
# 切分
chunks = self.text_splitter.split_documents(documents)
# 给每个 chunk 添加序号(方便溯源)
for i, chunk in enumerate(chunks):
chunk.metadata["chunk_id"] = i
chunk.metadata["chunk_size"] = len(chunk.page_content)
print(f"📄 切分完成:{len(documents)} 个文档 → {len(chunks)} 个 chunks")
return chunks
4.2 切分策略说明
原始文档(5000字)
│
▼
RecursiveCharacterTextSplitter
│
├── chunk_size = 512 # 每块最大字符数
├── chunk_overlap = 64 # 相邻块重叠字符数(保证上下文连续性)
│
▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Chunk 1 │ │ Chunk 2 │ │ Chunk 3 │
│ [0 : 512] │ │ [448 : 960] │ │ [896 : 1408] │
│ ←64→ │ │ ←64→ │ │ │
└──────────────────┘ └──────────────────┘ └──────────────────┘
重叠部分确保语义不在边界处断裂
五、Step 2:构建向量库
# vector_store.py
from langchain_community.vectorstores import Chroma, FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain.schema import Document
from typing import List, Optional
import os
class VectorStoreManager:
"""向量库管理器"""
def __init__(self, embedding_type: str = "bge", persist_dir: str = "./chroma_db"):
self.embedding_type = embedding_type
self.persist_dir = persist_dir
self.embeddings = self._build_embeddings()
self.vector_store = None
def _build_embeddings(self):
"""构建 Embedding 模型"""
if self.embedding_type == "bge":
# 推荐:本地部署,中文效果最好
# 首次运行会自动下载模型(约 1.3GB)
return HuggingFaceEmbeddings(
model_name="BAAI/bge-large-zh-v1.5",
model_kwargs={"device": "cpu"}, # GPU 改为 "cuda"
encode_kwargs={
"normalize_embeddings": True, # 归一化,使余弦相似度 = 内积
"batch_size": 32 # 批量编码,提升速度
},
# BGE 模型推荐在查询时加前缀,提升检索效果
query_instruction="为这个句子生成表示以用于检索相关文章:"
)
elif self.embedding_type == "openai":
# API 方式,无需本地 GPU
return OpenAIEmbeddings(
model="text-embedding-3-large",
dimensions=1536 # 可以降维节省存储
)
elif self.embedding_type == "bge-m3":
# 支持稠密 + 稀疏双向量,单模型实现混合检索
return HuggingFaceEmbeddings(
model_name="BAAI/bge-m3",
model_kwargs={"device": "cpu"},
encode_kwargs={"normalize_embeddings": True}
)
else:
raise ValueError(f"不支持的 embedding 类型: {self.embedding_type}")
def build_from_documents(self, documents: List[Document], use_faiss: bool = False):
"""从文档列表构建向量库"""
print(f"🔨 开始构建向量库({self.embedding_type} embeddings)...")
if use_faiss:
# FAISS:纯内存,速度极快,适合中小规模
self.vector_store = FAISS.from_documents(
documents=documents,
embedding=self.embeddings
)
# 保存到磁盘
self.vector_store.save_local(self.persist_dir)
print(f"✅ FAISS 向量库已保存到 {self.persist_dir}")
else:
# Chroma:支持持久化,适合开发阶段
self.vector_store = Chroma.from_documents(
documents=documents,
embedding=self.embeddings,
persist_directory=self.persist_dir,
collection_metadata={"hnsw:space": "cosine"} # 指定余弦距离
)
print(f"✅ Chroma 向量库已保存到 {self.persist_dir}")
return self.vector_store
def load_existing(self, use_faiss: bool = False):
"""加载已存在的向量库(避免重复构建)"""
if use_faiss:
self.vector_store = FAISS.load_local(
self.persist_dir,
self.embeddings,
allow_dangerous_deserialization=True
)
else:
self.vector_store = Chroma(
persist_directory=self.persist_dir,
embedding_function=self.embeddings
)
print(f"✅ 向量库加载成功")
return self.vector_store
def get_retriever(self, top_k: int = 5):
"""获取纯向量检索器"""
return self.vector_store.as_retriever(
search_type="similarity", # 也可以用 "mmr" 减少结果冗余
search_kwargs={"k": top_k}
)
六、Step 3:混合检索核心实现
6.1 混合检索架构
用户查询
│
├──▶ 向量检索(Chroma/FAISS)──▶ Top-K 结果 + 相似度分数
│
└──▶ BM25 检索(关键词)──────▶ Top-K 结果 + BM25 分数
│
▼
RRF 融合排序(Reciprocal Rank Fusion)
│
▼
最终 Top-K 文档
6.2 RRF 融合算法原理
RRF(Reciprocal Rank Fusion)是混合检索中最经典的排序融合算法:
RRF_score(doc) = Σ 1 / (k + rank_i)
其中:
k = 60(经验值,防止头部文档过度主导)
rank_i = 文档在第 i 路检索结果中的排名
示例: 文档 A 在向量检索中排第 2,在 BM25 中排第 1
RRF(A) = 1/(60+2) + 1/(60+1) = 0.01613 + 0.01639 = 0.03252
文档 B 只在向量检索中排第 1(BM25 未召回):
RRF(B) = 1/(60+1) + 0 = 0.01639
结果:A > B,融合后 A 排名更靠前 ✓
6.3 完整混合检索实现
# hybrid_retriever.py
from langchain.schema import Document, BaseRetriever
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
from langchain.vectorstores.base import VectorStore
from typing import List, Callable
import numpy as np
class HybridRetriever:
"""
混合检索器
融合:向量检索(语义) + BM25(关键词)
排序:RRF(Reciprocal Rank Fusion)
"""
def __init__(
self,
vector_store: VectorStore,
documents: List[Document],
vector_weight: float = 0.5, # 向量检索权重
bm25_weight: float = 0.5, # BM25 检索权重
top_k: int = 5
):
self.top_k = top_k
self.vector_weight = vector_weight
self.bm25_weight = bm25_weight
# 验证权重
assert abs(vector_weight + bm25_weight - 1.0) < 1e-6, "权重之和必须为 1"
# 构建向量检索器
self.vector_retriever = vector_store.as_retriever(
search_type="similarity",
search_kwargs={"k": top_k * 2} # 多取一些,融合后再截断
)
# 构建 BM25 检索器
# BM25 需要所有文档的文本(纯离线,无需 API)
self.bm25_retriever = BM25Retriever.from_documents(
documents,
k=top_k * 2
)
# 中文分词优化(默认按字符切分,可接入 jieba)
self.bm25_retriever.preprocess_func = self._chinese_tokenize
# 用 LangChain 内置的 EnsembleRetriever 组合(底层即 RRF)
self.ensemble_retriever = EnsembleRetriever(
retrievers=[self.vector_retriever, self.bm25_retriever],
weights=[vector_weight, bm25_weight]
)
@staticmethod
def _chinese_tokenize(text: str) -> List[str]:
"""
中文分词(使用 jieba,比默认字符切分效果更好)
pip install jieba
"""
try:
import jieba
return list(jieba.cut(text))
except ImportError:
# 降级:按字符切分
return list(text)
def retrieve(self, query: str) -> List[Document]:
"""执行混合检索"""
docs = self.ensemble_retriever.invoke(query)
return docs[:self.top_k]
def retrieve_with_scores(self, query: str) -> List[tuple]:
"""
返回带分数的结果(用于调试和评估)
手动实现 RRF 以获取分数
"""
# 向量检索结果
vector_docs = self.vector_retriever.invoke(query)
# BM25 检索结果
bm25_docs = self.bm25_retriever.invoke(query)
# 计算 RRF 分数
k = 60
doc_scores = {}
for rank, doc in enumerate(vector_docs):
doc_id = doc.page_content[:50] # 用内容前50字作为唯一键
if doc_id not in doc_scores:
doc_scores[doc_id] = {"doc": doc, "score": 0, "vector_rank": None, "bm25_rank": None}
doc_scores[doc_id]["score"] += self.vector_weight * (1 / (k + rank + 1))
doc_scores[doc_id]["vector_rank"] = rank + 1
for rank, doc in enumerate(bm25_docs):
doc_id = doc.page_content[:50]
if doc_id not in doc_scores:
doc_scores[doc_id] = {"doc": doc, "score": 0, "vector_rank": None, "bm25_rank": None}
doc_scores[doc_id]["score"] += self.bm25_weight * (1 / (k + rank + 1))
doc_scores[doc_id]["bm25_rank"] = rank + 1
# 按 RRF 分数排序
sorted_results = sorted(
doc_scores.values(),
key=lambda x: x["score"],
reverse=True
)
return [
(item["doc"], item["score"], item["vector_rank"], item["bm25_rank"])
for item in sorted_results[:self.top_k]
]
七、Step 4:组装 RAG Chain
# rag_chain.py
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnablePassthrough, RunnableLambda
from langchain.schema.output_parser import StrOutputParser
from langchain.schema import Document
from typing import List
from hybrid_retriever import HybridRetriever
# ==========================================
# Prompt 模板(中文优化版)
# ==========================================
RAG_PROMPT = ChatPromptTemplate.from_template("""
你是一个专业的问答助手。请根据以下检索到的参考资料回答用户的问题。
## 参考资料
{context}
## 用户问题
{question}
## 回答要求
- 仅根据参考资料中的信息回答,不要编造
- 如果参考资料中没有相关信息,明确说明"根据现有资料无法回答"
- 回答要简洁、准确,必要时引用资料来源
- 使用中文回答
## 回答:
""")
def format_docs(docs: List[Document]) -> str:
"""将检索到的文档格式化为上下文字符串"""
formatted = []
for i, doc in enumerate(docs, 1):
source = doc.metadata.get("source", "未知来源")
page = doc.metadata.get("page", "")
source_info = f"{source}" + (f" 第{page}页" if page else "")
formatted.append(f"【资料{i}】({source_info})\n{doc.page_content}")
return "\n\n".join(formatted)
class RAGChain:
"""完整的 RAG 问答链"""
def __init__(self, hybrid_retriever: HybridRetriever, model_name: str = "gpt-4o-mini"):
self.retriever = hybrid_retriever
self.llm = ChatOpenAI(
model=model_name,
temperature=0, # RAG 场景设为 0,减少幻觉
streaming=True # 流式输出,提升用户体验
)
# 构建 LCEL(LangChain Expression Language)链
self.chain = (
{
"context": RunnableLambda(
lambda x: format_docs(self.retriever.retrieve(x["question"]))
),
"question": RunnablePassthrough() | RunnableLambda(lambda x: x["question"])
}
| RAG_PROMPT
| self.llm
| StrOutputParser()
)
def ask(self, question: str) -> str:
"""同步问答"""
return self.chain.invoke({"question": question})
def ask_with_sources(self, question: str) -> dict:
"""返回答案 + 来源文档 + 检索分数(调试用)"""
# 获取检索结果(带分数)
retrieved = self.retriever.retrieve_with_scores(question)
# 格式化上下文
docs = [item[0] for item in retrieved]
context = format_docs(docs)
# 生成答案
answer = (RAG_PROMPT | self.llm | StrOutputParser()).invoke({
"context": context,
"question": question
})
return {
"answer": answer,
"sources": [
{
"content": doc.page_content[:200] + "...",
"source": doc.metadata.get("source", "未知"),
"rrf_score": round(score, 4),
"vector_rank": v_rank,
"bm25_rank": b_rank,
}
for doc, score, v_rank, b_rank in retrieved
]
}
def stream_ask(self, question: str):
"""流式输出"""
for chunk in self.chain.stream({"question": question}):
print(chunk, end="", flush=True)
print() # 换行
八、Step 5:入口整合
# main.py
import os
from dotenv import load_dotenv
from document_processor import DocumentProcessor
from vector_store import VectorStoreManager
from hybrid_retriever import HybridRetriever
from rag_chain import RAGChain
import json
load_dotenv() # 加载 .env 中的 OPENAI_API_KEY
def build_rag_system(docs_dir: str, rebuild: bool = False):
"""
构建或加载 RAG 系统
rebuild=False 时直接加载已有向量库,避免重复构建
"""
persist_dir = "./chroma_db"
# Step 1:初始化向量库管理器
vs_manager = VectorStoreManager(
embedding_type="bge", # 使用 BGE 中文模型
persist_dir=persist_dir
)
if rebuild or not os.path.exists(persist_dir):
# Step 2:加载并处理文档
processor = DocumentProcessor(
chunk_size=512,
chunk_overlap=64
)
raw_docs = processor.load_directory(docs_dir)
chunks = processor.split_documents(raw_docs)
# Step 3:构建向量库
vector_store = vs_manager.build_from_documents(chunks)
else:
# 直接加载已有向量库
vector_store = vs_manager.load_existing()
# 重新加载文档用于 BM25(BM25 不持久化,需要重建)
processor = DocumentProcessor(chunk_size=512, chunk_overlap=64)
raw_docs = processor.load_directory(docs_dir)
chunks = processor.split_documents(raw_docs)
# Step 4:构建混合检索器
retriever = HybridRetriever(
vector_store=vector_store,
documents=chunks,
vector_weight=0.6, # 向量权重略高(语义理解更重要)
bm25_weight=0.4,
top_k=5
)
# Step 5:组装 RAG 链
rag = RAGChain(retriever, model_name="gpt-4o-mini")
return rag
def main():
# 构建系统(第一次运行后设 rebuild=False 加快启动)
rag = build_rag_system(docs_dir="./documents", rebuild=True)
print("=" * 60)
print("🤖 RAG 问答系统已就绪,输入 'quit' 退出")
print("=" * 60)
while True:
question = input("\n❓ 请输入问题:").strip()
if question.lower() in ["quit", "exit", "q"]:
break
if not question:
continue
print("\n🔍 检索 + 生成中...\n")
# 带来源的问答(生产环境可改为 ask() 简洁模式)
result = rag.ask_with_sources(question)
print(f"💬 回答:\n{result['answer']}")
print(f"\n📚 参考来源:")
for i, src in enumerate(result['sources'], 1):
print(f" [{i}] {src['source']} | RRF分数: {src['rrf_score']} "
f"| 向量排名: {src['vector_rank']} | BM25排名: {src['bm25_rank']}")
if __name__ == "__main__":
main()
九、运行输出示例
✅ 加载 **/*.pdf: 3 个文档
✅ 加载 **/*.md: 12 个文档
📄 切分完成:15 个文档 → 287 个 chunks
🔨 开始构建向量库(bge embeddings)...
✅ Chroma 向量库已保存到 ./chroma_db
============================================================
🤖 RAG 问答系统已就绪,输入 'quit' 退出
============================================================
❓ 请输入问题:Redis 缓存雪崩怎么处理?
🔍 检索 + 生成中...
💬 回答:
缓存雪崩是指大量缓存同时失效,导致请求全部打到数据库。主要解决方案:
1. **随机过期时间**:在基础 TTL 上加随机偏移(如 TTL + random(0, 300s)),避免集中失效
2. **缓存预热**:系统启动时提前加载热点数据
3. **互斥锁**:缓存失效时只允许一个线程重建缓存
4. **熔断降级**:数据库压力过大时返回默认值或降级响应
📚 参考来源:
[1] redis-best-practices.pdf | RRF分数: 0.0325 | 向量排名: 1 | BM25排名: 2
[2] cache-design.md | RRF分数: 0.0316 | 向量排名: 2 | BM25排名: 1
[3] distributed-cache.pdf | RRF分数: 0.0164 | 向量排名: 3 | BM25排名: None
十、进阶优化技巧
10.1 查询改写(Query Rewriting)
用户的问题往往不够准确,先用 LLM 改写再检索:
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
query_rewrite_prompt = PromptTemplate.from_template("""
将以下用户问题改写为 3 个不同角度的检索查询,有助于从知识库中找到最相关的信息。
每行一个查询,只输出查询内容,不要编号和解释。
原始问题:{question}
改写后的查询:
""")
def rewrite_query(question: str, llm) -> List[str]:
result = (query_rewrite_prompt | llm | StrOutputParser()).invoke({"question": question})
queries = [q.strip() for q in result.strip().split("\n") if q.strip()]
return queries[:3]
# 对多个查询分别检索,合并去重
def multi_query_retrieve(question: str, retriever, llm) -> List[Document]:
queries = rewrite_query(question, llm) + [question]
all_docs = []
seen_ids = set()
for q in queries:
docs = retriever.retrieve(q)
for doc in docs:
doc_id = doc.page_content[:50]
if doc_id not in seen_ids:
seen_ids.add(doc_id)
all_docs.append(doc)
return all_docs[:5]
10.2 重排序(Reranking)
检索后用 Cross-Encoder 对结果精排,效果提升显著:
# pip install sentence-transformers
from sentence_transformers import CrossEncoder
class RerankerRetriever:
def __init__(self, base_retriever, top_k=5):
self.base_retriever = base_retriever
self.top_k = top_k
# 中文重排序模型
self.reranker = CrossEncoder("BAAI/bge-reranker-large")
def retrieve(self, query: str) -> List[Document]:
# 先用混合检索召回更多候选(top_k * 3)
candidates = self.base_retriever.retrieve(query)
# 用 CrossEncoder 重新打分
pairs = [[query, doc.page_content] for doc in candidates]
scores = self.reranker.predict(pairs)
# 按重排序分数重新排序
ranked = sorted(zip(scores, candidates), key=lambda x: x[0], reverse=True)
return [doc for _, doc in ranked[:self.top_k]]
10.3 权重调优建议
| 场景 | 向量权重 | BM25 权重 |
|---|---|---|
| 通用问答(语义为主) | 0.6 | 0.4 |
| 代码 / 技术文档 | 0.4 | 0.6 |
| 法律 / 合同文档 | 0.3 | 0.7 |
| 产品说明书 | 0.5 | 0.5 |
含有大量专有名词、编号、型号的场景,BM25 权重应适当调高。
十一、生产部署注意事项
11.1 向量库选型
| 向量库 | 适用规模 | 特点 |
|---|---|---|
| Chroma | < 100 万条 | 零配置,适合开发 |
| FAISS | < 500 万条 | 纯内存,速度极快 |
| Milvus | > 百万条 | 分布式,支持动态增删 |
| Elasticsearch | 已有 ES 集群 | 自带 BM25,可直接加向量混合 |
11.2 性能优化
# FAISS 使用 IVF 索引(大规模数据加速)
import faiss
# 构建 IVF 索引(100万条以上才有明显收益)
nlist = 100 # 聚类中心数量
quantizer = faiss.IndexFlatIP(embedding_dim)
index = faiss.IndexIVFFlat(quantizer, embedding_dim, nlist, faiss.METRIC_INNER_PRODUCT)
index.train(embeddings) # 需要先训练
index.nprobe = 10 # 查询时探测的聚类数(越大越准但越慢)
11.3 增量更新
def add_new_documents(new_docs: List[Document], vs_manager, retriever):
"""新文档增量入库(无需重建整个索引)"""
chunks = processor.split_documents(new_docs)
# Chroma 支持直接 add
vs_manager.vector_store.add_documents(chunks)
# BM25 需要重建(因为是内存索引)
# 建议:维护一个文档列表文件,增量追加后重建
retriever.bm25_retriever = BM25Retriever.from_documents(all_chunks)
print(f"✅ 新增 {len(chunks)} 个 chunks 入库完成")
十二、总结
RAG 系统构建流程总览
文档处理:RecursiveCharacterTextSplitter(chunk_size=512, overlap=64)
│
向量模型:BAAI/bge-large-zh-v1.5(中文首选,本地部署)
│
向量库: Chroma(开发)/ Milvus(生产)
│
检索策略:向量检索(60%)+ BM25 关键词检索(40%)
│
融合算法:RRF(Reciprocal Rank Fusion)
│
可选增强:查询改写 → 多路召回 → Cross-Encoder 重排序
│
生成阶段:temperature=0,明确 Prompt 约束防幻觉
混合检索是目前 RAG 系统在实际生产中效果最稳定的方案。向量检索解决语义泛化问题,BM25 解决精确匹配问题,RRF 将两路结果优雅融合,工程实现简单、效果显著。
在此基础上叠加查询改写和重排序,基本能覆盖 90% 以上的企业知识库问答场景。
如有问题欢迎评论区交流 🚀
更多推荐


所有评论(0)