RAG + Agent + MCP 架构范式实战指南

本文将基于实际案例,详细解析如何将RAG、Agent与MCP三大技术结合,构建一个完整可用的现代AI应用系统。

一、项目概述:智能企业知识助手

我们将构建一个 “企业智能知识助手”,该系统能够:

  1. 回答企业知识库问题(RAG能力)
  2. 执行复杂工作流(Agent能力)
  3. 安全连接企业系统(MCP能力)

系统架构图

用户界面 → Agent中枢 → MCP客户端
                        ↓
        [MCP服务器1: RAG系统]
        [MCP服务器2: 邮件系统]
        [MCP服务器3: CRM系统]
        [MCP服务器4: 数据分析]

二、技术栈选择

组件 推荐方案 备选方案
LLM核心 OpenAI GPT-4/GPT-3.5 Anthropic Claude, 开源模型
Agent框架 LangChain, AutoGen LlamaIndex, CrewAI
RAG系统 LangChain + Chroma Pinecone, Weaviate
MCP实现 MCP Python SDK 自定义HTTP代理
向量数据库 Chroma, Qdrant Pinecone, Milvus
工具调用 MCP标准协议 自定义API

三、分步实现

阶段1:构建基础RAG系统

# 1. 文档处理与向量化
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma

# 加载企业文档
loader = DirectoryLoader('./企业知识库/', glob="**/*.md")
documents = loader.load()

# 文档分割
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200
)
chunks = text_splitter.split_documents(documents)

# 创建向量数据库
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    persist_directory="./chroma_db"
)

# 2. 检索链构建
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI

llm = ChatOpenAI(model="gpt-4", temperature=0)
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
    return_source_documents=True
)

# 3. 测试RAG
response = qa_chain("我们公司的报销政策是什么?")
print(f"答案:{response['result']}")
print(f"来源:{[doc.metadata['source'] for doc in response['source_documents']]}")

阶段2:将RAG封装为MCP服务器

# mcp_rag_server.py
import asyncio
from mcp import Client, Server
from mcp.types import Tool, TextContent

class RAGServer(Server):
    def __init__(self, qa_chain):
        super().__init__()
        self.qa_chain = qa_chain
        
    async def handle_list_tools(self) -> list[Tool]:
        """向客户端声明可用的工具"""
        return [
            Tool(
                name="query_knowledge_base",
                description="查询企业知识库,获取相关政策、流程等信息",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "要查询的问题"
                        }
                    },
                    "required": ["query"]
                }
            )
        ]
    
    async def handle_call_tool(self, name: str, arguments: dict):
        """处理工具调用请求"""
        if name == "query_knowledge_base":
            query = arguments.get("query", "")
            result = self.qa_chain(query)
            
            # 格式化返回结果
            answer = result['result']
            sources = [doc.metadata.get('source', '未知') 
                      for doc in result['source_documents']]
            
            response = f"{answer}\n\n**来源**:\n"
            for source in set(sources):
                response += f"- {source}\n"
                
            return [TextContent(type="text", text=response)]
        
        raise ValueError(f"未知工具: {name}")

async def main():
    # 初始化RAG系统
    qa_chain = initialize_rag_system()
    
    # 创建并运行MCP服务器
    server = RAGServer(qa_chain)
    
    # 通过stdio与客户端通信(也可使用HTTP)
    async with server.run_over_stdio() as (read_stream, write_stream):
        await server.serve(read_stream, write_stream)

if __name__ == "__main__":
    asyncio.run(main())

阶段3:构建企业工具MCP服务器

# mcp_enterprise_tools.py
import smtplib
from email.mime.text import MIMEText
import json
import requests
from mcp import Server, Tool, TextContent

class EnterpriseToolsServer(Server):
    def __init__(self, config):
        super().__init__()
        self.config = config
        
    async def handle_list_tools(self) -> list[Tool]:
        return [
            Tool(
                name="send_email",
                description="发送邮件给指定收件人",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "to": {"type": "string"},
                        "subject": {"type": "string"},
                        "body": {"type": "string"}
                    },
                    "required": ["to", "subject", "body"]
                }
            ),
            Tool(
                name="get_crm_contact",
                description="从CRM系统获取联系人信息",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "contact_id": {"type": "string"}
                    },
                    "required": ["contact_id"]
                }
            ),
            Tool(
                name="create_jira_ticket",
                description="在Jira中创建工单",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "summary": {"type": "string"},
                        "description": {"type": "string"},
                        "project": {"type": "string"}
                    },
                    "required": ["summary", "description", "project"]
                }
            )
        ]
    
    async def handle_call_tool(self, name: str, arguments: dict):
        if name == "send_email":
            return await self._send_email(**arguments)
        elif name == "get_crm_contact":
            return await self._get_crm_contact(**arguments)
        elif name == "create_jira_ticket":
            return await self._create_jira_ticket(**arguments)
        raise ValueError(f"未知工具: {name}")
    
    async def _send_email(self, to: str, subject: str, body: str):
        # 实际实现邮件发送逻辑
        msg = MIMEText(body)
        msg['Subject'] = subject
        msg['From'] = self.config['email']['sender']
        msg['To'] = to
        
        with smtplib.SMTP(self.config['email']['smtp_server']) as server:
            server.send_message(msg)
        
        return [TextContent(type="text", text=f"邮件已发送给 {to}")]
    
    async def _get_crm_contact(self, contact_id: str):
        # 调用CRM API
        response = requests.get(
            f"{self.config['crm']['base_url']}/contacts/{contact_id}",
            headers={"Authorization": f"Bearer {self.config['crm']['api_key']}"}
        )
        contact = response.json()
        return [TextContent(type="text", text=json.dumps(contact, indent=2))]
    
    async def _create_jira_ticket(self, summary: str, description: str, project: str):
        # 创建Jira工单
        ticket_data = {
            "fields": {
                "project": {"key": project},
                "summary": summary,
                "description": description,
                "issuetype": {"name": "Task"}
            }
        }
        
        response = requests.post(
            f"{self.config['jira']['base_url']}/rest/api/3/issue",
            json=ticket_data,
            auth=(self.config['jira']['username'], self.config['jira']['api_token'])
        )
        
        ticket_key = response.json()['key']
        return [TextContent(
            type="text", 
            text=f"工单 {ticket_key} 已创建: {summary}"
        )]

阶段4:构建Agent中枢

# agent_orchestrator.py
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.tools import Tool
from langchain.memory import ConversationBufferMemory
from langchain.chat_models import ChatOpenAI
import mcp.client as mcp_client
import asyncio

class EnterpriseAgent:
    def __init__(self, mcp_servers_config):
        self.llm = ChatOpenAI(model="gpt-4", temperature=0)
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )
        self.mcp_clients = {}
        self.tools = []
        
    async def initialize(self):
        """连接所有MCP服务器并注册工具"""
        # 连接RAG MCP服务器
        rag_client = await mcp_client.connect_to_server(
            command="python",
            args=["mcp_rag_server.py"],
            transport="stdio"
        )
        self.mcp_clients['rag'] = rag_client
        
        # 连接企业工具MCP服务器
        enterprise_client = await mcp_client.connect_to_server(
            command="python",
            args=["mcp_enterprise_tools.py"],
            transport="stdio"
        )
        self.mcp_clients['enterprise'] = enterprise_client
        
        # 从所有服务器获取工具列表
        await self._discover_tools()
        
    async def _discover_tools(self):
        """发现所有MCP服务器提供的工具"""
        for server_name, client in self.mcp_clients.items():
            tools_info = await client.list_tools()
            for tool_info in tools_info:
                # 将MCP工具包装为LangChain工具
                langchain_tool = Tool(
                    name=f"{server_name}_{tool_info.name}",
                    func=self._create_tool_wrapper(client, tool_info.name),
                    description=tool_info.description
                )
                self.tools.append(langchain_tool)
    
    def _create_tool_wrapper(self, client, tool_name):
        """创建工具调用包装器"""
        async def wrapper(**kwargs):
            result = await client.call_tool(tool_name, kwargs)
            # 提取文本内容
            texts = []
            for content in result.contents:
                if hasattr(content, 'text'):
                    texts.append(content.text)
                elif isinstance(content, dict) and 'text' in content:
                    texts.append(content['text'])
            return "\n".join(texts)
        return wrapper
    
    def create_agent(self):
        """创建Agent执行器"""
        prompt = ChatPromptTemplate.from_messages([
            ("system", """你是一个企业智能助手,可以:
1. 回答企业知识库问题
2. 发送邮件
3. 查询CRM联系人
4. 创建Jira工单
5. 处理多步骤复杂任务

请根据用户需求选择最合适的工具。如果是知识性问题,优先使用知识库工具。
如果是涉及多个步骤的任务,请分解任务并逐步执行。"""),
            MessagesPlaceholder(variable_name="chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad")
        ])
        
        agent = create_openai_functions_agent(
            llm=self.llm,
            tools=self.tools,
            prompt=prompt
        )
        
        return AgentExecutor(
            agent=agent,
            tools=self.tools,
            memory=self.memory,
            verbose=True,
            handle_parsing_errors=True,
            max_iterations=5
        )
    
    async def run(self, query: str):
        """运行Agent处理查询"""
        agent_executor = self.create_agent()
        
        try:
            result = await agent_executor.ainvoke({"input": query})
            return result['output']
        except Exception as e:
            return f"执行出错: {str(e)}"
        finally:
            # 清理所有MCP连接
            for client in self.mcp_clients.values():
                await client.close()

# 使用示例
async def main():
    # 配置MCP服务器
    mcp_config = {
        "rag_server": {"command": "python", "args": ["mcp_rag_server.py"]},
        "enterprise_server": {"command": "python", "args": ["mcp_enterprise_tools.py"]}
    }
    
    # 创建并初始化Agent
    agent = EnterpriseAgent(mcp_config)
    await agent.initialize()
    
    # 执行复杂任务
    queries = [
        "我们公司的项目报销流程是什么?",
        "请把报销流程总结一下,然后发送给新员工张三(zhangsan@company.com)",
        "顺便在CRM里查一下张三的联系方式,然后在Jira上创建一个关于报销培训的工单"
    ]
    
    for query in queries:
        print(f"\n用户: {query}")
        response = await agent.run(query)
        print(f"助手: {response}")
    
    await agent.close()

if __name__ == "__main__":
    asyncio.run(main())

阶段5:完整工作流示例

# complete_workflow.py
"""
演示完整的RAG + Agent + MCP工作流:
用户请求 → Agent规划 → MCP工具调用 → 返回结果
"""

async def demonstrate_complex_workflow():
    """演示复杂工作流"""
    
    # 1. 用户提出复杂请求
    user_request = """
    我需要了解公司的新项目"星海计划"的相关信息,
    然后写一份摘要发送给市场部团队(market@company.com),
    并在Jira上创建一个跟进任务。
    """
    
    print(f"用户请求: {user_request}")
    
    # 2. Agent分解任务并执行
    tasks = [
        "使用知识库工具查询'星海计划'的详细信息",
        "基于查询结果撰写项目摘要",
        "使用邮件工具发送摘要给市场部团队",
        "使用Jira工具创建跟进任务"
    ]
    
    # 3. 执行过程(实际由Agent自动完成)
    results = []
    
    # 步骤1:查询知识库(RAG)
    rag_result = await query_knowledge_base("星海计划")
    results.append(f"知识库查询结果: {rag_result}")
    
    # 步骤2:生成摘要(LLM)
    summary = await generate_summary(rag_result)
    results.append(f"生成摘要: {summary}")
    
    # 步骤3:发送邮件(通过MCP)
    email_result = await send_email(
        to="market@company.com",
        subject="星海计划项目摘要",
        body=summary
    )
    results.append(f"邮件发送结果: {email_result}")
    
    # 步骤4:创建Jira任务(通过MCP)
    jira_result = await create_jira_ticket(
        summary="星海计划市场跟进",
        description=f"需要跟进星海计划的市场推广\n\n项目信息:\n{summary}",
        project="MARKET"
    )
    results.append(f"Jira工单创建结果: {jira_result}")
    
    # 4. 向用户返回完整结果
    final_response = f"""
    已完成所有任务:
    1. ✓ 已查询星海计划详细信息
    2. ✓ 已生成项目摘要
    3. ✓ 已将摘要发送至 market@company.com
    4. ✓ 已在Jira创建跟进工单
    
    详细执行记录:
    {chr(10).join(results)}
    """
    
    return final_response

四、部署与优化

1. 容器化部署

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 启动服务
CMD ["python", "main.py"]
# docker-compose.yml
version: '3.8'
services:
  rag-mcp-server:
    build: .
    command: python mcp_rag_server.py
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    
  tools-mcp-server:
    build: .
    command: python mcp_enterprise_tools.py
    environment:
      - CRM_API_KEY=${CRM_API_KEY}
      - JIRA_API_TOKEN=${JIRA_API_TOKEN}
    
  agent-orchestrator:
    build: .
    command: python agent_orchestrator.py
    depends_on:
      - rag-mcp-server
      - tools-mcp-server
    ports:
      - "8000:8000"
    
  web-ui:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./ui:/usr/share/nginx/html

2. 性能优化建议

# optimization.py
class OptimizedEnterpriseAgent(EnterpriseAgent):
    async def optimize_performance(self):
        """性能优化措施"""
        
        # 1. 缓存常用查询
        from functools import lru_cache
        
        @lru_cache(maxsize=100)
        async def cached_query_knowledge(query: str):
            return await self.mcp_clients['rag'].call_tool(
                "query_knowledge_base",
                {"query": query}
            )
        
        # 2. 并行执行独立任务
        import asyncio
        
        async def parallel_execution(tasks):
            """并行执行多个任务"""
            return await asyncio.gather(*tasks)
        
        # 3. 流式输出
        async def stream_response(self, query: str):
            """流式返回响应"""
            async for chunk in self.agent_executor.astream({"input": query}):
                yield chunk.get('output', '')
        
        # 4. 智能工具选择(基于历史成功率)
        self.tool_success_rates = {}
        
        async def smart_tool_selection(self, query: str):
            """基于历史成功率选择工具"""
            # 分析查询类型
            query_type = self.classify_query(query)
            
            # 选择最可能成功的工具
            best_tool = max(
                self.tools,
                key=lambda t: self.tool_success_rates.get(t.name, 0.5)
            )
            return best_tool

3. 监控与日志

# monitoring.py
import logging
from datetime import datetime
from prometheus_client import Counter, Histogram

# 定义指标
QUERY_COUNT = Counter('agent_queries_total', 'Total queries processed')
TOOL_CALL_COUNT = Counter('tool_calls_total', 'Total tool calls', ['tool_name'])
RESPONSE_TIME = Histogram('response_time_seconds', 'Response time in seconds')

class MonitoredEnterpriseAgent(EnterpriseAgent):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.setup_logging()
    
    def setup_logging(self):
        """配置结构化日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('agent.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    @RESPONSE_TIME.time()
    async def monitored_run(self, query: str):
        """带监控的查询执行"""
        QUERY_COUNT.inc()
        self.logger.info(f"开始处理查询: {query}")
        
        start_time = datetime.now()
        
        try:
            result = await super().run(query)
            duration = (datetime.now() - start_time).total_seconds()
            
            self.logger.info(f"查询处理完成,耗时: {duration:.2f}s")
            return result
            
        except Exception as e:
            self.logger.error(f"查询处理失败: {str(e)}", exc_info=True)
            raise
    
    async def monitored_tool_call(self, client, tool_name, arguments):
        """带监控的工具调用"""
        TOOL_CALL_COUNT.labels(tool_name=tool_name).inc()
        
        self.logger.info(f"调用工具 {tool_name}: {arguments}")
        start_time = datetime.now()
        
        try:
            result = await client.call_tool(tool_name, arguments)
            duration = (datetime.now() - start_time).total_seconds()
            
            self.logger.info(f"工具 {tool_name} 调用成功,耗时: {duration:.2f}s")
            return result
            
        except Exception as e:
            self.logger.error(f"工具 {tool_name} 调用失败: {str(e)}")
            raise

五、最佳实践与注意事项

1. 安全实践

# security.py
class SecureEnterpriseAgent(EnterpriseAgent):
    async def secure_initialize(self):
        """安全初始化"""
        
        # 1. 验证MCP服务器身份
        await self.verify_server_identity()
        
        # 2. 设置工具权限
        self.setup_tool_permissions()
        
        # 3. 启用审计日志
        self.enable_audit_logging()
    
    def setup_tool_permissions(self):
        """基于RBAC的工具权限控制"""
        self.tool_permissions = {
            "query_knowledge_base": ["员工", "经理", "管理员"],
            "send_email": ["经理", "管理员"],
            "create_jira_ticket": ["员工", "经理", "管理员"],
            "get_crm_contact": ["经理", "管理员"]
        }
    
    async def check_permission(self, user_role: str, tool_name: str) -> bool:
        """检查用户是否有权限使用工具"""
        allowed_roles = self.tool_permissions.get(tool_name, [])
        return user_role in allowed_roles

2. 错误处理与重试

# error_handling.py
from tenacity import retry, stop_after_attempt, wait_exponential

class ResilientEnterpriseAgent(EnterpriseAgent):
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10)
    )
    async def resilient_tool_call(self, client, tool_name, arguments):
        """带重试机制的工具调用"""
        try:
            return await client.call_tool(tool_name, arguments)
        except Exception as e:
            self.logger.warning(f"工具调用失败,将重试: {str(e)}")
            raise
    
    async def fallback_strategy(self, query: str, failed_tool: str):
        """降级策略"""
        fallback_mapping = {
            "query_knowledge_base": self.fallback_knowledge_query,
            "send_email": self.fallback_notification,
            # ... 其他工具的降级策略
        }
        
        fallback_func = fallback_mapping.get(failed_tool)
        if fallback_func:
            return await fallback_func(query)
        
        return "系统暂时无法处理此请求,请稍后重试。"

六、完整示例:客服工单处理系统

# customer_service_agent.py
"""
完整的客服工单处理Agent示例
整合了:知识库查询 + 工单创建 + CRM查询 + 邮件通知
"""

class CustomerServiceAgent(EnterpriseAgent):
    async def handle_customer_ticket(self, customer_query: str, customer_email: str):
        """处理客户工单的完整流程"""
        
        # 1. 从知识库查找解决方案(RAG)
        solution = await self.search_knowledge_base(customer_query)
        
        if solution:
            # 2. 如果找到方案,直接回复客户
            await self.send_customer_reply(customer_email, solution)
            return {"status": "resolved", "solution": solution}
        else:
            # 3. 没找到方案,创建内部工单(Agent + MCP)
            ticket_id = await self.create_internal_ticket(customer_query)
            
            # 4. 从CRM获取客户信息
            customer_info = await self.get_customer_info(customer_email)
            
            # 5. 通知客服团队
            await self.notify_support_team(ticket_id, customer_query, customer_info)
            
            # 6. 给客户发送确认邮件
            await self.send_acknowledgement(customer_email, ticket_id)
            
            return {
                "status": "escalated",
                "ticket_id": ticket_id,
                "assigned_to": "support_team"
            }
    
    async def search_knowledge_base(self, query: str):
        """使用RAG搜索知识库"""
        rag_client = self.mcp_clients['rag']
        result = await rag_client.call_tool(
            "query_knowledge_base",
            {"query": query}
        )
        return result
    
    async def create_internal_ticket(self, description: str):
        """使用MCP工具创建工单"""
        tools_client = self.mcp_clients['enterprise']
        result = await tools_client.call_tool(
            "create_jira_ticket",
            {
                "summary": f"客户问题: {description[:50]}...",
                "description": description,
                "project": "SUPPORT"
            }
        )
        # 从结果中提取工单ID
        return result.split()[1]  # 例如: "工单 SUPPORT-123 已创建"

总结

通过以上实战示例,我们展示了如何:

  1. 构建模块化系统:将RAG、各种企业工具封装为独立的MCP服务器
  2. 实现智能调度:使用Agent作为大脑,根据任务类型动态调用工具
  3. 确保安全可控:通过MCP协议实现工具间的安全隔离
  4. 处理复杂工作流:自动分解多步骤任务并协调执行

关键成功因素

  • 清晰的工具边界:每个MCP服务器职责单一
  • 健壮的错误处理:工具调用失败时要有降级方案
  • 全面的监控:跟踪Agent决策、工具调用、执行时长
  • 渐进式部署:先从简单场景开始,逐步增加复杂度

这种架构范式为企业构建可扩展、可维护、安全的AI应用提供了坚实基础。随着MCP生态的成熟,未来可以轻松集成更多第三方工具,构建功能更加强大的智能助手系统。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐