深入解析 MCP 协议,多智能体协作通信 (1)
深入解析 MCP 协议,多智能体协作通信。
·
深入解析 MCP 协议:多智能体协作通信的基石
MCP(Model Context Protocol) 是一种面向 AI 多智能体系统(Multi-Agent Systems, MAS)设计的标准化通信协议,旨在实现上下文感知、能力互操作与任务协同。本文将从协议起源、核心设计、技术架构、消息格式、安全机制、应用场景到未来展望,全方位剖析 MCP,并提供 UML 建模、项目结构、流程图及完整可运行的 Python 实现。
1. 协议起源与背景
随着大模型(LLM)和自主智能体(Agent)生态的爆发,不同厂商、框架、语言构建的 Agent 之间缺乏统一通信标准,导致:
- 上下文割裂:Agent 无法共享对话历史、环境状态或推理链。
- 能力黑盒:无法动态发现其他 Agent 的功能接口。
- 协同困难:跨 Agent 任务编排依赖硬编码或中间件。
MCP 应运而生,受以下思想启发:
- FIPA ACL(Foundation for Intelligent Physical Agents Agent Communication Language)
- ROS 2 Service/Topic 机制
- OpenAPI + JSON Schema 能力描述
- LCEL(LangChain Expression Language)的链式调用理念
MCP 不是传输层协议,而是语义层通信规范,可运行于 HTTP/WebSocket/gRPC 等之上。
2. 核心设计理念
| 原则 | 说明 |
|---|---|
| 上下文为中心 | 所有消息携带 context_id,支持上下文继承与分支 |
| 能力可声明 | Agent 启动时广播其 capability manifest |
| 任务可分解 | 支持子任务委托、结果聚合、超时回退 |
| 协议无关传输 | 可适配多种底层通信方式 |
| 安全默认开启 | 内置身份认证、权限控制、加密通道 |
3. 技术架构
组件说明:
- Agent:实现 MCP 客户端,具备发送/接收能力。
- Broker:可选中心节点(如 MQTT、Redis PubSub),也可直连。
- MCP Router:路由消息、验证权限、记录日志。
- Context Manager:管理上下文生命周期(创建、继承、销毁)。
- Capability Registry:注册/查询 Agent 能力(JSON Schema 描述)。
- Task Orchestrator:处理任务委托、超时、重试、结果合并。
4. 消息格式(MCP Message)
所有 MCP 消息为 JSON 对象,包含以下字段:
{
"mcp_version": "1.0",
"message_id": "uuid4",
"context_id": "ctx-123", // 上下文ID,可继承
"parent_message_id": null, // 若为响应,则指向请求ID
"sender": "agent-alpha",
"recipient": "agent-beta",
"timestamp": "2026-01-08T13:00:00Z",
"type": "request|response|capability_announce|task_delegate",
"payload": { ... }, // 类型相关数据
"security": {
"signature": "base64",
"token": "jwt-or-apikey"
}
}
主要消息类型:
| 类型 | payload 结构 | 用途 |
|---|---|---|
capability_announce |
{ capabilities: [ { name, schema, endpoint } ] } |
Agent 启动时广播能力 |
request |
{ task: "summarize", args: { text: "..." } } |
发起任务请求 |
response |
{ result: "...", status: "success/error" } |
返回结果 |
task_delegate |
{ subtask: "...", to: "agent-gamma" } |
委托子任务 |
5. 安全机制
- 身份认证:JWT 或 API Key,由
security.token携带。 - 消息签名:使用 Ed25519 对 payload 签名,防止篡改。
- 能力级授权:Capability Registry 维护 ACL(谁可调用哪个能力)。
- TLS 强制:生产环境要求加密传输。
6. 应用场景示例
场景:多 Agent 协同撰写报告
- 用户 → Orchestrator Agent:“写一篇关于量子计算的综述”
- Orchestrator 广播查询:谁有“research”能力?
- Researcher Agent 响应:我可爬取 arXiv。
- Orchestrator 委托子任务:
{ task: "fetch_papers", topic: "quantum" } - Researcher 返回论文列表。
- Orchestrator 委托 Writer Agent:
{ task: "summarize", papers: [...] } - Writer 返回初稿。
- Orchestrator 委托 Reviewer Agent:校对语法。
- 最终返回用户。
所有步骤通过 MCP 消息传递,共享同一 context_id。
7. UML 建模
7.1 类图(Class Diagram)
7.2 序列图(任务委托流程)
8. 项目文件结构
mcp-protocol/
├── README.md
├── pyproject.toml
├── src/
│ └── mcp/
│ ├── __init__.py
│ ├── message.py # MCPMessage 类
│ ├── agent.py # Agent 基类
│ ├── client.py # MCPClient (WebSocket/HTTP)
│ ├── context.py # ContextManager
│ ├── capability.py # Capability 模型
│ └── security.py # 签名/验证工具
├── examples/
│ ├── orchestrator_agent.py
│ ├── researcher_agent.py
│ └── writer_agent.py
└── tests/
└── test_message.py
9. 源代码完整实现(Python)
src/mcp/message.py
import uuid
import json
from datetime import datetime, timezone
from typing import Dict, Any, Optional
from .security import sign_message, verify_signature
class MCPMessage:
def __init__(
self,
sender: str,
recipient: str,
msg_type: str,
payload: Dict[str, Any],
context_id: Optional[str] = None,
parent_message_id: Optional[str] = None,
message_id: Optional[str] = None,
security: Optional[Dict[str, str]] = None
):
self.mcp_version = "1.0"
self.message_id = message_id or str(uuid.uuid4())
self.context_id = context_id or str(uuid.uuid4())
self.parent_message_id = parent_message_id
self.sender = sender
self.recipient = recipient
self.timestamp = datetime.now(timezone.utc).isoformat()
self.type = msg_type
self.payload = payload
self.security = security or {}
def to_dict(self) -> Dict[str, Any]:
return {
"mcp_version": self.mcp_version,
"message_id": self.message_id,
"context_id": self.context_id,
"parent_message_id": self.parent_message_id,
"sender": self.sender,
"recipient": self.recipient,
"timestamp": self.timestamp,
"type": self.type,
"payload": self.payload,
"security": self.security
}
def serialize(self) -> str:
return json.dumps(self.to_dict(), ensure_ascii=False)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MCPMessage":
return cls(
sender=data["sender"],
recipient=data["recipient"],
msg_type=data["type"],
payload=data["payload"],
context_id=data["context_id"],
parent_message_id=data.get("parent_message_id"),
message_id=data["message_id"],
security=data.get("security")
)
def sign(self, private_key: bytes) -> None:
payload_bytes = json.dumps(self.payload, sort_keys=True, ensure_ascii=False).encode()
sig = sign_message(payload_bytes, private_key)
self.security["signature"] = sig.decode()
def verify(self, public_key: bytes) -> bool:
if "signature" not in self.security:
return False
payload_bytes = json.dumps(self.payload, sort_keys=True, ensure_ascii=False).encode()
sig = self.security["signature"].encode()
return verify_signature(payload_bytes, sig, public_key)
src/mcp/capability.py
from typing import Dict, Any
class Capability:
def __init__(self, name: str, description: str, input_schema: Dict, output_schema: Dict, endpoint: str):
self.name = name
self.description = description
self.input_schema = input_schema
self.output_schema = output_schema
self.endpoint = endpoint
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"description": self.description,
"input_schema": self.input_schema,
"output_schema": self.output_schema,
"endpoint": self.endpoint
}
src/mcp/context.py
import threading
from typing import Dict, Any, Optional
class ContextManager:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.contexts = {}
return cls._instance
def create_context(self, metadata: Optional[Dict] = None) -> str:
import uuid
ctx_id = str(uuid.uuid4())
self.contexts[ctx_id] = {"metadata": metadata or {}, "history": []}
return ctx_id
def fork_context(self, parent_id: str, metadata: Optional[Dict] = None) -> str:
if parent_id not in self.contexts:
raise ValueError("Parent context not found")
child_id = self.create_context(metadata)
self.contexts[child_id]["parent"] = parent_id
return child_id
def get_context(self, ctx_id: str) -> Dict[str, Any]:
return self.contexts.get(ctx_id, {})
src/mcp/agent.py
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from .message import MCPMessage
from .capability import Capability
from .client import MCPClient
from .context import ContextManager
class Agent(ABC):
def __init__(self, agent_id: str, client: MCPClient):
self.agent_id = agent_id
self.client = client
self.capabilities: List[Capability] = []
self.context_manager = ContextManager()
self.client.set_message_handler(self.on_message)
def announce_capabilities(self):
cap_list = [cap.to_dict() for cap in self.capabilities]
msg = MCPMessage(
sender=self.agent_id,
recipient="broadcast",
msg_type="capability_announce",
payload={"capabilities": cap_list}
)
self.client.publish("mcp/capabilities", msg.serialize())
@abstractmethod
def handle_task(self, task_name: str, args: Dict[str, Any], context_id: str) -> Any:
pass
def on_message(self, raw_msg: str):
msg = MCPMessage.from_dict(json.loads(raw_msg))
if msg.type == "request":
result = self.handle_task(
task_name=msg.payload["task"],
args=msg.payload.get("args", {}),
context_id=msg.context_id
)
response = MCPMessage(
sender=self.agent_id,
recipient=msg.sender,
msg_type="response",
payload={"result": result, "status": "success"},
context_id=msg.context_id,
parent_message_id=msg.message_id
)
self.client.publish(f"mcp/response/{msg.sender}", response.serialize())
注:
client.py和security.py因篇幅略,但可基于websockets+cryptography实现。
10. 未来展望
- MCP over gRPC:提升性能,支持流式上下文。
- MCP Schema Registry:集中管理能力 Schema,支持版本兼容。
- MCP Debugger:可视化上下文流、消息追踪。
- 与 AutoGen / LangGraph 集成:成为 Agent 编排标准通信层。
结语
MCP 协议为多智能体系统提供了上下文感知、能力可发现、任务可协同的通信基础。通过标准化消息格式与交互范式,它有望成为 AI Agent 生态的“TCP/IP”。
更多推荐



所有评论(0)