多智能体协作和MCP服务
多智能体系统(MAS, Multi-Agent System) 是由多个自主的智能体组成的系统,这些智能体通过相互协作来完成单个智能体难以完成的复杂任务。1.2.2 通信机制(Communication)1.2.3 角色分配(Role Allocation)1.3 实际应用实例:智能研究报告生成系统1.3.1 系统架构1.3.2 完整工作流程1.3.3 智能体间的详细交互示例1.4 优势与挑战1.
·
多智能体协作与MCP服务:深度解析
1. 多智能体协作(Multi-Agent Collaboration)
1.1 核心概念
多智能体系统(MAS, Multi-Agent System) 是由多个自主的智能体组成的系统,这些智能体通过相互协作来完成单个智能体难以完成的复杂任务。
1.1.1 智能体定义
- 自主性(Autonomy):能够在没有外部干预的情况下执行操作
- 社交性(Social Ability):能够与其他智能体通信和交互
- 反应性(Reactivity):能够感知环境并做出响应
- 主动性(Pro-activeness):能够主动采取行动实现目标
1.2 核心工作机制
1.2.1 协调机制(Coordination)
# 示例:基于合同的协商机制
class ContractNetProtocol:
def __init__(self):
self.manager = None # 管理者角色
self.contractors = [] # 承包商列表
self.bids = {} # 投标记录
async def announce_task(self, task_description, task_requirements):
"""发布任务公告"""
announcement = {
"task_id": str(uuid.uuid4()),
"description": task_description,
"requirements": task_requirements,
"deadline": datetime.now() + timedelta(minutes=5)
}
# 向所有承包商发送公告
for contractor in self.contractors:
response = await contractor.receive_announcement(announcement)
if response["can_perform"]:
self.bids[contractor.id] = {
"bid": response["bid"],
"capability_score": response["capability_score"],
"estimated_time": response["estimated_time"]
}
return announcement["task_id"]
def evaluate_bids(self):
"""评估投标"""
if not self.bids:
return None
# 选择最佳承包商(基于能力和时间)
best_contractor = None
best_score = -1
for contractor_id, bid_info in self.bids.items():
# 综合评分算法
score = (
bid_info["capability_score"] * 0.6 +
(1 / bid_info["estimated_time"]) * 0.4
)
if score > best_score:
best_score = score
best_contractor = contractor_id
return best_contractor
1.2.2 通信机制(Communication)
# 示例:基于ACL(Agent Communication Language)的消息传递
class ACLMessage:
"""智能体通信语言消息"""
PERFORMATIVES = [
"REQUEST", # 请求执行动作
"INFORM", # 提供信息
"QUERY_REF", # 查询信息
"CONFIRM", # 确认
"DISCONFIRM", # 否认
"PROPOSE", # 提议
"ACCEPT_PROPOSAL", # 接受提议
"REJECT_PROPOSAL", # 拒绝提议
"CFP", # 呼叫提案
"SUBSCRIBE", # 订阅
"CANCEL" # 取消
]
def __init__(self, performative, sender, receiver, content):
self.message_id = str(uuid.uuid4())
self.performative = performative # 言语行为类型
self.sender = sender
self.receiver = receiver
self.content = content # 内容(通常是序列化对象)
self.language = "JSON" # 内容语言
self.ontology = "task-ontology" # 共享本体
self.protocol = "fipa-contract-net" # 通信协议
self.conversation_id = str(uuid.uuid4()) # 会话ID
self.reply_with = None # 请求回复标识
self.reply_by = datetime.now() + timedelta(minutes=10) # 回复期限
self.in_reply_to = None # 回复的消息ID
def to_dict(self):
return {
"message_id": self.message_id,
"performative": self.performative,
"sender": self.sender,
"receiver": self.receiver,
"content": self.content,
"timestamp": datetime.now().isoformat(),
"conversation_id": self.conversation_id
}
class AgentCommunicationLayer:
"""智能体通信层"""
def __init__(self):
self.message_queue = asyncio.Queue()
self.agent_registry = {} # 智能体注册表
self.conversation_states = {} # 会话状态跟踪
def register_agent(self, agent_id, agent):
"""注册智能体"""
self.agent_registry[agent_id] = agent
async def send_message(self, message: ACLMessage):
"""发送消息"""
# 序列化消息
serialized_msg = message.to_dict()
# 如果接收者存在,将消息放入其队列
if message.receiver in self.agent_registry:
receiver = self.agent_registry[message.receiver]
await receiver.receive_message(serialized_msg)
# 记录会话状态
if message.conversation_id not in self.conversation_states:
self.conversation_states[message.conversation_id] = {
"initiator": message.sender,
"state": "active",
"messages": [],
"start_time": datetime.now()
}
self.conversation_states[message.conversation_id]["messages"].append(
serialized_msg
)
return True
else:
logging.error(f"Receiver {message.receiver} not found")
return False
1.2.3 角色分配(Role Allocation)
# 示例:基于能力的角色分配算法
class RoleBasedCoordination:
def __init__(self):
self.agents = {} # agent_id -> Agent
self.roles = {} # role_name -> requirements
self.role_assignments = {} # agent_id -> role_name
def define_role(self, role_name, requirements):
"""定义角色及其要求"""
self.roles[role_name] = {
"required_skills": requirements.get("skills", []),
"required_experience": requirements.get("experience", 0),
"priority": requirements.get("priority", 1),
"max_agents": requirements.get("max_agents", 1)
}
def evaluate_agent_for_role(self, agent_id, role_name):
"""评估智能体是否适合角色"""
agent = self.agents[agent_id]
role_req = self.roles[role_name]
# 技能匹配度
skill_match = 0
agent_skills = set(agent.skills)
required_skills = set(role_req["required_skills"])
if required_skills:
skill_match = len(agent_skills & required_skills) / len(required_skills)
# 经验匹配度
exp_match = 1 if agent.experience >= role_req["required_experience"] else \
agent.experience / role_req["required_experience"]
# 综合评分
score = skill_match * 0.7 + exp_match * 0.3
return {
"agent_id": agent_id,
"role": role_name,
"score": score,
"skill_match": skill_match,
"exp_match": exp_match,
"available_capacity": agent.available_capacity
}
def assign_roles(self, task_requirements):
"""分配角色"""
# 收集所有可能的分配
candidate_assignments = []
for agent_id, agent in self.agents.items():
for role_name in task_requirements["required_roles"]:
if agent.available_capacity > 0:
evaluation = self.evaluate_agent_for_role(agent_id, role_name)
if evaluation["score"] > 0.5: # 阈值
candidate_assignments.append(evaluation)
# 排序并选择最佳分配(背包问题变种)
candidate_assignments.sort(key=lambda x: x["score"], reverse=True)
assignments = {}
role_counts = {}
for assignment in candidate_assignments:
role_name = assignment["role"]
agent_id = assignment["agent_id"]
# 检查角色限制
current_count = role_counts.get(role_name, 0)
max_agents = self.roles[role_name]["max_agents"]
if current_count < max_agents:
if agent_id not in assignments:
# 检查智能体容量
agent = self.agents[agent_id]
if agent.assign_role(role_name):
assignments[agent_id] = role_name
role_counts[role_name] = current_count + 1
self.role_assignments = assignments
return assignments
1.3 实际应用实例:智能研究报告生成系统
1.3.1 系统架构
┌─────────────────────────────────────────────────────┐
│ 智能研究报告生成系统 │
├─────────────────────────────────────────────────────┤
│ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ 研究智能体 │ │ 分析智能体 │ │ 写作智能体 │ │
│ │ (Research)│ │ (Analysis)│ │ (Writing) │ │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │ │
│ ┌─────▼───────────────▼───────────────▼─────┐ │
│ │ 协调器 (Orchestrator) │ │
│ └─────────────────────┬─────────────────────┘ │
│ │ │
│ ┌─────────────────────▼─────────────────────┐ │
│ │ 知识库 (Knowledge Base) │ │
│ │ • 文献资料 • 研究数据 • 模板库 │ │
│ └───────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────┘
1.3.2 完整工作流程
class ResearchReportSystem:
"""智能研究报告生成系统"""
def __init__(self):
# 初始化智能体
self.research_agent = ResearchAgent()
self.analysis_agent = AnalysisAgent()
self.writing_agent = WritingAgent()
self.review_agent = ReviewAgent()
# 协调器
self.orchestrator = Orchestrator()
# 知识库
self.knowledge_base = KnowledgeBase()
# 通信层
self.communication = AgentCommunicationLayer()
# 注册智能体
self._register_agents()
def _register_agents(self):
"""注册所有智能体"""
agents = {
"research": self.research_agent,
"analysis": self.analysis_agent,
"writing": self.writing_agent,
"review": self.review_agent,
"orchestrator": self.orchestrator
}
for agent_id, agent in agents.items():
agent.id = agent_id
self.communication.register_agent(agent_id, agent)
async def generate_report(self, topic, requirements):
"""生成研究报告"""
# 阶段1:任务分解与规划
task_plan = await self.orchestrator.plan_task(
topic=topic,
requirements=requirements
)
# 阶段2:并行研究与数据收集
research_tasks = task_plan["research_tasks"]
research_results = []
# 使用异步并发执行研究任务
research_coros = []
for task in research_tasks:
coro = self.research_agent.execute_research_task(task)
research_coros.append(coro)
research_results = await asyncio.gather(*research_coros)
# 阶段3:数据分析与整合
analysis_prompts = []
for result in research_results:
if result["success"]:
prompt = self.analysis_agent.create_analysis_prompt(result["data"])
analysis_prompts.append(prompt)
analysis_results = await self.analysis_agent.analyze_data_batch(analysis_prompts)
# 阶段4:报告结构规划
outline = await self.writing_agent.create_outline(
topic=topic,
analysis_results=analysis_results,
report_type=requirements.get("report_type", "academic")
)
# 阶段5:章节并行写作
chapters = outline["chapters"]
writing_tasks = []
for chapter in chapters:
task = {
"chapter_title": chapter["title"],
"key_points": chapter["key_points"],
"data_sources": analysis_results,
"writing_style": requirements.get("writing_style", "formal")
}
writing_tasks.append(task)
# 并行生成章节
writing_coros = []
for task in writing_tasks:
coro = self.writing_agent.write_chapter(task)
writing_coros.append(coro)
chapter_contents = await asyncio.gather(*writing_coros)
# 阶段6:报告整合与润色
draft = await self.writing_agent.compile_report(
outline=outline,
chapters=chapter_contents,
metadata={
"topic": topic,
"author": "AI Research System",
"date": datetime.now().strftime("%Y-%m-%d")
}
)
# 阶段7:质量评审与优化
review_results = await self.review_agent.review_report(
report=draft,
criteria=requirements.get("review_criteria", DEFAULT_CRITERIA)
)
# 阶段8:反馈循环与迭代改进
if review_results["needs_revision"]:
revision_plan = review_results["revision_plan"]
revised_draft = await self._apply_revisions(draft, revision_plan)
# 二次评审
final_review = await self.review_agent.review_report(
report=revised_draft,
criteria=requirements.get("review_criteria", DEFAULT_CRITERIA)
)
final_report = revised_draft if final_review["passed"] else draft
else:
final_report = draft
# 阶段9:格式转换与输出
formatted_report = await self.writing_agent.format_report(
report=final_report,
format_type=requirements.get("output_format", "markdown")
)
# 阶段10:知识库归档
await self.knowledge_base.archive_report(
report=formatted_report,
metadata={
"topic": topic,
"generation_date": datetime.now(),
"agents_involved": ["research", "analysis", "writing", "review"],
"quality_score": review_results.get("quality_score", 0)
}
)
return {
"success": True,
"report": formatted_report,
"metadata": {
"topic": topic,
"generation_time": datetime.now(),
"research_sources": len(research_results),
"analysis_insights": len(analysis_results),
"review_feedback": review_results.get("feedback", []),
"final_quality_score": review_results.get("quality_score", 0)
},
"intermediate_results": {
"research": research_results,
"analysis": analysis_results,
"outline": outline,
"review": review_results
}
}
async def _apply_revisions(self, draft, revision_plan):
"""应用修订建议"""
for revision in revision_plan["revisions"]:
if revision["type"] == "content_addition":
# 触发额外研究
research_task = {
"query": revision["suggested_content"],
"depth": "quick"
}
new_research = await self.research_agent.execute_research_task(research_task)
if new_research["success"]:
# 整合新内容
integration_result = await self.writing_agent.integrate_content(
draft=draft,
new_content=new_research["data"],
section=revision["section"]
)
draft = integration_result["revised_draft"]
elif revision["type"] == "restructuring":
# 重新组织内容
draft = await self.writing_agent.reorganize_content(
draft=draft,
new_structure=revision["suggested_structure"]
)
elif revision["type"] == "style_improvement":
# 改进写作风格
draft = await self.writing_agent.improve_style(
draft=draft,
style_guidelines=revision["guidelines"]
)
return draft
1.3.3 智能体间的详细交互示例
# 详细展示智能体间如何通过消息传递协作
async def demonstrate_agent_interaction():
"""展示智能体交互过程"""
system = ResearchReportSystem()
# 1. 协调器向研究智能体发送研究请求
cfp_message = ACLMessage(
performative="CFP", # 呼叫提案
sender="orchestrator",
receiver="research",
content={
"task_type": "literature_review",
"topic": "机器学习在医疗影像诊断中的应用",
"scope": {
"time_range": "2018-2023",
"sources": ["academic_papers", "clinical_studies", "industry_reports"],
"languages": ["中文", "英文"]
},
"requirements": {
"min_sources": 10,
"include_statistics": True,
"format": "structured_summary"
},
"deadline": "30分钟",
"reward": {"priority": "high", "resources": "full_access"}
}
)
await system.communication.send_message(cfp_message)
# 2. 研究智能体回应提案
proposal_message = ACLMessage(
performative="PROPOSE",
sender="research",
receiver="orchestrator",
content={
"proposal_id": "prop_001",
"capabilities": ["web_search", "academic_db", "data_extraction"],
"estimated_time": "25分钟",
"confidence": 0.85,
"resource_requirements": {
"api_calls": 50,
"compute_time": "15分钟",
"data_storage": "100MB"
}
}
)
await system.communication.send_message(proposal_message)
# 3. 协调器接受提案
accept_message = ACLMessage(
performative="ACCEPT_PROPOSAL",
sender="orchestrator",
receiver="research",
content={
"accepted_proposal": "prop_001",
"contract_terms": {
"quality_standards": {"accuracy": 0.9, "completeness": 0.85},
"validation_required": True,
"intermediate_checkpoints": ["5分钟", "15分钟"]
}
}
)
await system.communication.send_message(accept_message)
# 4. 研究智能体开始执行并发送进度更新
progress_message = ACLMessage(
performative="INFORM",
sender="research",
receiver="orchestrator",
content={
"progress_update": {
"percentage": 40,
"sources_found": 6,
"key_findings": [
"深度学习在CT扫描分析中达到95%准确率",
"迁移学习减少了对标注数据的需求",
"联邦学习保护了患者隐私"
],
"estimated_completion": "15分钟后",
"issues": ["某些论文需要付费访问"]
}
}
)
await system.communication.send_message(progress_message)
# 5. 研究完成后发送结果
result_message = ACLMessage(
performative="INFORM",
sender="research",
receiver="orchestrator",
content={
"task_completion": {
"status": "success",
"data": {
"sources_analyzed": 12,
"total_papers": 45,
"key_themes": [
{"theme": "卷积神经网络", "count": 25},
{"theme": "生成对抗网络", "count": 15},
{"theme": "Transformer", "count": 5}
],
"statistical_summary": {
"average_accuracy": 0.92,
"common_datasets": ["ImageNet", "CheXpert", "MIMIC-CXR"],
"trends": ["小样本学习", "自监督学习", "可解释性AI"]
},
"raw_data": "..." # 简化表示
},
"metadata": {
"time_spent": "22分钟",
"resources_used": {"api_calls": 42, "data_downloaded": "85MB"},
"confidence_score": 0.88
}
}
}
)
await system.communication.send_message(result_message)
# 6. 协调器确认接收并转发给分析智能体
confirm_message = ACLMessage(
performative="CONFIRM",
sender="orchestrator",
receiver="research",
content={
"confirmation": {
"result_received": True,
"quality_assessment": {
"completeness": 0.9,
"relevance": 0.95,
"timeliness": 1.0
},
"next_step": "forwarding_to_analysis"
}
}
)
await system.communication.send_message(confirm_message)
1.4 优势与挑战
1.4.1 优势
- 复杂性处理:能处理单个智能体无法完成的复杂任务
- 鲁棒性:一个智能体失败不影响整个系统
- 灵活性:智能体可动态加入/离开系统
- 专业化:每个智能体可专注于特定领域
- 可扩展性:容易增加新智能体或新功能
1.4.2 挑战
- 协调开销:智能体间通信和协调需要额外资源
- 冲突解决:不同智能体可能有目标冲突
- 知识共享:确保智能体间有效共享知识和上下文
- 系统稳定性:避免死锁和活锁情况
- 安全隐私:保护智能体间的敏感信息
2. MCP服务(Model Context Protocol)
2.1 核心概念
MCP(Model Context Protocol) 是一种标准化的协议,允许大型语言模型(LLM)与外部工具、数据源和服务进行安全、高效的交互。它类似于操作系统的设备驱动程序,为LLM提供了访问外部能力的标准化接口。
2.1.1 MCP的核心设计原则
- 标准化接口:统一的API规范
- 上下文管理:智能管理模型上下文窗口
- 安全沙箱:限制外部服务的访问权限
- 可扩展性:容易集成新工具和服务
- 互操作性:跨不同模型和平台工作
2.2 核心工作机制
2.2.1 MCP架构
┌─────────────────────────────────────────────────────┐
│ LLM 应用程序 │
│ (如: ChatGPT, Claude, 自定义AI助手) │
└─────────────────────────┬───────────────────────────┘
│ HTTP/WebSocket
│ JSON-RPC 2.0
┌─────────────────────────▼───────────────────────────┐
│ MCP 服务器 (Host) │
│ • 协议实现 • 会话管理 • 安全性控制 │
└─────────────────────────┬───────────────────────────┘
│ MCP 协议
│ (工具注册/调用)
┌─────────────────────────▼───────────────────────────┐
│ MCP 客户端/服务端 │
│ • 工具实现 • 资源提供 • 数据源连接 │
└─────────────────────────────────────────────────────┘
2.2.2 MCP协议消息格式
class MCPMessage:
"""MCP协议消息基类"""
@staticmethod
def create_request(method: str, params: dict = None, id: int = 1):
"""创建JSON-RPC请求"""
return {
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params or {}
}
@staticmethod
def create_response(result: any = None, id: int = 1):
"""创建JSON-RPC响应"""
return {
"jsonrpc": "2.0",
"id": id,
"result": result
}
@staticmethod
def create_error(error_code: int, error_message: str, id: int = 1):
"""创建JSON-RPC错误响应"""
return {
"jsonrpc": "2.0",
"id": id,
"error": {
"code": error_code,
"message": error_message
}
}
# MCP标准方法
MCP_METHODS = {
# 工具相关
"tools/list": "列出可用工具",
"tools/call": "调用工具",
# 资源相关
"resources/list": "列出可用资源",
"resources/read": "读取资源内容",
"resources/subscribe": "订阅资源更新",
# 提示词相关
"prompts/list": "列出可用提示词",
"prompts/get": "获取提示词模板",
# 日志和监控
"logging/set_level": "设置日志级别",
"metrics/report": "报告性能指标"
}
2.2.3 MCP服务器实现
import json
import asyncio
from typing import Dict, Any, List, Callable
from enum import Enum
class MCPErrorCode(Enum):
"""MCP错误代码"""
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERNAL_ERROR = -32603
SERVER_ERROR = -32000
TOOL_EXECUTION_ERROR = -32001
RESOURCE_NOT_FOUND = -32002
PERMISSION_DENIED = -32003
class MCPServer:
"""MCP服务器实现"""
def __init__(self, host: str = "localhost", port: int = 8000):
self.host = host
self.port = port
self.tools: Dict[str, Callable] = {}
self.resources: Dict[str, Any] = {}
self.prompts: Dict[str, str] = {}
self.subscriptions: Dict[str, List[str]] = {}
self.sessions: Dict[str, Dict] = {}
# 注册内置工具
self._register_builtin_tools()
def _register_builtin_tools(self):
"""注册内置工具"""
self.register_tool("ping", self._tool_ping)
self.register_tool("help", self._tool_help)
self.register_tool("list_tools", self._tool_list_tools)
def register_tool(self, name: str, function: Callable, description: str = ""):
"""注册工具"""
self.tools[name] = {
"function": function,
"description": description,
"parameters": self._extract_parameters(function)
}
def register_resource(self, name: str, resource: Any, metadata: Dict = None):
"""注册资源"""
self.resources[name] = {
"data": resource,
"metadata": metadata or {},
"last_updated": asyncio.get_event_loop().time()
}
def _extract_parameters(self, function: Callable) -> List[Dict]:
"""从函数签名提取参数信息"""
import inspect
params = []
sig = inspect.signature(function)
for param_name, param in sig.parameters.items():
if param_name == "self":
continue
param_info = {
"name": param_name,
"type": str(param.annotation) if param.annotation != inspect.Parameter.empty else "any",
"required": param.default == inspect.Parameter.empty,
"default": param.default if param.default != inspect.Parameter.empty else None
}
params.append(param_info)
return params
async def _tool_ping(self, message: str = "pong") -> Dict:
"""Ping工具(用于连接测试)"""
return {"status": "ok", "message": message, "timestamp": asyncio.get_event_loop().time()}
async def _tool_help(self, tool_name: str = None) -> Dict:
"""帮助工具"""
if tool_name:
if tool_name in self.tools:
tool = self.tools[tool_name]
return {
"name": tool_name,
"description": tool["description"],
"parameters": tool["parameters"]
}
else:
raise ValueError(f"Tool '{tool_name}' not found")
else:
return {
"available_tools": list(self.tools.keys()),
"total_tools": len(self.tools),
"server_info": {
"host": self.host,
"port": self.port,
"protocol_version": "1.0"
}
}
async def _tool_list_tools(self) -> Dict:
"""列出所有工具"""
tools_info = []
for name, tool in self.tools.items():
tools_info.append({
"name": name,
"description": tool["description"],
"parameter_count": len(tool["parameters"])
})
return {"tools": tools_info, "count": len(tools_info)}
async def handle_request(self, request_data: Dict) -> Dict:
"""处理MCP请求"""
try:
# 验证JSON-RPC 2.0格式
if request_data.get("jsonrpc") != "2.0":
return MCPMessage.create_error(
MCPErrorCode.INVALID_REQUEST.value,
"Invalid JSON-RPC version"
)
method = request_data.get("method")
params = request_data.get("params", {})
request_id = request_data.get("id", 1)
if not method:
return MCPMessage.create_error(
MCPErrorCode.INVALID_REQUEST.value,
"Method is required"
)
# 路由到相应的处理方法
if method == "tools/list":
result = await self.handle_tools_list(params)
elif method == "tools/call":
result = await self.handle_tools_call(params)
elif method == "resources/list":
result = await self.handle_resources_list(params)
elif method == "resources/read":
result = await self.handle_resources_read(params)
elif method == "prompts/list":
result = await self.handle_prompts_list(params)
elif method == "prompts/get":
result = await self.handle_prompts_get(params)
else:
# 检查是否为工具调用
if method.startswith("tools/"):
tool_name = method[6:] # 去掉"tools/"前缀
if tool_name in self.tools:
result = await self._call_tool_directly(tool_name, params)
else:
return MCPMessage.create_error(
MCPErrorCode.METHOD_NOT_FOUND.value,
f"Method '{method}' not found"
)
else:
return MCPMessage.create_error(
MCPErrorCode.METHOD_NOT_FOUND.value,
f"Method '{method}' not found"
)
return MCPMessage.create_response(result, request_id)
except Exception as e:
logging.error(f"Error handling request: {e}")
return MCPMessage.create_error(
MCPErrorCode.INTERNAL_ERROR.value,
f"Internal server error: {str(e)}"
)
async def handle_tools_list(self, params: Dict) -> Dict:
"""处理工具列表请求"""
tools = []
for name, tool in self.tools.items():
tools.append({
"name": name,
"description": tool["description"],
"inputSchema": {
"type": "object",
"properties": {
param["name"]: {
"type": param["type"],
"description": f"Parameter: {param['name']}",
"required": param["required"]
}
for param in tool["parameters"]
}
}
})
return {"tools": tools}
async def handle_tools_call(self, params: Dict) -> Dict:
"""处理工具调用请求"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
if not tool_name:
raise ValueError("Tool name is required")
if tool_name not in self.tools:
raise ValueError(f"Tool '{tool_name}' not found")
tool = self.tools[tool_name]
# 验证参数
self._validate_arguments(tool["parameters"], arguments)
# 执行工具
try:
result = await tool["function"](**arguments)
return {
"tool": tool_name,
"result": result,
"success": True,
"timestamp": asyncio.get_event_loop().time()
}
except Exception as e:
logging.error(f"Tool execution failed: {e}")
return {
"tool": tool_name,
"error": str(e),
"success": False,
"timestamp": asyncio.get_event_loop().time()
}
async def _call_tool_directly(self, tool_name: str, params: Dict) -> Dict:
"""直接调用工具"""
if tool_name not in self.tools:
raise ValueError(f"Tool '{tool_name}' not found")
tool = self.tools[tool_name]
result = await tool["function"](**params)
return {
"tool": tool_name,
"result": result,
"success": True
}
def _validate_arguments(self, expected_params: List[Dict], provided_args: Dict):
"""验证参数"""
for param in expected_params:
param_name = param["name"]
if param["required"] and param_name not in provided_args:
raise ValueError(f"Required parameter '{param_name}' is missing")
if param_name in provided_args:
# 这里可以添加类型验证
pass
async def start(self):
"""启动MCP服务器"""
# 这里实现HTTP或WebSocket服务器
# 简化示例,实际中会使用FastAPI或类似框架
pass
2.3 MCP服务实例:智能数据分析服务
2.3.1 完整MCP服务实现
class DataAnalysisMCPService(MCPServer):
"""数据分析MCP服务"""
def __init__(self):
super().__init__(host="localhost", port=8003)
# 注册数据分析专用工具
self._register_data_analysis_tools()
# 注册示例数据集
self._register_sample_datasets()
def _register_data_analysis_tools(self):
"""注册数据分析工具"""
# 1. 数据加载工具
self.register_tool(
name="load_dataset",
function=self._tool_load_dataset,
description="从各种来源加载数据集"
)
# 2. 数据清洗工具
self.register_tool(
name="clean_data",
function=self._tool_clean_data,
description="清洗和预处理数据"
)
# 3. 统计分析工具
self.register_tool(
name="statistical_summary",
function=self._tool_statistical_summary,
description="生成数据集的统计摘要"
)
# 4. 可视化工具
self.register_tool(
name="create_visualization",
function=self._tool_create_visualization,
description="创建数据可视化图表"
)
# 5. 机器学习工具
self.register_tool(
name="train_model",
function=self._tool_train_model,
description="训练机器学习模型"
)
# 6. 预测工具
self.register_tool(
name="make_predictions",
function=self._tool_make_predictions,
description="使用模型进行预测"
)
# 7. 报告生成工具
self.register_tool(
name="generate_report",
function=self._tool_generate_report,
description="生成数据分析报告"
)
def _register_sample_datasets(self):
"""注册示例数据集"""
import pandas as pd
import numpy as np
# 示例1:销售数据集
dates = pd.date_range('2023-01-01', periods=100, freq='D')
sales_data = pd.DataFrame({
'date': dates,
'product': np.random.choice(['A', 'B', 'C', 'D'], 100),
'region': np.random.choice(['North', 'South', 'East', 'West'], 100),
'sales_amount': np.random.randint(100, 10000, 100),
'units_sold': np.random.randint(1, 100, 100),
'customer_rating': np.random.uniform(1.0, 5.0, 100).round(1)
})
self.register_resource(
name="sales_dataset",
resource=sales_data.to_dict(orient='records'),
metadata={
"description": "示例销售数据集",
"columns": list(sales_data.columns),
"rows": len(sales_data),
"source": "generated",
"last_updated": "2023-01-01"
}
)
# 示例2:客户数据集
customer_data = pd.DataFrame({
'customer_id': range(1, 51),
'age': np.random.randint(18, 70, 50),
'income': np.random.randint(30000, 150000, 50),
'gender': np.random.choice(['Male', 'Female'], 50),
'city': np.random.choice(['Beijing', 'Shanghai', 'Guangzhou', 'Shenzhen'], 50),
'loyalty_score': np.random.randint(1, 100, 50),
'churn_risk': np.random.choice([0, 1], 50, p=[0.7, 0.3])
})
self.register_resource(
name="customer_dataset",
resource=customer_data.to_dict(orient='records'),
metadata={
"description": "示例客户数据集",
"columns": list(customer_data.columns),
"rows": len(customer_data),
"source": "generated",
"last_updated": "2023-01-01"
}
)
async def _tool_load_dataset(self, source_type: str, source_path: str = None, **kwargs):
"""加载数据集工具"""
import pandas as pd
if source_type == "csv":
if not source_path:
raise ValueError("source_path is required for CSV")
df = pd.read_csv(source_path)
elif source_type == "excel":
if not source_path:
raise ValueError("source_path is required for Excel")
df = pd.read_excel(source_path)
elif source_type == "database":
# 这里简化为示例,实际需要数据库连接
connection_string = kwargs.get("connection_string")
query = kwargs.get("query")
if not connection_string or not query:
raise ValueError("connection_string and query are required for database")
# 实际实现中这里会连接数据库
df = pd.DataFrame({"example": [1, 2, 3]})
elif source_type == "builtin":
dataset_name = kwargs.get("dataset_name")
if dataset_name and dataset_name in self.resources:
data = self.resources[dataset_name]["data"]
df = pd.DataFrame(data)
else:
raise ValueError(f"Built-in dataset '{dataset_name}' not found")
else:
raise ValueError(f"Unsupported source type: {source_type}")
# 生成数据集摘要
summary = {
"rows": len(df),
"columns": len(df.columns),
"column_names": list(df.columns),
"data_types": {col: str(dtype) for col, dtype in df.dtypes.items()},
"missing_values": df.isnull().sum().to_dict(),
"memory_usage": f"{df.memory_usage(deep=True).sum() / 1024:.2f} KB",
"sample_data": df.head(5).to_dict(orient='records')
}
# 生成唯一ID并存储
dataset_id = f"dataset_{int(asyncio.get_event_loop().time())}"
self.register_resource(
name=dataset_id,
resource=df.to_dict(orient='records'),
metadata={
"source_type": source_type,
"source_path": source_path,
"loaded_at": asyncio.get_event_loop().time(),
"summary": summary
}
)
return {
"dataset_id": dataset_id,
"summary": summary,
"message": f"Dataset loaded successfully with {len(df)} rows and {len(df.columns)} columns"
}
async def _tool_clean_data(self, dataset_id: str, operations: List[Dict]):
"""数据清洗工具"""
if dataset_id not in self.resources:
raise ValueError(f"Dataset '{dataset_id}' not found")
data = self.resources[dataset_id]["data"]
df = pd.DataFrame(data)
cleaning_log = []
for op in operations:
op_type = op.get("type")
if op_type == "handle_missing":
method = op.get("method", "drop")
columns = op.get("columns", [])
if method == "drop":
if columns:
df.dropna(subset=columns, inplace=True)
else:
df.dropna(inplace=True)
cleaning_log.append(f"Dropped missing values using method: {method}")
elif method == "fill":
value = op.get("value", 0)
if columns:
df[columns] = df[columns].fillna(value)
else:
df.fillna(value, inplace=True)
cleaning_log.append(f"Filled missing values with: {value}")
elif op_type == "remove_duplicates":
subset = op.get("subset", None)
df.drop_duplicates(subset=subset, inplace=True)
cleaning_log.append("Removed duplicate rows")
elif op_type == "remove_outliers":
column = op.get("column")
method = op.get("method", "iqr")
if column not in df.columns:
cleaning_log.append(f"Column {column} not found, skipping outlier removal")
continue
if method == "iqr":
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
mask = (df[column] >= lower_bound) & (df[column] <= upper_bound)
df = df[mask]
cleaning_log.append(f"Removed outliers from {column} using IQR method")
elif op_type == "normalize":
column = op.get("column")
method = op.get("method", "minmax")
if column not in df.columns:
cleaning_log.append(f"Column {column} not found, skipping normalization")
continue
if method == "minmax":
min_val = df[column].min()
max_val = df[column].max()
if max_val > min_val:
df[column] = (df[column] - min_val) / (max_val - min_val)
cleaning_log.append(f"Normalized {column} using min-max scaling")
elif method == "zscore":
mean_val = df[column].mean()
std_val = df[column].std()
if std_val > 0:
df[column] = (df[column] - mean_val) / std_val
cleaning_log.append(f"Normalized {column} using z-score normalization")
elif op_type == "encode_categorical":
column = op.get("column")
method = op.get("method", "onehot")
if column not in df.columns:
cleaning_log.append(f"Column {column} not found, skipping encoding")
continue
if method == "onehot":
dummies = pd.get_dummies(df[column], prefix=column)
df = pd.concat([df.drop(column, axis=1), dummies], axis=1)
cleaning_log.append(f"One-hot encoded column: {column}")
elif method == "label":
unique_vals = df[column].unique()
mapping = {val: i for i, val in enumerate(unique_vals)}
df[column] = df[column].map(mapping)
cleaning_log.append(f"Label encoded column: {column}")
# 更新数据集
cleaned_dataset_id = f"cleaned_{dataset_id}"
self.register_resource(
name=cleaned_dataset_id,
resource=df.to_dict(orient='records'),
metadata={
"original_dataset": dataset_id,
"operations_applied": operations,
"cleaning_log": cleaning_log,
"cleaned_at": asyncio.get_event_loop().time(),
"summary": {
"rows": len(df),
"columns": len(df.columns),
"remaining_missing": df.isnull().sum().sum()
}
}
)
return {
"cleaned_dataset_id": cleaned_dataset_id,
"original_rows": len(data),
"cleaned_rows": len(df),
"operations_applied": len(operations),
"cleaning_log": cleaning_log,
"message": f"Data cleaning completed. Removed {len(data) - len(df)} rows."
}
async def _tool_statistical_summary(self, dataset_id: str, columns: List[str] = None):
"""统计分析工具"""
if dataset_id not in self.resources:
raise ValueError(f"Dataset '{dataset_id}' not found")
data = self.resources[dataset_id]["data"]
df = pd.DataFrame(data)
if columns:
df = df[columns]
# 数值型列
numeric_cols = df.select_dtypes(include=[np.number]).columns
numeric_summary = {}
for col in numeric_cols:
numeric_summary[col] = {
"count": int(df[col].count()),
"mean": float(df[col].mean()),
"std": float(df[col].std()),
"min": float(df[col].min()),
"25%": float(df[col].quantile(0.25)),
"50%": float(df[col].median()),
"75%": float(df[col].quantile(0.75)),
"max": float(df[col].max()),
"skew": float(df[col].skew()),
"kurtosis": float(df[col].kurtosis())
}
# 分类型列
categorical_cols = df.select_dtypes(exclude=[np.number]).columns
categorical_summary = {}
for col in categorical_cols:
value_counts = df[col].value_counts()
categorical_summary[col] = {
"unique_count": int(df[col].nunique()),
"mode": str(value_counts.index[0]) if len(value_counts) > 0 else None,
"mode_frequency": int(value_counts.iloc[0]) if len(value_counts) > 0 else 0,
"value_distribution": value_counts.head(10).to_dict(),
"missing_count": int(df[col].isnull().sum())
}
# 相关性分析
correlation_matrix = {}
if len(numeric_cols) > 1:
corr_df = df[numeric_cols].corr()
correlation_matrix = corr_df.to_dict()
return {
"dataset": dataset_id,
"summary": {
"total_rows": len(df),
"total_columns": len(df.columns),
"numeric_columns": list(numeric_cols),
"categorical_columns": list(categorical_cols),
"missing_values_total": int(df.isnull().sum().sum()),
"memory_usage": f"{df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB"
},
"numeric_statistics": numeric_summary,
"categorical_statistics": categorical_summary,
"correlation_matrix": correlation_matrix
}
async def _tool_create_visualization(self, dataset_id: str, chart_type: str, **kwargs):
"""创建可视化图表工具"""
if dataset_id not in self.resources:
raise ValueError(f"Dataset '{dataset_id}' not found")
import matplotlib.pyplot as plt
import seaborn as sns
import io
import base64
data = self.resources[dataset_id]["data"]
df = pd.DataFrame(data)
plt.figure(figsize=kwargs.get("figsize", (10, 6)))
if chart_type == "histogram":
column = kwargs.get("column")
if not column or column not in df.columns:
raise ValueError(f"Valid column required for histogram")
bins = kwargs.get("bins", 30)
plt.hist(df[column].dropna(), bins=bins, edgecolor='black', alpha=0.7)
plt.title(f"Histogram of {column}")
plt.xlabel(column)
plt.ylabel("Frequency")
elif chart_type == "scatter":
x_column = kwargs.get("x_column")
y_column = kwargs.get("y_column")
if not x_column or not y_column:
raise ValueError("Both x_column and y_column are required for scatter plot")
plt.scatter(df[x_column], df[y_column], alpha=0.6)
plt.title(f"Scatter Plot: {x_column} vs {y_column}")
plt.xlabel(x_column)
plt.ylabel(y_column)
elif chart_type == "bar":
column = kwargs.get("column")
if not column or column not in df.columns:
raise ValueError(f"Valid column required for bar chart")
value_counts = df[column].value_counts().head(kwargs.get("top_n", 10))
value_counts.plot(kind='bar')
plt.title(f"Bar Chart of {column}")
plt.xlabel(column)
plt.ylabel("Count")
plt.xticks(rotation=45)
elif chart_type == "box":
column = kwargs.get("column")
if not column or column not in df.columns:
raise ValueError(f"Valid column required for box plot")
df.boxplot(column=column)
plt.title(f"Box Plot of {column}")
plt.ylabel(column)
elif chart_type == "correlation_heatmap":
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) < 2:
raise ValueError("Need at least 2 numeric columns for correlation heatmap")
corr_matrix = df[numeric_cols].corr()
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0)
plt.title("Correlation Heatmap")
else:
raise ValueError(f"Unsupported chart type: {chart_type}")
# 将图表保存为base64
buffer = io.BytesIO()
plt.tight_layout()
plt.savefig(buffer, format='png', dpi=100)
plt.close()
buffer.seek(0)
image_base64 = base64.b64encode(buffer.read()).decode('utf-8')
visualization_id = f"viz_{int(asyncio.get_event_loop().time())}"
# 保存可视化元数据
self.register_resource(
name=visualization_id,
resource={"image_base64": image_base64},
metadata={
"chart_type": chart_type,
"dataset_id": dataset_id,
"created_at": asyncio.get_event_loop().time(),
"parameters": kwargs
}
)
return {
"visualization_id": visualization_id,
"chart_type": chart_type,
"image_base64": f"data:image/png;base64,{image_base64}",
"message": f"Visualization created successfully"
}
async def _tool_generate_report(self, dataset_id: str, analysis_type: str = "comprehensive", **kwargs):
"""生成数据分析报告工具"""
if dataset_id not in self.resources:
raise ValueError(f"Dataset '{dataset_id}' not found")
data = self.resources[dataset_id]["data"]
df = pd.DataFrame(data)
# 收集分析结果
report_sections = []
# 1. 数据集概览
overview = {
"section_title": "数据集概览",
"content": {
"行数": len(df),
"列数": len(df.columns),
"内存使用": f"{df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB",
"缺失值总数": int(df.isnull().sum().sum()),
"重复行数": int(df.duplicated().sum())
}
}
report_sections.append(overview)
# 2. 数据质量分析
quality_issues = []
for col in df.columns:
missing_pct = df[col].isnull().mean() * 100
if missing_pct > 5:
quality_issues.append(f"列 '{col}' 有 {missing_pct:.1f}% 的缺失值")
if df[col].dtype == 'object':
unique_ratio = df[col].nunique() / len(df)
if unique_ratio > 0.9:
quality_issues.append(f"列 '{col}' 可能是唯一标识符 (唯一值比例: {unique_ratio:.1%})")
quality_section = {
"section_title": "数据质量评估",
"content": {
"发现的问题": quality_issues if quality_issues else ["未发现显著数据质量问题"],
"建议": [
"对于高缺失率列考虑填充或删除",
"对于唯一标识符列考虑是否保留",
"检查异常值和数据分布"
]
}
}
report_sections.append(quality_section)
# 3. 统计摘要
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
stats_summary = {}
for col in numeric_cols[:5]: # 限制前5个数值列
stats_summary[col] = {
"平均值": float(df[col].mean()),
"标准差": float(df[col].std()),
"最小值": float(df[col].min()),
"中位数": float(df[col].median()),
"最大值": float(df[col].max())
}
stats_section = {
"section_title": "关键统计指标",
"content": stats_summary
}
report_sections.append(stats_section)
# 4. 可视化摘要
if analysis_type == "comprehensive":
# 自动创建关键可视化
visualizations = []
# 为数值列创建直方图
for col in numeric_cols[:3]: # 前3个数值列
viz_result = await self._tool_create_visualization(
dataset_id=dataset_id,
chart_type="histogram",
column=col,
bins=20
)
visualizations.append({
"column": col,
"chart_type": "histogram",
"visualization_id": viz_result["visualization_id"]
})
# 如果有多于一个数值列,创建散点图矩阵
if len(numeric_cols) >= 2:
viz_result = await self._tool_create_visualization(
dataset_id=dataset_id,
chart_type="correlation_heatmap"
)
visualizations.append({
"chart_type": "correlation_heatmap",
"visualization_id": viz_result["visualization_id"]
})
viz_section = {
"section_title": "数据可视化",
"content": {
"生成的图表": visualizations,
"解读": "直方图显示数据分布,热力图显示变量间相关性"
}
}
report_sections.append(viz_section)
# 5. 洞察和建议
insights = []
# 自动生成洞察
if len(numeric_cols) > 0:
# 检查数据偏斜
for col in numeric_cols:
skewness = df[col].skew()
if abs(skewness) > 1:
direction = "右偏" if skewness > 0 else "左偏"
insights.append(f"列 '{col}' 呈{direction}分布 (偏度: {skewness:.2f})")
# 检查相关性
if len(numeric_cols) >= 2:
corr_matrix = df[numeric_cols].corr().abs()
high_corr_pairs = []
for i in range(len(numeric_cols)):
for j in range(i+1, len(numeric_cols)):
col1, col2 = numeric_cols[i], numeric_cols[j]
corr = corr_matrix.loc[col1, col2]
if corr > 0.7:
high_corr_pairs.append(f"'{col1}' 和 '{col2}' (相关系数: {corr:.2f})")
if high_corr_pairs:
insights.append(f"发现强相关变量对: {', '.join(high_corr_pairs)}")
insights_section = {
"section_title": "关键洞察",
"content": insights if insights else ["数据集特征较为均衡,无明显异常模式"]
}
report_sections.append(insights_section)
# 6. 建议
recommendations = [
"根据业务目标选择合适的机器学习算法",
"考虑数据标准化或归一化处理",
"建立数据质量监控机制",
"定期更新和验证数据集"
]
if quality_issues:
recommendations.insert(0, "优先解决数据质量问题")
recommendations_section = {
"section_title": "建议",
"content": recommendations
}
report_sections.append(recommendations_section)
# 生成报告ID
report_id = f"report_{int(asyncio.get_event_loop().time())}"
self.register_resource(
name=report_id,
resource=report_sections,
metadata={
"dataset_id": dataset_id,
"analysis_type": analysis_type,
"generated_at": asyncio.get_event_loop().time(),
"sections_count": len(report_sections)
}
)
return {
"report_id": report_id,
"sections": report_sections,
"summary": {
"total_sections": len(report_sections),
"analysis_type": analysis_type,
"generation_time": datetime.now().isoformat()
},
"message": f"数据分析报告生成完成,包含 {len(report_sections)} 个部分"
}
2.3.2 MCP客户端使用示例
class MCPClient:
"""MCP客户端实现"""
def __init__(self, server_url: str):
self.server_url = server_url
self.session_id = None
self.available_tools = []
async def connect(self):
"""连接到MCP服务器"""
# 这里实现实际的连接逻辑
# 对于HTTP-based MCP:
# 1. 发送ping测试连接
# 2. 获取可用工具列表
# 3. 建立会话
ping_response = await self._send_request("tools/ping", {})
if ping_response.get("success"):
# 获取工具列表
tools_response = await self._send_request("tools/list", {})
self.available_tools = tools_response.get("result", {}).get("tools", [])
# 建立会话
session_response = await self._send_request("session/create", {
"client_info": {
"name": "data_analysis_client",
"version": "1.0.0",
"capabilities": ["data_loading", "analysis", "visualization"]
}
})
self.session_id = session_response.get("result", {}).get("session_id")
return True
return False
async def _send_request(self, method: str, params: dict):
"""发送MCP请求"""
import aiohttp
import json
request_data = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params
}
if self.session_id:
request_data["params"]["session_id"] = self.session_id
async with aiohttp.ClientSession() as session:
async with session.post(
self.server_url,
json=request_data,
headers={"Content-Type": "application/json"}
) as response:
return await response.json()
async def execute_data_analysis_pipeline(self):
"""执行完整的数据分析流水线"""
# 步骤1: 加载数据集
load_result = await self._send_request("tools/call", {
"name": "load_dataset",
"arguments": {
"source_type": "builtin",
"dataset_name": "sales_dataset"
}
})
dataset_id = load_result.get("result", {}).get("dataset_id")
# 步骤2: 数据清洗
clean_result = await self._send_request("tools/call", {
"name": "clean_data",
"arguments": {
"dataset_id": dataset_id,
"operations": [
{
"type": "handle_missing",
"method": "fill",
"value": 0,
"columns": ["sales_amount", "units_sold"]
},
{
"type": "remove_duplicates"
}
]
}
})
cleaned_dataset_id = clean_result.get("result", {}).get("cleaned_dataset_id")
# 步骤3: 统计分析
stats_result = await self._send_request("tools/call", {
"name": "statistical_summary",
"arguments": {
"dataset_id": cleaned_dataset_id,
"columns": ["sales_amount", "units_sold", "customer_rating"]
}
})
# 步骤4: 创建可视化
viz_result = await self._send_request("tools/call", {
"name": "create_visualization",
"arguments": {
"dataset_id": cleaned_dataset_id,
"chart_type": "histogram",
"column": "sales_amount",
"bins": 20
}
})
# 步骤5: 生成报告
report_result = await self._send_request("tools/call", {
"name": "generate_report",
"arguments": {
"dataset_id": cleaned_dataset_id,
"analysis_type": "comprehensive"
}
})
return {
"pipeline_steps": {
"data_loading": load_result.get("result", {}),
"data_cleaning": clean_result.get("result", {}),
"statistical_analysis": stats_result.get("result", {}),
"visualization": viz_result.get("result", {}),
"report_generation": report_result.get("result", {})
},
"final_report_id": report_result.get("result", {}).get("report_id")
}
2.4 MCP与多智能体系统的集成
2.4.1 智能体作为MCP客户端
class MCPEnabledAgent(BaseAgent):
"""支持MCP的智能体基类"""
def __init__(self, name: str, description: str, mcp_servers: Dict[str, str]):
super().__init__(name, description)
self.mcp_servers = mcp_servers
self.mcp_clients = {}
async def initialize_mcp_connections(self):
"""初始化MCP连接"""
for server_name, server_url in self.mcp_servers.items():
client = MCPClient(server_url)
if await client.connect():
self.mcp_clients[server_name] = client
logging.info(f"Connected to MCP server: {server_name}")
else:
logging.warning(f"Failed to connect to MCP server: {server_name}")
async def call_mcp_tool(self, server_name: str, tool_name: str, arguments: Dict) -> Dict:
"""调用MCP工具"""
if server_name not in self.mcp_clients:
raise ValueError(f"MCP server '{server_name}' not connected")
client = self.mcp_clients[server_name]
return await client.call_tool(tool_name, arguments)
def get_available_mcp_tools(self) -> Dict[str, List]:
"""获取所有可用的MCP工具"""
available_tools = {}
for server_name, client in self.mcp_clients.items():
available_tools[server_name] = client.available_tools
return available_tools
2.4.2 多智能体系统中的MCP服务发现与使用
class MCPServiceDiscovery:
"""MCP服务发现与管理"""
def __init__(self):
self.services = {} # service_name -> MCPClient
self.service_registry = {} # 服务注册表
async def discover_services(self, network_range: str = "localhost"):
"""发现可用的MCP服务"""
discovered_services = []
# 实现服务发现逻辑
# 1. 端口扫描
# 2. 服务注册表查询
# 3. 配置文件中读取
# 示例:从配置发现服务
config_services = {
"data_analysis": "http://localhost:8003",
"web_search": "http://localhost:8001",
"calculator": "http://localhost:8002",
"weather": "http://localhost:8004"
}
for service_name, service_url in config_services.items():
try:
client = MCPClient(service_url)
if await client.connect():
self.services[service_name] = client
discovered_services.append({
"name": service_name,
"url": service_url,
"tools": client.available_tools,
"status": "connected"
})
else:
discovered_services.append({
"name": service_name,
"url": service_url,
"status": "connection_failed"
})
except Exception as e:
logging.error(f"Failed to connect to {service_name}: {e}")
return discovered_services
async def route_request(self, agent_id: str, service_type: str, request: Dict) -> Dict:
"""路由请求到合适的MCP服务"""
# 根据服务类型选择服务
if service_type in self.services:
client = self.services[service_type]
return await client.call_tool(request["tool"], request.get("arguments", {}))
# 如果没有直接匹配,尝试查找可用服务
for service_name, client in self.services.items():
available_tools = [tool["name"] for tool in client.available_tools]
if request["tool"] in available_tools:
logging.info(f"Routing tool '{request['tool']}' to service '{service_name}'")
return await client.call_tool(request["tool"], request.get("arguments", {}))
raise ValueError(f"No MCP service available for tool '{request['tool']}'")
def get_service_capabilities(self) -> Dict:
"""获取所有服务的功能"""
capabilities = {}
for service_name, client in self.services.items():
capabilities[service_name] = {
"url": client.server_url,
"tools": client.available_tools,
"session_id": client.session_id
}
return capabilities
2.5 实际应用场景示例:智能商业分析助手
class BusinessAnalyticsAssistant:
"""智能商业分析助手(集成多智能体与MCP)"""
def __init__(self):
# 初始化多智能体系统
self.agents = {
"data_collector": DataCollectorAgent(),
"data_analyzer": DataAnalyzerAgent(),
"insight_generator": InsightGeneratorAgent(),
"report_writer": ReportWriterAgent()
}
# 初始化MCP服务发现
self.mcp_discovery = MCPServiceDiscovery()
# 初始化协调器
self.orchestrator = Orchestrator(self.agents)
async def analyze_business_performance(self, company_data: Dict) -> Dict:
"""分析企业绩效"""
# 步骤1: 发现可用的MCP服务
discovered_services = await self.mcp_discovery.discover_services()
# 步骤2: 数据收集智能体使用MCP服务收集数据
financial_data = await self.agents["data_collector"].collect_financial_data(
company_data,
mcp_services=self.mcp_discovery.services
)
market_data = await self.agents["data_collector"].collect_market_data(
company_data["industry"],
mcp_services=self.mcp_discovery.services
)
# 步骤3: 数据分析智能体处理数据
analysis_results = await self.agents["data_analyzer"].analyze_performance(
financial_data=financial_data,
market_data=market_data,
mcp_tools={
"statistical_analysis": self.mcp_discovery.services["data_analysis"],
"trend_analysis": self.mcp_discovery.services["data_analysis"]
}
)
# 步骤4: 洞察生成智能体生成商业洞察
insights = await self.agents["insight_generator"].generate_insights(
analysis_results=analysis_results,
business_context=company_data
)
# 步骤5: 报告写作智能体生成最终报告
report = await self.agents["report_writer"].generate_report(
insights=insights,
data_sources={
"financial": financial_data,
"market": market_data,
"analysis": analysis_results
},
report_format="executive_summary"
)
# 步骤6: 使用MCP服务增强报告
if "data_analysis" in self.mcp_discovery.services:
enhanced_report = await self.mcp_discovery.services["data_analysis"].call_tool(
"enhance_report",
{
"report_content": report,
"enhancement_type": "visual_summary",
"target_audience": "executive"
}
)
report = enhanced_report["result"]
return {
"company": company_data["name"],
"analysis_date": datetime.now().isoformat(),
"key_insights": insights["top_insights"],
"performance_score": analysis_results["performance_score"],
"recommendations": insights["recommendations"],
"full_report": report,
"data_sources_used": [
"financial_statements",
"market_analysis",
"competitor_data"
],
"mcp_services_utilized": list(self.mcp_discovery.services.keys())
}
3. 技术深度解析
3.1 多智能体协作的核心算法
3.1.1 共识算法(Consensus Algorithms)
class PaxosConsensus:
"""Paxos共识算法实现(简化版)"""
def __init__(self, agent_id: str, all_agents: List[str]):
self.agent_id = agent_id
self.all_agents = all_agents
self.proposal_number = 0
self.accepted_proposals = {}
self.learned_values = {}
async def propose(self, value: Any, proposal_id: str) -> bool:
"""提议一个值"""
# 阶段1a: 准备阶段
prepare_responses = await self._send_prepare(proposal_id)
# 检查是否有更高的提案号被接受
highest_accepted = None
for response in prepare_responses:
if response.get("accepted_value") and (
highest_accepted is None or
response["accepted_proposal_id"] > highest_accepted["proposal_id"]
):
highest_accepted = {
"proposal_id": response["accepted_proposal_id"],
"value": response["accepted_value"]
}
# 如果已经有接受的提案,使用该值
if highest_accepted:
value_to_propose = highest_accepted["value"]
else:
value_to_propose = value
# 阶段2a: 接受请求
accept_responses = await self._send_accept_request(
proposal_id,
value_to_propose
)
# 检查是否达到多数同意
accept_count = sum(1 for r in accept_responses if r.get("accepted", False))
majority = len(self.all_agents) // 2 + 1
if accept_count >= majority:
# 阶段3: 学习阶段
await self._send_learn(proposal_id, value_to_propose)
return True
return False
async def _send_prepare(self, proposal_id: str) -> List[Dict]:
"""发送准备请求"""
responses = []
message = {
"type": "prepare",
"proposal_id": proposal_id,
"sender": self.agent_id
}
# 向所有智能体发送请求
for agent in self.all_agents:
if agent != self.agent_id:
response = await self._send_message(agent, message)
responses.append(response)
return responses
3.1.2 拍卖机制(Auction Mechanisms)
class CombinatorialAuction:
"""组合拍卖机制"""
def __init__(self, tasks: List[Dict], agents: List[str]):
self.tasks = tasks
self.agents = agents
self.bids = {} # agent_id -> {task_id -> bid}
self.winners = {}
async def conduct_auction(self):
"""执行组合拍卖"""
# 收集投标
for agent in self.agents:
agent_bids = await self._collect_bids_from_agent(agent)
self.bids[agent] = agent_bids
# 解决胜者确定问题(Winner Determination Problem)
# 这是一个NP难问题,使用启发式算法
return self._solve_wdp_heuristic()
def _solve_wdp_heuristic(self) -> Dict:
"""使用启发式算法解决胜者确定问题"""
# 贪心算法:按单位价值排序
all_bids = []
for agent_id, agent_bids in self.bids.items():
for task_id, bid_info in agent_bids.items():
all_bids.append({
"agent_id": agent_id,
"task_id": task_id,
"bid_amount": bid_info["amount"],
"capability_score": bid_info["capability_score"],
"efficiency_score": bid_info.get("efficiency", 1.0)
})
# 按性价比排序
all_bids.sort(
key=lambda x: x["capability_score"] / x["bid_amount"] * x["efficiency_score"],
reverse=True
)
# 分配任务
assigned_tasks = set()
winners = {}
for bid in all_bids:
if bid["task_id"] not in assigned_tasks:
winners[bid["task_id"]] = {
"agent_id": bid["agent_id"],
"bid_amount": bid["bid_amount"],
"capability_score": bid["capability_score"]
}
assigned_tasks.add(bid["task_id"])
return winners
3.2 MCP协议的安全机制
3.2.1 认证与授权
class MCPSecurityManager:
"""MCP安全管理器"""
def __init__(self):
self.api_keys = {} # key -> permissions
self.sessions = {} # session_id -> session_info
self.rate_limits = {} # client_id -> request_count
def authenticate_request(self, request: Dict) -> Tuple[bool, str]:
"""认证请求"""
api_key = request.get("headers", {}).get("X-API-Key")
if not api_key:
return False, "API key is required"
if api_key not in self.api_keys:
return False, "Invalid API key"
permissions = self.api_keys[api_key]
# 检查权限
method = request.get("method", "")
if not self._check_permissions(permissions, method):
return False, "Insufficient permissions"
# 检查速率限制
client_id = request.get("client_id", "")
if not self._check_rate_limit(client_id):
return False, "Rate limit exceeded"
return True, "authenticated"
def _check_permissions(self, permissions: List[str], method: str) -> bool:
"""检查权限"""
# 方法到权限的映射
method_permissions = {
"tools/list": ["read_tools"],
"tools/call": ["execute_tools"],
"resources/read": ["read_resources"],
"resources/subscribe": ["subscribe_resources"]
}
required_permission = method_permissions.get(method, [])
if not required_permission:
return True # 不需要特定权限
return any(perm in permissions for perm in required_permission)
def _check_rate_limit(self, client_id: str) -> bool:
"""检查速率限制"""
current_time = time.time()
if client_id not in self.rate_limits:
self.rate_limits[client_id] = {
"count": 1,
"window_start": current_time
}
return True
client_limit = self.rate_limits[client_id]
# 每分钟限制
if current_time - client_limit["window_start"] > 60:
# 重置窗口
client_limit["count"] = 1
client_limit["window_start"] = current_time
return True
if client_limit["count"] >= 100: # 每分钟100次请求
return False
client_limit["count"] += 1
return True
3.2.2 数据加密与完整性
class MCPEncryption:
"""MCP加密工具"""
def __init__(self, private_key_path: str = None, public_key_path: str = None):
if private_key_path and public_key_path:
self.private_key = self._load_private_key(private_key_path)
self.public_key = self._load_public_key(public_key_path)
else:
# 生成新的密钥对
self.private_key, self.public_key = self._generate_key_pair()
def encrypt_message(self, message: str, recipient_public_key=None) -> Dict:
"""加密消息"""
# 生成对称密钥
symmetric_key = secrets.token_bytes(32)
# 加密数据
cipher = Cipher(algorithms.AES(symmetric_key), modes.GCM(nonce=secrets.token_bytes(12)))
encryptor = cipher.encryptor()
ciphertext = encryptor.update(message.encode()) + encryptor.finalize()
# 加密对称密钥
if recipient_public_key:
encrypted_key = recipient_public_key.encrypt(
symmetric_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
else:
encrypted_key = symmetric_key # 简化示例
return {
"ciphertext": base64.b64encode(ciphertext).decode(),
"encrypted_key": base64.b64encode(encrypted_key).decode(),
"nonce": base64.b64encode(encryptor.tag).decode(),
"algorithm": "AES256-GCM-RSA2048-OAEP"
}
def decrypt_message(self, encrypted_data: Dict, sender_public_key=None) -> str:
"""解密消息"""
ciphertext = base64.b64decode(encrypted_data["ciphertext"])
encrypted_key = base64.b64decode(encrypted_data["encrypted_key"])
nonce = base64.b64decode(encrypted_data["nonce"])
# 解密对称密钥
if sender_public_key:
symmetric_key = self.private_key.decrypt(
encrypted_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
else:
symmetric_key = encrypted_key # 简化示例
# 解密数据
cipher = Cipher(algorithms.AES(symmetric_key), modes.GCM(nonce, tag=nonce))
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
return plaintext.decode()
4. 性能优化与最佳实践
4.1 智能体通信优化
4.1.1 消息压缩与批处理
class OptimizedAgentCommunication:
"""优化的智能体通信"""
def __init__(self):
self.message_buffer = []
self.buffer_size = 10
self.buffer_timeout = 0.1 # 100ms
self.compression_enabled = True
async def send_optimized(self, message: ACLMessage):
"""优化发送消息"""
# 添加到缓冲区
self.message_buffer.append(message)
# 如果缓冲区满了或超时,批量发送
if (len(self.message_buffer) >= self.buffer_size or
asyncio.get_event_loop().time() - self.last_flush > self.buffer_timeout):
await self._flush_buffer()
async def _flush_buffer(self):
"""刷新缓冲区"""
if not self.message_buffer:
return
# 批量处理消息
if len(self.message_buffer) == 1:
# 单个消息直接发送
await self._send_single(self.message_buffer[0])
else:
# 多个消息批量发送
await self._send_batch(self.message_buffer)
self.message_buffer = []
self.last_flush = asyncio.get_event_loop().time()
async def _send_batch(self, messages: List[ACLMessage]):
"""批量发送消息"""
# 压缩消息
if self.compression_enabled:
compressed_batch = self._compress_messages(messages)
else:
compressed_batch = messages
# 序列化
batch_data = json.dumps({
"batch_id": str(uuid.uuid4()),
"messages": [msg.to_dict() for msg in compressed_batch],
"count": len(messages),
"compressed": self.compression_enabled
})
# 发送批量消息
# 实现实际的网络发送逻辑
pass
def _compress_messages(self, messages: List[ACLMessage]) -> List[ACLMessage]:
"""压缩消息"""
# 实现消息压缩逻辑
# 1. 去除冗余信息
# 2. 使用字典编码
# 3. 应用压缩算法
return messages
4.2 MCP服务性能优化
4.2.1 连接池管理
class MCPConnectionPool:
"""MCP连接池"""
def __init__(self, max_connections: int = 10, idle_timeout: int = 300):
self.max_connections = max_connections
self.idle_timeout = idle_timeout
self.active_connections = {}
self.idle_connections = []
self.connection_stats = {}
async def get_connection(self, server_url: str) -> MCPClient:
"""获取连接"""
current_time = asyncio.get_event_loop().time()
# 检查空闲连接
for i, (conn_url, conn, last_used) in enumerate(self.idle_connections):
if conn_url == server_url and current_time - last_used < self.idle_timeout:
# 从空闲池移除
conn_info = self.idle_connections.pop(i)
# 添加到活跃连接
self.active_connections[conn_info[1].session_id] = conn_info
return conn_info[1]
# 检查是否超过最大连接数
if len(self.active_connections) + len(self.idle_connections) >= self.max_connections:
# 回收最旧的空闲连接
if self.idle_connections:
oldest_conn = self.idle_connections.pop(0)
await oldest_conn[1].close()
# 创建新连接
client = MCPClient(server_url)
await client.connect()
self.active_connections[client.session_id] = (server_url, client, current_time)
return client
async def release_connection(self, client: MCPClient):
"""释放连接"""
if client.session_id in self.active_connections:
conn_info = self.active_connections.pop(client.session_id)
self.idle_connections.append(conn_info)
async def cleanup_idle_connections(self):
"""清理空闲连接"""
current_time = asyncio.get_event_loop().time()
new_idle_list = []
for conn_info in self.idle_connections:
conn_url, conn, last_used = conn_info
if current_time - last_used < self.idle_timeout:
new_idle_list.append(conn_info)
else:
# 关闭超时连接
await conn.close()
self.idle_connections = new_idle_list
5. 实际部署架构
5.1 生产环境架构
┌─────────────────────────────────────────────────────────────────┐
│ 生产环境部署 │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 负载均衡器 │ │ 负载均衡器 │ │ 负载均衡器 │ │
│ │ (Nginx) │ │ (Nginx) │ │ (Nginx) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
├─────────┼─────────────────┼─────────────────┼────────────────┤
│ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │
│ │ API服务 │ │ 智能体服务 │ │ MCP服务 │ │
│ │ (FastAPI) │ │ (Agents) │ │ (Services) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
├─────────┼───────────────┼───────────────┼──────────────────┤
│ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │
│ │ Redis │ │ PostgreSQL │ │ Qdrant │ │
│ │ (缓存/消息) │ │ (关系数据) │ │ (向量存储) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 监控与日志系统 │ │
│ │ • Prometheus • Grafana • ELK Stack │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 配置管理 │ │
│ │ • Consul • Vault • Kubernetes ConfigMaps │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
6. 总结
6.1 多智能体协作的核心价值
- 任务分解与专业化:复杂任务分解为子任务,由专业化智能体处理
- 并行处理与效率:多个智能体并行工作,提高系统吞吐量
- 鲁棒性与容错:单个智能体故障不影响整体系统
- 知识共享与学习:智能体间共享知识和经验
- 动态适应:智能体可根据环境变化调整行为
6.2 MCP服务的核心优势
- 标准化接口:统一的服务访问方式
- 可扩展性:易于集成新工具和服务
- 安全性:内置的安全和权限控制
- 互操作性:跨平台和跨模型兼容
- 上下文管理:智能管理模型上下文窗口
6.3 集成系统的独特优势
当多智能体协作与MCP服务结合时,系统获得以下优势:
- 增强的能力访问:智能体可通过MCP访问广泛的外部能力
- 统一的工具管理:所有工具和服务通过标准化接口管理
- 动态服务发现:智能体可自动发现和使用新服务
- 资源优化:通过连接池和缓存优化资源使用
- 安全增强:统一的安全策略和访问控制
6.4 未来发展方向
- 更智能的协调算法:基于深度强化学习的智能体协调
- 联邦学习集成:智能体间安全的知识共享和学习
- 边缘计算支持:分布式智能体部署
- 区块链集成:去中心化的智能体协作和信任机制
- 量子计算准备:量子启发的优化算法
这种集成的多智能体MCP系统代表了AI系统架构的重要发展方向,为实现真正智能、灵活、可扩展的AI应用提供了强大的基础架构。
更多推荐
所有评论(0)