在 AI Agent 技术爆发的当下,单一智能体的能力边界已逐渐显现 ——LangChain 开发的 Agent 无法与 LlamaIndex 的 Agent 直接对话,Salesforce 的智能工具与 Workday 的系统难以协同工作。而 A2A(Agent2Agent)协议与 MCP(模型上下文协议)的结合,正是突破这一瓶颈的关键:MCP 让单个 Agent 拥有 “十八般武艺”,A2A 则让这些身怀绝技的 Agent 形成 “协作网络”。笔者将从技术本质、协同逻辑、代码实战三个维度,拆解两者结合的核心知识点,同时揭秘落地过程中的坑点与解决方案。

一、先理清核心:A2A 与 MCP 的协同本质

🔴在深入代码前,必须先明确两者的定位差异与协作逻辑 ——A2A 是 “Agent 间的外交协议”,MCP 是 “Agent 内部的工具接口标准”,两者在技术栈中呈现 “上层协作 + 下层执行” 的分层架构。

1. 核心定位对比

终极协同范式:当 A2A Agent 内部使用 MCP

🔴工作流示例
高层协作 (A2A)Code Agent 通过 A2A 向 GitHub Agent 请求:“请分析 X 仓库上个季度的代码提交活跃度。”
🔵底层执行 (MCP)GitHub Agent 接收到任务后,在其内部通过 MCP 调用 github_api_tool 来拉取 commit 历史,MCP 调用 filesystem_tool 将数据写入本地 CSV 文件,并且调用data_analysis_tool (Pandas) 对 CSV 文件进行分析。
🟣结果返回 (A2A)GitHub Agent 将分析报告(Artifact)通过 A2A返回给Code Agent

协同核心逻辑:上层谈 “任务”,下层做 “执行”

两者的协同并非简单叠加,而是形成 “任务拆解 - 分布式执行 - 结果汇总” 的闭环:

  1. 上层(A2A):总控 Agent 通过 A2A 协议向专业 Agent 发起 “高抽象度任务”(如 “生成 Q2 财报草稿”),并协调销售、市场、供应链等多个 Agent 分工;
  2. 中层(Agent 内部):每个专业 Agent 收到任务后,通过 MCP 协议调用具体工具完成 “低粒度执行”(如销售 Agent 用 MCP 调用 Salesforce API 拉取数据);
  3. 下层(工具交互):MCP 负责处理 Agent 与工具的通信细节(认证、数据格式转换、异常重试),屏蔽不同工具的接口差异;
  4. 反馈闭环:专业 Agent 通过 A2A 将执行结果(如销售数据、库存成本)返回总控 Agent,最终汇总形成完整成果。

简单说:A2A 解决 “谁来做、做什么” 的协作问题,MCP 解决 “怎么做、用什么做” 的执行问题

二、代码实战:A2A+MCP 协同的完整实现

🔴下面以 “分析 GitHub 仓库季度代码提交活跃度” 为例,完整实现 A2A 协同 + MCP 工具调用的流程。

🔵技术栈:Python 3.10+、A2A SDK(0.2.5 版本)、MCP Python 客户端、FastAPI(A2A Server)。

1. 前置准备:环境搭建与依赖安装

首先安装核心依赖,注意 A2A 目前处于快速迭代期,需指定稳定版本:

# 安装A2A核心依赖(含协议实现与SDK)
pip install a2a-protocol==0.2.5 fastapi uvicorn
# 安装MCP工具调用依赖
pip install mcp-client==0.1.8 python-dotenv requests pandas

2. 核心实现步骤

步骤 1:定义 MCP 工具集(Agent 内部的 “能力底座”)

🔴MCP 的核心是标准化工具调用,我们需要实现 3 个核心工具:GitHub API 调用工具、文件存储工具、数据分析工具。每个工具需遵循 MCP 的Tool接口规范,包含namedescriptionparameters三个核心字段。

# mcp_tools.py
import os
import requests
import pandas as pd
from mcp.client import MCPTool, ToolResponse
from dotenv import load_dotenv

load_dotenv()
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")

# 1. GitHub API调用工具:拉取仓库提交记录
class GitHubCommitTool(MCPTool):
    name = "github_commit_fetcher"
    description = "拉取指定GitHub仓库的提交记录,支持按时间段筛选"
    parameters = {
        "type": "object",
        "properties": {
            "repo_owner": {"type": "string", "description": "仓库所有者(如a2aproject)"},
            "repo_name": {"type": "string", "description": "仓库名称(如A2A)"},
            "start_date": {"type": "string", "description": "开始日期(YYYY-MM-DD)"},
            "end_date": {"type": "string", "description": "结束日期(YYYY-MM-DD)"}
        },
        "required": ["repo_owner", "repo_name", "start_date", "end_date"]
    }

    def execute(self, params: dict) -> ToolResponse:
        headers = {"Authorization": f"token {GITHUB_TOKEN}"}
        url = f"https://api.github.com/repos/{params['repo_owner']}/{params['repo_name']}/commits"
        params = {
            "since": f"{params['start_date']}T00:00:00Z",
            "until": f"{params['end_date']}T23:59:59Z"
        }
        
        try:
            response = requests.get(url, headers=headers, params=params, timeout=10)
            response.raise_for_status()  # 抛出HTTP错误
            commits = response.json()
            # 提取关键信息:提交SHA、作者、时间、说明
            commit_data = [
                {
                    "sha": c["sha"],
                    "author": c["commit"]["author"]["name"],
                    "date": c["commit"]["author"]["date"],
                    "message": c["commit"]["message"]
                }
                for c in commits
            ]
            return ToolResponse(success=True, data=commit_data)
        except Exception as e:
            return ToolResponse(success=False, error=str(e))

# 2. 文件存储工具:将数据写入CSV
class FileStorageTool(MCPTool):
    name = "file_storage"
    description = "将结构化数据写入本地CSV文件"
    parameters = {
        "type": "object",
        "properties": {
            "data": {"type": "array", "description": "待存储的结构化数据"},
            "file_path": {"type": "string", "description": "存储路径(如./data/commits.csv)"}
        },
        "required": ["data", "file_path"]
    }

    def execute(self, params: dict) -> ToolResponse:
        try:
            # 创建目录(如果不存在)
            os.makedirs(os.path.dirname(params["file_path"]), exist_ok=True)
            df = pd.DataFrame(params["data"])
            df.to_csv(params["file_path"], index=False)
            return ToolResponse(success=True, data={"file_path": params["file_path"]})
        except Exception as e:
            return ToolResponse(success=False, error=str(e))

# 3. 数据分析工具:统计提交活跃度
class DataAnalysisTool(MCPTool):
    name = "commit_analysis"
    description = "分析CSV文件中的提交数据,统计活跃度(按天/作者)"
    parameters = {
        "type": "object",
        "properties": {
            "file_path": {"type": "string", "description": "CSV文件路径"}
        },
        "required": ["file_path"]
    }

    def execute(self, params: dict) -> ToolResponse:
        try:
            df = pd.read_csv(params["file_path"])
            df["date"] = pd.to_datetime(df["date"]).dt.date  # 提取日期部分
            # 统计:每日提交数、作者提交数
            daily_stats = df.groupby("date").size().to_dict()
            author_stats = df.groupby("author").size().to_dict()
            total_commits = len(df)
            
            analysis_result = {
                "total_commits": total_commits,
                "daily_commit_count": daily_stats,
                "author_commit_count": author_stats
            }
            return ToolResponse(success=True, data=analysis_result)
        except Exception as e:
            return ToolResponse(success=False, error=str(e))
步骤 2:实现 A2A Server(GitHub Agent 的协作接口)

🔴A2A Server 负责对外提供 Agent 的 “能力名片”(Agent Card)和任务接收接口,其他 Agent 通过 A2A 协议与此 Server 通信。我们用 FastAPI 实现 A2A Server 的核心功能:

# a2a_github_agent.py
from fastapi import FastAPI, Request
from a2a.protocol import (
    AgentCard, Task, TaskStatus, TaskArtifact, TextPart,
    TaskStatusUpdateEvent, TaskArtifactUpdateEvent
)
from a2a.server import A2AServer
from mcp.client import MCPHost
from mcp_tools import GitHubCommitTool, FileStorageTool, DataAnalysisTool
import uvicorn
import asyncio

# 1. 初始化MCP Host(管理Agent内部的工具集)
mcp_host = MCPHost()
# 注册MCP工具
mcp_host.register_tool(GitHubCommitTool())
mcp_host.register_tool(FileStorageTool())
mcp_host.register_tool(DataAnalysisTool())

# 2. 定义A2A Agent Card(Agent的“名片”,对外暴露能力)
agent_card = AgentCard(
    name="GitHubAnalysisAgent",
    description="提供GitHub仓库提交记录拉取、存储与活跃度分析服务",
    url="http://localhost:8000/a2a",  # A2A服务接口基址
    version="0.1.0",
    capabilities=["streaming", "pushNotifications"],  # 支持流式通信和推送通知
    authentication={"scheme": "none"}  # 开发环境暂不启用认证
)

# 3. 初始化A2A Server
app = FastAPI()
a2a_server = A2AServer(app, agent_card=agent_card, base_path="/a2a")

# 4. 实现A2A任务处理逻辑(核心协同点)
@a2a_server.task_handler()
async def handle_analysis_task(task: Task) -> None:
    """处理A2A任务:接收分析请求,通过MCP调用工具执行,推送结果"""
    task_id = task.task_id
    # 1. 解析A2A任务参数(从Task的Message中提取)
    try:
        # A2A的Message内容存储在parts中,这里提取TextPart的JSON参数
        task_params = next(
            part.content for part in task.messages[0].parts 
            if part.type == "text" and part.subtype == "json"
        )
        task_params = eval(task_params)  # 实际生产环境建议用json.loads,需确保格式正确
        repo_owner = task_params["repo_owner"]
        repo_name = task_params["repo_name"]
        start_date = task_params["start_date"]
        end_date = task_params["end_date"]
    except Exception as e:
        # 推送任务失败状态
        await a2a_server.push_event(
            task_id,
            TaskStatusUpdateEvent(status=TaskStatus.FAILED, error=f"参数解析失败:{str(e)}")
        )
        return

    # 2. 推送任务开始状态(submitted -> working)
    await a2a_server.push_event(
        task_id,
        TaskStatusUpdateEvent(status=TaskStatus.WORKING, message="开始处理GitHub分析任务")
    )

    try:
        # 3. 调用MCP工具执行具体操作(下层执行)
        # 步骤1:拉取GitHub提交记录
        commit_result = await asyncio.to_thread(
            mcp_host.call_tool,
            tool_name="github_commit_fetcher",
            params={
                "repo_owner": repo_owner,
                "repo_name": repo_name,
                "start_date": start_date,
                "end_date": end_date
            }
        )
        if not commit_result.success:
            raise Exception(f"拉取提交记录失败:{commit_result.error}")
        
        # 推送中间结果(可选,提升用户体验)
        await a2a_server.push_event(
            task_id,
            TaskArtifactUpdateEvent(
                artifacts=[TaskArtifact(
                    name="commit_list",
                    parts=[TextPart(content=f"已拉取{len(commit_result.data)}条提交记录")]
                )]
            )
        )

        # 步骤2:存储数据到CSV
        storage_result = await asyncio.to_thread(
            mcp_host.call_tool,
            tool_name="file_storage",
            params={
                "data": commit_result.data,
                "file_path": f"./data/{repo_owner}_{repo_name}_commits.csv"
            }
        )
        if not storage_result.success:
            raise Exception(f"存储数据失败:{storage_result.error}")

        # 步骤3:分析提交活跃度
        analysis_result = await asyncio.to_thread(
            mcp_host.call_tool,
            tool_name="commit_analysis",
            params={"file_path": storage_result.data["file_path"]}
        )
        if not analysis_result.success:
            raise Exception(f"数据分析失败:{analysis_result.error}")

        # 4. 推送最终结果(Artifact形式)
        final_artifact = TaskArtifact(
            name="github_analysis_report",
            parts=[
                TextPart(
                    content=f"""
                    # GitHub仓库活跃度分析报告
                    - 仓库:{repo_owner}/{repo_name}
                    - 时间范围:{start_date} 至 {end_date}
                    - 总提交数:{analysis_result.data['total_commits']}
                    - 每日提交分布:{analysis_result.data['daily_commit_count']}
                    - 作者贡献分布:{analysis_result.data['author_commit_count']}
                    """
                )
            ]
        )
        await a2a_server.push_event(
            task_id,
            TaskArtifactUpdateEvent(artifacts=[final_artifact])
        )

        # 5. 推送任务完成状态
        await a2a_server.push_event(
            task_id,
            TaskStatusUpdateEvent(status=TaskStatus.COMPLETED, message="分析任务完成")
        )

    except Exception as e:
        # 推送任务失败状态
        await a2a_server.push_event(
            task_id,
            TaskStatusUpdateEvent(status=TaskStatus.FAILED, error=f"任务执行失败:{str(e)}")
        )

# 6. 暴露Agent Card接口(A2A服务发现的核心)
@app.get("/.well-known/agent.json")
async def get_agent_card():
    return agent_card.dict()

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
步骤 3:实现 A2A Client(总控 Agent,发起协作请求)

🔴总控 Agent 通过 A2A 协议发现 GitHub Agent 的能力(读取 Agent Card),并发起分析任务,接收流式反馈:

# a2a_controller_agent.py
import requests
import json
from a2a.client import A2AClient
from a2a.protocol import Task, Message, TextPart

async def main():
    # 1. A2A服务发现:读取GitHub Agent的Agent Card
    agent_card_url = "http://localhost:8000/.well-known/agent.json"
    try:
        response = requests.get(agent_card_url, timeout=5)
        agent_card = response.json()
        a2a_server_url = agent_card["url"]
        print(f"发现A2A服务:{a2a_server_url}")
    except Exception as e:
        print(f"服务发现失败:{str(e)}")
        return

    # 2. 初始化A2A Client
    client = A2AClient(server_url=a2a_server_url)

    # 3. 构造A2A任务(高抽象度协作请求)
    task_params = {
        "repo_owner": "a2aproject",
        "repo_name": "A2A",
        "start_date": "2025-01-01",
        "end_date": "2025-03-31"
    }
    task = Task(
        messages=[
            Message(
                role="user",
                parts=[TextPart(content=json.dumps(task_params), subtype="json")]
            )
        ]
    )

    # 4. 发送流式任务请求(支持长任务实时反馈)
    async for event in client.send_streaming_task(task):
        if isinstance(event, TaskStatusUpdateEvent):
            print(f"\n任务状态更新:{event.status} - {event.message or ''}")
            if event.error:
                print(f"错误信息:{event.error}")
        elif isinstance(event, TaskArtifactUpdateEvent):
            for artifact in event.artifacts:
                print(f"\n收到成果:{artifact.name}")
                for part in artifact.parts:
                    print(part.content)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

3. 运行流程与效果验证

  1. 启动 GitHub Agent 的 A2A Server:
    python a2a_github_agent.py

    2.在.env 文件中配置 GitHub Token

    GITHUB_TOKEN=your_github_personal_access_token

    3.运行总控 Agent 发起任务:

    python a2a_controller_agent.py

    三、落地坑点与解决方案(实战经验总结)
    🔴坑点 1:A2A 版本兼容性问题
    现象:A2A 协议处于快速迭代期(当前最新 0.2.5),不同版本的 SDK 接口差异较大,客户端与服务端版本不匹配会导致通信失败;
    解决方案:
    首先要明确指定依赖版本(如a2a-protocol==0.2.5),避免自动升级;在 Agent Card 中添加protocol_version字段,客户端发起请求前校验版本兼容性。
    🔵坑点 2:MCP 工具调用的异步阻塞
    现象:MCP 工具(如 GitHub API 调用)若为同步实现,会阻塞 A2A Server 的事件循环,导致无法处理其他任务;
    解决方案:
    1.用asyncio.to_thread()将同步工具调用转为异步(如代码中所示);2.对耗时工具(如大数据分析),采用异步 MCP 工具实现,避免阻塞。
    🟣坑点 3:认证与隐私保护缺失
    现象:开发环境中关闭认证(authentication={"scheme": "none"}),生产环境直接上线,导致工具被未授权调用;
    解决方案:
    首先遵循 A2A 的 “默认安全” 设计哲学,启用 OAuth2 或 API Key 认证;MCP 工具层要添加权限校验(如 GitHub Token 的权限范围检查),避免敏感操作。

  2. 🟡坑点 4:长任务的断连与重试
    现象:流式协作(Streaming 模式)中,网络波动导致 SSE 连接断开,任务执行状态丢失;
    解决方案:
    客户端实现resubscribe_task逻辑,断连后通过 task_id 重新订阅;服务端将任务状态持久化到数据库(如 Redis),避免重启后状态丢失。
    🟢坑点 5:工具参数不兼容
    现象:不同 Agent 的 MCP 工具参数定义不一致(如 “仓库所有者” 有的叫repo_owner,有的叫owner),导致协作失败;
    解决方案:
    基于 A2A 的capabilities字段,在服务发现阶段校验工具参数兼容性;还要定义行业级的 MCP 工具参数标准

    致谢˚𝜗𝜚🍒ᝰ.ᐟ

    谢谢大家的阅读,以上是我对近期文献阅读的总结,欢迎大家在评论区指出,如果我的内容对你有帮助,可以点赞 , 收藏 ,大家的支持就是我坚持下去的动力!

    “请赐予我平静,去接受我无法改变的 ;赐予我勇气,去改变我能改变的。”

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐