在这里插入图片描述

  【个人主页:玄同765

大语言模型(LLM)开发工程师中国传媒大学·数字媒体技术(智能交互与游戏设计)

深耕领域:大语言模型开发 / RAG知识库 / AI Agent落地 / 模型微调

技术栈:Python / LangChain/RAG(Dify+Redis+Milvus)| SQL/NumPy | FastAPI+Docker ️

工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案 

     

「让AI交互更智能,让技术落地更高效」

欢迎技术探讨/项目合作! 关注我,解锁大模型与智能交互的无限可能!


版本要求:本文基于 LangChain v1.0+ 编写,所有代码示例在 Python 3.10+ 和 LangChain 最新版本中测试通过。

核心内容:本文将深入介绍 LangChain Retrieval 模块的核心概念、v1.0 重大更新、以及如何使用最新的 create_retrieval_chain API 构建生产级 RAG 应用。


一、LangChain v1.0 Retrieval 模块概述

1.1 什么是 Retrieval 模块

Retrieval(检索)模块是 LangChain 框架中负责从外部数据源检索相关信息的核心组件。它是构建 RAG(Retrieval-Augmented Generation,检索增强生成)应用的基础,让大语言模型能够基于私有数据或实时信息回答问题。

1.2 Retrieval 的核心工作流程

┌─────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Document  │ -> │   Splitting │ -> │   Storage   │ -> │  Retrieval  │ -> │   Output    │
│   Loading   │    │  (Chunks)   │    │ (Vector DB) │    │  (Search)   │    │  (Answer)   │
└─────────────┘    └─────────────┘    └─────────────┘    └─────────────┘    └─────────────┘

五个核心阶段

  1. Document Loading(文档加载):从各种数据源加载文档
  2. Splitting(文本分割):将长文档切分为合适的块
  3. Storage(存储):使用嵌入模型将文本转换为向量并存储
  4. Retrieval(检索):根据查询检索相关文档
  5. Output(输出):将检索结果与LLM结合生成答案

1.3 v1.0 版本重大更新

LangChain v1.0 对 Retrieval 模块进行了重大重构,主要变化包括:

特性 v0.x 版本 v1.0+ 版本
链式构建 RetrievalQAConversationalRetrievalChain create_retrieval_chain
消息类型 字符串处理为主 BaseMessage 对象为核心
文本提取 .text() 方法 .text 属性
类型安全 较弱 强类型,更好的IDE支持
Python版本 3.8+ 3.10+
API稳定性 频繁变更 稳定的企业级API

关键变化详解

  1. 全新的 create_retrieval_chain API:统一的RAG链构建方式,替代了旧版的多个类
  2. 消息对象标准化:所有输入输出使用 BaseMessage 对象,.text() 方法变为 .text 属性
  3. 强类型支持:更好的类型提示和IDE自动补全
  4. Python 3.10+ 要求:利用新版Python的类型系统特性

二、核心组件详解

2.1 Document Loaders(文档加载器)

LangChain 支持从多种数据源加载文档:

from langchain_community.document_loaders import (
    TextLoader,
    PyPDFLoader,
    CSVLoader,
    UnstructuredHTMLLoader,
    DirectoryLoader
)

# 加载文本文件
loader = TextLoader("document.txt", encoding="utf-8")
docs = loader.load()

# 加载PDF文件
pdf_loader = PyPDFLoader("document.pdf")
pdf_docs = pdf_loader.load()

# 加载整个目录
 dir_loader = DirectoryLoader(
    "./data",
    glob="**/*.txt",
    loader_cls=TextLoader
)
all_docs = dir_loader.load()

常用加载器

加载器 用途 安装依赖
TextLoader 纯文本文件 内置
PyPDFLoader PDF文档 pypdf
CSVLoader CSV表格 内置
UnstructuredHTMLLoader HTML网页 unstructured
WebBaseLoader 网页URL beautifulsoup4
JSONLoader JSON数据 内置

2.2 Text Splitters(文本分割器)

将长文档分割成适合检索的块是RAG的关键步骤。

from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
    TokenTextSplitter
)

# 递归字符分割器(推荐)
recursive_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,        # 每个块的最大字符数
    chunk_overlap=200,      # 块之间的重叠字符数
    length_function=len,    # 长度计算函数
    separators=["\n\n", "\n", "。", " ", ""]  # 分割符优先级
)

chunks = recursive_splitter.split_documents(docs)

# 字符分割器
char_splitter = CharacterTextSplitter(
    separator="\n",
    chunk_size=1000,
    chunk_overlap=200
)

# Token分割器(基于模型token)
token_splitter = TokenTextSplitter(
    chunk_size=500,
    chunk_overlap=50,
    model_name="gpt-3.5-turbo"
)

分割器选择指南

分割器 适用场景 特点
RecursiveCharacterTextSplitter 通用场景 智能递归分割,保持语义完整性
CharacterTextSplitter 简单文本 按固定字符分割
TokenTextSplitter 精确控制token 基于模型的token计算
MarkdownHeaderTextSplitter Markdown文档 按标题层级分割
CodeSplitter 代码文件 按代码结构分割

2.3 Embedding Models(嵌入模型)

嵌入模型将文本转换为向量表示,是语义检索的基础。

from langchain_openai import OpenAIEmbeddings
from langchain_ollama import OllamaEmbeddings
from langchain_community.embeddings import HuggingFaceEmbeddings

# OpenAI嵌入模型
openai_embeddings = OpenAIEmbeddings(
    model="text-embedding-3-large",
    api_key="your-api-key"
)

# Ollama本地嵌入模型
ollama_embeddings = OllamaEmbeddings(
    model="nomic-embed-text"
)

# HuggingFace开源嵌入模型
hf_embeddings = HuggingFaceEmbeddings(
    model_name="BAAI/bge-large-zh-v1.5"
)

# 嵌入文本
texts = ["Hello world", "LangChain is great"]
embeddings = openai_embeddings.embed_documents(texts)

# 嵌入查询
query_embedding = openai_embeddings.embed_query("What is LangChain?")

推荐嵌入模型

模型 维度 适用场景 语言支持
text-embedding-3-large 3072 高质量检索 多语言
text-embedding-3-small 1536 平衡性能成本 多语言
BAAI/bge-large-zh-v1.5 1024 中文场景 中文优化
nomic-embed-text 768 本地部署 多语言

2.4 Vector Stores(向量数据库)

向量数据库负责存储和检索向量化的文档。

from langchain_chroma import Chroma
from langchain_community.vectorstores import FAISS
from langchain_community.vectorstores import Qdrant

# Chroma(推荐,轻量级本地数据库)
chroma_db = Chroma.from_documents(
    documents=chunks,
    embedding=openai_embeddings,
    persist_directory="./chroma_db"
)

# FAISS(Facebook开源,高性能)
faiss_db = FAISS.from_documents(
    documents=chunks,
    embedding=openai_embeddings
)
faiss_db.save_local("faiss_index")

# Qdrant(生产级,支持分布式)
qdrant_db = Qdrant.from_documents(
    documents=chunks,
    embedding=openai_embeddings,
    location=":memory:"  # 或 url="http://localhost:6333"
)

向量数据库对比

数据库 类型 特点 适用场景
Chroma 嵌入式 轻量、易用、本地优先 开发测试、小型应用
FAISS 嵌入式 Facebook开源、高性能 大规模本地检索
Qdrant 服务端 生产级、分布式 企业级应用
Pinecone 云服务 全托管、高可用 生产环境
Weaviate 混合 支持向量+语义搜索 复杂查询场景

2.5 Retrievers(检索器)

检索器负责从向量数据库中检索相关文档。

# 基础相似度检索
retriever = chroma_db.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 4}  # 返回前4个结果
)

# MMR(最大边际相关性)检索
mmr_retriever = chroma_db.as_retriever(
    search_type="mmr",
    search_kwargs={
        "k": 4,
        "fetch_k": 20,      # 候选池大小
        "lambda_mult": 0.5   # 多样性权重
    }
)

# 相似度阈值检索
threshold_retriever = chroma_db.as_retriever(
    search_type="similarity_score_threshold",
    search_kwargs={"score_threshold": 0.8}
)

# 执行检索
results = retriever.invoke("What is LangChain?")

检索策略对比

策略 特点 适用场景
similarity 纯向量相似度 标准检索场景
mmr 平衡相关性和多样性 需要多样化结果
similarity_score_threshold 质量过滤 需要高置信度结果

三、v1.0 新特性:create_retrieval_chain

3.1 旧版 API vs 新版 API

v0.x 旧方式(已废弃)

# 不推荐
from langchain.chains import RetrievalQA

qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=retriever
)

v1.0+ 新方式(推荐)

from langchain_classic.chains.combine_documents import create_stuff_documents_chain
from langchain_classic.chains.retrieval import create_retrieval_chain

# 创建文档处理链
document_chain = create_stuff_documents_chain(llm, prompt)

# 创建检索链
retrieval_chain = create_retrieval_chain(retriever, document_chain)

# 执行
response = retrieval_chain.invoke({"input": "What is LangChain?"})

3.2 create_retrieval_chain 详解

create_retrieval_chain 是 v1.0 引入的统一RAG链构建API,它将检索和生成两个步骤解耦:

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains.retrieval import create_retrieval_chain

# 1. 初始化LLM
llm = ChatOpenAI(
    model="gpt-4o",
    temperature=0.7,
    api_key="your-api-key"
)

# 2. 创建提示模板
prompt = ChatPromptTemplate.from_messages([
    ("system", """You are a helpful assistant. Use the following context to answer the question.
    If you don't know the answer, say you don't know.

    Context: {context}"""),
    ("human", "{input}")
])

# 3. 创建文档处理链
document_chain = create_stuff_documents_chain(
    llm=llm,
    prompt=prompt,
    document_separator="\n\n"  # 文档分隔符
)

# 4. 创建检索链
retrieval_chain = create_retrieval_chain(
    retriever=retriever,
    combine_docs_chain=document_chain
)

# 5. 执行查询
response = retrieval_chain.invoke({
    "input": "What is LangChain and what are its main features?"
})

# 6. 解析响应
print(f"Answer: {response['answer']}")
print(f"\nRetrieved documents: {len(response['context'])}")

响应结构

{
    "input": "用户原始问题",
    "context": [Document(...), Document(...)],  # 检索到的文档
    "answer": "生成的答案"
}

3.3 关键变化:消息对象处理

v1.0 中,所有文本内容都通过 BaseMessage 对象处理:

from langchain_core.messages import HumanMessage, AIMessage

# v0.x 方式(已废弃)
# result = chain.run("question")

# v1.0+ 方式
result = chain.invoke({"input": "question"})

# 提取文本(注意:.text() 方法变为 .text 属性)
# v0.x: answer = result.text()
# v1.0+: 
answer = result["answer"]

# 如果是消息对象
if hasattr(answer, 'text'):
    text = answer.text  # 属性,不是方法

四、完整实战:构建生产级RAG应用

4.1 完整代码示例

#!/usr/bin/env python3
"""
LangChain v1.0+ Retrieval 完整实战示例
构建一个基于私有文档的问答系统
"""

import os
from typing import List

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_classic.chains.combine_documents import create_stuff_documents_chain
from langchain_classic.chains.retrieval import create_retrieval_chain


class RAGApplication:
    """RAG应用类"""

    def __init__(self, openai_api_key: str):
        self.openai_api_key = openai_api_key
        self.vectorstore = None
        self.retriever = None
        self.chain = None

        # 初始化组件
        self._init_embeddings()
        self._init_llm()

    def _init_embeddings(self):
        """初始化嵌入模型"""
        self.embeddings = OpenAIEmbeddings(
            model="text-embedding-3-large",
            api_key=self.openai_api_key
        )

    def _init_llm(self):
        """初始化LLM"""
        self.llm = ChatOpenAI(
            model="gpt-4o-mini",
            temperature=0.7,
            api_key=self.openai_api_key
        )

    def load_documents(self, file_path: str) -> List:
        """加载文档"""
        loader = TextLoader(file_path, encoding="utf-8")
        documents = loader.load()
        print(f"Loaded {len(documents)} documents")
        return documents

    def split_documents(self, documents: List) -> List:
        """分割文档"""
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            length_function=len,
            separators=["\n\n", "\n", "。", " ", ""]
        )
        chunks = splitter.split_documents(documents)
        print(f"Split into {len(chunks)} chunks")
        return chunks

    def create_vectorstore(self, chunks: List, persist_dir: str = "./chroma_db"):
        """创建向量数据库"""
        self.vectorstore = Chroma.from_documents(
            documents=chunks,
            embedding=self.embeddings,
            persist_directory=persist_dir
        )
        print(f"Vectorstore created at {persist_dir}")

    def load_vectorstore(self, persist_dir: str = "./chroma_db"):
        """加载已有向量数据库"""
        self.vectorstore = Chroma(
            persist_directory=persist_dir,
            embedding_function=self.embeddings
        )
        print(f"Vectorstore loaded from {persist_dir}")

    def setup_retriever(self, k: int = 4):
        """设置检索器"""
        self.retriever = self.vectorstore.as_retriever(
            search_type="similarity",
            search_kwargs={"k": k}
        )
        print(f"Retriever setup with k={k}")

    def create_chain(self):
        """创建检索链"""
        # 创建提示模板
        prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a helpful assistant specialized in answering questions based on provided context.

            Guidelines:
            - Answer based only on the provided context
            - If the answer is not in the context, say "I don't have enough information to answer this question"
            - Be concise but complete
            - Cite specific parts of the context when possible

            Context:
            {context}"""),
            ("human", "{input}")
        ])

        # 创建文档处理链
        document_chain = create_stuff_documents_chain(
            llm=self.llm,
            prompt=prompt,
            document_separator="\n\n---\n\n"
        )

        # 创建检索链
        self.chain = create_retrieval_chain(
            retriever=self.retriever,
            combine_docs_chain=document_chain
        )
        print("RAG chain created successfully")

    def query(self, question: str) -> dict:
        """执行查询"""
        if not self.chain:
            raise ValueError("Chain not initialized. Call create_chain() first.")

        response = self.chain.invoke({"input": question})
        return response

    def chat_loop(self):
        """交互式问答循环"""
        print("\n" + "="*50)
        print("RAG Application Ready! Type 'exit' to quit.")
        print("="*50 + "\n")

        while True:
            question = input("Question: ").strip()

            if question.lower() in ["exit", "quit", "q"]:
                print("Goodbye!")
                break

            if not question:
                continue

            try:
                response = self.query(question)

                print(f"\nAnswer: {response['answer']}")
                print(f"\nSources ({len(response['context'])} documents retrieved):")
                for i, doc in enumerate(response['context'], 1):
                    print(f"  {i}. {doc.page_content[:100]}...")
                print()

            except Exception as e:
                print(f"Error: {e}")


def main():
    """主函数"""
    # 设置API密钥
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        api_key = input("Enter your OpenAI API key: ")

    # 创建RAG应用
    rag = RAGApplication(api_key)

    # 检查是否已有向量数据库
    if os.path.exists("./chroma_db"):
        print("Loading existing vectorstore...")
        rag.load_vectorstore()
    else:
        print("Creating new vectorstore...")
        # 加载文档(请确保有sample.txt文件)
        docs = rag.load_documents("sample.txt")
        chunks = rag.split_documents(docs)
        rag.create_vectorstore(chunks)

    # 设置检索器和链
    rag.setup_retriever(k=4)
    rag.create_chain()

    # 启动交互式问答
    rag.chat_loop()


if __name__ == "__main__":
    main()

4.2 使用LCEL构建自定义链

LangChain Expression Language (LCEL) 提供了一种更灵活的方式来构建RAG链:

from langchain_core.runnables import RunnablePassthrough, RunnableParallel

# 格式化文档的辅助函数
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# 使用LCEL构建RAG链
rag_chain = (
    RunnableParallel(
        {
            "context": retriever | format_docs,
            "input": RunnablePassthrough()
        }
    )
    | prompt
    | llm
    | StrOutputParser()
)

# 执行
answer = rag_chain.invoke("What is LangChain?")

4.3 带历史记录的对话式RAG

from langchain_core.messages import HumanMessage, AIMessage
from langchain_classic.chains.history_aware_retriever import create_history_aware_retriever

# 历史感知检索器提示
history_prompt = ChatPromptTemplate.from_messages([
    ("system", "Given the chat history and the latest user question, formulate a standalone question that can be understood without the chat history."),
    ("human", "{input}")
])

# 创建历史感知检索器
history_aware_retriever = create_history_aware_retriever(
    llm=llm,
    retriever=retriever,
    prompt=history_prompt
)

# 创建对话链
conversation_chain = create_retrieval_chain(
    retriever=history_aware_retriever,
    combine_docs_chain=document_chain
)

# 带历史记录的查询
chat_history = []
response = conversation_chain.invoke({
    "input": "What are its main features?",  # 指代前文的LangChain
    "chat_history": chat_history
})

# 更新历史
chat_history.extend([
    HumanMessage(content="What is LangChain?"),
    AIMessage(content=response["answer"])
])

五、高级技巧与最佳实践

5.1 检索优化技巧

1. 混合检索(向量 + 关键词)

from langchain_community.retrievers import BM25Retriever, EnsembleRetriever

# 创建BM25检索器
bm25_retriever = BM25Retriever.from_documents(documents)
bm25_retriever.k = 2

# 创建向量检索器
vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 2})

# 组合检索器
ensemble_retriever = EnsembleRetriever(
    retrievers=[bm25_retriever, vector_retriever],
    weights=[0.5, 0.5]
)

2. 多查询检索

from langchain_community.retrievers.multi_query import MultiQueryRetriever

multi_query_retriever = MultiQueryRetriever.from_llm(
    retriever=vectorstore.as_retriever(),
    llm=llm
)

3. 重排序(Reranking)

from langchain_community.retrievers import ContextualCompressionRetriever
from langchain_community.document_transformers import EmbeddingsRedundantFilter

# 创建压缩检索器
compressor = EmbeddingsRedundantFilter(embeddings=embeddings)
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=retriever
)

5.2 性能优化

1. 缓存嵌入

from langchain.embeddings import CacheBackedEmbeddings
from langchain.storage import LocalFileStore

# 创建本地缓存
store = LocalFileStore("./cache/")

# 包装嵌入模型
cached_embeddings = CacheBackedEmbeddings.from_bytes_store(
    underlying_embeddings=embeddings,
    document_embedding_cache=store,
    namespace="openai_embeddings"
)

2. 批处理

# 批量嵌入
texts = ["text1", "text2", "text3"]
embeddings_list = embeddings.embed_documents(texts, chunk_size=100)

5.3 评估RAG系统

from langchain.evaluation import (
    QAEvalChain,
    ContextQAEvalChain
)

# 创建评估链
eval_chain = QAEvalChain.from_llm(llm)

# 评估预测
examples = [
    {"query": "What is LangChain?", "answer": "A framework for LLM applications"}
]
predictions = [{"result": "LangChain is a Python framework"}]

graded_outputs = eval_chain.evaluate(examples, predictions)

六、常见问题与解决方案

6.1 检索质量不佳

问题:检索到的文档与问题不相关

解决方案
1. 调整 chunk_size 和 chunk_overlap
2. 尝试不同的 search_type(MMR vs Similarity)
3. 使用混合检索(向量 + BM25)
4. 添加查询重写步骤

6.2 上下文窗口超限

问题:检索到的文档太长,超出LLM上下文限制

解决方案
1. 减小 chunk_size
2. 减少检索数量 k
3. 使用Map-Reduce或Refine链
4. 使用支持长上下文的模型(如GPT-4 Turbo)

6.3 重复信息

问题:多个检索结果包含重复信息

解决方案
1. 使用 EmbeddingsRedundantFilter
2. 增加 chunk_overlap 的差异化
3. 使用MMR检索策略


七、总结

7.1 核心要点回顾

  1. Retrieval模块五阶段:Loading → Splitting → Storage → Retrieval → Output
  2. v1.0重大变化
  3. 使用 create_retrieval_chain 替代旧版Chain类
  4. 消息对象标准化,.text() 变为 .text 属性
  5. Python 3.10+ 要求
  6. 最佳实践
  7. 使用 RecursiveCharacterTextSplitter 进行智能分割
  8. 选择合适的嵌入模型和向量数据库
  9. 使用MMR或混合检索优化检索质量

7.2 版本选择建议

  • 新项目:直接使用 LangChain v1.0+
  • 现有项目:参考迁移指南逐步升级
  • 生产环境:v1.0 提供更稳定的API和更好的性能

7.3 学习资源


提示:本文所有代码示例均基于 LangChain v1.0+ 和 Python 3.10+。在实际使用前,请确保已安装最新版本的LangChain和相关依赖。

安装依赖

pip install langchain langchain-openai langchain-chroma langchain-community
pip install chromadb pypdf unstructured
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐