RAG + Agent + MCP 架构范式实战指南
本文介绍了一个结合RAG、Agent和MCP技术的企业智能知识助手系统构建方案。该系统具备企业知识库问答、复杂工作流执行和安全连接企业系统的能力。技术栈采用OpenAI GPT模型作为核心,结合LangChain、Chroma等工具实现RAG功能,通过MCP协议集成企业邮件、CRM等系统。实现过程分为三个阶段:首先构建基础RAG系统处理企业文档并实现知识检索;然后将RAG封装为MCP服务器;最后开
·
目录
RAG + Agent + MCP 架构范式实战指南
本文将基于实际案例,详细解析如何将RAG、Agent与MCP三大技术结合,构建一个完整可用的现代AI应用系统。
一、项目概述:智能企业知识助手
我们将构建一个 “企业智能知识助手”,该系统能够:
- 回答企业知识库问题(RAG能力)
- 执行复杂工作流(Agent能力)
- 安全连接企业系统(MCP能力)
系统架构图:
用户界面 → Agent中枢 → MCP客户端
↓
[MCP服务器1: RAG系统]
[MCP服务器2: 邮件系统]
[MCP服务器3: CRM系统]
[MCP服务器4: 数据分析]
二、技术栈选择
| 组件 | 推荐方案 | 备选方案 |
|---|---|---|
| LLM核心 | OpenAI GPT-4/GPT-3.5 | Anthropic Claude, 开源模型 |
| Agent框架 | LangChain, AutoGen | LlamaIndex, CrewAI |
| RAG系统 | LangChain + Chroma | Pinecone, Weaviate |
| MCP实现 | MCP Python SDK | 自定义HTTP代理 |
| 向量数据库 | Chroma, Qdrant | Pinecone, Milvus |
| 工具调用 | MCP标准协议 | 自定义API |
三、分步实现
阶段1:构建基础RAG系统
# 1. 文档处理与向量化
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
# 加载企业文档
loader = DirectoryLoader('./企业知识库/', glob="**/*.md")
documents = loader.load()
# 文档分割
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = text_splitter.split_documents(documents)
# 创建向量数据库
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./chroma_db"
)
# 2. 检索链构建
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(model="gpt-4", temperature=0)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
return_source_documents=True
)
# 3. 测试RAG
response = qa_chain("我们公司的报销政策是什么?")
print(f"答案:{response['result']}")
print(f"来源:{[doc.metadata['source'] for doc in response['source_documents']]}")
阶段2:将RAG封装为MCP服务器
# mcp_rag_server.py
import asyncio
from mcp import Client, Server
from mcp.types import Tool, TextContent
class RAGServer(Server):
def __init__(self, qa_chain):
super().__init__()
self.qa_chain = qa_chain
async def handle_list_tools(self) -> list[Tool]:
"""向客户端声明可用的工具"""
return [
Tool(
name="query_knowledge_base",
description="查询企业知识库,获取相关政策、流程等信息",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "要查询的问题"
}
},
"required": ["query"]
}
)
]
async def handle_call_tool(self, name: str, arguments: dict):
"""处理工具调用请求"""
if name == "query_knowledge_base":
query = arguments.get("query", "")
result = self.qa_chain(query)
# 格式化返回结果
answer = result['result']
sources = [doc.metadata.get('source', '未知')
for doc in result['source_documents']]
response = f"{answer}\n\n**来源**:\n"
for source in set(sources):
response += f"- {source}\n"
return [TextContent(type="text", text=response)]
raise ValueError(f"未知工具: {name}")
async def main():
# 初始化RAG系统
qa_chain = initialize_rag_system()
# 创建并运行MCP服务器
server = RAGServer(qa_chain)
# 通过stdio与客户端通信(也可使用HTTP)
async with server.run_over_stdio() as (read_stream, write_stream):
await server.serve(read_stream, write_stream)
if __name__ == "__main__":
asyncio.run(main())
阶段3:构建企业工具MCP服务器
# mcp_enterprise_tools.py
import smtplib
from email.mime.text import MIMEText
import json
import requests
from mcp import Server, Tool, TextContent
class EnterpriseToolsServer(Server):
def __init__(self, config):
super().__init__()
self.config = config
async def handle_list_tools(self) -> list[Tool]:
return [
Tool(
name="send_email",
description="发送邮件给指定收件人",
inputSchema={
"type": "object",
"properties": {
"to": {"type": "string"},
"subject": {"type": "string"},
"body": {"type": "string"}
},
"required": ["to", "subject", "body"]
}
),
Tool(
name="get_crm_contact",
description="从CRM系统获取联系人信息",
inputSchema={
"type": "object",
"properties": {
"contact_id": {"type": "string"}
},
"required": ["contact_id"]
}
),
Tool(
name="create_jira_ticket",
description="在Jira中创建工单",
inputSchema={
"type": "object",
"properties": {
"summary": {"type": "string"},
"description": {"type": "string"},
"project": {"type": "string"}
},
"required": ["summary", "description", "project"]
}
)
]
async def handle_call_tool(self, name: str, arguments: dict):
if name == "send_email":
return await self._send_email(**arguments)
elif name == "get_crm_contact":
return await self._get_crm_contact(**arguments)
elif name == "create_jira_ticket":
return await self._create_jira_ticket(**arguments)
raise ValueError(f"未知工具: {name}")
async def _send_email(self, to: str, subject: str, body: str):
# 实际实现邮件发送逻辑
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.config['email']['sender']
msg['To'] = to
with smtplib.SMTP(self.config['email']['smtp_server']) as server:
server.send_message(msg)
return [TextContent(type="text", text=f"邮件已发送给 {to}")]
async def _get_crm_contact(self, contact_id: str):
# 调用CRM API
response = requests.get(
f"{self.config['crm']['base_url']}/contacts/{contact_id}",
headers={"Authorization": f"Bearer {self.config['crm']['api_key']}"}
)
contact = response.json()
return [TextContent(type="text", text=json.dumps(contact, indent=2))]
async def _create_jira_ticket(self, summary: str, description: str, project: str):
# 创建Jira工单
ticket_data = {
"fields": {
"project": {"key": project},
"summary": summary,
"description": description,
"issuetype": {"name": "Task"}
}
}
response = requests.post(
f"{self.config['jira']['base_url']}/rest/api/3/issue",
json=ticket_data,
auth=(self.config['jira']['username'], self.config['jira']['api_token'])
)
ticket_key = response.json()['key']
return [TextContent(
type="text",
text=f"工单 {ticket_key} 已创建: {summary}"
)]
阶段4:构建Agent中枢
# agent_orchestrator.py
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.tools import Tool
from langchain.memory import ConversationBufferMemory
from langchain.chat_models import ChatOpenAI
import mcp.client as mcp_client
import asyncio
class EnterpriseAgent:
def __init__(self, mcp_servers_config):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
self.mcp_clients = {}
self.tools = []
async def initialize(self):
"""连接所有MCP服务器并注册工具"""
# 连接RAG MCP服务器
rag_client = await mcp_client.connect_to_server(
command="python",
args=["mcp_rag_server.py"],
transport="stdio"
)
self.mcp_clients['rag'] = rag_client
# 连接企业工具MCP服务器
enterprise_client = await mcp_client.connect_to_server(
command="python",
args=["mcp_enterprise_tools.py"],
transport="stdio"
)
self.mcp_clients['enterprise'] = enterprise_client
# 从所有服务器获取工具列表
await self._discover_tools()
async def _discover_tools(self):
"""发现所有MCP服务器提供的工具"""
for server_name, client in self.mcp_clients.items():
tools_info = await client.list_tools()
for tool_info in tools_info:
# 将MCP工具包装为LangChain工具
langchain_tool = Tool(
name=f"{server_name}_{tool_info.name}",
func=self._create_tool_wrapper(client, tool_info.name),
description=tool_info.description
)
self.tools.append(langchain_tool)
def _create_tool_wrapper(self, client, tool_name):
"""创建工具调用包装器"""
async def wrapper(**kwargs):
result = await client.call_tool(tool_name, kwargs)
# 提取文本内容
texts = []
for content in result.contents:
if hasattr(content, 'text'):
texts.append(content.text)
elif isinstance(content, dict) and 'text' in content:
texts.append(content['text'])
return "\n".join(texts)
return wrapper
def create_agent(self):
"""创建Agent执行器"""
prompt = ChatPromptTemplate.from_messages([
("system", """你是一个企业智能助手,可以:
1. 回答企业知识库问题
2. 发送邮件
3. 查询CRM联系人
4. 创建Jira工单
5. 处理多步骤复杂任务
请根据用户需求选择最合适的工具。如果是知识性问题,优先使用知识库工具。
如果是涉及多个步骤的任务,请分解任务并逐步执行。"""),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
agent = create_openai_functions_agent(
llm=self.llm,
tools=self.tools,
prompt=prompt
)
return AgentExecutor(
agent=agent,
tools=self.tools,
memory=self.memory,
verbose=True,
handle_parsing_errors=True,
max_iterations=5
)
async def run(self, query: str):
"""运行Agent处理查询"""
agent_executor = self.create_agent()
try:
result = await agent_executor.ainvoke({"input": query})
return result['output']
except Exception as e:
return f"执行出错: {str(e)}"
finally:
# 清理所有MCP连接
for client in self.mcp_clients.values():
await client.close()
# 使用示例
async def main():
# 配置MCP服务器
mcp_config = {
"rag_server": {"command": "python", "args": ["mcp_rag_server.py"]},
"enterprise_server": {"command": "python", "args": ["mcp_enterprise_tools.py"]}
}
# 创建并初始化Agent
agent = EnterpriseAgent(mcp_config)
await agent.initialize()
# 执行复杂任务
queries = [
"我们公司的项目报销流程是什么?",
"请把报销流程总结一下,然后发送给新员工张三(zhangsan@company.com)",
"顺便在CRM里查一下张三的联系方式,然后在Jira上创建一个关于报销培训的工单"
]
for query in queries:
print(f"\n用户: {query}")
response = await agent.run(query)
print(f"助手: {response}")
await agent.close()
if __name__ == "__main__":
asyncio.run(main())
阶段5:完整工作流示例
# complete_workflow.py
"""
演示完整的RAG + Agent + MCP工作流:
用户请求 → Agent规划 → MCP工具调用 → 返回结果
"""
async def demonstrate_complex_workflow():
"""演示复杂工作流"""
# 1. 用户提出复杂请求
user_request = """
我需要了解公司的新项目"星海计划"的相关信息,
然后写一份摘要发送给市场部团队(market@company.com),
并在Jira上创建一个跟进任务。
"""
print(f"用户请求: {user_request}")
# 2. Agent分解任务并执行
tasks = [
"使用知识库工具查询'星海计划'的详细信息",
"基于查询结果撰写项目摘要",
"使用邮件工具发送摘要给市场部团队",
"使用Jira工具创建跟进任务"
]
# 3. 执行过程(实际由Agent自动完成)
results = []
# 步骤1:查询知识库(RAG)
rag_result = await query_knowledge_base("星海计划")
results.append(f"知识库查询结果: {rag_result}")
# 步骤2:生成摘要(LLM)
summary = await generate_summary(rag_result)
results.append(f"生成摘要: {summary}")
# 步骤3:发送邮件(通过MCP)
email_result = await send_email(
to="market@company.com",
subject="星海计划项目摘要",
body=summary
)
results.append(f"邮件发送结果: {email_result}")
# 步骤4:创建Jira任务(通过MCP)
jira_result = await create_jira_ticket(
summary="星海计划市场跟进",
description=f"需要跟进星海计划的市场推广\n\n项目信息:\n{summary}",
project="MARKET"
)
results.append(f"Jira工单创建结果: {jira_result}")
# 4. 向用户返回完整结果
final_response = f"""
已完成所有任务:
1. ✓ 已查询星海计划详细信息
2. ✓ 已生成项目摘要
3. ✓ 已将摘要发送至 market@company.com
4. ✓ 已在Jira创建跟进工单
详细执行记录:
{chr(10).join(results)}
"""
return final_response
四、部署与优化
1. 容器化部署
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY . .
# 启动服务
CMD ["python", "main.py"]
# docker-compose.yml
version: '3.8'
services:
rag-mcp-server:
build: .
command: python mcp_rag_server.py
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
tools-mcp-server:
build: .
command: python mcp_enterprise_tools.py
environment:
- CRM_API_KEY=${CRM_API_KEY}
- JIRA_API_TOKEN=${JIRA_API_TOKEN}
agent-orchestrator:
build: .
command: python agent_orchestrator.py
depends_on:
- rag-mcp-server
- tools-mcp-server
ports:
- "8000:8000"
web-ui:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./ui:/usr/share/nginx/html
2. 性能优化建议
# optimization.py
class OptimizedEnterpriseAgent(EnterpriseAgent):
async def optimize_performance(self):
"""性能优化措施"""
# 1. 缓存常用查询
from functools import lru_cache
@lru_cache(maxsize=100)
async def cached_query_knowledge(query: str):
return await self.mcp_clients['rag'].call_tool(
"query_knowledge_base",
{"query": query}
)
# 2. 并行执行独立任务
import asyncio
async def parallel_execution(tasks):
"""并行执行多个任务"""
return await asyncio.gather(*tasks)
# 3. 流式输出
async def stream_response(self, query: str):
"""流式返回响应"""
async for chunk in self.agent_executor.astream({"input": query}):
yield chunk.get('output', '')
# 4. 智能工具选择(基于历史成功率)
self.tool_success_rates = {}
async def smart_tool_selection(self, query: str):
"""基于历史成功率选择工具"""
# 分析查询类型
query_type = self.classify_query(query)
# 选择最可能成功的工具
best_tool = max(
self.tools,
key=lambda t: self.tool_success_rates.get(t.name, 0.5)
)
return best_tool
3. 监控与日志
# monitoring.py
import logging
from datetime import datetime
from prometheus_client import Counter, Histogram
# 定义指标
QUERY_COUNT = Counter('agent_queries_total', 'Total queries processed')
TOOL_CALL_COUNT = Counter('tool_calls_total', 'Total tool calls', ['tool_name'])
RESPONSE_TIME = Histogram('response_time_seconds', 'Response time in seconds')
class MonitoredEnterpriseAgent(EnterpriseAgent):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setup_logging()
def setup_logging(self):
"""配置结构化日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('agent.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
@RESPONSE_TIME.time()
async def monitored_run(self, query: str):
"""带监控的查询执行"""
QUERY_COUNT.inc()
self.logger.info(f"开始处理查询: {query}")
start_time = datetime.now()
try:
result = await super().run(query)
duration = (datetime.now() - start_time).total_seconds()
self.logger.info(f"查询处理完成,耗时: {duration:.2f}s")
return result
except Exception as e:
self.logger.error(f"查询处理失败: {str(e)}", exc_info=True)
raise
async def monitored_tool_call(self, client, tool_name, arguments):
"""带监控的工具调用"""
TOOL_CALL_COUNT.labels(tool_name=tool_name).inc()
self.logger.info(f"调用工具 {tool_name}: {arguments}")
start_time = datetime.now()
try:
result = await client.call_tool(tool_name, arguments)
duration = (datetime.now() - start_time).total_seconds()
self.logger.info(f"工具 {tool_name} 调用成功,耗时: {duration:.2f}s")
return result
except Exception as e:
self.logger.error(f"工具 {tool_name} 调用失败: {str(e)}")
raise
五、最佳实践与注意事项
1. 安全实践
# security.py
class SecureEnterpriseAgent(EnterpriseAgent):
async def secure_initialize(self):
"""安全初始化"""
# 1. 验证MCP服务器身份
await self.verify_server_identity()
# 2. 设置工具权限
self.setup_tool_permissions()
# 3. 启用审计日志
self.enable_audit_logging()
def setup_tool_permissions(self):
"""基于RBAC的工具权限控制"""
self.tool_permissions = {
"query_knowledge_base": ["员工", "经理", "管理员"],
"send_email": ["经理", "管理员"],
"create_jira_ticket": ["员工", "经理", "管理员"],
"get_crm_contact": ["经理", "管理员"]
}
async def check_permission(self, user_role: str, tool_name: str) -> bool:
"""检查用户是否有权限使用工具"""
allowed_roles = self.tool_permissions.get(tool_name, [])
return user_role in allowed_roles
2. 错误处理与重试
# error_handling.py
from tenacity import retry, stop_after_attempt, wait_exponential
class ResilientEnterpriseAgent(EnterpriseAgent):
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def resilient_tool_call(self, client, tool_name, arguments):
"""带重试机制的工具调用"""
try:
return await client.call_tool(tool_name, arguments)
except Exception as e:
self.logger.warning(f"工具调用失败,将重试: {str(e)}")
raise
async def fallback_strategy(self, query: str, failed_tool: str):
"""降级策略"""
fallback_mapping = {
"query_knowledge_base": self.fallback_knowledge_query,
"send_email": self.fallback_notification,
# ... 其他工具的降级策略
}
fallback_func = fallback_mapping.get(failed_tool)
if fallback_func:
return await fallback_func(query)
return "系统暂时无法处理此请求,请稍后重试。"
六、完整示例:客服工单处理系统
# customer_service_agent.py
"""
完整的客服工单处理Agent示例
整合了:知识库查询 + 工单创建 + CRM查询 + 邮件通知
"""
class CustomerServiceAgent(EnterpriseAgent):
async def handle_customer_ticket(self, customer_query: str, customer_email: str):
"""处理客户工单的完整流程"""
# 1. 从知识库查找解决方案(RAG)
solution = await self.search_knowledge_base(customer_query)
if solution:
# 2. 如果找到方案,直接回复客户
await self.send_customer_reply(customer_email, solution)
return {"status": "resolved", "solution": solution}
else:
# 3. 没找到方案,创建内部工单(Agent + MCP)
ticket_id = await self.create_internal_ticket(customer_query)
# 4. 从CRM获取客户信息
customer_info = await self.get_customer_info(customer_email)
# 5. 通知客服团队
await self.notify_support_team(ticket_id, customer_query, customer_info)
# 6. 给客户发送确认邮件
await self.send_acknowledgement(customer_email, ticket_id)
return {
"status": "escalated",
"ticket_id": ticket_id,
"assigned_to": "support_team"
}
async def search_knowledge_base(self, query: str):
"""使用RAG搜索知识库"""
rag_client = self.mcp_clients['rag']
result = await rag_client.call_tool(
"query_knowledge_base",
{"query": query}
)
return result
async def create_internal_ticket(self, description: str):
"""使用MCP工具创建工单"""
tools_client = self.mcp_clients['enterprise']
result = await tools_client.call_tool(
"create_jira_ticket",
{
"summary": f"客户问题: {description[:50]}...",
"description": description,
"project": "SUPPORT"
}
)
# 从结果中提取工单ID
return result.split()[1] # 例如: "工单 SUPPORT-123 已创建"
总结
通过以上实战示例,我们展示了如何:
- 构建模块化系统:将RAG、各种企业工具封装为独立的MCP服务器
- 实现智能调度:使用Agent作为大脑,根据任务类型动态调用工具
- 确保安全可控:通过MCP协议实现工具间的安全隔离
- 处理复杂工作流:自动分解多步骤任务并协调执行
关键成功因素:
- 清晰的工具边界:每个MCP服务器职责单一
- 健壮的错误处理:工具调用失败时要有降级方案
- 全面的监控:跟踪Agent决策、工具调用、执行时长
- 渐进式部署:先从简单场景开始,逐步增加复杂度
这种架构范式为企业构建可扩展、可维护、安全的AI应用提供了坚实基础。随着MCP生态的成熟,未来可以轻松集成更多第三方工具,构建功能更加强大的智能助手系统。
更多推荐


所有评论(0)