从个体赋能到群体智能:A2A 与 MCP 协同的技术深解(含代码实战)
本文探讨了AIAgent协作中的关键技术与解决方案。针对当前单一智能体能力受限、不同系统间难以协同的问题,提出A2A(Agent间协作协议)与MCP(模型上下文协议)的结合方案。A2A负责高层任务分配与协调,MCP处理底层工具调用与执行,形成"任务拆解-分布式执行-结果汇总"的闭环。通过GitHub仓库分析实例,展示了Python环境下的完整实现流程,包括MCP工具定义、A2A
在 AI Agent 技术爆发的当下,单一智能体的能力边界已逐渐显现 ——LangChain 开发的 Agent 无法与 LlamaIndex 的 Agent 直接对话,Salesforce 的智能工具与 Workday 的系统难以协同工作。而 A2A(Agent2Agent)协议与 MCP(模型上下文协议)的结合,正是突破这一瓶颈的关键:MCP 让单个 Agent 拥有 “十八般武艺”,A2A 则让这些身怀绝技的 Agent 形成 “协作网络”。笔者将从技术本质、协同逻辑、代码实战三个维度,拆解两者结合的核心知识点,同时揭秘落地过程中的坑点与解决方案。
一、先理清核心:A2A 与 MCP 的协同本质
🔴在深入代码前,必须先明确两者的定位差异与协作逻辑 ——A2A 是 “Agent 间的外交协议”,MCP 是 “Agent 内部的工具接口标准”,两者在技术栈中呈现 “上层协作 + 下层执行” 的分层架构。
1. 核心定位对比



终极协同范式:当 A2A Agent 内部使用 MCP
协同核心逻辑:上层谈 “任务”,下层做 “执行”
两者的协同并非简单叠加,而是形成 “任务拆解 - 分布式执行 - 结果汇总” 的闭环:
- 上层(A2A):总控 Agent 通过 A2A 协议向专业 Agent 发起 “高抽象度任务”(如 “生成 Q2 财报草稿”),并协调销售、市场、供应链等多个 Agent 分工;
- 中层(Agent 内部):每个专业 Agent 收到任务后,通过 MCP 协议调用具体工具完成 “低粒度执行”(如销售 Agent 用 MCP 调用 Salesforce API 拉取数据);
- 下层(工具交互):MCP 负责处理 Agent 与工具的通信细节(认证、数据格式转换、异常重试),屏蔽不同工具的接口差异;
- 反馈闭环:专业 Agent 通过 A2A 将执行结果(如销售数据、库存成本)返回总控 Agent,最终汇总形成完整成果。
简单说:A2A 解决 “谁来做、做什么” 的协作问题,MCP 解决 “怎么做、用什么做” 的执行问题。
二、代码实战:A2A+MCP 协同的完整实现
🔴下面以 “分析 GitHub 仓库季度代码提交活跃度” 为例,完整实现 A2A 协同 + MCP 工具调用的流程。
🔵技术栈:Python 3.10+、A2A SDK(0.2.5 版本)、MCP Python 客户端、FastAPI(A2A Server)。
1. 前置准备:环境搭建与依赖安装
首先安装核心依赖,注意 A2A 目前处于快速迭代期,需指定稳定版本:
# 安装A2A核心依赖(含协议实现与SDK)
pip install a2a-protocol==0.2.5 fastapi uvicorn
# 安装MCP工具调用依赖
pip install mcp-client==0.1.8 python-dotenv requests pandas
2. 核心实现步骤
步骤 1:定义 MCP 工具集(Agent 内部的 “能力底座”)
🔴MCP 的核心是标准化工具调用,我们需要实现 3 个核心工具:GitHub API 调用工具、文件存储工具、数据分析工具。每个工具需遵循 MCP 的Tool接口规范,包含name、description、parameters三个核心字段。
# mcp_tools.py
import os
import requests
import pandas as pd
from mcp.client import MCPTool, ToolResponse
from dotenv import load_dotenv
load_dotenv()
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
# 1. GitHub API调用工具:拉取仓库提交记录
class GitHubCommitTool(MCPTool):
name = "github_commit_fetcher"
description = "拉取指定GitHub仓库的提交记录,支持按时间段筛选"
parameters = {
"type": "object",
"properties": {
"repo_owner": {"type": "string", "description": "仓库所有者(如a2aproject)"},
"repo_name": {"type": "string", "description": "仓库名称(如A2A)"},
"start_date": {"type": "string", "description": "开始日期(YYYY-MM-DD)"},
"end_date": {"type": "string", "description": "结束日期(YYYY-MM-DD)"}
},
"required": ["repo_owner", "repo_name", "start_date", "end_date"]
}
def execute(self, params: dict) -> ToolResponse:
headers = {"Authorization": f"token {GITHUB_TOKEN}"}
url = f"https://api.github.com/repos/{params['repo_owner']}/{params['repo_name']}/commits"
params = {
"since": f"{params['start_date']}T00:00:00Z",
"until": f"{params['end_date']}T23:59:59Z"
}
try:
response = requests.get(url, headers=headers, params=params, timeout=10)
response.raise_for_status() # 抛出HTTP错误
commits = response.json()
# 提取关键信息:提交SHA、作者、时间、说明
commit_data = [
{
"sha": c["sha"],
"author": c["commit"]["author"]["name"],
"date": c["commit"]["author"]["date"],
"message": c["commit"]["message"]
}
for c in commits
]
return ToolResponse(success=True, data=commit_data)
except Exception as e:
return ToolResponse(success=False, error=str(e))
# 2. 文件存储工具:将数据写入CSV
class FileStorageTool(MCPTool):
name = "file_storage"
description = "将结构化数据写入本地CSV文件"
parameters = {
"type": "object",
"properties": {
"data": {"type": "array", "description": "待存储的结构化数据"},
"file_path": {"type": "string", "description": "存储路径(如./data/commits.csv)"}
},
"required": ["data", "file_path"]
}
def execute(self, params: dict) -> ToolResponse:
try:
# 创建目录(如果不存在)
os.makedirs(os.path.dirname(params["file_path"]), exist_ok=True)
df = pd.DataFrame(params["data"])
df.to_csv(params["file_path"], index=False)
return ToolResponse(success=True, data={"file_path": params["file_path"]})
except Exception as e:
return ToolResponse(success=False, error=str(e))
# 3. 数据分析工具:统计提交活跃度
class DataAnalysisTool(MCPTool):
name = "commit_analysis"
description = "分析CSV文件中的提交数据,统计活跃度(按天/作者)"
parameters = {
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "CSV文件路径"}
},
"required": ["file_path"]
}
def execute(self, params: dict) -> ToolResponse:
try:
df = pd.read_csv(params["file_path"])
df["date"] = pd.to_datetime(df["date"]).dt.date # 提取日期部分
# 统计:每日提交数、作者提交数
daily_stats = df.groupby("date").size().to_dict()
author_stats = df.groupby("author").size().to_dict()
total_commits = len(df)
analysis_result = {
"total_commits": total_commits,
"daily_commit_count": daily_stats,
"author_commit_count": author_stats
}
return ToolResponse(success=True, data=analysis_result)
except Exception as e:
return ToolResponse(success=False, error=str(e))
步骤 2:实现 A2A Server(GitHub Agent 的协作接口)
🔴A2A Server 负责对外提供 Agent 的 “能力名片”(Agent Card)和任务接收接口,其他 Agent 通过 A2A 协议与此 Server 通信。我们用 FastAPI 实现 A2A Server 的核心功能:
# a2a_github_agent.py
from fastapi import FastAPI, Request
from a2a.protocol import (
AgentCard, Task, TaskStatus, TaskArtifact, TextPart,
TaskStatusUpdateEvent, TaskArtifactUpdateEvent
)
from a2a.server import A2AServer
from mcp.client import MCPHost
from mcp_tools import GitHubCommitTool, FileStorageTool, DataAnalysisTool
import uvicorn
import asyncio
# 1. 初始化MCP Host(管理Agent内部的工具集)
mcp_host = MCPHost()
# 注册MCP工具
mcp_host.register_tool(GitHubCommitTool())
mcp_host.register_tool(FileStorageTool())
mcp_host.register_tool(DataAnalysisTool())
# 2. 定义A2A Agent Card(Agent的“名片”,对外暴露能力)
agent_card = AgentCard(
name="GitHubAnalysisAgent",
description="提供GitHub仓库提交记录拉取、存储与活跃度分析服务",
url="http://localhost:8000/a2a", # A2A服务接口基址
version="0.1.0",
capabilities=["streaming", "pushNotifications"], # 支持流式通信和推送通知
authentication={"scheme": "none"} # 开发环境暂不启用认证
)
# 3. 初始化A2A Server
app = FastAPI()
a2a_server = A2AServer(app, agent_card=agent_card, base_path="/a2a")
# 4. 实现A2A任务处理逻辑(核心协同点)
@a2a_server.task_handler()
async def handle_analysis_task(task: Task) -> None:
"""处理A2A任务:接收分析请求,通过MCP调用工具执行,推送结果"""
task_id = task.task_id
# 1. 解析A2A任务参数(从Task的Message中提取)
try:
# A2A的Message内容存储在parts中,这里提取TextPart的JSON参数
task_params = next(
part.content for part in task.messages[0].parts
if part.type == "text" and part.subtype == "json"
)
task_params = eval(task_params) # 实际生产环境建议用json.loads,需确保格式正确
repo_owner = task_params["repo_owner"]
repo_name = task_params["repo_name"]
start_date = task_params["start_date"]
end_date = task_params["end_date"]
except Exception as e:
# 推送任务失败状态
await a2a_server.push_event(
task_id,
TaskStatusUpdateEvent(status=TaskStatus.FAILED, error=f"参数解析失败:{str(e)}")
)
return
# 2. 推送任务开始状态(submitted -> working)
await a2a_server.push_event(
task_id,
TaskStatusUpdateEvent(status=TaskStatus.WORKING, message="开始处理GitHub分析任务")
)
try:
# 3. 调用MCP工具执行具体操作(下层执行)
# 步骤1:拉取GitHub提交记录
commit_result = await asyncio.to_thread(
mcp_host.call_tool,
tool_name="github_commit_fetcher",
params={
"repo_owner": repo_owner,
"repo_name": repo_name,
"start_date": start_date,
"end_date": end_date
}
)
if not commit_result.success:
raise Exception(f"拉取提交记录失败:{commit_result.error}")
# 推送中间结果(可选,提升用户体验)
await a2a_server.push_event(
task_id,
TaskArtifactUpdateEvent(
artifacts=[TaskArtifact(
name="commit_list",
parts=[TextPart(content=f"已拉取{len(commit_result.data)}条提交记录")]
)]
)
)
# 步骤2:存储数据到CSV
storage_result = await asyncio.to_thread(
mcp_host.call_tool,
tool_name="file_storage",
params={
"data": commit_result.data,
"file_path": f"./data/{repo_owner}_{repo_name}_commits.csv"
}
)
if not storage_result.success:
raise Exception(f"存储数据失败:{storage_result.error}")
# 步骤3:分析提交活跃度
analysis_result = await asyncio.to_thread(
mcp_host.call_tool,
tool_name="commit_analysis",
params={"file_path": storage_result.data["file_path"]}
)
if not analysis_result.success:
raise Exception(f"数据分析失败:{analysis_result.error}")
# 4. 推送最终结果(Artifact形式)
final_artifact = TaskArtifact(
name="github_analysis_report",
parts=[
TextPart(
content=f"""
# GitHub仓库活跃度分析报告
- 仓库:{repo_owner}/{repo_name}
- 时间范围:{start_date} 至 {end_date}
- 总提交数:{analysis_result.data['total_commits']}
- 每日提交分布:{analysis_result.data['daily_commit_count']}
- 作者贡献分布:{analysis_result.data['author_commit_count']}
"""
)
]
)
await a2a_server.push_event(
task_id,
TaskArtifactUpdateEvent(artifacts=[final_artifact])
)
# 5. 推送任务完成状态
await a2a_server.push_event(
task_id,
TaskStatusUpdateEvent(status=TaskStatus.COMPLETED, message="分析任务完成")
)
except Exception as e:
# 推送任务失败状态
await a2a_server.push_event(
task_id,
TaskStatusUpdateEvent(status=TaskStatus.FAILED, error=f"任务执行失败:{str(e)}")
)
# 6. 暴露Agent Card接口(A2A服务发现的核心)
@app.get("/.well-known/agent.json")
async def get_agent_card():
return agent_card.dict()
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
步骤 3:实现 A2A Client(总控 Agent,发起协作请求)
🔴总控 Agent 通过 A2A 协议发现 GitHub Agent 的能力(读取 Agent Card),并发起分析任务,接收流式反馈:
# a2a_controller_agent.py
import requests
import json
from a2a.client import A2AClient
from a2a.protocol import Task, Message, TextPart
async def main():
# 1. A2A服务发现:读取GitHub Agent的Agent Card
agent_card_url = "http://localhost:8000/.well-known/agent.json"
try:
response = requests.get(agent_card_url, timeout=5)
agent_card = response.json()
a2a_server_url = agent_card["url"]
print(f"发现A2A服务:{a2a_server_url}")
except Exception as e:
print(f"服务发现失败:{str(e)}")
return
# 2. 初始化A2A Client
client = A2AClient(server_url=a2a_server_url)
# 3. 构造A2A任务(高抽象度协作请求)
task_params = {
"repo_owner": "a2aproject",
"repo_name": "A2A",
"start_date": "2025-01-01",
"end_date": "2025-03-31"
}
task = Task(
messages=[
Message(
role="user",
parts=[TextPart(content=json.dumps(task_params), subtype="json")]
)
]
)
# 4. 发送流式任务请求(支持长任务实时反馈)
async for event in client.send_streaming_task(task):
if isinstance(event, TaskStatusUpdateEvent):
print(f"\n任务状态更新:{event.status} - {event.message or ''}")
if event.error:
print(f"错误信息:{event.error}")
elif isinstance(event, TaskArtifactUpdateEvent):
for artifact in event.artifacts:
print(f"\n收到成果:{artifact.name}")
for part in artifact.parts:
print(part.content)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
3. 运行流程与效果验证
- 启动 GitHub Agent 的 A2A Server:
python a2a_github_agent.py2.在.env 文件中配置 GitHub Token
GITHUB_TOKEN=your_github_personal_access_token3.运行总控 Agent 发起任务:
python a2a_controller_agent.py三、落地坑点与解决方案(实战经验总结)
🔴坑点 1:A2A 版本兼容性问题
现象:A2A 协议处于快速迭代期(当前最新 0.2.5),不同版本的 SDK 接口差异较大,客户端与服务端版本不匹配会导致通信失败;
解决方案:
首先要明确指定依赖版本(如a2a-protocol==0.2.5),避免自动升级;在 Agent Card 中添加protocol_version字段,客户端发起请求前校验版本兼容性。
🔵坑点 2:MCP 工具调用的异步阻塞
现象:MCP 工具(如 GitHub API 调用)若为同步实现,会阻塞 A2A Server 的事件循环,导致无法处理其他任务;
解决方案:
1.用asyncio.to_thread()将同步工具调用转为异步(如代码中所示);2.对耗时工具(如大数据分析),采用异步 MCP 工具实现,避免阻塞。
🟣坑点 3:认证与隐私保护缺失
现象:开发环境中关闭认证(authentication={"scheme": "none"}),生产环境直接上线,导致工具被未授权调用;
解决方案:
首先遵循 A2A 的 “默认安全” 设计哲学,启用 OAuth2 或 API Key 认证;MCP 工具层要添加权限校验(如 GitHub Token 的权限范围检查),避免敏感操作。 -
🟡坑点 4:长任务的断连与重试
现象:流式协作(Streaming 模式)中,网络波动导致 SSE 连接断开,任务执行状态丢失;
解决方案:
客户端实现resubscribe_task逻辑,断连后通过 task_id 重新订阅;服务端将任务状态持久化到数据库(如 Redis),避免重启后状态丢失。
🟢坑点 5:工具参数不兼容
现象:不同 Agent 的 MCP 工具参数定义不一致(如 “仓库所有者” 有的叫repo_owner,有的叫owner),导致协作失败;
解决方案:
基于 A2A 的capabilities字段,在服务发现阶段校验工具参数兼容性;还要定义行业级的 MCP 工具参数标准致谢˚𝜗𝜚🍒ᝰ.ᐟ
谢谢大家的阅读,以上是我对近期文献阅读的总结,欢迎大家在评论区指出,如果我的内容对你有帮助,可以点赞 , 收藏 ,大家的支持就是我坚持下去的动力!
“请赐予我平静,去接受我无法改变的 ;赐予我勇气,去改变我能改变的。”

更多推荐


所有评论(0)