LangChain RAG核心链:从基础检索到对话式问答的全流程封装
和:基础RAG链,实现"用户查询→文档检索→答案生成"的标准闭环:对话式RAG链,在基础RAG基础上融入对话历史管理,支持多轮上下文感知问答这两个核心链的本质是将RAG技术从"研究原型"转化为"工程模板",让开发者无需从零构建复杂的检索-生成协同逻辑。LangChain的RAG核心链(RetrievalQA和ConversationalRetrievalChain)通过系统性的封装和优化,将复杂的
LangChain RAG核心链:从基础检索到对话式问答的全流程封装
目录
- 核心定义与价值:RAG核心链解决什么问题?
- 底层实现逻辑:RAG核心链如何实现检索-生成协同?
- 代码实践:从基础RAG到对话式问答如何落地?
- 设计考量:为什么LangChain要封装RAG核心链?
- 替代方案与优化空间
1. 核心定义与价值:RAG核心链解决什么问题?
1.1 本质定位:即插即用的RAG模板
RetrievalQA 和 ConversationalRetrievalChain 是LangChain框架中两个核心的RAG(检索增强生成)链,它们将复杂的"检索+生成"流程封装为开箱即用的组件:
- RetrievalQA:基础RAG链,实现"用户查询→文档检索→答案生成"的标准闭环
- ConversationalRetrievalChain:对话式RAG链,在基础RAG基础上融入对话历史管理,支持多轮上下文感知问答
这两个核心链的本质是将RAG技术从"研究原型"转化为"工程模板",让开发者无需从零构建复杂的检索-生成协同逻辑。
1.2 拆解核心痛点:手动拼接的开发困境
在没有核心链封装的情况下,开发者需要手动处理以下复杂逻辑:
痛点1:检索结果与提示词融合逻辑重复开发
# 手动拼接方式 - 每个项目都需要重复实现
def manual_rag_query(question: str, retriever, llm):
# 1. 检索相关文档
docs = retriever.get_relevant_documents(question)
# 2. 手动构建上下文 - 容易出错且不一致
context = "\n\n".join([doc.page_content for doc in docs])
# 3. 手动拼接提示词 - 格式不统一
prompt = f"""基于以下上下文回答问题:
上下文:{context}
问题:{question}
答案:"""
# 4. 调用LLM生成答案
return llm.invoke(prompt)
痛点2:多轮对话上下文丢失
# 手动处理对话历史 - 复杂且易错
def manual_conversational_rag(question: str, chat_history: list, retriever, llm):
# 需要手动管理对话历史格式转换
history_str = ""
for human, ai in chat_history:
history_str += f"Human: {human}\nAI: {ai}\n"
# 需要手动生成独立问题
standalone_question = llm.invoke(f"""
根据对话历史和新问题,生成一个独立的问题:
对话历史:{history_str}
新问题:{question}
独立问题:
""")
# 后续检索和生成逻辑...
痛点3:检索相关性不足和参数调优困难
- 缺乏标准化的检索策略(相似度搜索、MMR等)
- 没有统一的文档数量控制和token限制机制
- 缺少检索结果质量评估和降级处理
1.3 链路可视化:完整流程展示
RetrievalQA基础流程
ConversationalRetrievalChain对话流程
1.4 核心能力:关键特性价值分析
特性1:检索结果自动注入提示词
- 自动处理文档格式化和上下文构建
- 支持多种文档合并策略(stuff、map_reduce、refine等)
- 内置token限制和文档截断机制
特性2:对话历史压缩与管理
- 智能的对话历史格式化(支持tuple和BaseMessage格式)
- 自动生成独立问题,避免上下文依赖
- 可配置的历史信息保留策略
特性3:多轮上下文关联
- 问题重写机制,将依赖历史的问题转化为独立问题
- 上下文感知的检索,提高多轮对话的连贯性
- 灵活的问题传递策略(原问题vs重写问题)
特性4:统一的错误处理和降级策略
- 内置的无文档响应机制
- 异步执行支持
- 完整的回调和监控集成
2. 底层实现逻辑:RAG核心链如何实现检索-生成协同?
2.1 RetrievalQA核心构成:Retriever+LLMChain协同
核心架构分析
# RetrievalQA的核心组件构成
class RetrievalQA(BaseRetrievalQA):
retriever: BaseRetriever # 检索器
combine_documents_chain: BaseCombineDocumentsChain # 文档合并链
def _call(self, inputs: dict[str, Any], run_manager=None) -> dict[str, Any]:
# 1. 提取用户问题
question = inputs[self.input_key] # 默认为"query"
# 2. 执行文档检索
docs = self._get_docs(question, run_manager=run_manager)
# 3. 通过combine_documents_chain处理文档和生成答案
answer = self.combine_documents_chain.run(
input_documents=docs,
question=question,
callbacks=run_manager.get_child(),
)
# 4. 构建返回结果
result = {self.output_key: answer} # 默认output_key为"result"
if self.return_source_documents:
result["source_documents"] = docs
return result
文档检索逻辑
def _get_docs(self, question: str, *, run_manager: CallbackManagerForChainRun) -> list[Document]:
"""RetrievalQA的文档检索实现"""
return self.retriever.invoke(
question,
config={"callbacks": run_manager.get_child()},
)
链式构建方法
@classmethod
def from_llm(cls, llm: BaseLanguageModel, prompt: Optional[PromptTemplate] = None, **kwargs):
"""从LLM构建RetrievalQA链"""
# 1. 选择或使用默认提示词模板
_prompt = prompt or PROMPT_SELECTOR.get_prompt(llm)
# 2. 创建LLM链
llm_chain = LLMChain(llm=llm, prompt=_prompt)
# 3. 创建文档提示词模板
document_prompt = PromptTemplate(
input_variables=["page_content"],
template="Context:\n{page_content}",
)
# 4. 创建文档合并链
combine_documents_chain = StuffDocumentsChain(
llm_chain=llm_chain,
document_variable_name="context", # 文档在提示词中的变量名
document_prompt=document_prompt,
)
return cls(combine_documents_chain=combine_documents_chain, **kwargs)
2.2 ConversationalRetrievalChain扩展机制:对话记忆+检索增强
核心组件扩展
class ConversationalRetrievalChain(BaseConversationalRetrievalChain):
retriever: BaseRetriever
combine_docs_chain: BaseCombineDocumentsChain # 文档合并链
question_generator: LLMChain # 问题生成链 - 新增核心组件
# 对话管理配置
rephrase_question: bool = True # 是否重写问题
return_generated_question: bool = False # 是否返回生成的问题
get_chat_history: Optional[Callable] = None # 自定义历史格式化函数
对话历史处理机制
def _get_chat_history(chat_history: list[CHAT_TURN_TYPE]) -> str:
"""标准的对话历史格式化函数"""
buffer = ""
for dialogue_turn in chat_history:
if isinstance(dialogue_turn, BaseMessage):
# 处理BaseMessage格式
role_prefix = _ROLE_MAP.get(dialogue_turn.type, f"{dialogue_turn.type}: ")
buffer += f"\n{role_prefix}{dialogue_turn.content}"
elif isinstance(dialogue_turn, tuple):
# 处理tuple格式 (human_message, ai_message)
human = "Human: " + dialogue_turn[0]
ai = "Assistant: " + dialogue_turn[1]
buffer += f"\n{human}\n{ai}"
return buffer
独立问题生成逻辑
def _call(self, inputs: dict[str, Any], run_manager=None) -> dict[str, Any]:
question = inputs["question"]
chat_history = inputs["chat_history"]
# 1. 格式化对话历史
get_chat_history = self.get_chat_history or _get_chat_history
chat_history_str = get_chat_history(chat_history)
# 2. 生成独立问题(如果有历史对话)
if chat_history_str:
new_question = self.question_generator.run(
question=question,
chat_history=chat_history_str,
callbacks=run_manager.get_child(),
)
else:
new_question = question
# 3. 使用独立问题进行检索
docs = self._get_docs(new_question, inputs, run_manager=run_manager)
# 4. 决定传递给生成链的问题
new_inputs = inputs.copy()
if self.rephrase_question:
new_inputs["question"] = new_question # 使用重写的问题
new_inputs["chat_history"] = chat_history_str
# 5. 生成最终答案
answer = self.combine_docs_chain.run(
input_documents=docs,
callbacks=run_manager.get_child(),
**new_inputs,
)
return {self.output_key: answer}
2.3 检索-生成协同策略:优化融合方式
文档合并策略对比
# 1. Stuff策略 - 直接拼接所有文档
class StuffDocumentsChain(BaseCombineDocumentsChain):
"""将所有文档内容直接拼接到提示词中"""
def combine_docs(self, docs: List[Document], **kwargs) -> Tuple[str, dict]:
# 简单拼接所有文档内容
inputs = {k: v for k, v in kwargs.items()}
inputs[self.document_variable_name] = self.document_separator.join([
format_document(doc, self.document_prompt) for doc in docs
])
return self.llm_chain.predict(**inputs), {}
# 2. Map-Reduce策略 - 分别处理后合并
# 适用于文档数量多、内容长的场景
# 3. Refine策略 - 迭代优化答案
# 适用于需要逐步完善答案的场景
Token限制和文档截断
def _reduce_tokens_below_limit(self, docs: list[Document]) -> list[Document]:
"""ConversationalRetrievalChain的token控制机制"""
num_docs = len(docs)
if self.max_tokens_limit and isinstance(self.combine_docs_chain, StuffDocumentsChain):
# 计算每个文档的token数
tokens = [
self.combine_docs_chain.llm_chain._get_num_tokens(doc.page_content)
for doc in docs
]
# 从前往后累加,超出限制时截断
token_count = sum(tokens[:num_docs])
while token_count > self.max_tokens_limit:
num_docs -= 1
token_count -= tokens[num_docs]
return docs[:num_docs]
检索策略配置
# VectorDBQA支持的检索策略
class VectorDBQA(BaseRetrievalQA):
search_type: str = "similarity" # "similarity" 或 "mmr"
k: int = 4 # 检索文档数量
search_kwargs: dict = Field(default_factory=dict) # 额外搜索参数
def _get_docs(self, question: str, **kwargs) -> list[Document]:
if self.search_type == "similarity":
# 余弦相似度搜索
docs = self.vectorstore.similarity_search(
question, k=self.k, **self.search_kwargs
)
elif self.search_type == "mmr":
# 最大边际相关性搜索 - 平衡相关性和多样性
docs = self.vectorstore.max_marginal_relevance_search(
question, k=self.k, **self.search_kwargs
)
return docs
3. 代码实践:从基础RAG到对话式问答如何落地?
3.1 基础实践1:RetrievalQA构建标准RAG流程
完整环境设置
# 依赖安装
pip install langchain langchain-openai langchain-community
pip install chromadb # 向量数据库
pip install pypdf # PDF文档处理
pip install tiktoken # Token计算
标准RAG实现
import os
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.prompts import PromptTemplate
# 1. 环境配置
os.environ["OPENAI_API_KEY"] = "your-api-key-here"
class StandardRAGSystem:
def __init__(self, persist_directory="./chroma_db"):
"""初始化标准RAG系统"""
# 初始化嵌入模型
self.embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
chunk_size=1000 # 批处理大小
)
# 初始化LLM
self.llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0, # 确保答案一致性
max_tokens=500
)
# 初始化向量存储
self.vectorstore = Chroma(
persist_directory=persist_directory,
embedding_function=self.embeddings,
collection_name="rag_documents"
)
# 文本分割器
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200, # 保持上下文连续性
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
def ingest_documents(self, file_paths: list[str]):
"""摄取文档到向量存储"""
all_documents = []
for file_path in file_paths:
print(f"处理文件: {file_path}")
# 根据文件类型选择加载器
if file_path.endswith('.pdf'):
loader = PyPDFLoader(file_path)
elif file_path.endswith('.txt'):
loader = TextLoader(file_path, encoding='utf-8')
else:
print(f"不支持的文件类型: {file_path}")
continue
# 加载文档
documents = loader.load()
# 文本分割
splits = self.text_splitter.split_documents(documents)
# 添加元数据
for split in splits:
split.metadata.update({
"source_file": os.path.basename(file_path),
"file_type": file_path.split('.')[-1]
})
all_documents.extend(splits)
# 批量添加到向量存储
if all_documents:
self.vectorstore.add_documents(all_documents)
self.vectorstore.persist() # 持久化存储
print(f"成功摄取 {len(all_documents)} 个文档片段")
def create_retrieval_qa(self, chain_type="stuff", k=4, return_source_documents=True):
"""创建RetrievalQA链"""
# 创建检索器
retriever = self.vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": k}
)
# 自定义提示词模板
custom_prompt = PromptTemplate(
template="""使用以下上下文信息来回答问题。如果你不知道答案,请说"我不知道",不要编造答案。
请用中文回答,并保持答案简洁明了。
上下文信息:
{context}
问题: {question}
详细答案:""",
input_variables=["context", "question"]
)
# 创建RetrievalQA链
qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type=chain_type, # "stuff", "map_reduce", "refine"
retriever=retriever,
return_source_documents=return_source_documents,
chain_type_kwargs={
"prompt": custom_prompt,
"verbose": True # 显示中间步骤
}
)
return qa_chain
def query(self, question: str, qa_chain=None) -> dict:
"""执行查询"""
if qa_chain is None:
qa_chain = self.create_retrieval_qa()
# 执行查询
result = qa_chain.invoke({"query": question})
# 格式化输出
response = {
"question": question,
"answer": result["result"],
"source_documents": []
}
# 添加源文档信息
if "source_documents" in result:
for doc in result["source_documents"]:
response["source_documents"].append({
"content": doc.page_content[:200] + "...", # 截断显示
"source": doc.metadata.get("source_file", "未知"),
"page": doc.metadata.get("page", "未知")
})
return response
# 使用示例
def demo_basic_rag():
"""基础RAG演示"""
# 创建RAG系统
rag_system = StandardRAGSystem()
# 摄取文档(请替换为实际文件路径)
document_files = [
"docs/langchain_guide.pdf",
"docs/rag_tutorial.txt"
]
rag_system.ingest_documents(document_files)
# 创建QA链
qa_chain = rag_system.create_retrieval_qa(
chain_type="stuff",
k=3, # 检索3个最相关文档
return_source_documents=True
)
# 测试查询
questions = [
"什么是RAG技术?",
"LangChain的主要特性有哪些?",
"如何优化检索效果?"
]
for question in questions:
print(f"\n问题: {question}")
result = rag_system.query(question, qa_chain)
print(f"答案: {result['answer']}")
print(f"来源: {len(result['source_documents'])} 个文档")
print("-" * 50)
if __name__ == "__main__":
demo_basic_rag()
3.2 基础实践2:ConversationalRetrievalChain实现多轮对话
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory, ConversationSummaryBufferMemory
from langchain.prompts import PromptTemplate
from langchain.schema import BaseMessage, HumanMessage, AIMessage
class ConversationalRAGSystem:
def __init__(self, vectorstore, llm):
"""初始化对话式RAG系统"""
self.vectorstore = vectorstore
self.llm = llm
self.conversation_chain = None
self.chat_history = [] # 手动管理对话历史
def create_conversational_chain(self, memory_type="buffer", max_token_limit=2000):
"""创建对话式检索链"""
# 创建检索器
retriever = self.vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 4}
)
# 自定义问题压缩提示词
condense_question_prompt = PromptTemplate(
template="""根据以下对话历史和后续问题,将后续问题重写为一个独立的问题,使其可以在没有对话历史的情况下被理解。
对话历史:
{chat_history}
后续问题: {question}
独立问题:""",
input_variables=["chat_history", "question"]
)
# 自定义QA提示词
qa_prompt = PromptTemplate(
template="""使用以下上下文信息回答问题。如果上下文中没有相关信息,请说"根据提供的信息,我无法回答这个问题"。
上下文:
{context}
问题: {question}
请提供详细且准确的答案:""",
input_variables=["context", "question"]
)
# 创建对话检索链
self.conversation_chain = ConversationalRetrievalChain.from_llm(
llm=self.llm,
retriever=retriever,
condense_question_prompt=condense_question_prompt,
return_source_documents=True,
return_generated_question=True, # 返回生成的独立问题
max_tokens_limit=max_token_limit, # Token限制
combine_docs_chain_kwargs={
"prompt": qa_prompt
}
)
return self.conversation_chain
def chat(self, question: str) -> dict:
"""进行对话"""
if self.conversation_chain is None:
self.create_conversational_chain()
# 执行对话
result = self.conversation_chain.invoke({
"question": question,
"chat_history": self.chat_history
})
# 更新对话历史
self.chat_history.append((question, result["answer"]))
# 限制历史长度(避免token过多)
if len(self.chat_history) > 10:
self.chat_history = self.chat_history[-10:]
return {
"question": question,
"answer": result["answer"],
"generated_question": result.get("generated_question", question),
"source_documents": [
{
"content": doc.page_content[:150] + "...",
"source": doc.metadata.get("source_file", "未知")
}
for doc in result.get("source_documents", [])
],
"chat_history_length": len(self.chat_history)
}
def reset_conversation(self):
"""重置对话历史"""
self.chat_history = []
print("对话历史已重置")
def get_conversation_summary(self) -> str:
"""获取对话摘要"""
if not self.chat_history:
return "暂无对话历史"
summary = f"对话轮数: {len(self.chat_history)}\n"
summary += "最近3轮对话:\n"
for i, (human, ai) in enumerate(self.chat_history[-3:], 1):
summary += f"{i}. 用户: {human[:50]}...\n"
summary += f" AI: {ai[:50]}...\n"
return summary
# 使用示例
def demo_conversational_rag():
"""对话式RAG演示"""
# 假设已有vectorstore和llm
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
# 初始化组件
embeddings = OpenAIEmbeddings()
llm = ChatOpenAI(temperature=0)
vectorstore = Chroma(embedding_function=embeddings)
# 创建对话系统
conv_rag = ConversationalRAGSystem(vectorstore, llm)
conv_rag.create_conversational_chain()
# 模拟多轮对话
conversation_flow = [
"什么是机器学习?",
"它和深度学习有什么区别?",
"深度学习在图像识别中是如何应用的?",
"刚才提到的CNN是什么?",
"能给我一个具体的例子吗?"
]
print("=== 对话式RAG演示 ===")
for i, question in enumerate(conversation_flow, 1):
print(f"\n第{i}轮对话:")
print(f"用户: {question}")
result = conv_rag.chat(question)
print(f"AI: {result['answer']}")
print(f"生成的独立问题: {result['generated_question']}")
print(f"参考文档数: {len(result['source_documents'])}")
print(f"对话历史长度: {result['chat_history_length']}")
print("-" * 60)
# 显示对话摘要
print("\n=== 对话摘要 ===")
print(conv_rag.get_conversation_summary())
if __name__ == "__main__":
demo_conversational_rag()
4. 设计考量:为什么LangChain要封装RAG核心链?
4.1 流程标准化价值:降低RAG技术使用门槛
复杂度封装对比
# 手动实现RAG - 需要处理大量细节
class ManualRAGImplementation:
def __init__(self):
self.retriever = None
self.llm = None
self.prompt_template = None
def query(self, question: str):
# 1. 手动检索文档
docs = self.retriever.get_relevant_documents(question)
# 2. 手动处理文档格式
context = ""
for i, doc in enumerate(docs):
context += f"文档{i+1}: {doc.page_content}\n\n"
# 3. 手动构建提示词
prompt = f"""基于以下文档回答问题:
{context}
问题:{question}
答案:"""
# 4. 手动调用LLM
response = self.llm.invoke(prompt)
# 5. 手动处理返回结果
return {
"answer": response,
"sources": [doc.metadata for doc in docs]
}
# LangChain核心链 - 一行代码解决
retrieval_qa = RetrievalQA.from_llm(
llm=llm,
retriever=retriever,
return_source_documents=True
)
result = retrieval_qa.invoke({"query": question})
标准化带来的优势
- 一致性保证:所有RAG应用使用相同的处理逻辑
- 最佳实践内置:集成了经过验证的提示词模板和处理策略
- 错误处理统一:标准化的异常处理和降级机制
- 性能优化:内置的token管理和批处理优化
4.2 组件协同优化:深度整合的必要性
RAG核心链通过深度整合解决了以下关键问题:
- 回调链传递:确保监控和日志在整个流程中保持连续
- 错误传播:统一的错误处理和恢复机制
- 资源管理:自动的内存和计算资源优化
- 类型安全:组件间数据传递的类型检查和转换
4.3 灵活性与易用性平衡:参数化配置策略
LangChain通过多层次配置支持实现了灵活性与易用性的平衡:
# 1. 简单配置 - 开箱即用
simple_qa = RetrievalQA.from_llm(llm=llm, retriever=retriever)
# 2. 中等配置 - 常用参数调整
medium_qa = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True,
chain_type_kwargs={"prompt": custom_prompt}
)
# 3. 高级配置 - 完全自定义
advanced_combine_chain = StuffDocumentsChain(
llm_chain=LLMChain(llm=llm, prompt=advanced_prompt),
document_variable_name="context",
document_prompt=custom_doc_prompt
)
advanced_qa = RetrievalQA(
combine_documents_chain=advanced_combine_chain,
retriever=retriever,
return_source_documents=True
)
5. 替代方案与优化空间
5.1 替代实现方案:多种RAG实现路径对比
方案1:手动串联Retriever与LLMChain
class ManualRAGChain:
"""手动实现的RAG链"""
def __init__(self, retriever, llm, prompt_template):
self.retriever = retriever
self.llm = llm
self.prompt_template = prompt_template
def invoke(self, query: str) -> dict:
# 1. 手动检索
docs = self.retriever.get_relevant_documents(query)
# 2. 手动构建上下文
context = "\n\n".join([doc.page_content for doc in docs])
# 3. 手动生成提示词
prompt = self.prompt_template.format(context=context, question=query)
# 4. 手动调用LLM
answer = self.llm.invoke(prompt)
return {"result": answer, "source_documents": docs}
# 优劣势对比
manual_pros_cons = {
"优势": [
"完全控制每个步骤",
"可以实现高度定制化逻辑",
"没有框架依赖",
"性能开销更小"
],
"劣势": [
"需要重复实现基础逻辑",
"缺乏标准化的错误处理",
"没有内置的优化机制",
"维护成本高",
"缺乏生态系统支持"
]
}
方案2:使用其他框架的RAG组件
# LlamaIndex RAG实现
from llama_index import VectorStoreIndex, SimpleDirectoryReader
from llama_index.query_engine import RetrieverQueryEngine
class LlamaIndexRAG:
"""基于LlamaIndex的RAG实现"""
def __init__(self, documents_path):
# 加载文档
documents = SimpleDirectoryReader(documents_path).load_data()
# 创建索引
self.index = VectorStoreIndex.from_documents(documents)
# 创建查询引擎
self.query_engine = self.index.as_query_engine()
def query(self, question: str):
return self.query_engine.query(question)
# 框架对比
framework_comparison = {
"LangChain": {
"优势": ["生态丰富", "组件化设计", "灵活配置"],
"劣势": ["学习曲线陡峭", "版本更新频繁"]
},
"LlamaIndex": {
"优势": ["专注RAG", "简单易用", "性能优化"],
"劣势": ["功能相对单一", "扩展性有限"]
},
"Haystack": {
"优势": ["企业级特性", "生产就绪", "多语言支持"],
"劣势": ["配置复杂", "资源消耗大"]
}
}
5.2 优化方向:三维度提升策略
检索质量优化
class EnhancedRetrieval:
"""检索质量优化策略"""
def __init__(self, vectorstore, embeddings):
self.vectorstore = vectorstore
self.embeddings = embeddings
def hybrid_search(self, query: str, k: int = 5) -> List[Document]:
"""混合检索:向量搜索 + 关键词搜索"""
# 1. 向量相似度搜索
vector_docs = self.vectorstore.similarity_search(query, k=k)
# 2. 关键词搜索(简化实现)
keyword_docs = self._keyword_search(query, k=k)
# 3. 结果融合和重排序
combined_docs = self._merge_and_rerank(vector_docs, keyword_docs, query)
return combined_docs[:k]
def _keyword_search(self, query: str, k: int) -> List[Document]:
"""基于关键词的搜索"""
# 实际实现中可以使用BM25或其他关键词搜索算法
keywords = query.lower().split()
# 简化实现...
return []
def _merge_and_rerank(self, vector_docs: List[Document],
keyword_docs: List[Document], query: str) -> List[Document]:
"""结果融合和重排序"""
# 使用RRF (Reciprocal Rank Fusion) 算法
doc_scores = {}
# 向量搜索结果评分
for i, doc in enumerate(vector_docs):
doc_id = id(doc)
doc_scores[doc_id] = doc_scores.get(doc_id, 0) + 1 / (i + 1)
# 关键词搜索结果评分
for i, doc in enumerate(keyword_docs):
doc_id = id(doc)
doc_scores[doc_id] = doc_scores.get(doc_id, 0) + 1 / (i + 1)
# 按分数排序
all_docs = {id(doc): doc for doc in vector_docs + keyword_docs}
sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
return [all_docs[doc_id] for doc_id, score in sorted_docs]
def query_expansion(self, query: str) -> str:
"""查询扩展:丰富查询词汇"""
# 使用同义词、相关词扩展查询
expanded_terms = []
# 1. 添加同义词
synonyms = self._get_synonyms(query)
expanded_terms.extend(synonyms)
# 2. 添加相关概念
related_concepts = self._get_related_concepts(query)
expanded_terms.extend(related_concepts)
# 3. 构建扩展查询
expanded_query = f"{query} {' '.join(expanded_terms)}"
return expanded_query
def _get_synonyms(self, query: str) -> List[str]:
"""获取同义词(简化实现)"""
synonym_dict = {
"机器学习": ["ML", "人工智能", "AI"],
"深度学习": ["神经网络", "DL"],
# 更多同义词映射...
}
synonyms = []
for word in query.split():
if word in synonym_dict:
synonyms.extend(synonym_dict[word])
return synonyms
def _get_related_concepts(self, query: str) -> List[str]:
"""获取相关概念"""
# 可以使用词嵌入模型找到相关概念
# 这里提供简化实现
concept_dict = {
"机器学习": ["监督学习", "无监督学习", "强化学习"],
"深度学习": ["卷积神经网络", "循环神经网络", "Transformer"],
}
concepts = []
for word in query.split():
if word in concept_dict:
concepts.extend(concept_dict[word])
return concepts
生成效果提升
class EnhancedGeneration:
"""生成效果提升策略"""
def __init__(self, llm):
self.llm = llm
def dynamic_prompt_template(self, query: str, docs: List[Document]) -> str:
"""动态提示词模板生成"""
# 根据查询类型和文档内容动态调整提示词
query_type = self._classify_query(query)
doc_types = self._analyze_document_types(docs)
if query_type == "definition":
template = """基于以下资料,请提供准确的定义和解释:
资料:
{context}
问题:{question}
请按以下格式回答:
1. 核心定义:[简洁的定义]
2. 详细解释:[详细说明]
3. 关键特点:[列出主要特点]
4. 应用场景:[实际应用]"""
elif query_type == "how-to":
template = """基于以下资料,请提供详细的操作指南:
资料:
{context}
问题:{question}
请按以下格式回答:
1. 准备工作:[需要的准备]
2. 具体步骤:[详细步骤]
3. 注意事项:[重要提醒]
4. 常见问题:[可能遇到的问题]"""
else: # general
template = """基于以下资料回答问题:
资料:
{context}
问题:{question}
详细答案:"""
return template
def _classify_query(self, query: str) -> str:
"""查询类型分类"""
if any(word in query for word in ["是什么", "定义", "概念"]):
return "definition"
elif any(word in query for word in ["如何", "怎么", "步骤"]):
return "how-to"
elif any(word in query for word in ["为什么", "原因", "原理"]):
return "explanation"
else:
return "general"
def _analyze_document_types(self, docs: List[Document]) -> List[str]:
"""分析文档类型"""
doc_types = []
for doc in docs:
# 根据文档内容或元数据判断类型
content = doc.page_content.lower()
if "步骤" in content or "方法" in content:
doc_types.append("tutorial")
elif "定义" in content or "概念" in content:
doc_types.append("definition")
else:
doc_types.append("general")
return doc_types
def context_compression(self, docs: List[Document], query: str, max_tokens: int = 2000) -> str:
"""上下文压缩:提取最相关信息"""
# 1. 计算每个文档片段与查询的相关性
doc_relevance = []
for doc in docs:
relevance_score = self._calculate_relevance(doc.page_content, query)
doc_relevance.append((doc, relevance_score))
# 2. 按相关性排序
doc_relevance.sort(key=lambda x: x[1], reverse=True)
# 3. 逐步添加文档,直到达到token限制
compressed_context = ""
current_tokens = 0
for doc, score in doc_relevance:
doc_tokens = len(doc.page_content.split()) # 简化的token计算
if current_tokens + doc_tokens <= max_tokens:
compressed_context += f"\n\n{doc.page_content}"
current_tokens += doc_tokens
else:
# 如果剩余空间不足,截取部分内容
remaining_tokens = max_tokens - current_tokens
if remaining_tokens > 50: # 至少保留50个token的空间
partial_content = " ".join(doc.page_content.split()[:remaining_tokens])
compressed_context += f"\n\n{partial_content}..."
break
return compressed_context.strip()
def _calculate_relevance(self, content: str, query: str) -> float:
"""计算内容与查询的相关性"""
query_words = set(query.lower().split())
content_words = set(content.lower().split())
# 计算词汇重叠率
intersection = query_words.intersection(content_words)
union = query_words.union(content_words)
jaccard_similarity = len(intersection) / len(union) if union else 0
return jaccard_similarity
性能与成本控制
class PerformanceOptimization:
"""性能与成本控制策略"""
def __init__(self):
self.cache = {} # 简单的内存缓存
self.request_count = 0
self.cost_tracker = {"embedding_calls": 0, "llm_calls": 0}
def cached_retrieval(self, query: str, retriever, cache_ttl: int = 3600) -> List[Document]:
"""缓存检索结果"""
import hashlib
import time
# 生成查询的哈希值作为缓存键
query_hash = hashlib.md5(query.encode()).hexdigest()
cache_key = f"retrieval_{query_hash}"
# 检查缓存
if cache_key in self.cache:
cached_result, timestamp = self.cache[cache_key]
if time.time() - timestamp < cache_ttl:
return cached_result
# 执行检索
docs = retriever.get_relevant_documents(query)
# 缓存结果
self.cache[cache_key] = (docs, time.time())
return docs
def batch_embedding(self, texts: List[str], embeddings_model, batch_size: int = 100) -> List[List[float]]:
"""批量嵌入处理"""
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i + batch_size]
batch_embeddings = embeddings_model.embed_documents(batch_texts)
all_embeddings.extend(batch_embeddings)
# 更新成本跟踪
self.cost_tracker["embedding_calls"] += len(batch_texts)
return all_embeddings
def smart_chunking(self, documents: List[Document], target_chunk_size: int = 1000) -> List[Document]:
"""智能分块:基于语义边界"""
chunked_docs = []
for doc in documents:
content = doc.page_content
# 1. 按段落分割
paragraphs = content.split('\n\n')
# 2. 智能合并段落
current_chunk = ""
current_size = 0
for paragraph in paragraphs:
paragraph_size = len(paragraph.split())
if current_size + paragraph_size <= target_chunk_size:
current_chunk += f"\n\n{paragraph}" if current_chunk else paragraph
current_size += paragraph_size
else:
# 保存当前块
if current_chunk:
chunk_doc = Document(
page_content=current_chunk,
metadata={**doc.metadata, "chunk_id": len(chunked_docs)}
)
chunked_docs.append(chunk_doc)
# 开始新块
current_chunk = paragraph
current_size = paragraph_size
# 保存最后一个块
if current_chunk:
chunk_doc = Document(
page_content=current_chunk,
metadata={**doc.metadata, "chunk_id": len(chunked_docs)}
)
chunked_docs.append(chunk_doc)
return chunked_docs
def cost_aware_generation(self, query: str, docs: List[Document], llm, max_cost: float = 0.1) -> str:
"""成本感知的生成"""
# 估算成本
estimated_tokens = sum(len(doc.page_content.split()) for doc in docs) + len(query.split())
estimated_cost = estimated_tokens * 0.00002 # 假设每token成本
if estimated_cost > max_cost:
# 如果成本过高,减少文档数量或压缩内容
max_docs = int(max_cost / (estimated_cost / len(docs)))
docs = docs[:max_docs]
# 执行生成
context = "\n\n".join([doc.page_content for doc in docs])
prompt = f"基于以下上下文回答问题:\n\n{context}\n\n问题:{query}\n\n答案:"
response = llm.invoke(prompt)
# 更新成本跟踪
self.cost_tracker["llm_calls"] += 1
return response
def get_performance_stats(self) -> dict:
"""获取性能统计"""
return {
"cache_size": len(self.cache),
"total_requests": self.request_count,
"embedding_calls": self.cost_tracker["embedding_calls"],
"llm_calls": self.cost_tracker["llm_calls"],
"cache_hit_rate": self._calculate_cache_hit_rate()
}
def _calculate_cache_hit_rate(self) -> float:
"""计算缓存命中率"""
# 简化实现
return 0.75 # 假设75%的缓存命中率
总结
LangChain的RAG核心链(RetrievalQA和ConversationalRetrievalChain)通过系统性的封装和优化,将复杂的检索增强生成技术转化为开箱即用的工程组件。这种设计不仅大幅降低了RAG技术的使用门槛,还通过标准化的流程、深度的组件集成和灵活的配置选项,为开发者提供了从原型到生产的完整解决方案。
核心价值总结
- 技术门槛降低:将复杂的RAG流程封装为简单的API调用
- 最佳实践内置:集成了经过验证的提示词模板和处理策略
- 生态系统协同:与LangChain生态无缝集成,支持多种组件替换
- 扩展性保证:提供多层次的配置和自定义选项
应用建议
- 初学者:直接使用
RetrievalQA.from_llm()
快速构建RAG应用 - 进阶用户:通过
ConversationalRetrievalChain
实现多轮对话场景 - 高级开发者:基于核心链进行定制化扩展和优化
发展趋势
随着RAG技术的不断发展,LangChain的核心链也在持续演进,从传统的链式调用向更现代的Runnable接口迁移,提供更好的性能和更灵活的组合能力。开发者应关注新版本的迁移指南,逐步采用新的API设计。
通过深入理解和合理运用这些RAG核心链,开发者可以快速构建高质量的知识问答系统,为用户提供准确、相关且具有上下文感知能力的智能问答体验。
更多推荐
所有评论(0)