LangChain 极速入门与技术实战:Python AI应用新范式

LangChain 技术架构深度解析

LangChain 的核心价值在于其模块化设计哲学清晰的架构分层。理解其技术架构是构建复杂AI应用的基础。

1. 技术架构概览

┌─────────────────────────────────────────────────────┐
│                  应用层 (Application)                │
│  - 智能助手 │ 文档问答 │ 数据分析 │ 代码生成          │
├─────────────────────────────────────────────────────┤
│                 链式层 (Chains)                     │
│  - LLMChain │ SequentialChain │ RouterChain         │
├─────────────────────────────────────────────────────┤
│               组件抽象层 (Abstractions)              │
│  - 提示模板 │ 记忆系统 │ 检索器 │ 输出解析器         │
├─────────────────────────────────────────────────────┤
│                模型层 (Models)                       │
│  - LLMs │ ChatModels │ Embeddings                   │
├─────────────────────────────────────────────────────┤
│            集成与工具层 (Integrations)               │
│  - 向量数据库 │ API工具 │ 搜索引擎 │ 文件系统         │
└─────────────────────────────────────────────────────┘

2. 设计模式与模块化

LangChain 应用通常遵循以下设计模式:

# 完整应用架构示例:config.py
"""
配置管理模块 - 集中管理所有配置项
"""
import os
from typing import Dict, Any
from dotenv import load_dotenv

load_dotenv()

class Config:
    """配置管理器"""
    def __init__(self):
        self.openai_api_key = os.getenv("OPENAI_API_KEY")
        self.model_name = os.getenv("MODEL_NAME", "gpt-4")
        self.temperature = float(os.getenv("TEMPERATURE", "0.7"))
        self.vector_db_path = os.getenv("VECTOR_DB_PATH", "./chroma_db")
        
    def get_llm_config(self) -> Dict[str, Any]:
        """获取LLM配置"""
        return {
            "model": self.model_name,
            "temperature": self.temperature,
            "api_key": self.openai_api_key
        }

# 单例配置实例
config = Config()

核心组件详解与实战代码

1. 模型抽象层:多模型统一接口

# models_manager.py
"""
多模型管理器 - 支持动态切换和回退机制
"""
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_anthropic import ChatAnthropic
from langchain_google_genai import ChatGoogleGenerativeAI
from typing import Optional, Union, List
import logging

logger = logging.getLogger(__name__)

class ModelManager:
    """统一模型管理器"""
    
    def __init__(self, config: Config):
        self.config = config
        self._models = {}
        self._embeddings = None
        
    def get_chat_model(
        self, 
        provider: str = "openai",
        **kwargs
    ) -> Union[ChatOpenAI, ChatAnthropic, ChatGoogleGenerativeAI]:
        """获取聊天模型实例"""
        
        # 合并配置
        model_kwargs = {
            "temperature": self.config.temperature,
            **kwargs
        }
        
        if provider == "openai":
            if not self.config.openai_api_key:
                raise ValueError("OpenAI API key is required")
            model = ChatOpenAI(
                api_key=self.config.openai_api_key,
                model_name=self.config.model_name,
                **model_kwargs
            )
            
        elif provider == "anthropic":
            model = ChatAnthropic(**model_kwargs)
            
        elif provider == "google":
            model = ChatGoogleGenerativeAI(**model_kwargs)
            
        else:
            raise ValueError(f"Unsupported provider: {provider}")
            
        self._models[provider] = model
        return model
    
    def get_embeddings(self) -> OpenAIEmbeddings:
        """获取嵌入模型"""
        if self._embeddings is None:
            self._embeddings = OpenAIEmbeddings(
                api_key=self.config.openai_api_key
            )
        return self._embeddings
    
    def get_available_models(self) -> List[str]:
        """获取可用模型列表"""
        return list(self._models.keys())

# 使用示例
if __name__ == "__main__":
    from config import config
    
    manager = ModelManager(config)
    
    # 获取OpenAI模型
    openai_llm = manager.get_chat_model(
        provider="openai",
        temperature=0.5,
        max_tokens=1000
    )
    
    # 测试模型
    response = openai_llm.invoke("你好,请介绍一下自己")
    print(f"模型响应: {response.content}")

2. 高级提示工程与模板系统

# prompt_system.py
"""
智能提示系统 - 支持动态模板和上下文感知
"""
from langchain.prompts import (
    ChatPromptTemplate, 
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
    MessagesPlaceholder
)
from langchain.schema import BaseMessage, HumanMessage, AIMessage
from typing import Dict, List, Any
import json

class PromptSystem:
    """高级提示管理系统"""
    
    def __init__(self):
        self.templates = {}
        self._register_default_templates()
    
    def _register_default_templates(self):
        """注册默认模板"""
        
        # 1. 数据分析专家模板
        data_analyst_template = ChatPromptTemplate.from_messages([
            SystemMessagePromptTemplate.from_template(
                """你是一位资深数据分析专家,拥有10年行业经验。
                你的任务是根据用户提供的数据和分析需求,给出专业、准确的分析结果。
                
                分析原则:
                1. 数据驱动:所有结论必须有数据支持
                2. 业务导向:分析要服务于业务目标
                3. 可操作性:建议必须具体可执行
                
                当前数据上下文:
                {data_context}
                
                请开始你的分析。"""
            ),
            MessagesPlaceholder(variable_name="chat_history"),
            HumanMessagePromptTemplate.from_template("{query}")
        ])
        
        self.templates["data_analyst"] = data_analyst_template
        
        # 2. 代码生成模板
        code_gen_template = ChatPromptTemplate.from_messages([
            SystemMessagePromptTemplate.from_template(
                """你是一位经验丰富的软件工程师,擅长{language}编程。
                
                代码生成规范:
                1. 遵循PEP8/{language}最佳实践
                2. 包含必要的注释和文档字符串
                3. 考虑错误处理和边缘情况
                4. 代码要模块化、可复用
                
                需求:{requirements}
                
                请生成高质量的代码。"""
            ),
            HumanMessagePromptTemplate.from_template("{task}")
        ])
        
        self.templates["code_generator"] = code_gen_template
        
        # 3. 文档总结模板
        summary_template = ChatPromptTemplate.from_messages([
            SystemMessagePromptTemplate.from_template(
                """你是专业的文档总结助手,能够从复杂文档中提取关键信息。
                
                总结要求:
                1. 提取核心观点(3-5个)
                2. 保留关键数据和事实
                3. 识别行动建议
                4. 字数控制在{word_limit}字以内
                
                文档类型:{doc_type}
                目标读者:{audience}
                
                开始总结。"""
            ),
            HumanMessagePromptTemplate.from_template("{content}")
        ])
        
        self.templates["summarizer"] = summary_template
    
    def create_dynamic_prompt(
        self,
        template_name: str,
        **kwargs
    ) -> ChatPromptTemplate:
        """创建动态提示模板"""
        if template_name not in self.templates:
            raise ValueError(f"模板不存在: {template_name}")
        
        template = self.templates[template_name]
        return template.partial(**kwargs) if kwargs else template
    
    def format_with_context(
        self,
        template_name: str,
        query: str,
        context: Dict[str, Any],
        chat_history: List[BaseMessage] = None
    ) -> List[BaseMessage]:
        """带上下文的提示格式化"""
        prompt = self.create_dynamic_prompt(template_name)
        
        format_kwargs = {"query": query, **context}
        if chat_history:
            format_kwargs["chat_history"] = chat_history
        
        return prompt.format_messages(**format_kwargs)

# 使用示例
if __name__ == "__main__":
    prompt_system = PromptSystem()
    
    # 使用数据分析模板
    data_context = {
        "data_source": "销售数据库",
        "time_range": "2024年Q1",
        "metrics": ["销售额", "利润率", "客户增长率"]
    }
    
    messages = prompt_system.format_with_context(
        template_name="data_analyst",
        query="分析季度销售趋势并提出改进建议",
        context={"data_context": json.dumps(data_context, ensure_ascii=False)},
        chat_history=[
            HumanMessage(content="这是我们的销售数据"),
            AIMessage(content="我已了解数据概况")
        ]
    )
    
    print("生成的提示消息:")
    for msg in messages:
        print(f"{msg.type}: {msg.content[:100]}...")

3. 高级记忆系统实现

# memory_system.py
"""
智能记忆系统 - 支持短期/长期记忆和记忆检索
"""
from langchain.memory import (
    ConversationBufferMemory,
    ConversationSummaryMemory,
    VectorStoreRetrieverMemory
)
from langchain.schema import BaseMemory
from typing import Dict, Any, List, Optional
from datetime import datetime
import pickle
import hashlib

class AdvancedMemorySystem:
    """高级记忆管理系统"""
    
    def __init__(
        self,
        short_term_memory: BaseMemory = None,
        long_term_memory: BaseMemory = None,
        max_short_term_items: int = 20
    ):
        # 短期记忆(最近对话)
        self.short_term_memory = short_term_memory or ConversationBufferMemory(
            return_messages=True,
            memory_key="chat_history",
            output_key="output"
        )
        
        # 长期记忆(总结性记忆)
        self.long_term_memory = long_term_memory or ConversationSummaryMemory(
            llm=ChatOpenAI(temperature=0),
            memory_key="long_term_history"
        )
        
        self.max_short_term_items = max_short_term_items
        self.memory_file = "memory_store.pkl"
        self._load_memory()
    
    def _load_memory(self):
        """从文件加载记忆"""
        try:
            with open(self.memory_file, 'rb') as f:
                saved_memory = pickle.load(f)
                self.short_term_memory = saved_memory.get('short_term', self.short_term_memory)
                self.long_term_memory = saved_memory.get('long_term', self.long_term_memory)
        except FileNotFoundError:
            print("未找到记忆存储文件,创建新记忆系统")
    
    def save_memory(self):
        """保存记忆到文件"""
        memory_data = {
            'short_term': self.short_term_memory,
            'long_term': self.long_term_memory,
            'saved_at': datetime.now().isoformat()
        }
        with open(self.memory_file, 'wb') as f:
            pickle.dump(memory_data, f)
    
    def add_interaction(
        self,
        user_input: str,
        ai_response: str,
        metadata: Dict[str, Any] = None
    ):
        """添加交互到记忆系统"""
        
        # 添加到短期记忆
        self.short_term_memory.save_context(
            {"input": user_input},
            {"output": ai_response}
        )
        
        # 定期总结到长期记忆
        if self._should_summarize():
            self._update_long_term_memory()
        
        # 添加元数据
        if metadata:
            self._add_metadata(metadata)
    
    def _should_summarize(self) -> bool:
        """判断是否需要总结"""
        chat_history = self.short_term_memory.load_memory_variables({})
        messages = chat_history.get("chat_history", [])
        return len(messages) >= self.max_short_term_items
    
    def _update_long_term_memory(self):
        """更新长期记忆"""
        chat_history = self.short_term_memory.load_memory_variables({})
        summary_text = self._create_summary(chat_history)
        
        self.long_term_memory.save_context(
            {"input": "历史对话总结"},
            {"output": summary_text}
        )
        
        # 清空短期记忆
        self.short_term_memory.clear()
    
    def _create_summary(self, chat_history: Dict) -> str:
        """创建对话总结"""
        messages = chat_history.get("chat_history", [])
        conversation_text = "\n".join([
            f"{msg.type}: {msg.content}" 
            for msg in messages[-10:]  # 最近10条
        ])
        
        # 使用LLM生成总结
        summary_prompt = f"""请总结以下对话的核心内容和关键信息:
        
        {conversation_text}
        
        总结要求:
        1. 提取主要讨论主题
        2. 记录重要决定和结论
        3. 识别待办事项
        4. 字数不超过200字"""
        
        return summary_prompt  # 实际应用中应调用LLM生成总结
    
    def _add_metadata(self, metadata: Dict[str, Any]):
        """添加元数据"""
        metadata_file = "metadata.json"
        try:
            import json
            with open(metadata_file, 'r') as f:
                existing_metadata = json.load(f)
        except FileNotFoundError:
            existing_metadata = []
        
        existing_metadata.append({
            **metadata,
            "timestamp": datetime.now().isoformat()
        })
        
        with open(metadata_file, 'w') as f:
            json.dump(existing_metadata, f, indent=2)
    
    def get_memory_context(self) -> Dict[str, Any]:
        """获取记忆上下文"""
        short_term = self.short_term_memory.load_memory_variables({})
        long_term = self.long_term_memory.load_memory_variables({})
        
        return {
            "short_term_memory": short_term,
            "long_term_memory": long_term,
            "memory_summary": self.get_memory_summary()
        }
    
    def get_memory_summary(self) -> str:
        """获取记忆摘要"""
        short_term = self.short_term_memory.load_memory_variables({})
        messages = short_term.get("chat_history", [])
        
        return f"对话记忆:{len(messages)}条消息"
    
    def search_memory(self, query: str) -> List[Dict[str, Any]]:
        """在记忆中搜索相关内容"""
        all_memory = self.get_memory_context()
        results = []
        
        # 简单关键词搜索(实际应用可用向量搜索)
        for key, value in all_memory.items():
            if isinstance(value, str) and query.lower() in value.lower():
                results.append({
                    "source": key,
                    "content": value[:200] + "..." if len(value) > 200 else value
                })
        
        return results

# 使用示例
if __name__ == "__main__":
    memory_system = AdvancedMemorySystem()
    
    # 模拟对话
    conversations = [
        ("你好,我叫张三", "你好张三!很高兴认识你。"),
        ("我来自北京", "北京是个美丽的城市!"),
        ("我喜欢编程", "编程是个很有用的技能!"),
        ("我想学习Python", "Python是很好的入门语言。")
    ]
    
    for user_input, ai_response in conversations:
        memory_system.add_interaction(
            user_input=user_input,
            ai_response=ai_response,
            metadata={"topic": "自我介绍"}
        )
    
    # 获取记忆上下文
    context = memory_system.get_memory_context()
    print("记忆上下文:", context["memory_summary"])
    
    # 搜索记忆
    results = memory_system.search_memory("编程")
    print("搜索结果:", results)
    
    # 保存记忆
    memory_system.save_memory()

4. 智能代理系统架构

# agent_system.py
"""
智能代理系统 - 支持工具调用和任务分解
"""
from langchain.agents import (
    AgentExecutor,
    create_openai_functions_agent,
    create_react_agent
)
from langchain.tools import BaseTool, tool
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from typing import List, Dict, Any, Optional
import asyncio
from concurrent.futures import ThreadPoolExecutor
import json

class ToolRegistry:
    """工具注册器"""
    
    def __init__(self):
        self.tools = {}
        self._register_builtin_tools()
    
    def _register_builtin_tools(self):
        """注册内置工具"""
        
        @tool
        def calculator(expression: str) -> str:
            """计算数学表达式,支持加减乘除和常用函数"""
            try:
                # 安全地评估表达式
                allowed_names = {
                    'abs': abs, 'round': round, 'min': min, 'max': max,
                    'sum': sum, 'len': len
                }
                
                # 使用ast安全解析
                import ast
                node = ast.parse(expression, mode='eval')
                
                def _eval(node):
                    if isinstance(node, ast.Num):
                        return node.n
                    elif isinstance(node, ast.BinOp):
                        left = _eval(node.left)
                        right = _eval(node.right)
                        if isinstance(node.op, ast.Add):
                            return left + right
                        elif isinstance(node.op, ast.Sub):
                            return left - right
                        elif isinstance(node.op, ast.Mult):
                            return left * right
                        elif isinstance(node.op, ast.Div):
                            return left / right
                    elif isinstance(node, ast.Name):
                        if node.id in allowed_names:
                            return allowed_names[node.id]
                    elif isinstance(node, ast.Call):
                        func = _eval(node.func)
                        args = [_eval(arg) for arg in node.args]
                        return func(*args)
                    raise ValueError(f"不支持的表达式: {node}")
                
                result = _eval(node.body)
                return str(result)
                
            except Exception as e:
                return f"计算错误: {str(e)}"
        
        self.register_tool("calculator", calculator)
        
        @tool
        def web_search(query: str) -> str:
            """搜索网络信息"""
            # 实际应用中集成搜索引擎API
            return f"网络搜索结果(模拟): {query}"
        
        self.register_tool("web_search", web_search)
        
        @tool
        def data_analyzer(data_json: str, analysis_type: str) -> str:
            """分析数据"""
            try:
                data = json.loads(data_json)
                if analysis_type == "statistics":
                    stats = {
                        "count": len(data),
                        "keys": list(data[0].keys()) if data else []
                    }
                    return json.dumps(stats, ensure_ascii=False)
                else:
                    return "分析类型暂不支持"
            except Exception as e:
                return f"数据分析错误: {str(e)}"
        
        self.register_tool("data_analyzer", data_analyzer)
    
    def register_tool(self, name: str, tool_func: callable):
        """注册新工具"""
        self.tools[name] = tool_func
    
    def get_tool(self, name: str) -> Optional[BaseTool]:
        """获取工具"""
        return self.tools.get(name)
    
    def get_all_tools(self) -> List[BaseTool]:
        """获取所有工具"""
        return list(self.tools.values())

class IntelligentAgent:
    """智能代理"""
    
    def __init__(
        self,
        llm,
        tool_registry: ToolRegistry,
        agent_type: str = "react"
    ):
        self.llm = llm
        self.tool_registry = tool_registry
        self.agent_type = agent_type
        
        # 创建代理
        if agent_type == "react":
            self.agent = self._create_react_agent()
        elif agent_type == "openai_functions":
            self.agent = self._create_openai_functions_agent()
        else:
            raise ValueError(f"不支持的代理类型: {agent_type}")
        
        # 创建执行器
        self.executor = AgentExecutor(
            agent=self.agent,
            tools=tool_registry.get_all_tools(),
            verbose=True,
            handle_parsing_errors=True,
            max_iterations=5
        )
    
    def _create_react_agent(self):
        """创建ReAct代理"""
        prompt = ChatPromptTemplate.from_messages([
            ("system", """你是一个智能助手,可以调用工具解决问题。
            请遵循以下步骤:
            1. 理解问题
            2. 思考需要哪些工具
            3. 调用工具
            4. 分析结果
            5. 给出最终答案
            
            可用的工具:
            {tools}
            
            开始处理问题。"""),
            MessagesPlaceholder(variable_name="chat_history"),
            ("user", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad")
        ])
        
        return create_react_agent(
            llm=self.llm,
            tools=self.tool_registry.get_all_tools(),
            prompt=prompt
        )
    
    def _create_openai_functions_agent(self):
        """创建OpenAI函数代理"""
        prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个有帮助的助手"),
            MessagesPlaceholder(variable_name="chat_history"),
            ("user", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad")
        ])
        
        return create_openai_functions_agent(
            llm=self.llm,
            prompt=prompt,
            tools=self.tool_registry.get_all_tools()
        )
    
    async def ainvoke(self, input_text: str, **kwargs) -> Dict[str, Any]:
        """异步调用代理"""
        try:
            result = await self.executor.ainvoke({
                "input": input_text,
                "chat_history": kwargs.get("chat_history", [])
            })
            return {
                "success": True,
                "output": result.get("output", ""),
                "intermediate_steps": result.get("intermediate_steps", []),
                "execution_time": result.get("execution_time", 0)
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "output": ""
            }
    
    def invoke(self, input_text: str, **kwargs) -> Dict[str, Any]:
        """同步调用代理"""
        try:
            result = self.executor.invoke({
                "input": input_text,
                "chat_history": kwargs.get("chat_history", [])
            })
            return {
                "success": True,
                "output": result.get("output", ""),
                "intermediate_steps": result.get("intermediate_steps", [])
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "output": ""
            }
    
    def batch_process(self, inputs: List[str]) -> List[Dict[str, Any]]:
        """批量处理"""
        with ThreadPoolExecutor(max_workers=5) as executor:
            futures = [
                executor.submit(self.invoke, input_text)
                for input_text in inputs
            ]
            return [future.result() for future in futures]

# 使用示例
if __name__ == "__main__":
    from config import config
    from models_manager import ModelManager
    
    # 初始化
    model_manager = ModelManager(config)
    llm = model_manager.get_chat_model("openai")
    
    # 创建工具注册器和代理
    tool_registry = ToolRegistry()
    
    # 注册自定义工具
    @tool
    def weather_checker(city: str) -> str:
        """查询城市天气"""
        # 模拟天气数据
        weather_data = {
            "北京": "晴,15-25°C",
            "上海": "多云,18-28°C",
            "广州": "雨,22-30°C"
        }
        return weather_data.get(city, "城市天气信息暂不可用")
    
    tool_registry.register_tool("weather_checker", weather_checker)
    
    # 创建智能代理
    agent = IntelligentAgent(
        llm=llm,
        tool_registry=tool_registry,
        agent_type="react"
    )
    
    # 测试代理
    test_queries = [
        "计算 (15 + 23) * 2 的结果",
        "查询北京的天气",
        "分析这个数据 [{'name': 'Alice', 'age': 25}, {'name': 'Bob', 'age': 30}]"
    ]
    
    for query in test_queries:
        print(f"\n查询: {query}")
        result = agent.invoke(query)
        
        if result["success"]:
            print(f"结果: {result['output']}")
            if result.get("intermediate_steps"):
                print(f"中间步骤: {len(result['intermediate_steps'])}步")
        else:
            print(f"错误: {result['error']}")

5. 企业级RAG系统实现

# rag_system.py
"""
企业级RAG系统 - 检索增强生成
"""
from typing import List, Dict, Any, Optional
from langchain.document_loaders import (
    TextLoader,
    PDFLoader,
    WebBaseLoader,
    DirectoryLoader
)
from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter
)
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import (
    Chroma,
    FAISS,
    Pinecone
)
from langchain.retrievers import (
    MultiVectorRetriever,
    ContextualCompressionRetriever,
    EnsembleRetriever
)
from langchain.retrievers.document_compressors import (
    LLMChainExtractor,
    EmbeddingsFilter
)
from langchain.chains import RetrievalQA
from langchain.schema import Document
import os
import pickle

class EnterpriseRAGSystem:
    """企业级RAG系统"""
    
    def __init__(
        self,
        embedding_model=None,
        llm=None,
        vector_store_type: str = "chroma",
        persist_directory: str = "./vector_store"
    ):
        self.embedding_model = embedding_model or OpenAIEmbeddings()
        self.llm = llm
        self.vector_store_type = vector_store_type
        self.persist_directory = persist_directory
        self.vector_store = None
        self.retriever = None
        self.qa_chain = None
        
    def load_documents(
        self,
        source_path: str,
        source_type: str = "directory",
        **kwargs
    ) -> List[Document]:
        """加载文档"""
        
        loader_map = {
            "directory": DirectoryLoader,
            "pdf": PDFLoader,
            "text": TextLoader,
            "web": WebBaseLoader
        }
        
        if source_type not in loader_map:
            raise ValueError(f"不支持的文档类型: {source_type}")
        
        loader_class = loader_map[source_type]
        
        if source_type == "directory":
            loader = loader_class(
                source_path,
                glob="**/*.*",
                loader_kwargs=kwargs.get("loader_kwargs", {})
            )
        else:
            loader = loader_class(source_path, **kwargs)
        
        return loader.load()
    
    def process_documents(
        self,
        documents: List[Document],
        chunk_size: int = 1000,
        chunk_overlap: int = 200
    ) -> List[Document]:
        """处理文档"""
        
        # 文本分割
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", "。", "!", "?", ";", ",", " "]
        )
        
        chunks = text_splitter.split_documents(documents)
        
        # 添加元数据
        for i, chunk in enumerate(chunks):
            chunk.metadata.update({
                "chunk_id": i,
                "total_chunks": len(chunks),
                "source_hash": hash(chunk.page_content[:100])
            })
        
        return chunks
    
    def create_vector_store(self, documents: List[Document]):
        """创建向量存储"""
        
        if self.vector_store_type == "chroma":
            self.vector_store = Chroma.from_documents(
                documents=documents,
                embedding=self.embedding_model,
                persist_directory=self.persist_directory
            )
            self.vector_store.persist()
            
        elif self.vector_store_type == "faiss":
            self.vector_store = FAISS.from_documents(
                documents=documents,
                embedding=self.embedding_model
            )
            # 保存FAISS索引
            self.vector_store.save_local(self.persist_directory)
            
        else:
            raise ValueError(f"不支持的向量存储类型: {self.vector_store_type}")
        
        print(f"向量存储创建成功,包含 {len(documents)} 个文档块")
    
    def create_advanced_retriever(
        self,
        search_type: str = "similarity",
        k: int = 4,
        score_threshold: float = 0.5
    ):
        """创建高级检索器"""
        
        if not self.vector_store:
            raise ValueError("请先创建向量存储")
        
        # 基础检索器
        base_retriever = self.vector_store.as_retriever(
            search_type=search_type,
            search_kwargs={
                "k": k * 2,  # 获取更多结果用于压缩
                "score_threshold": score_threshold
            }
        )
        
        # 上下文压缩检索器
        compressor = LLMChainExtractor.from_llm(self.llm)
        compression_retriever = ContextualCompressionRetriever(
            base_compressor=compressor,
            base_retriever=base_retriever
        )
        
        # 嵌入过滤
        embeddings_filter = EmbeddingsFilter(
            embeddings=self.embedding_model,
            similarity_threshold=score_threshold
        )
        
        # 组合检索器
        self.retriever = compression_retriever
        
        return self.retriever
    
    def create_qa_chain(self, chain_type: str = "stuff"):
        """创建问答链"""
        
        if not self.retriever:
            raise ValueError("请先创建检索器")
        
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type=chain_type,
            retriever=self.retriever,
            return_source_documents=True,
            verbose=True
        )
        
        return self.qa_chain
    
    def query(
        self,
        question: str,
        **kwargs
    ) -> Dict[str, Any]:
        """查询"""
        
        if not self.qa_chain:
            raise ValueError("请先创建问答链")
        
        try:
            result = self.qa_chain({"query": question})
            
            return {
                "success": True,
                "answer": result["result"],
                "source_documents": [
                    {
                        "content": doc.page_content[:200] + "...",
                        "metadata": doc.metadata
                    }
                    for doc in result.get("source_documents", [])
                ],
                "source_count": len(result.get("source_documents", []))
            }
            
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "answer": ""
            }
    
    def save_system(self, filepath: str = "rag_system.pkl"):
        """保存系统状态"""
        
        state = {
            "vector_store_type": self.vector_store_type,
            "persist_directory": self.persist_directory,
            "config": {
                "embedding_model": type(self.embedding_model).__name__,
                "llm": type(self.llm).__name__ if self.llm else None
            }
        }
        
        with open(filepath, 'wb') as f:
            pickle.dump(state, f)
        
        print(f"系统状态已保存到 {filepath}")
    
    @classmethod
    def load_system(
        cls,
        filepath: str = "rag_system.pkl",
        embedding_model=None,
        llm=None
    ):
        """加载系统"""
        
        with open(filepath, 'rb') as f:
            state = pickle.load(f)
        
        system = cls(
            embedding_model=embedding_model,
            llm=llm,
            vector_store_type=state["vector_store_type"],
            persist_directory=state["persist_directory"]
        )
        
        # 加载向量存储
        if state["vector_store_type"] == "chroma":
            system.vector_store = Chroma(
                persist_directory=state["persist_directory"],
                embedding_function=embedding_model
            )
            system.retriever = system.vector_store.as_retriever()
        
        elif state["vector_store_type"] == "faiss":
            system.vector_store = FAISS.load_local(
                state["persist_directory"],
                embedding_model,
                allow_dangerous_deserialization=True
            )
            system.retriever = system.vector_store.as_retriever()
        
        # 重新创建QA链
        if llm:
            system.create_qa_chain()
        
        return system

# 使用示例
if __name__ == "__main__":
    from config import config
    from models_manager import ModelManager
    
    # 初始化模型
    model_manager = ModelManager(config)
    llm = model_manager.get_chat_model("openai", temperature=0.1)
    embeddings = model_manager.get_embeddings()
    
    # 创建RAG系统
    rag_system = EnterpriseRAGSystem(
        embedding_model=embeddings,
        llm=llm,
        vector_store_type="chroma",
        persist_directory="./data/vector_store"
    )
    
    # 1. 加载文档
    print("加载文档...")
    documents = rag_system.load_documents(
        source_path="./data/documents",
        source_type="directory"
    )
    print(f"加载了 {len(documents)} 个文档")
    
    # 2. 处理文档
    print("处理文档...")
    processed_docs = rag_system.process_documents(
        documents,
        chunk_size=800,
        chunk_overlap=100
    )
    print(f"分割为 {len(processed_docs)} 个文档块")
    
    # 3. 创建向量存储
    print("创建向量存储...")
    rag_system.create_vector_store(processed_docs)
    
    # 4. 创建检索器
    print("创建检索器...")
    retriever = rag_system.create_advanced_retriever(
        search_type="similarity_score_threshold",
        k=3,
        score_threshold=0.7
    )
    
    # 5. 创建QA链
    print("创建QA链...")
    qa_chain = rag_system.create_qa_chain(chain_type="map_reduce")
    
    # 6. 测试查询
    test_questions = [
        "文档的主要内容是什么?",
        "有哪些关键的技术点?",
        "作者提出了什么建议?"
    ]
    
    for question in test_questions:
        print(f"\n问题: {question}")
        result = rag_system.query(question)
        
        if result["success"]:
            print(f"回答: {result['answer'][:200]}...")
            print(f"参考文档: {result['source_count']}个")
        else:
            print(f"错误: {result['error']}")
    
    # 7. 保存系统
    rag_system.save_system("./data/rag_system_state.pkl")

高级应用:完整AI应用架构

# ai_application.py
"""
完整的AI应用架构示例
"""
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime
import logging
from dataclasses import dataclass, asdict
import json

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

@dataclass
class AIConfig:
    """AI应用配置"""
    model_provider: str = "openai"
    model_name: str = "gpt-4"
    temperature: float = 0.7
    max_tokens: int = 2000
    enable_memory: bool = True
    enable_tools: bool = True
    cache_responses: bool = True
    
    def to_dict(self) -> Dict[str, Any]:
        return asdict(self)

class AIApplication:
    """完整的AI应用"""
    
    def __init__(self, config: AIConfig):
        self.config = config
        self.components = {}
        self._initialize_components()
        
    def _initialize_components(self):
        """初始化所有组件"""
        logger.info("初始化AI应用组件...")
        
        # 1. 初始化模型管理器
        from models_manager import ModelManager
        from config import Config as AppConfig
        
        app_config = AppConfig()
        self.components['model_manager'] = ModelManager(app_config)
        
        # 2. 初始化LLM
        self.components['llm'] = self.components['model_manager'].get_chat_model(
            provider=self.config.model_provider,
            temperature=self.config.temperature,
            max_tokens=self.config.max_tokens
        )
        
        # 3. 初始化记忆系统
        if self.config.enable_memory:
            from memory_system import AdvancedMemorySystem
            self.components['memory'] = AdvancedMemorySystem()
        
        # 4. 初始化代理系统
        if self.config.enable_tools:
            from agent_system import ToolRegistry, IntelligentAgent
            
            tool_registry = ToolRegistry()
            self.components['agent'] = IntelligentAgent(
                llm=self.components['llm'],
                tool_registry=tool_registry,
                agent_type="react"
            )
        
        # 5. 初始化缓存
        if self.config.cache_responses:
            self.components['cache'] = ResponseCache()
        
        logger.info("AI应用初始化完成")
    
    async def process_request(
        self,
        user_input: str,
        user_id: Optional[str] = None,
        context: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """处理用户请求"""
        
        start_time = datetime.now()
        request_id = f"req_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        
        logger.info(f"处理请求 {request_id}: {user_input[:50]}...")
        
        try:
            # 检查缓存
            if self.config.cache_responses:
                cached_response = self.components['cache'].get(user_input)
                if cached_response:
                    logger.info(f"请求 {request_id} 命中缓存")
                    return {
                        "request_id": request_id,
                        "response": cached_response,
                        "from_cache": True,
                        "processing_time": 0
                    }
            
            # 获取记忆上下文
            memory_context = {}
            if self.config.enable_memory and user_id:
                memory_context = self.components['memory'].get_memory_context()
            
            # 使用代理处理
            if self.config.enable_tools:
                result = await self.components['agent'].ainvoke(
                    user_input,
                    chat_history=memory_context.get("chat_history", [])
                )
                
                response = result["output"]
                tool_used = len(result.get("intermediate_steps", [])) > 0
            else:
                # 直接使用LLM
                response_obj = await self.components['llm'].ainvoke(user_input)
                response = response_obj.content
                tool_used = False
            
            # 更新记忆
            if self.config.enable_memory and user_id:
                self.components['memory'].add_interaction(
                    user_input=user_input,
                    ai_response=response,
                    metadata={
                        "user_id": user_id,
                        "request_id": request_id,
                        "used_tools": tool_used
                    }
                )
            
            # 保存到缓存
            if self.config.cache_responses:
                self.components['cache'].set(user_input, response)
            
            processing_time = (datetime.now() - start_time).total_seconds()
            
            return {
                "request_id": request_id,
                "response": response,
                "from_cache": False,
                "tool_used": tool_used,
                "processing_time": processing_time,
                "timestamp": datetime.now().isoformat()
            }
            
        except Exception as e:
            logger.error(f"处理请求 {request_id} 时出错: {str(e)}")
            
            return {
                "request_id": request_id,
                "error": str(e),
                "response": "抱歉,处理您的请求时出现了错误。",
                "success": False,
                "timestamp": datetime.now().isoformat()
            }
    
    def batch_process(
        self,
        requests: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """批量处理请求"""
        
        async def process_all():
            tasks = []
            for req in requests:
                task = self.process_request(
                    user_input=req.get("input", ""),
                    user_id=req.get("user_id"),
                    context=req.get("context", {})
                )
                tasks.append(task)
            
            return await asyncio.gather(*tasks)
        
        return asyncio.run(process_all())
    
    def get_system_status(self) -> Dict[str, Any]:
        """获取系统状态"""
        
        status = {
            "config": self.config.to_dict(),
            "components": {
                name: type(comp).__name__
                for name, comp in self.components.items()
            },
            "memory_stats": {}
        }
        
        if self.config.enable_memory:
            memory = self.components.get('memory')
            if memory:
                status["memory_stats"] = memory.get_memory_summary()
        
        return status

class ResponseCache:
    """响应缓存"""
    
    def __init__(self, max_size: int = 1000, ttl: int = 3600):
        self.cache = {}
        self.max_size = max_size
        self.ttl = ttl  # 生存时间(秒)
        self.access_times = {}
    
    def get(self, key: str) -> Optional[str]:
        """获取缓存"""
        if key in self.cache:
            # 检查是否过期
            last_access = self.access_times.get(key)
            if last_access and (datetime.now() - last_access).seconds > self.ttl:
                self.delete(key)
                return None
            
            self.access_times[key] = datetime.now()
            return self.cache[key]
        return None
    
    def set(self, key: str, value: str):
        """设置缓存"""
        if len(self.cache) >= self.max_size:
            # 删除最久未使用的
            oldest_key = min(self.access_times, key=self.access_times.get)
            self.delete(oldest_key)
        
        self.cache[key] = value
        self.access_times[key] = datetime.now()
    
    def delete(self, key: str):
        """删除缓存"""
        if key in self.cache:
            del self.cache[key]
        if key in self.access_times:
            del self.access_times[key]
    
    def clear(self):
        """清空缓存"""
        self.cache.clear()
        self.access_times.clear()
    
    def stats(self) -> Dict[str, Any]:
        """缓存统计"""
        return {
            "size": len(self.cache),
            "max_size": self.max_size,
            "ttl": self.ttl
        }

# 使用示例
async def main():
    """主函数示例"""
    
    # 创建配置
    config = AIConfig(
        model_provider="openai",
        model_name="gpt-4",
        temperature=0.7,
        enable_memory=True,
        enable_tools=True,
        cache_responses=True
    )
    
    # 创建AI应用
    app = AIApplication(config)
    
    # 获取系统状态
    status = app.get_system_status()
    print("系统状态:")
    print(json.dumps(status, indent=2, ensure_ascii=False))
    
    # 处理单个请求
    print("\n处理单个请求...")
    result = await app.process_request(
        user_input="计算一下 (45 + 78) * 2 除以 3 的结果",
        user_id="user_123"
    )
    
    print(f"响应: {result.get('response', '')}")
    print(f"是否使用工具: {result.get('tool_used', False)}")
    print(f"处理时间: {result.get('processing_time', 0):.2f}秒")
    
    # 批量处理
    print("\n批量处理请求...")
    batch_requests = [
        {"input": "你好,介绍一下Python语言", "user_id": "user_123"},
        {"input": "今天北京天气怎么样?", "user_id": "user_456"},
        {"input": "解释一下机器学习", "user_id": "user_789"}
    ]
    
    batch_results = app.batch_process(batch_requests)
    
    for i, result in enumerate(batch_results):
        print(f"\n请求 {i+1}:")
        print(f"输入: {batch_requests[i]['input']}")
        print(f"响应: {result.get('response', '')[:100]}...")
        print(f"是否缓存: {result.get('from_cache', False)}")

if __name__ == "__main__":
    asyncio.run(main())

部署与监控

# deployment.py
"""
生产环境部署与监控
"""
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, Field
from typing import List, Optional
import uvicorn
from datetime import datetime
import logging
import asyncio
from contextlib import asynccontextmanager
import prometheus_client
from prometheus_fastapi_instrumentator import Instrumentator

# 请求/响应模型
class ChatRequest(BaseModel):
    message: str = Field(..., min_length=1, max_length=1000)
    user_id: Optional[str] = None
    session_id: Optional[str] = None
    context: Optional[dict] = {}

class ChatResponse(BaseModel):
    message_id: str
    response: str
    processing_time: float
    model_used: str
    timestamp: str

# 应用生命周期管理
@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期管理"""
    # 启动时初始化
    print("启动AI应用...")
    
    # 初始化AI应用
    from ai_application import AIApplication, AIConfig
    
    config = AIConfig(
        model_provider="openai",
        model_name="gpt-4",
        temperature=0.7
    )
    
    app.state.ai_app = AIApplication(config)
    app.state.request_counter = prometheus_client.Counter(
        'chat_requests_total',
        'Total chat requests',
        ['status']
    )
    
    yield
    
    # 关闭时清理
    print("关闭AI应用...")
    # 清理资源

# 创建FastAPI应用
app = FastAPI(
    title="AI Chat API",
    description="基于LangChain的智能对话API",
    version="1.0.0",
    lifespan=lifespan
)

# 添加监控
Instrumentator().instrument(app).expose(app)

# 中间件
@app.middleware("http")
async def add_process_time_header(request, call_next):
    start_time = datetime.now()
    response = await call_next(request)
    processing_time = (datetime.now() - start_time).total_seconds()
    response.headers["X-Processing-Time"] = str(processing_time)
    return response

# 路由
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """聊天接口"""
    
    try:
        # 记录请求
        app.state.request_counter.labels(status="received").inc()
        
        # 处理请求
        result = await app.state.ai_app.process_request(
            user_input=request.message,
            user_id=request.user_id,
            context=request.context or {}
        )
        
        if result.get("success", True):
            app.state.request_counter.labels(status="success").inc()
            
            return ChatResponse(
                message_id=result["request_id"],
                response=result["response"],
                processing_time=result.get("processing_time", 0),
                model_used=app.state.ai_app.config.model_name,
                timestamp=result.get("timestamp", datetime.now().isoformat())
            )
        else:
            app.state.request_counter.labels(status="error").inc()
            raise HTTPException(status_code=500, detail=result.get("error", "处理失败"))
            
    except Exception as e:
        app.state.request_counter.labels(status="error").inc()
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    """健康检查"""
    return {
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "service": "AI Chat API"
    }

@app.get("/metrics")
async def metrics():
    """Prometheus指标"""
    return prometheus_client.generate_latest()

@app.get("/system/status")
async def system_status():
    """系统状态"""
    if hasattr(app.state, 'ai_app'):
        return app.state.ai_app.get_system_status()
    return {"status": "initializing"}

# 批量处理
@app.post("/batch_chat")
async def batch_chat(requests: List[ChatRequest]):
    """批量聊天接口"""
    try:
        request_data = [
            {
                "input": req.message,
                "user_id": req.user_id,
                "context": req.context or {}
            }
            for req in requests
        ]
        
        results = app.state.ai_app.batch_process(request_data)
        
        return [
            {
                "message_id": result.get("request_id", f"batch_{i}"),
                "response": result.get("response", ""),
                "success": result.get("success", True)
            }
            for i, result in enumerate(results)
        ]
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    uvicorn.run(
        "deployment:app",
        host="0.0.0.0",
        port=8000,
        reload=True,
        log_level="info"
    )

项目结构与最佳实践

ai-application/
├── README.md
├── requirements.txt
├── .env.example
├── config/
│   ├── __init__.py
│   └── settings.py
├── src/
│   ├── core/
│   │   ├── __init__.py
│   │   ├── models.py
│   │   ├── prompts.py
│   │   ├── memory.py
│   │   └── agents.py
│   ├── services/
│   │   ├── __init__.py
│   │   ├── chat_service.py
│   │   └── rag_service.py
│   └── api/
│       ├── __init__.py
│       ├── routes.py
│       └── middleware.py
├── tests/
│   ├── __init__.py
│   ├── test_models.py
│   └── test_services.py
├── scripts/
│   ├── setup.py
│   └── deploy.sh
└── docker/
    ├── Dockerfile
    └── docker-compose.yml

最佳实践总结

  1. 模块化设计:每个组件职责单一,便于测试和维护
  2. 配置管理:使用环境变量和配置文件,避免硬编码
  3. 错误处理:完善的异常捕获和错误恢复机制
  4. 性能监控:集成监控和日志系统,便于问题排查
  5. 缓存策略:合理使用缓存提高响应速度
  6. 安全考虑:输入验证、API密钥管理、沙箱执行
  7. 版本控制:API版本管理,向后兼容

总结

LangChain 提供了一个强大而灵活的框架,使得构建复杂的AI应用变得简单高效。通过本文提供的完整示例和最佳实践,你可以快速搭建起符合企业级标准的AI应用系统。关键点包括:

  1. 架构清晰:分层设计,组件解耦
  2. 代码完整:提供可直接运行的完整示例
  3. 生产就绪:包含部署、监控、缓存等生产环境特性
  4. 扩展性强:易于添加新功能和集成新工具

随着LangChain生态的不断发展,开发者可以更专注于业务逻辑的实现,而无需担心底层复杂性,真正实现AI应用的快速开发和部署。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐