摘要

本文将详细介绍如何使用 LangChain 框架开发一个本地知识智能客服系统。我们将构建一个支持多轮对话、基于本地知识库检索增强生成(RAG)的智能客服,并集成千问大模型。

1. 引言

随着大语言模型技术的快速发展,基于知识库的问答系统(RAG, Retrieval-Augmented Generation)已成为解决专业领域问答的有效方案。本文将介绍如何使用 LangChain 框架构建一个本地知识智能客服系统,实现文档加载、向量存储、检索增强问答和多轮对话记忆等功能。

本项目已开源,代码仓库地址如下,欢迎 Star & Fork!✨

2. 技术栈与架构

在这里插入图片描述

2.1 核心技术栈

  • LangChain: 框架核心,提供 RAG 链构建
  • FAISS: 向量数据库,用于高效相似性搜索
  • DashScope: 千问大模型 API 接口
  • Python 3.13: 开发环境

2.2 系统架构

用户输入 → 问题向量化 → 向量库检索 → 上下文构建 → LLM 生成 → 输出答案
     ↑                                    ↓
   记忆模块 ←←←←←←←←←←←←←←←←←←←←←←←←←←←←←

3. 项目实现

3.1 项目结构

blog_project/
├── main.py                 # 主程序入口
├── rag_system.py           # 核心RAG系统类
├── document_processor.py   # 文档处理
├── qwen_embedding.py       # 千问Embedding
├── faiss_vectorstore.py    # FAISS向量存储
├── qwen_llm.py             # 千问LLM和RAG链
├── data/                   # 存放txt知识库文件
├── vector_stores/          # 存放FAISS向量库
├── .env                    # 环境变量文件
└── requirements.txt        # 依赖包

3.2 核心代码实现

3.2.1 文档处理器 (document_processor.py)
import os
from typing import List
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

class DocumentProcessor:
    """处理文档加载和切分的类"""
    
    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
        """
        初始化文档处理器
        
        Args:
            chunk_size: 文本块大小
            chunk_overlap: 块重叠大小,保证上下文连续性
        """
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=self.chunk_overlap,
            length_function=len,
            is_separator_regex=False,
        )

    def load_documents(self, data_dir: str) -> List:
        """
        从指定目录加载所有txt文件
        
        Args:
            data_dir: 包含txt文件的目录路径
            
        Returns:
            加载的文档列表
        """
        documents = []
        for root, dirs, files in os.walk(data_dir):
            for file in files:
                if file.lower().endswith('.txt'):
                    file_path = os.path.join(root, file)
                    loader = TextLoader(file_path, encoding='utf-8')
                    documents.extend(loader.load())
        return documents

    def split_documents(self, documents: List) -> List:
        """
        切分文档
        
        Args:
            documents: 原始文档列表
            
        Returns:
            切分后的文档块列表
        """
        return self.text_splitter.split_documents(documents)
3.2.2 千问 Embedding 封装 (qwen_embedding.py)
from langchain_core.embeddings import Embeddings
from langchain_dashscope import DashScopeEmbeddings

class QwenEmbedding(Embeddings):
    """千问Embedding模型封装类"""
    
    def __init__(self, api_key: str, model: str = "text-embedding-v2"):
        """
        初始化千问Embedding
        
        Args:
            api_key: DashScope API密钥
            model: 使用的嵌入模型名称
        """
        self.embeddings = DashScopeEmbeddings(
            dashscope_api_key=api_key,
            model=model
        )

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """
        对文档列表进行嵌入
        
        Args:
            texts: 文档文本列表
            
        Returns:
            嵌入向量列表
        """
        return self.embeddings.embed_documents(texts)

    def embed_query(self, text: str) -> List[float]:
        """
        对単个查询进行嵌入
        
        Args:
            text: 查询文本
            
        Returns:
            嵌入向量
        """
        return self.embeddings.embed_query(text)
3.2.3 FAISS 向量存储 (faiss_vectorstore.py)
import os
from typing import List, Optional
from langchain_community.vectorstores import FAISS
from langchain_core.embeddings import Embeddings

class FAISSVectorStore:
    """FAISS向量存储和检索类"""
    
    def __init__(self, embeddings: Embeddings, vector_store_dir: str = "./vector_stores"):
        """
        初始化FAISS向量存储
        
        Args:
            embeddings: 嵌入模型实例
            vector_store_dir: 向量库保存目录
        """
        self.embeddings = embeddings
        self.vector_store_dir = vector_store_dir
        self.vector_store: Optional[FAISS] = None
        os.makedirs(vector_store_dir, exist_ok=True)
    
    def create_or_load_vector_store(self, docs_path: str = None) -> 'FAISSVectorStore':
        """
        创建或加载向量库
        
        Args:
            docs_path: 文档路径,如果向量库不存在且提供了此路径,则创建向量库
            
        Returns:
            自身实例
        """
        # 尝试加载已存在的向量库
        vector_store_path = os.path.join(self.vector_store_dir, "faiss_index")
        if os.path.exists(vector_store_path):
            print(f"Loading existing vector store from {vector_store_path}")
            self.vector_store = FAISS.load_local(
                vector_store_path, 
                self.embeddings, 
                allow_dangerous_deserialization=True
            )
        elif docs_path:
            print("No existing vector store found, creating new one...")
            # 此处需要文档processor来处理文档,将在rag_system中集成
            # 暂时返回实例,实际创建在后续方法中
        else:
            raise ValueError("No existing vector store found and no docs_path provided to create one.")
        
        return self
    
    def add_documents(self, documents: List):
        """
        添加文档到向量库
        
        Args:
            documents: 要添加的文档列表
        """
        if self.vector_store is None:
            # 首次创建
            self.vector_store = FAISS.from_documents(documents, self.embeddings)
        else:
            # 追加到现有向量库
            self.vector_store.add_documents(documents)
    
    def save_vector_store(self):
        """保存向量库到磁盘"""
        if self.vector_store:
            vector_store_path = os.path.join(self.vector_store_dir, "faiss_index")
            self.vector_store.save_local(vector_store_path)
            print(f"Vector store saved to {vector_store_path}")
    
    def similarity_search(self, query: str, k: int = 4) -> List:
        """
        相似性搜索
        
        Args:
            query: 查询文本
            k: 返回结果数量
            
        Returns:
            相似文档列表
        """
        if self.vector_store is None:
            raise ValueError("Vector store is not initialized.")
        return self.vector_store.similarity_search(query, k=k)
    
    def similarity_search_with_score(self, query: str, k: int = 4) -> List:
        """
        相似性搜索,返回相似度分数
        
        Args:
            query: 查询文本
            k: 返回结果数量
            
        Returns:
            (文档, 相似度分数) 元组列表
        """
        if self.vector_store is None:
            raise ValueError("Vector store is not initialized.")
        return self.vector_store.similarity_search_with_score(query, k=k)
3.2.4 核心 RAG 系统 (rag_system.py)
import os
from dotenv import load_dotenv
from document_processor import DocumentProcessor
from qwen_embedding import QwenEmbedding
from faiss_vectorstore import FAISSVectorStore
from qwen_llm import QwenLLM, RAGChainBuilder

# 加载环境变量
load_dotenv()

class LocalKnowledgeChatbot:
    """本地知识智能客服主类"""
    
    def __init__(self, 
                 data_dir: str = "./data",
                 vector_store_dir: str = "./vector_stores",
                 api_key: str = None,
                 embedding_model: str = "text-embedding-v2",
                 llm_model: str = "qwen-max",
                 chunk_size: int = 500,
                 chunk_overlap: int = 50):
        """
        初始化客服系统
        
        Args:
            data_dir: 知识库txt文件目录
            vector_store_dir: 向量库存储目录
            api_key: DashScope API密钥
            embedding_model: 嵌入模型名称
            llm_model: 大语言模型名称
            chunk_size: 文档切分块大小
            chunk_overlap: 文档切分重叠大小
        """
        self.data_dir = data_dir
        self.vector_store_dir = vector_store_dir
        self.api_key = api_key or os.getenv("DASHSCOPE_API_KEY")
        if not self.api_key:
            raise ValueError("API Key is required. Set DASHSCOPE_API_KEY in environment or pass as parameter.")
        
        # 初始化各组件
        self.doc_processor = DocumentProcessor(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
        self.embedding_model = QwenEmbedding(api_key=self.api_key, model=embedding_model)
        self.vector_store = FAISSVectorStore(embeddings=self.embedding_model.embeddings, vector_store_dir=vector_store_dir)
        self.qwen_llm = QwenLLM(api_key=self.api_key, model=llm_model)
        
        # 加载或创建向量库
        self.vector_store.create_or_load_vector_store(docs_path=self.data_dir if not os.path.exists(os.path.join(vector_store_dir, "faiss_index")) else None)
        
        # 如果向量库是新建的,需要处理文档并添加
        if self.vector_store.vector_store is None and os.path.exists(self.data_dir):
            print("Processing documents and building vector store...")
            raw_docs = self.doc_processor.load_documents(self.data_dir)
            split_docs = self.doc_processor.split_documents(raw_docs)
            self.vector_store.add_documents(split_docs)
            self.vector_store.save_vector_store()
        elif self.vector_store.vector_store is None:
            raise ValueError(f"No vector store found and no documents found in {data_dir} to create one.")
        
        # 构建RAG链
        self.rag_chain_builder = RAGChainBuilder(
            llm=self.qwen_llm.get_llm(),
            vector_store=self.vector_store.vector_store
        )
        self.qa_chain = self.rag_chain_builder.build_qa_chain()
    
    def ask(self, query: str) -> dict:
        """
        回答用户问题
        
        Args:
            query: 用户问题
            
        Returns:
            包含答案和来源文档的字典
        """
        result = self.qa_chain({"query": query})
        return {
            "answer": result["result"],
            "source_documents": result.get("source_documents", [])
        }
    
    def reset_memory(self):
        """重置对话记忆"""
        self.rag_chain_builder.memory.clear()

3.3 系统特性

3.3.1 模块化设讣
  • 可扩展性: 各个组件独立封装,便于扩展和维护
  • 可配置性: 支持自定义文档切分参数、模型参数等
  • 容错性: 包含完善的异常处理机制
3.3.2 核心功能
  • 文档处理: 支持批量加载和智能切分
  • 向量检索: 实现高效的相似性搜索
  • 对话记忆: 支持多轮对话上下文理解
  • 知识增强: 结合本地知识生成准确回答

4. 部署与使用

4.1 环境配置

# 1. 安装依赖
pip install -r requirements.txt

# 2. 配置API密钥
echo "DASHSCOPE_API_KEY=your_api_key" > .env

# 3. 放置知识库文件到 data/ 目录

4.2 启动系统

python main.py

4.3 系统优化建议

  1. 性能优化: 使用 GPU 加速向量化过程
  2. 知识库管理: 实现动态知识库更新机制
  3. 对话优化: 集成更高级的对话管理策略

5. 总结

本文详细介绍了基于 LangChain 的本地知识智能客服系统的设计与实现。该系统具有良好的模块化结构、扩展性和实用性,能够有效结合本地知识库和大语言模型,为用户提供准确、可靠的问答服务。

通过本文的实践,我们可以看到 LangChain 框架在构建复杂 AI 应用中的强大能力,特别是在处理文档理解、向量检索和对话管理等场景下的便利性。这种 RAG 架构为专业领域的知识问答提供了一种有效的解决方案。

项目资源

6. 参考资料

  1. LangChain 官方文档
  2. FAISS 向量数据库文档
  3. 阿里云百炼平台文档
  4. RAG 技术原理解析论文
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐