© 2026 DREAMVFIA UNION

一、引言:为什么需要RAG架构

在人工智能飞速发展的今天,大语言模型(LLM)已经展现出惊人的能力,能够进行复杂的推理、创作和对话。然而,LLM本身存在一个根本性的局限性:它的知识截止到训练日期,无法直接获取实时信息或访问私有领域的专业知识。想象一下,即使是最强大的GPT-4,它也无法回答关于你们公司内部规章制度的问题,因为它从未见过这些文档。这就是检索增强生成(Retrieval-Augmented Generation,RAG)技术诞生的背景。

RAG架构通过将外部知识检索与大语言模型生成能力相结合,有效地解决了上述问题。其核心思想简单而强大:首先,根据用户问题从私有知识库中检索相关信息;然后,将检索到的上下文信息与用户问题一起发送给LLM,让模型基于这些真实可靠的信息生成答案。这种架构不仅能够突破模型的知识局限,还能大幅减少“幻觉”问题,提高回答的准确性和可信度。

LangChain作为当前最流行的LLM应用开发框架,提供了完整的RAG开发工具链。从文档加载、文本分割、嵌入向量生成,到向量数据库存储、相似性检索,再到与LLM的集成,LangChain为开发者提供了一站式解决方案。本文将深入探讨RAG技术的各个环节,通过大量实战代码,帮助读者从零构建一个功能完善的知识库问答系统。无论你是刚刚接触LLM开发的新手,还是希望深入了解RAG架构的工程师,本文都将提供有价值的参考。

二、RAG技术原理解析

2.1 RAG的工作流程

RAG系统的完整工作流程可以分为三个主要阶段:数据准备阶段、检索阶段和生成阶段。理解这三个阶段的协作机制,对于构建高效的RAG系统至关重要。

在数据准备阶段,系统首先需要将各种格式的文档(如PDF、Word、HTML等)转换为可处理的文本格式。这个过程由文档加载器(Document Loader)完成。接下来,由于大多数文档内容较长,无法直接嵌入到LLM的上下文窗口中,需要使用文本分割器(Text Splitter)将文档切分成较小的文本块(Chunk)。每个文本块随后通过嵌入模型(Embedding Model)转换为向量表示,这些向量最终存储在向量数据库(Vector Store)中,以便后续的快速检索。

检索阶段是RAG系统的核心环节。当用户提出问题时,系统首先将问题转换为向量表示,然后在向量数据库中进行相似性搜索,找到与问题最相关的TOP-K个文本块。这些检索到的文本块作为上下文信息,与原始问题一起构成完整的提示(Prompt),发送给大语言模型。

生成阶段相对简单但至关重要。LLM接收包含问题和检索上下文的提示,利用其强大的推理能力生成最终答案。一个优秀的RAG系统还会返回检索到的信息来源,方便用户验证答案的准确性。

2.2 向量检索的核心原理

向量检索是RAG系统的技术核心,其基本原理是将文本转换为高维向量,通过计算向量之间的相似度来找到最相关的内容。这种方法相比传统的关键词匹配具有显著优势:它能够理解语义相似性,即使查询词与文档中的用词不完全相同,也能找到相关内容。

向量相似度计算主要有两种方法:余弦相似度(Cosine Similarity)和欧氏距离(Euclidean Distance)。余弦相似度衡量的是两个向量方向上的相似程度,取值范围为[-1, 1],值越接近1表示越相似。欧氏距离则是空间中两点之间的直线距离,值越小表示越相似。在实际应用中,余弦相似度更为常用,因为它对向量的长度不敏感,更适合文本嵌入的场景。

向量数据库是专门为高效存储和检索向量而设计的数据库系统。与传统的关系型数据库不同,向量数据库针对高维向量进行了优化,能够在海量数据中快速找到最相似的向量。主流的向量数据库包括Chroma、FAISS、Milvus、Pinecone、Weaviate等,它们各有特点,适用于不同的应用场景。

2.3 RAG的挑战与优化策略

尽管RAG架构看似简单,但在实际应用中面临着诸多挑战。首先是语义检索的准确性问题:简单的相似性搜索可能无法找到真正相关的文档,或者返回的文档虽然相似但无法回答用户问题。其次是上下文窗口的限制:LLM的上下文窗口长度有限,无法一次性输入所有检索到的文档,需要进行选择性取舍。第三是系统延迟问题:RAG系统涉及多个组件,任何一个环节的性能瓶颈都会影响整体响应时间。

针对这些挑战,业界已经发展出多种优化策略。在检索层面,可以采用混合检索(Hybrid Search)结合关键词和语义检索,使用 reranking 模型对初步检索结果进行二次排序。在上下文处理方面,可以采用上下文压缩(Context Compression)技术提取每个文档块中的关键信息,减少输入 token 数量。在系统架构方面,可以通过缓存、异步处理、批量优化等方式提升性能。理解这些优化策略,对于构建生产级别的RAG系统至关重要。

三、LangChain与向量数据库环境搭建

3.1 Python开发环境准备

在开始RAG实战之前,我们需要准备好Python开发环境。RAG开发涉及多个库的支持,包括LangChain、各种向量数据库客户端、文档处理库等。建议使用Python 3.10或更高版本,以获得最佳的库兼容性。

# 创建虚拟环境(推荐)
python -m venv rag-env

# 激活虚拟环境
# Windows:
rag-env\Scripts\activate
# Linux/Mac:
source rag-env/bin/activate

接下来安装RAG开发所需的核心依赖库。LangChain的模块化设计使得我们可以按需安装组件,避免引入不必要的依赖。

# LangChain核心包
pip install langchain-core

# LangChain主包
pip install langchain

# OpenAI集成(如果使用OpenAI的模型)
pip install langchain-openai

# 社区维护的集成包
pip install langchain-community

# 向量数据库
pip install faiss-cpu       # Facebook AI Similarity Search
pip install chromadb         # Chroma向量数据库
pip install qdrant-client   # Qdrant向量数据库

# 文档处理
pip install pypdf           # PDF处理
pip install python-docx     # Word文档处理
pip install beautifulsoup4  # HTML处理
pip install markdown         # Markdown处理

# 文本嵌入
pip install sentence-transformers  # HuggingFace嵌入模型

# 实用工具
pip install python-dotenv  # 环境变量管理
pip install numpy          # 数值计算
pip install tqdm           # 进度条

3.2 环境变量配置

在实际开发中,我们需要管理API密钥等敏感信息。建议使用python-dotenv库来管理环境变量,创建一个.env文件来存储配置。

# 创建.env文件
# 文件内容如下:
"""
OPENAI_API_KEY=sk-your-openai-api-key-here
ANTHROPIC_API_KEY=sk-ant-your-api-key-here
LANGSMITH_API_KEY=your-langsmith-key
HUGGINGFACEHUB_API_TOKEN=your-huggingface-token
"""

# 加载环境变量
from dotenv import load_dotenv
import os

# 加载.env文件
load_dotenv()

# 验证API密钥
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
    raise ValueError("请在.env文件中设置OPENAI_API_KEY")

print(f"OpenAI API Key已加载: {openai_api_key[:10]}...")

# 配置LangSmith(用于调试追踪)
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "rag-knowledge-base"

3.3 第一个RAG程序

让我们从最简单的RAG程序开始,理解整体工作流程。这个例子将演示如何加载文档、创建向量存储、执行检索并生成回答。

from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.prompts import PromptTemplate
import os
from dotenv import load_dotenv

load_dotenv()

# 第一步:准备文档内容
# 创建测试文档
test_document = """
量子计算是一种遵循量子力学规律调控量子信息单元进行计算的新型计算模式。
与传统的通用计算机使用比特(0或1)不同,量子计算机使用量子比特。
量子比特具有叠加态特性,可以同时处于0和1的叠加状态。
这使得量子计算机在处理某些特定问题时具有超越经典计算机的潜力。

量子计算的主要应用领域包括:
1. 密码学:Shor算法可以在多项式时间内分解大整数
2. 量子化学:模拟分子结构和化学反应
3. 优化问题:解决复杂的组合优化问题
4. 机器学习:加速某些机器学习算法的计算

量子计算面临的主要挑战包括:
- 量子退相干:量子态容易受到环境干扰
- 量子纠错:需要复杂的纠错机制来保证计算正确性
- 可扩展性:增加量子比特数量同时保持相干性非常困难
"""

# 保存测试文档
with open("quantum_computing.txt", "w", encoding="utf-8") as f:
    f.write(test_document)

# 第二步:加载文档
loader = TextLoader("quantum_computing.txt", encoding="utf-8")
documents = loader.load()
print(f"加载文档数量: {len(documents)}")
print(f"文档内容长度: {len(documents[0].page_content)}")

# 第三步:分割文档
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=50,
    separators=["\n\n", "\n", "。", "!", "?", " ", ""]
)
splits = text_splitter.split_documents(documents)
print(f"分割后文档块数量: {len(splits)}")

# 第四步:创建嵌入和向量存储
embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
vectorstore = Chroma.from_documents(
    documents=splits,
    embedding=embeddings,
    collection_name="quantum_knowledge"
)
print("向量数据库创建完成")

# 第五步:创建检索器
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

# 第六步:创建问答链
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)

# 定义系统提示
system_prompt = """你是一个专业的技术问答助手。请根据提供的上下文信息回答用户的问题。
要求:
1. 只根据提供的上下文回答,不要编造信息
2. 如果上下文中没有相关信息,请如实告知
3. 回答要清晰准确"""

prompt = PromptTemplate.from_template("""{context}

用户问题: {input}

请根据以上上下文回答用户问题。""")

# 创建文档处理链
question_answer_chain = create_stuff_documents_chain(llm, prompt)

# 创建完整检索链
rag_chain = create_retrieval_chain(retriever, question_answer_chain)

# 第七步:测试问答
questions = [
    "量子计算与传统计算有什么区别?",
    "量子计算有哪些应用领域?",
    "量子计算面临哪些挑战?"
]

for question in questions:
    print(f"\n{'='*50}")
    print(f"问题: {question}")
    result = rag_chain.invoke({"input": question})
    print(f"回答: {result['answer']}")
    print(f"检索到 {len(result['context'])} 个相关文档")

运行上述代码,你将看到RAG系统的完整工作流程输出。这个简单的示例涵盖了RAG开发的核心要素,后续章节我们将逐步深入每个组件的细节。

四、文档加载与处理

4.1 支持多种格式的文档加载器

LangChain提供了丰富的文档加载器,支持从各种数据源加载内容。了解这些加载器的使用方法是构建RAG系统的基础。

# 4.1.1 文本文件加载
from langchain_community.document_loaders import TextLoader

# 加载纯文本文件
text_loader = TextLoader("document.txt", encoding="utf-8")
text_docs = text_loader.load()
print(f"加载文本: {len(text_docs)} 个文档")

# 4.1.2 PDF文件加载
from langchain_community.document_loaders import PyPDFLoader, PDFPlumberLoader

# PyPDFLoader - 快速但功能有限
pypdf_loader = PyPDFLoader("document.pdf")
pypdf_docs = pypdf_loader.load()

# PDFPlumberLoader - 功能更丰富,可以提取表格等
plumber_loader = PDFPlumberLoader("document.pdf")
plumber_docs = plumber_loader.load()

print(f"PDF页数: {len(pypdf_docs)}")

# 4.1.3 Word文档加载
from langchain_community.document_loaders import Docx2txtLoader, UnstructuredWordDocumentLoader

# Docx2txtLoader - 提取文本内容
docx_loader = Docx2txtLoader("document.docx")
docx_docs = docx_loader.load()

# UnstructuredWordDocumentLoader - 保留更多结构信息
unstructured_loader = UnstructuredWordDocumentLoader("document.docx")
unstructured_docs = unstructured_loader.load()

# 4.1.4 HTML网页加载
from langchain_community.document_loaders import BSHTMLLoader, UnstructuredHTMLLoader

# BeautifulSoup HTML加载器
bs_loader = BSHTMLLoader("page.html")
bs_docs = bs_loader.load()

# Unstructured HTML加载器
html_loader = UnstructuredHTMLLoader("page.html")
html_docs = html_loader.load()

# 4.1.5 Markdown加载
from langchain_community.document_loaders import UnstructuredMarkdownLoader, MarkdownLoader

# Unstructured方式
md_unstructured = UnstructuredMarkdownLoader("readme.md")
md_docs_unstructured = md_unstructured.load()

# 专用Markdown加载器
md_loader = MarkdownLoader("readme.md")
md_docs = md_loader.load()

# 4.1.6 CSV文件加载
from langchain_community.document_loaders import CSVLoader

csv_loader = CSVLoader("data.csv", encoding="utf-8")
csv_docs = csv_loader.load()

# 4.1.7 目录批量加载
from langchain_community.document_loaders import DirectoryLoader

# 加载目录下所有txt文件
dir_loader = DirectoryLoader(
    "./documents",              # 目录路径
    glob="**/*.txt",           # 文件匹配模式
    loader_cls=TextLoader,      # 使用的加载器
    show_progress=True          # 显示进度条
)
all_docs = dir_loader.load()
print(f"目录下共加载 {len(all_docs)} 个文档")

4.2 文档元数据处理

在RAG系统中,元数据(Metadata)的合理使用可以显著提升检索效果。元数据是附加在每个文档块上的额外信息,可以用于过滤、分组等高级检索操作。

from langchain_community.document_loaders import TextLoader
from langchain.schema import Document

# 4.2.1 从加载器获取元数据
loader = TextLoader("article.txt", encoding="utf-8")
docs = loader.load()

# 每个Document对象包含page_content和metadata
for doc in docs:
    print(f"内容: {doc.page_content[:100]}...")
    print(f"元数据: {doc.metadata}")

# 4.2.2 自定义元数据
class CustomDocumentLoader:
    """自定义文档加载器,添加额外元数据"""
    
    def __init__(self, file_path: str, category: str, author: str):
        self.file_path = file_path
        self.category = category
        self.author = author
    
    def load(self):
        with open(self.file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # 创建带有自定义元数据的Document
        doc = Document(
            page_content=content,
            metadata={
                "source": self.file_path,
                "category": self.category,
                "author": self.author,
                "file_type": "txt",
                "processed_date": "2026-03-05"
            }
        )
        return [doc]

# 使用自定义加载器
loader = CustomDocumentLoader(
    file_path="technical_doc.txt",
    category="技术文档",
    author="DREAMVFIA"
)
custom_docs = loader.load()
print(f"自定义元数据: {custom_docs[0].metadata}")

4.3 文档分割策略详解

文档分割是RAG系统中至关重要的环节。分割策略直接影响检索效果:块太小可能导致信息不完整,块太大可能引入噪声且超出LLM的上下文窗口限制。

from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
    MarkdownHeaderTextSplitter,
    PythonCodeTextSplitter,
    Language
)

# 4.3.1 递归字符分割器(推荐)
# 按照分隔符列表依次尝试分割,灵活适应不同文本结构
recursive_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,           # 目标块大小
    chunk_overlap=200,          # 块之间的重叠(保持上下文连贯)
    length_function=len,        # 计算长度的函数
    separators=[                 # 分隔符列表(按优先级)
        "\n\n",                  # 段落分隔
        "\n",                    # 换行
        "。!?",                # 中文标点
        ".!?",                   # 英文标点
        " ",                     # 空格
        ""                       # 最小单位
    ],
    keep_separator=False        # 是否保留分隔符
)

# 4.3.2 字符分割器
# 按固定分隔符简单分割
char_splitter = CharacterTextSplitter(
    separator="\n\n",
    chunk_size=500,
    chunk_overlap=50,
    length_function=len,
    is_separator_regex=False
)

# 4.3.3 Markdown标题分割器
# 按照Markdown标题结构分割,保留标题层级信息
markdown_splitter = MarkdownHeaderTextSplitter(
    headers_to_split_on=[
        ("#", "H1"),
        ("##","H2"),
        ("###", "H3"),
    ]
)

markdown_text = """
# 第一章 量子计算基础

## 1.1 什么是量子计算

量子计算是一种基于量子力学原理的计算方式。

## 1.2 量子比特

量子比特是量子计算的基本单位。

### 1.2.1 叠加态

量子比特可以同时处于0和1的叠加态。

### 1.2.2 纠缠态

多个量子比特可以处于纠缠状态。
"""

md_header_docs = markdown_splitter.split_text(markdown_text)
for doc in md_header_docs:
    print(f"内容: {doc.page_content[:50]}...")
    print(f"元数据: {doc.metadata}")
    print("-" * 30)

# 4.3.4 代码专用分割器
python_splitter = PythonCodeTextSplitter(
    chunk_size=500,
    chunk_overlap=50
)

python_code = """
def calculate_factorial(n):
    if n <= 1:
        return 1
    return n * calculate_factorial(n-1)

def fibonacci(n):
    if n <= 1:
        return n
    return fibonacci(n-1) + fibonacci(n-2)
"""

code_docs = python_splitter.split_text(python_code)
print("代码分割结果:")
for doc in code_docs:
    print(doc.page_content)
    print("-" * 20)

4.4 大规模文档处理

当需要处理大量文档时,需要考虑效率和资源管理问题。以下是一些实用的处理模式。

from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from typing import List
import os
from tqdm import tqdm

def process_documents_directory(
    directory: str,
    chunk_size: int = 1000,
    chunk_overlap: int = 200,
    extensions: List[str] = None
) -> List:
    """
    处理目录下所有文档
    
    参数:
        directory: 文档目录路径
        chunk_size: 文本块大小
        chunk_overlap: 块重叠大小
        extensions: 要处理的文件扩展名列表
    
    返回:
        处理后的文档块列表
    """
    
    if extensions is None:
        extensions = ["txt", "md", "pdf", "docx", "html"]
    
    all_documents = []
    
    # 获取所有支持的文件
    file_patterns = [f"**/*.{ext}" for ext in extensions]
    
    for pattern in file_patterns:
        loader = DirectoryLoader(
            directory,
            glob=pattern,
            loader_cls=TextLoader,
            show_progress=True
        )
        try:
            docs = loader.load()
            all_documents.extend(docs)
            print(f"从 {pattern} 加载了 {len(docs)} 个文档")
        except Exception as e:
            print(f"加载 {pattern} 时出错: {e}")
    
    print(f"\n总共加载了 {len(all_documents)} 个文档")
    
    # 分割文档
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        separators=["\n\n", "\n", "。", "!", "?", " ", ""]
    )
    
    splits = text_splitter.split_documents(all_documents)
    print(f"分割后得到 {len(splits)} 个文本块")
    
    return splits

def create_vectorstore_from_documents(
    documents: List,
    collection_name: str = "knowledge_base",
    persist_directory: str = "./chroma_db"
):
    """
    从文档列表创建向量数据库
    
    参数:
        documents: 文档块列表
        collection_name: 集合名称
        persist_directory: 持久化存储路径
    """
    
    # 使用OpenAI嵌入
    embeddings = OpenAIEmbeddings(
        model="text-embedding-ada-002"
    )
    
    # 创建向量数据库(带持久化)
    vectorstore = Chroma.from_documents(
        documents=documents,
        embedding=embeddings,
        collection_name=collection_name,
        persist_directory=persist_directory
    )
    
    print(f"向量数据库已创建,包含 {vectorstore._collection.count()} 个向量")
    
    return vectorstore

# 使用示例
# splits = process_documents_directory("./knowledge_base")
# vectorstore = create_vectorstore_from_documents(splits)

五、向量数据库深度实践

5.1 Chroma向量数据库

Chroma是当前最流行的开源向量数据库之一,它简单易用,与LangChain集成紧密。Chroma采用Python原生实现,部署简单,非常适合学习和开发环境使用。

from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.schema import Document
import os

# 5.1.1 从文档创建向量存储
documents = [
    Document(
        page_content="量子计算使用量子比特进行计算,具有叠加和纠缠特性",
        metadata={"source": "quantum.txt", "category": "physics"}
    ),
    Document(
        page_content="机器学习是人工智能的一个分支,通过数据训练模型",
        metadata={"source": "ml.txt", "category": "ai"}
    ),
    Document(
        page_content="深度学习是机器学习的子领域,使用神经网络模型",
        metadata={"source": "dl.txt", "category": "ai"}
    ),
    Document(
        page_content="自然语言处理是AI处理文本和语言的技术",
        metadata={"source": "nlp.txt", "category": "ai"}
    ),
]

embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")

# 创建向量数据库
chroma = Chroma.from_documents(
    documents=documents,
    embedding=embeddings,
    collection_name="tech_knowledge"
)

print(f"向量数量: {chroma._collection.count()}")

# 5.1.2 相似性检索
query = "什么是机器学习?"
results = chroma.similarity_search_with_score(query, k=2)

print(f"\n查询: {query}")
print("检索结果:")
for doc, score in results:
    print(f"- 内容: {doc.page_content}")
    print(f"  元数据: {doc.metadata}")
    print(f"  相似度分数: {score:.4f}")

# 5.1.3 带过滤条件的检索
# 使用元数据过滤
results_filtered = chroma.similarity_search_with_score(
    query="深度学习",
    k=2,
    filter={"category": "ai"}  # 只检索AI类别
)

print("\n过滤后的结果 (category=ai):")
for doc, score in results_filtered:
    print(f"- {doc.page_content}")

# 5.1.4 MMR(最大边际相关性)检索
# 在相关性和多样性之间取得平衡
results_mmr = chroma.max_marginal_relevance_search(
    query="AI技术",
    k=3,
    fetch_k=10,        # 从前10个结果中选择
    lambda_mult=0.5    # 0=只关注多样性, 1=只关注相关性
)

print("\nMMR检索结果:")
for doc in results_mmr:
    print(f"- {doc.page_content}")

# 5.1.5 持久化和加载
# 持久化存储
chroma.persist()

# 从已有存储加载
loaded_chroma = Chroma(
    collection_name="tech_knowledge",
    embedding_function=embeddings,
    persist_directory="./chroma_db"
)

print(f"\n从持久化存储加载: {loaded_chroma._collection.count()} 个向量")

5.2 FAISS向量数据库

FAISS(Facebook AI Similarity Search)是由Facebook开发的向量相似性搜索库,以其高性能和内存效率著称。FAISS特别适合需要处理海量向量的场景。

from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.schema import Document
import numpy as np

# 5.2.1 创建FAISS向量存储
documents = [
    Document(page_content="量子计算使用量子比特", metadata={"id": "1"}),
    Document(page_content="机器学习训练模型", metadata={"id": "2"}),
    Document(page_content="深度学习使用神经网络", metadata={"id": "3"}),
    Document(page_content="自然语言处理文本", metadata={"id": "4"}),
    Document(page_content="计算机视觉图像", metadata={"id": "5"}),
]

embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")

# 创建FAISS向量数据库
faiss = FAISS.from_documents(documents, embeddings)

print(f"向量数量: {faiss.index.ntotal}")

# 5.2.2 相似性检索
query = "神经网络和AI"
results = faiss.similarity_search_with_score(query, k=3)

print(f"\n查询: {query}")
for doc, score in results:
    print(f"- {doc.page_content} (分数: {score:.4f})")

# 5.2.3 获取向量进行底层操作
# FAISS允许直接访问底层索引
docsearch = faiss.as_retriever()

# 通过向量进行检索
query_embedding = embeddings.embed_query("深度学习")
results_by_vector = faiss.similarity_search_by_vector(query_embedding, k=2)

print("\n通过向量检索:")
for doc in results_by_vector:
    print(f"- {doc.page_content}")

# 5.2.4 合并多个向量存储
faiss1 = FAISS.from_texts(["文档1内容"], embeddings)
faiss2 = FAISS.from_texts(["文档2内容"], embeddings)

# 合并
faiss1.merge_from(faiss2)

print(f"\n合并后向量数量: {faiss1.index.ntotal}")

# 5.2.5 保存和加载
faiss.save_local("faiss_index")
loaded_faiss = FAISS.load_local("faiss_index", embeddings)

print(f"加载后向量数量: {loaded_faiss.index.ntotal}")

5.3 Qdrant向量数据库

Qdrant是一个高性能的向量数据库,支持云端部署和本地部署,提供RESTful API和gRPC接口。

from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Qdrant
from langchain.schema import Document
import os

# 5.3.1 连接到Qdrant
# 本地模式
qdrant = Qdrant.from_documents(
    documents=[
        Document(page_content="量子计算", metadata={"type": "tech"}),
        Document(page_content="机器学习", metadata={"type": "ai"}),
    ],
    embedding=OpenAIEmbeddings(),
    location=":memory:",  # 使用内存存储,或者指定路径
    collection_name="my_collection"
)

# 云端模式(需要URL和API Key)
# qdrant = Qdrant.from_documents(
#     documents=documents,
#     embedding=embeddings,
#     url="https://your-cluster.qdrant.io",
#     api_key="your-api-key",
#     collection_name="cloud_collection"
# )

# 5.3.2 检索操作
results = qdrant.similarity_search(
    query="什么是机器学习?",
    k=2
)

print("Qdrant检索结果:")
for doc in results:
    print(f"- {doc.page_content}")

# 5.3.3 带分数的检索
results_with_score = qdrant similarity_search_with_score(
    query="深度学习技术",
    k=2
)

for doc, score in results_with_score:
    print(f"- {doc.page_content} (分数: {score:.4f})")

5.4 向量数据库对比与选型

不同的向量数据库有不同的特点和适用场景,选择合适的数据库需要综合考虑多种因素。

# 向量数据库对比分析
"""
| 特性         | Chroma       | FAISS        | Qdrant       | Pinecone     |
|-------------|--------------|--------------|--------------|--------------|
| 部署方式     | 本地/嵌入     | 本地          | 本地/云端     | 云端         |
| 性能         | 中等         | 高            | 高           | 高           |
| 扩展性       | 有限         | 中等          | 优秀          | 优秀         |
| 易用性       | 简单         | 中等          | 中等          | 简单         |
| 过滤支持     | 支持         | 有限          | 支持          | 支持          |
| 云服务       | 无           | 无            | 有            | 有           |
| LangChain集成| 完整         | 完整          | 完整          | 完整          |
"""

# 选择建议
"""
开发/学习环境: Chroma (简单易用,零配置)

中小规模生产环境: FAISS (高性能,本地部署)

大规模生产环境: Qdrant / Pinecone (云端服务,弹性扩展)

需要严格数据隐私: FAISS / Chroma (完全本地部署)
"""

六、检索增强生成(RAG)链构建

6.1 基础RAG链

RAG链是将检索器和LLM组合的核心组件。LangChain提供了多种方式来构建RAG链,从简单的直接组合到复杂的多阶段处理。

from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.prompts import PromptTemplate, ChatPromptTemplate, MessagesPlaceholder
from langchain.schema import HumanMessage, SystemMessage
import os

# 准备测试数据
test_content = """
人工智能(Artificial Intelligence,AI)是计算机科学的一个分支,致力于开发能够模拟、延伸和扩展人类智能的理论、方法、技术和应用系统。

机器学习(Machine Learning,ML)是人工智能的核心,是实现智能化的关键技术之一。机器学习使计算机能够通过数据学习和改进经验,而无需被明确编程。

深度学习(Deep Learning,DL)是机器学习的一个子领域,使用具有多个隐藏层的人工神经网络来学习数据的表征。深度学习在图像识别、语音识别和自然语言处理等领域取得了突破性进展。

自然语言处理(Natural Language Processing,NLP)是人工智能和语言学的交叉领域,致力于使计算机能够理解、解释和生成人类语言。

计算机视觉(Computer Vision,CV)是使计算机能够从图像或视频中获取信息的技术。
"""

with open("ai_knowledge.txt", "w", encoding="utf-8") as f:
    f.write(test_content)

# 加载和处理文档
loader = TextLoader("ai_knowledge.txt", encoding="utf-8")
documents = loader.load()

splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=30)
splits = splitter.split_documents(documents)

# 创建向量存储
embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
vectorstore = Chroma.from_documents(splits, embeddings, collection_name="ai_kb")

# 6.1.1 最简单的RAG链
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)

# 基础问答提示
qa_prompt = PromptTemplate.from_template("""
基于以下上下文回答问题。如果上下文中没有相关信息,请如实告知。

上下文:
{context}

问题:{input}

回答:""")

# 创建文档处理链(将检索到的文档填充到提示中)
qa_chain = create_stuff_documents_chain(llm, qa_prompt)

# 创建完整检索链
rag_chain = create_retrieval_chain(
    vectorstore.as_retriever(search_kwargs={"k": 2}),
    qa_chain
)

# 执行
result = rag_chain.invoke({"input": "什么是深度学习?"})
print("基础RAG链结果:")
print(result["answer"])
print("\n" + "="*50)

# 6.1.2 带历史对话的RAG链
# 需要使用带有历史消息的提示模板
from langchain.chains import create_history_aware_retriever

# 先理解上下文的检索器
contextualize_q_prompt = PromptTemplate.from_template("""
给定聊天历史和当前问题,将问题改写为不依赖历史也能理解的独立问题。

聊天历史:
{chat_history}

当前问题:{input}

改写后的问题:""")

# 创建理解上下文的检索器
history_retriever = create_history_aware_retriever(
    llm,
    vectorstore.as_retriever(),
    contextualize_q_prompt
)

# 对话式问答提示
conversational_qa_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个专业的AI知识助手。请根据提供的上下文回答问题。"),
    MessagesPlaceholder(variable_name="chat_history", optional=True),
    ("human", "{input}")
])

# 创建对话式QA链
conversational_qa_chain = create_stuff_documents_chain(
    llm,
    conversational_qa_prompt
)

# 创建完整对话RAG链
conversational_rag = create_retrieval_chain(
    history_retriever,
    conversational_qa_chain
)

# 测试对话
result1 = conversational_rag.invoke({
    "input": "深度学习是什么?",
    "chat_history": []
})
print("对话RAG - 问题1:")
print(result1["answer"])

result2 = conversational_rag.invoke({
    "input": "它和机器学习有什么关系?",
    "chat_history": [
        HumanMessage(content="深度学习是什么?"),
        HumanMessage(content=result1["answer"])
    ]
})
print("\n对话RAG - 问题2:")
print(result2["answer"])

6.2 LCEL构建高级RAG链

LangChain Expression Language(LCEL)提供了一种声明式的方式来构建复杂的RAG链。使用LCEL,我们可以更灵活地组合各种组件。

from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.schema import StrOutputParser
from langchain.schema.runnable import RunnableParallel, RunnablePassthrough
import os

# 准备数据
test_content = """
Python是一种广泛使用的高级编程语言,由Guido van Rossum于1991年首次发布。
Python的设计哲学强调代码可读性和简洁的语法。
相比C++或Java,Python让开发者可以用更少的代码完成同样的想法。

Java是一种面向对象的编程语言,最初由James Gosling在1995年发布。
Java的口号是"一次编写,到处运行",这得益于其虚拟机概念。
Java广泛应用于企业级应用开发和Android应用开发。

JavaScript是一种脚本语言,最初用于网页开发,现在也用于服务器端开发。
JavaScript是Web前端开发的核心技术之一。
Node.js让JavaScript可以运行在服务器端。
"""

with open("programming.txt", "w", encoding="utf-8") as f:
    f.write(test_content)

# 数据处理
loader = TextLoader("programming.txt", encoding="utf-8")
docs = loader.load()
splits = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=50).split_documents(docs)

vectorstore = Chroma.from_documents(splits, OpenAIEmbeddings(), collection_name="prog")
retriever = vectorstore.as_retriever()

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)

# 6.2.1 使用LCEL构建RAG链
# 方式1:简单LCEL链
simple_rag = (
    {"context": retriever, "input": RunnablePassthrough()}
    | PromptTemplate.from_template("""
基于以下上下文回答问题。

上下文:{context}

问题:{input}

回答:""")
    | llm
    | StrOutputParser()
)

result = simple_rag.invoke("Python有什么特点?")
print("简单LCEL链结果:")
print(result)

# 6.2.2 带答案来源的RAG链
def format_docs(docs):
    """格式化检索到的文档"""
    formatted = []
    for i, doc in enumerate(docs, 1):
        formatted.append(f"来源{i}: {doc.page_content}")
    return "\n\n".join(formatted)

rag_with_sources = (
    {
        "context": retriever | format_docs,
        "input": RunnablePassthrough()
    }
    | PromptTemplate.from_template("""
基于以下上下文回答问题,并注明来源。

上下文:
{context}

问题:{input}

回答(注明来源):""")
    | llm
    | StrOutputParser()
)

result2 = rag_with_sources.invoke("Java的口号是什么?")
print("\n带来源的RAG链结果:")
print(result2)

# 6.2.3 多检索器RAG链
# 可以组合多个检索器的结果
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

# 创建压缩检索器
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=retriever
)

# 使用RunnableParallel组合多个检索结果
def combine_contexts(inputs):
    """组合多个检索器的结果"""
    docs1 = inputs.get("semantic", [])
    docs2 = inputs.get("compression", [])
    # 去重并组合
    combined = {doc.page_content: doc for doc in docs1 + docs2}
    return list(combined.values())

advanced_rag = RunnableParallel(
    semantic=retriever,
    compression=compression_retriever
) | combine_contexts | format_docs | (lambda x: {"context": x, "input": "用中文回答"}) | PromptTemplate.from_template("上下文:{context}\n问题:{input}\n回答:") | llm | StrOutputParser()

print("\n高级RAG链测试完成")

6.3 检索结果优化策略

在实际应用中,基础的相似性检索往往无法满足需求,需要采用多种优化策略来提升检索质量。

from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import (
    LLMChainExtractor,
    LLMChainFilter,
    EmbeddingsFilter,
    DocumentCompressorPipeline
)
from langchain_openai import ChatOpenAI
from langchain.schema import Document
import numpy as np

# 准备测试数据
docs = [
    Document(page_content="Python的创始人是谁?Python由Guido van Rossum创建。", metadata={"source": "python.txt"}),
    Document(page_content="Java由James Gosling于1995年发明。", metadata={"source": "java.txt"}),
    Document(page_content="Python是一种高级编程语言。", metadata={"source": "python.txt"}),
    Document(page_content="Java是一种面向对象的编程语言。", metadata={"source": "java.txt"}),
    Document(page_content="Guido van Rossum创建了Python。", metadata={"source": "python.txt"}),
    Document(page_content="James Gosling是Java的创始人。", metadata={"source": "java.txt"}),
]

embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
vectorstore = Chroma.from_documents(docs, embeddings, collection_name="test_kb")
base_retriever = vectorstore.as_retriever(search_kwargs={"k": 4})

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)

# 6.3.1 LLMChainExtractor - 提取最相关的部分
# 使用LLM从检索到的文档中提取最相关的片段
extractor = LLMChainExtractor.from_llm(llm)
compression_retriever_extractor = ContextualCompressionRetriever(
    base_compressor=extractor,
    base_retriever=base_retriever
)

print("LLMChainExtractor结果:")
docs_extracted = compression_retriever_extriever.get_relevant_documents("Python的创始人是谁?")
for doc in docs_extracted:
    print(f"- {doc.page_content}")

# 6.3.2 EmbeddingsFilter - 基于嵌入相似度过滤
# 过滤掉与查询相似度低于阈值的文档
embeddings_filter = EmbeddingsFilter(
    embeddings=embeddings,
    similarity_threshold=0.5
)

filter_retriever = ContextualCompressionRetriever(
    base_compressor=embeddings_filter,
    base_retriever=base_retriever
)

print("\nEmbeddingsFilter结果:")
docs_filtered = filter_retriever.get_relevant_documents("Python的创始人是谁?")
for doc in docs_filtered:
    print(f"- {doc.page_content}")

# 6.3.3 文档压缩管道
# 组合多个压缩器
pipeline_compressor = DocumentCompressorPipeline(
    transformers=[
        LLMChainFilter.from_llm(llm),  # 先用LLM过滤
        # 可以添加更多转换器
    ]
)

pipeline_retriever = ContextualCompressionRetriever(
    base_compressor=pipeline_compressor,
    base_retriever=base_retriever
)

print("\n压缩管道结果:")
docs_pipeline = pipeline_retriever.get_relevant_documents("Java的创始人")
for doc in docs_pipeline:
    print(f"- {doc.page_content}")

七、实战项目:企业级知识库问答系统

7.1 系统架构设计

本节我们将构建一个完整的企业级知识库问答系统,该系统将具备文档管理、智能问答、来源追溯、多轮对话等功能。

# 完整的企业知识库问答系统

import os
import tempfile
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
from dotenv import load_dotenv

# 导入LangChain组件
from langchain_community.document_loaders import (
    TextLoader, PyPDFLoader, Docx2txtLoader, 
    CSVLoader, DirectoryLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import OpenAIEmbeddings, HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma, FAISS
from langchain_openai import ChatOpenAI
from langchain.chains import create_retrieval_chain, create_history_aware_retriever
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.prompts import PromptTemplate, ChatPromptTemplate, MessagesPlaceholder
from langchain.memory import ConversationBufferMemory
from langchain.schema import Document, HumanMessage, AIMessage
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field

load_dotenv()

# ==================== 数据模型定义 ====================

class DocumentType(Enum):
    """支持的文档类型"""
    TEXT = "txt"
    PDF = "pdf"
    WORD = "docx"
    MARKDOWN = "md"
    CSV = "csv"

@dataclass
class KnowledgeConfig:
    """知识库配置"""
    collection_name: str = "enterprise_knowledge"
    embedding_model: str = "text-embedding-ada-002"
    chunk_size: int = 1000
    chunk_overlap: int = 200
    vectorstore_type: str = "chroma"  # chroma, faiss
    persist_directory: str = "./vectorstore"

@dataclass  
class QAPair:
    """问答对"""
    question: str
    answer: str
    sources: List[Dict[str, Any]] = field(default_factory=list)
    conversation_id: Optional[str] = None

# ==================== 文档处理器 ====================

class DocumentProcessor:
    """文档处理器 - 统一处理各种格式的文档"""
    
    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=[
                "\n\n", "\n", "。", "!", "?", 
                ".", "!", "?", " ", ""
            ],
            keep_separator=False
        )
    
    def load_document(self, file_path: str) -> List[Document]:
        """根据文件类型选择合适的加载器"""
        
        file_ext = os.path.splitext(file_path)[1].lower().replace(".", "")
        
        loaders = {
            "txt": TextLoader,
            "md": TextLoader,
            "pdf": PyPDFLoader,
            "docx": Docx2txtLoader,
            "csv": CSVLoader,
        }
        
        loader_class = loaders.get(file_ext)
        if not loader_class:
            raise ValueError(f"不支持的文件类型: {file_ext}")
        
        loader = loader_class(file_path, encoding="utf-8")
        return loader.load()
    
    def process_file(self, file_path: str, metadata: Dict = None) -> List[Document]:
        """处理单个文件"""
        docs = self.load_document(file_path)
        
        # 添加元数据
        for doc in docs:
            if metadata:
                doc.metadata.update(metadata)
            doc.metadata["source_file"] = os.path.basename(file_path)
        
        # 分割文档
        splits = self.text_splitter.split_documents(docs)
        
        return splits
    
    def process_directory(self, directory: str) -> List[Document]:
        """处理目录下所有支持的文件"""
        all_splits = []
        
        supported_extensions = ["txt", "md", "pdf", "docx", "csv"]
        
        for ext in supported_extensions:
            loader = DirectoryLoader(
                directory,
                glob=f"**/*.{ext}",
                loader_cls=TextLoader,
                show_progress=True
            )
            
            try:
                docs = loader.load()
                splits = self.text_splitter.split_documents(docs)
                all_splits.extend(splits)
            except Exception as e:
                print(f"处理 {ext} 文件时出错: {e}")
        
        return all_splits

# ==================== 向量存储管理器 ====================

class VectorStoreManager:
    """向量存储管理器"""
    
    def __init__(self, config: KnowledgeConfig):
        self.config = config
        self.embeddings = self._create_embeddings()
        self.vectorstore = None
    
    def _create_embeddings(self):
        """创建嵌入模型"""
        if self.config.embedding_model.startswith("text-embedding"):
            return OpenAIEmbeddings(
                model=self.config.embedding_model
            )
        else:
            # 使用HuggingFace本地模型
            return HuggingFaceEmbeddings(
                model_name=self.config.embedding_model
            )
    
    def create_from_documents(self, documents: List[Document]):
        """从文档创建向量存储"""
        
        if self.config.vectorstore_type == "chroma":
            self.vectorstore = Chroma.from_documents(
                documents=documents,
                embedding=self.embeddings,
                collection_name=self.config.collection_name,
                persist_directory=self.config.persist_directory
            )
        elif self.config.vectorstore_type == "faiss":
            self.vectorstore = FAISS.from_documents(
                documents=documents,
                embedding=self.embeddings
            )
        
        return self.vectorstore
    
    def get_retriever(self, search_kwargs: Dict = None):
        """获取检索器"""
        if not self.vectorstore:
            raise ValueError("向量存储未初始化")
        
        default_kwargs = {"k": 5}
        if search_kwargs:
            default_kwargs.update(search_kwargs)
        
        return self.vectorstore.as_retriever(
            search_type="similarity",
            search_kwargs=default_kwargs
        )
    
    def similarity_search(self, query: str, k: int = 5):
        """执行相似性搜索"""
        return self.vectorstore.similarity_search_with_score(query, k=k)
    
    def save(self):
        """保存向量存储"""
        if self.vectorstore and hasattr(self.vectorstore, 'persist'):
            self.vectorstore.persist()

# ==================== RAG链构建器 ====================

class RAGChainBuilder:
    """RAG链构建器"""
    
    def __init__(self, llm, retriever):
        self.llm = llm
        self.retriever = retriever
    
    def build_basic_chain(self):
        """构建基础RAG链"""
        
        system_prompt = """你是一个专业的企业知识助手。请根据提供的上下文信息准确回答用户的问题。

要求:
1. 只根据提供的上下文回答,不要编造信息
2. 回答要准确、简洁、清晰
3. 如果上下文中没有相关信息,请如实告知用户

上下文信息:
{context}"""
        
        prompt = PromptTemplate.from_template(system_prompt + "\n\n问题:{input}\n回答:")
        
        qa_chain = create_stuff_documents_chain(self.llm, prompt)
        rag_chain = create_retrieval_chain(self.retriever, qa_chain)
        
        return rag_chain
    
    def build_conversational_chain(self):
        """构建对话式RAG链(带记忆)"""
        
        # 上下文理解检索器
        contextualize_prompt = PromptTemplate.from_template(
            """给定聊天历史和当前问题,将问题改写为独立的问题,
            使其不依赖聊天历史也能被理解。
            
            聊天历史:
            {chat_history}
            
            当前问题:{input}
            
            改写后的问题:"""
        )
        
        history_retriever = create_history_aware_retriever(
            self.llm,
            self.retriever,
            contextualize_prompt
        )
        
        # 对话式问答提示
        system_prompt = """你是一个专业的企业知识助手。请根据提供的上下文和聊天历史回答用户的问题。

要求:
1. 只根据提供的上下文回答,不要编造信息
2. 如果上下文中没有相关信息,请如实告知"""
        
        qa_prompt = ChatPromptTemplate.from_messages([
            ("system", system_prompt),
            MessagesPlaceholder(variable_name="chat_history", optional=True),
            ("human", "{input}")
        ])
        
        qa_chain = create_stuff_documents_chain(self.llm, qa_prompt)
        
        conversational_rag = create_retrieval_chain(
            history_retriever,
            qa_chain
        )
        
        return conversational_rag

# ==================== 主应用类 ====================

class EnterpriseKnowledgeBase:
    """企业知识库主类"""
    
    def __init__(
        self,
        collection_name: str = "enterprise_kb",
        llm_model: str = "gpt-3.5-turbo",
        embedding_model: str = "text-embedding-ada-002"
    ):
        # 初始化LLM
        self.llm = ChatOpenAI(
            model=llm_model,
            temperature=0.3,
            max_tokens=2000
        )
        
        # 配置
        self.config = KnowledgeConfig(
            collection_name=collection_name,
            embedding_model=embedding_model
        )
        
        # 初始化组件
        self.doc_processor = DocumentProcessor(
            chunk_size=self.config.chunk_size,
            chunk_overlap=self.config.chunk_overlap
        )
        
        self.vector_manager = VectorStoreManager(self.config)
        
        # 对话历史
        self.conversation_history: Dict[str, List] = {}
        
        # RAG链
        self.basic_rag_chain = None
        self.conversational_rag_chain = None
    
    def ingest_document(self, file_path: str, metadata: Dict = None):
        """摄取文档到知识库"""
        
        # 处理文档
        splits = self.doc_processor.process_file(file_path, metadata)
        print(f"文档已分割为 {len(splits)} 个块")
        
        # 创建向量存储
        self.vector_manager.create_from_documents(splits)
        print("向量存储已创建")
        
        # 构建RAG链
        self._build_rag_chains()
        
        return len(splits)
    
    def ingest_directory(self, directory: str):
        """摄取整个目录"""
        
        splits = self.doc_processor.process_directory(directory)
        print(f"目录已处理,共 {len(splits)} 个文档块")
        
        self.vector_manager.create_from_documents(splits)
        self._build_rag_chains()
        
        return len(splits)
    
    def _build_rag_chains(self):
        """构建RAG链"""
        retriever = self.vector_manager.get_retriever()
        
        builder = RAGChainBuilder(self.llm, retriever)
        
        self.basic_rag_chain = builder.build_basic_chain()
        self.conversational_rag_chain = builder.build_conversational_chain()
    
    def ask(self, question: str) -> QAPair:
        """基础问答"""
        
        if not self.basic_rag_chain:
            raise ValueError("知识库未初始化,请先摄取文档")
        
        result = self.basic_rag_chain.invoke({"input": question})
        
        # 提取来源
        sources = []
        for doc in result.get("context", []):
            sources.append({
                "content": doc.page_content[:200] + "...",
                "metadata": doc.metadata
            })
        
        return QAPair(
            question=question,
            answer=result["answer"],
            sources=sources
        )
    
    def conversational_ask(self, question: str, conversation_id: str = "default") -> QAPair:
        """对话式问答"""
        
        if not self.conversational_rag_chain:
            raise ValueError("知识库未初始化")
        
        # 获取历史
        history = self.conversation_history.get(conversation_id, [])
        
        # 执行对话
        result = self.conversational_rag_chain.invoke({
            "input": question,
            "chat_history": history
        })
        
        # 更新历史
        history.extend([
            HumanMessage(content=question),
            AIMessage(content=result["answer"])
        ])
        self.conversation_history[conversation_id] = history[-10:]  # 保留最近10轮
        
        # 提取来源
        sources = []
        for doc in result.get("context", []):
            sources.append({
                "content": doc.page_content[:200] + "...",
                "metadata": doc.metadata
            })
        
        return QAPair(
            question=question,
            answer=result["answer"],
            sources=sources,
            conversation_id=conversation_id
        )
    
    def search_similar(self, query: str, k: int = 5):
        """相似性搜索"""
        return self.vector_manager.similarity_search(query, k=k)

# ==================== 使用示例 ====================

def main():
    """使用示例"""
    
    # 创建知识库实例
    kb = EnterpriseKnowledgeBase(
        collection_name="company_knowledge",
        llm_model="gpt-3.5-turbo"
    )
    
    # 准备测试文档
    test_content = """
    DREAMVFIA UNION 企业技术文档
    
    公司简介
    DREAMVFIA UNION是一家专注于人工智能技术研发和应用的科技公司。
    公司成立于2023年,总部位于北京。
    
    核心技术
    公司主要技术方向包括:
    1. 大语言模型应用开发
    2. 量子计算技术研究
    3. 机器学习平台开发
    4. 智能对话系统
    
    产品服务
    主要产品包括:
    - AI开发工具链(DREAMVFIA Tools)
    - 企业级知识管理系统
    - 智能客服平台
    - 量子计算模拟器
    
    联系方式
    邮箱: contact@dreamvfia.com
    电话: 400-888-8888
    地址: 北京市海淀区
    """
    
    # 保存测试文档
    with open("company_doc.txt", "w", encoding="utf-8") as f:
        f.write(test_content)
    
    # 摄取文档
    print("正在摄取文档...")
    kb.ingest_document("company_doc.txt", {"category": "公司介绍"})
    
    # 测试问答
    print("\n" + "="*50)
    print("基础问答测试")
    print("="*50)
    
    questions = [
        "DREAMVFIA UNION的主营业务是什么?",
        "公司有哪些产品?",
        "联系方式是什么?"
    ]
    
    for q in questions:
        result = kb.ask(q)
        print(f"\n问题: {q}")
        print(f"回答: {result.answer}")
        print(f"来源数量: {len(result.sources)}")
    
    # 测试对话
    print("\n" + "="*50)
    print("对话式问答测试")
    print("="*50)
    
    result1 = kb.conversational_ask("你们公司是做什么的?", "user_001")
    print(f"\n问题1: 你们公司是做什么的?")
    print(f"回答: {result1.answer}")
    
    result2 = kb.conversational_ask("刚才说的产品有哪些?", "user_001")
    print(f"\n问题2: 刚才说的产品有哪些?")
    print(f"回答: {result2.answer}")
    
    result3 = kb.conversational_ask("联系邮箱是什么?", "user_001")
    print(f"\n问题3: 联系邮箱是什么?")
    print(f"回答: {result3.answer}")

if __name__ == "__main__":
    main()

7.2 Streamlit Web界面

为了让知识库系统更易于使用,我们可以使用Streamlit快速构建Web界面。

# app.py - Streamlit Web界面

import streamlit as st
import os
from typing import List
from dotenv import load_dotenv
from langchain_community.document_loaders import TextLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI
from langchain.chains import create_retrieval_chain, create_history_aware_retriever
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.prompts import PromptTemplate, ChatPromptTemplate, MessagesPlaceholder
from langchain.memory import ConversationBufferMemory

load_dotenv()

# 页面配置
st.set_page_config(
    page_title="企业知识库助手",
    page_icon="📚",
    layout="wide"
)

# 初始化会话状态
if "vectorstore" not in st.session_state:
    st.session_state.vectorstore = None

if "messages" not in st.session_state:
    st.session_state.messages = []

if "memory" not in st.session_state:
    st.session_state.memory = ConversationBufferMemory(
        return_messages=True,
        output_key="answer",
        input_key="input"
    )

# 侧边栏 - 知识库管理
with st.sidebar:
    st.title("📚 知识库管理")
    
    # 文件上传
    uploaded_file = st.file_uploader(
        "上传知识文档",
        type=["txt", "pdf", "md"],
        help="支持TXT、PDF、Markdown格式"
    )
    
    if uploaded_file:
        if st.button("处理文档", type="primary"):
            with st.spinner("正在处理文档..."):
                # 保存上传文件
                with tempfile.NamedTemporaryFile(
                    delete=False, 
                    suffix=os.path.splitext(uploaded_file.name)[1]
                ) as tmp:
                    tmp.write(uploaded_file.getvalue())
                    tmp_path = tmp.name
                
                # 加载文档
                if uploaded_file.name.endswith('.pdf'):
                    loader = PyPDFLoader(tmp_path)
                else:
                    loader = TextLoader(tmp_path, encoding='utf-8')
                
                docs = loader.load()
                
                # 分割
                splitter = RecursiveCharacterTextSplitter(
                    chunk_size=1000,
                    chunk_overlap=200
                )
                splits = splitter.split_documents(docs)
                
                # 创建向量存储
                embeddings = OpenAIEmbeddings()
                st.session_state.vectorstore = Chroma.from_documents(
                    splits,
                    embeddings,
                    collection_name="knowledge_base"
                )
                
                st.success(f"文档处理完成!共 {len(splits)} 个知识块")
    
    st.divider()
    
    # 显示知识库状态
    if st.session_state.vectorstore:
        count = st.session_state.vectorstore._collection.count()
        st.success(f"✅ 知识库已加载 ({count} 个向量)")
    else:
        st.warning("⚠️ 请先上传文档")
    
    # 清空对话
    if st.button("清空对话历史"):
        st.session_state.messages = []
        st.session_state.memory.clear()
        st.rerun()

# 主界面
st.title("💬 企业知识库助手")

# 显示聊天历史
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])
        
        # 如果是助手消息,显示来源
        if message.get("sources") and message["role"] == "assistant":
            with st.expander("查看信息来源"):
                for i, source in enumerate(message["sources"], 1):
                    st.markdown(f"**来源{i}:** {source}")

# 用户输入
if prompt := st.chat_input("请输入您的问题..."):
    # 添加用户消息
    st.session_state.messages.append({"role": "user", "content": prompt})
    with st.chat_message("user"):
        st.markdown(prompt)
    
    # 生成回答
    with st.chat_message("assistant"):
        with st.spinner("思考中..."):
            if st.session_state.vectorstore is None:
                response = "请先在侧边栏上传文档来构建知识库!"
                sources = []
            else:
                # 获取检索器
                retriever = st.session_state.vectorstore.as_retriever(k=3)
                
                # 创建LLM
                llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.3)
                
                # 获取历史
                history = st.session_state.memory.load_memory_variables({}).get("history", [])
                
                # 构建对话链
                contextualize_prompt = PromptTemplate.from_template(
                    """给定聊天历史,生成独立的问题。
                    
                    历史: {chat_history}
                    问题: {input}
                    
                    独立问题:"""
                )
                
                history_retriever = create_history_aware_retriever(
                    llm, retriever, contextualize_prompt
                )
                
                system_prompt = """你是一个企业知识助手。根据知识库中的文档回答用户问题。"""
                
                qa_prompt = ChatPromptTemplate.from_messages([
                    ("system", system_prompt),
                    MessagesPlaceholder(variable_name="chat_history", optional=True),
                    ("human", "{input}")
                ])
                
                qa_chain = create_stuff_documents_chain(llm, qa_prompt)
                rag_chain = create_retrieval_chain(history_retriever, qa_chain)
                
                # 执行
                result = rag_chain.invoke({
                    "input": prompt,
                    "chat_history": history
                })
                
                response = result["answer"]
                sources = [
                    {"content": doc.page_content[:300] + "..."}
                    for doc in result.get("context", [])
                ]
                
                # 保存到记忆
                st.session_state.memory.save_context(
                    {"input": prompt},
                    {"answer": response}
                )
            
            st.markdown(response)
            
            assistant_message = {
                "role": "assistant",
                "content": response,
                "sources": sources
            }
            st.session_state.messages.append(assistant_message)

# 导入必要的库
import tempfile

运行应用:

pip install streamlit
streamlit run app.py

八、性能优化与生产部署

8.1 检索结果后处理

在实际应用中,简单的相似性检索往往不能直接满足需求,需要进行后处理以提高答案质量。

from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.prompts import PromptTemplate
from langchain.schema import Document
import numpy as np

# 8.1.1 MMR(最大边际相关性)优化
# 在相关性和多样性之间取得平衡

def mmr_retrieval(vectorstore, query: str, k: int = 5, fetch_k: int = 20, lambda_mult: float = 0.5):
    """
    MMR检索 - 平衡相关性和多样性
    
    参数:
        lambda_mult: 0=只关注多样性, 1=只关注相关性
    """
    return vectorstore.max_marginal_relevance_search(
        query,
        k=k,
        fetch_k=fetch_k,
        lambda_mult=lambda_mult
    )

# 8.1.2 相似度阈值过滤
def threshold_filtered_retrieval(vectorstore, query: str, k: int = 5, threshold: float = 0.7):
    """带阈值的检索"""
    results_with_score = vectorstore.similarity_search_with_score(query, k=k*2)
    
    filtered_results = [
        (doc, score) for doc, score in results_with_score
        if score < threshold  # Chroma使用距离作为分数,越小越相似
    ]
    
    return filtered_results[:k]

# 8.1.3 混合检索 - 结合关键词和向量检索
from langchain.retrievers import EnsembleRetriever
from langchain.retrievers.contextual_compression import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import EmbeddingsFilter

def create_hybrid_retriever(vectorstore, keyword_index=None):
    """创建混合检索器"""
    
    # 向量检索器
    vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
    
    # 关键词检索器(如果可用)
    # keyword_retriever = BM25Retriever.from_documents(...)
    
    # 组合检索器
    ensemble = EnsembleRetrievers(
        retrievers=[vector_retriever],
        weights=[1.0]  # 可以调整权重
    )
    
    return ensemble

# 8.1.4 重新排序(Reranking)
# 使用更精确的模型对初步检索结果进行排序
from langchain.schema import BaseRetriever, Document

class RerankRetriever(BaseRetriever):
    """带重新排序的检索器"""
    
    def __init__(self, base_retriever, reranker_llm, top_k: int = 3):
        self.base_retriever = base_retriever
        self.reranker = reranker_llm
        self.top_k = top_k
    
    def get_relevant_documents(self, query: str) -> List[Document]:
        # 获取更多候选
        candidates = self.base_retriever.get_relevant_documents(query)
        
        if len(candidates) <= self.top_k:
            return candidates
        
        # 使用LLM重新排序
        rerank_prompt = f"""请根据与问题的相关性对以下文档进行排序,只返回排序后的索引(从0开始),用逗号分隔。

问题: {query}

文档:
{chr(10).join([f"{i}. {doc.page_content[:200]}" for i, doc in enumerate(candidates)])}

排序结果:"""
        
        response = self.reranker.invoke(rerank_prompt)
        
        # 解析响应获取排序
        # ...解析逻辑
        
        return candidates[:self.top_k]
    
    async def aget_relevant_documents(self, query: str):
        return self.get_relevant_documents(query)

8.2 缓存策略与性能优化

from langchain.cache import InMemoryCache, SQLiteCache
from langchain.globals import set_llm_cache
import hashlib
import json
from datetime import timedelta
from typing import Optional

# 8.2.1 LLM响应缓存
# 内存缓存
set_llm_cache(InMemoryCache())

# SQLite缓存(持久化)
set_llm_cache(SQLiteCache(database_path=".llm_cache.db"))

# 8.2.2 向量检索结果缓存
class VectorCache:
    """向量检索结果缓存"""
    
    def __init__(self, ttl_seconds: int = 3600):
        self.cache = {}
        self.ttl = ttl_seconds
        import time
        self.time = time
    
    def _make_key(self, query: str, k: int) -> str:
        return hashlib.md5(f"{query}:{k}".encode()).hexdigest()
    
    def get(self, query: str, k: int):
        key = self._make_key(query, k)
        
        if key in self.cache:
            result, timestamp = self.cache[key]
            if self.time.time() - timestamp < self.ttl:
                return result
            else:
                del self.cache[key]
        
        return None
    
    def set(self, query: str, k: int, result):
        key = self._make_key(query, k)
        self.cache[key] = (result, self.time.time())

# 8.2.3 异步处理优化
import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncRAGSystem:
    """异步RAG系统"""
    
    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=4)
    
    async def aquery(self, question: str, vectorstore, llm, chain):
        # 异步执行检索和生成
        loop = asyncio.get_event_loop()
        
        # 并行执行检索
        docs = await loop.run_in_executor(
            self.executor,
            vectorstore.similarity_search,
            question,
            5
        )
        
        # 异步生成回答
        result = await loop.run_in_executor(
            self.executor,
            chain.invoke,
            {"input": question, "context": docs}
        )
        
        return result
    
    def batch_query(self, questions: list):
        """批量查询"""
        return asyncio.run(
            asyncio.gather(*[
                self aquery(q, self.vectorstore, self.llm, self.chain)
                for q in questions
            ])
        )

8.3 生产环境部署

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 设置环境变量
ENV PYTHONUNBUFFERED=1
ENV OPENAI_API_KEY=${OPENAI_API_KEY}

# 暴露端口
EXPOSE 8501

# 启动应用
CMD ["streamlit", "run", "app.py", "--server.address", "0.0.0.0"]
# docker-compose.yml
version: '3.8'

services:
  rag-app:
    build: .
    ports:
      - "8501:8501"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    volumes:
      - ./data:/app/data
      - ./vectorstore:/app/vectorstore
    deploy:
      resources:
        limits:
          memory: 4G
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]
# requirements.txt
langchain>=0.1.0
langchain-openai>=0.0.5
langchain-community>=0.0.10
chromadb>=0.4.0
faiss-cpu>=1.7.0
streamlit>=1.28.0
python-dotenv>=1.0.0
pypdf>=3.0.0
python-docx>=1.0.0
pydantic>=2.0.0

九、总结与展望

本文系统地介绍了使用LangChain和向量数据库构建RAG知识库的全流程技术。从RAG技术的基本原理出发,我们深入探讨了文档加载与处理、向量数据库的选择与使用、检索增强生成链的构建,以及完整的企业级知识库问答系统实现。

RAG架构作为当前LLM应用的主流方案,其重要性不言而喻。通过将私有知识与强大的语言模型能力相结合,RAG能够为企业提供安全、准确、可追溯的智能问答服务。随着技术的不断发展,RAG系统将会在更多场景得到应用,包括企业内部知识管理、客户服务自动化、专业领域问答等。

未来,RAG技术将沿着几个方向继续演进:首先是检索精度的持续提升,通过更先进的嵌入模型和检索算法来提高相关性;其次是多模态内容的支持,使系统能够处理图像、音频、视频等多种形式的数据;最后是与Agent的深度融合,使RAG系统具备更强大的推理和工具使用能力。


版权声明:© 2026 DREAMVFIA UNION

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐