A2A + MCP 的python实现的最小可运行骨架
A2A+MCP Python骨架简介 该框架展示了A2A(代理间通信)作为编排核心、MCP(代理-工具/数据通信)作为工具接入的最小实现方案。包含一个路由器和三个子代理(语音识别、法律分析、语音合成),以及一个模拟MCP服务器,全链路支持SSE流式传输。 核心组件: A2A模块:定义了任务和事件的数据模型,支持SSE流式编解码 MCP模块:提供JSON-RPC风格的客户端实现 Router:根据任
·
A2A + MCP Python Skeleton
目的:演示 A2A(代理↔代理) 作为编排骨干、MCP(代理↔工具/数据) 作为工具接入的最小可运行骨架。包含一个 Router、三个子 Agent(ASR / Legal / TTS),以及一个 Mock MCP Server(JSON‑RPC 风格),全链路支持 SSE 流式。
目录结构
.
├─ requirements.txt
├─ .env.sample
├─ src/
│ ├─ common/
│ │ ├─ a2a.py
│ │ └─ mcp.py
│ ├─ router.py
│ ├─ agents/
│ │ ├─ asr_agent.py
│ │ ├─ legal_agent.py
│ │ └─ tts_agent.py
│ └─ tools/
│ └─ mock_mcp_server.py
requirements.txt
fastapi==0.111.0
uvicorn[standard]==0.30.0
httpx==0.27.2
pydantic==2.8.2
python-dotenv==1.0.1
.env.sample
# Router 将任务路由到各个 Agent 的地址
ASR_AGENT_URL=http://127.0.0.1:8011
LEGAL_AGENT_URL=http://127.0.0.1:8012
TTS_AGENT_URL=http://127.0.0.1:8013
# MCP Server 地址(示例使用 mock)
MCP_ENDPOINT=http://127.0.0.1:8099/mcp
运行前复制为
.env
并按需修改。
src/common/a2a.py
from __future__ import annotations
import json
import uuid
from typing import Any, AsyncIterator, Dict
from pydantic import BaseModel, Field
A2A_MIME_SSE = "text/event-stream"
# === A2A 任务与事件 ===
class A2ATask(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
type: str # 例如: "asr.transcribe", "legal.analyze", "tts.speak"
payload: Dict[str, Any] = {}
stream: bool = True # 是否请求流式
class A2AEvent(BaseModel):
task_id: str
event: str # 例如: "progress", "delta", "final", "error"
data: Dict[str, Any] = {}
# === SSE 编解码 ===
def sse_encode(event: A2AEvent) -> bytes:
"""将事件编码为 SSE 文本块。"""
line = "data: " + event.model_dump_json() + "\n\n"
return line.encode("utf-8")
async def sse_stream_generator(generator: AsyncIterator[A2AEvent]):
async for ev in generator:
yield sse_encode(ev)
# === 工具函数 ===
class A2AError(RuntimeError):
pass
src/common/mcp.py
from __future__ import annotations
import asyncio
import json
import time
import uuid
from typing import Any, Dict, List, Optional
import httpx
from pydantic import BaseModel
class MCPError(RuntimeError):
pass
class MCPClient:
"""最小 MCP JSON-RPC 客户端(HTTP 版)。
- list_tools(): 调用 JSON-RPC method="tools/list_tools"
- call(tool, args): 调用 JSON-RPC method="tools/call"
注:真实 MCP 可用 stdio / SSE 等传输,这里给出 HTTP JSON‑RPC 骨架,便于快速落地与替换。
"""
def __init__(self, endpoint: str, timeout: float = 30.0):
self.endpoint = endpoint.rstrip("/")
self.timeout = timeout
self._client = httpx.AsyncClient(timeout=timeout)
async def _rpc(self, method: str, params: Dict[str, Any]) -> Any:
req = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": method,
"params": params,
}
r = await self._client.post(self.endpoint, json=req)
r.raise_for_status()
data = r.json()
if "error" in data:
raise MCPError(str(data["error"]))
return data.get("result")
async def list_tools(self) -> List[Dict[str, Any]]:
return await self._rpc("tools/list_tools", {})
async def call(self, tool: str, args: Dict[str, Any]) -> Dict[str, Any]:
return await self._rpc("tools/call", {"tool": tool, "args": args})
async def aclose(self):
await self._client.aclose()
src/router.py
from __future__ import annotations
import os
from typing import AsyncIterator
import httpx
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from common.a2a import A2ATask, A2AEvent, sse_encode
load_dotenv()
ASR_URL = os.getenv("ASR_AGENT_URL", "http://127.0.0.1:8011")
LEGAL_URL = os.getenv("LEGAL_AGENT_URL", "http://127.0.0.1:8012")
TTS_URL = os.getenv("TTS_AGENT_URL", "http://127.0.0.1:8013")
ROUTE_TABLE = {
"asr.": ASR_URL,
"legal.": LEGAL_URL,
"tts.": TTS_URL,
}
app = FastAPI(title="A2A Router")
def _pick_agent(task_type: str) -> str:
for prefix, url in ROUTE_TABLE.items():
if task_type.startswith(prefix):
return url
raise HTTPException(status_code=400, detail=f"No agent for task type: {task_type}")
@app.post("/a2a/task")
async def route_task(task: A2ATask):
agent_base = _pick_agent(task.type)
agent_endpoint = f"{agent_base}/a2a/task"
async with httpx.AsyncClient(timeout=None) as client:
if task.stream:
# 以 SSE 方式转发,并将下游的 SSE 原样转发给调用方
resp = await client.post(agent_endpoint, json=task.model_dump(), headers={"accept": "text/event-stream"})
if resp.status_code != 200:
raise HTTPException(status_code=resp.status_code, detail=resp.text)
async def forward() -> AsyncIterator[bytes]:
async for chunk in resp.aiter_bytes():
# 直接转发下游 SSE 字节流
yield chunk
return StreamingResponse(forward(), media_type="text/event-stream")
else:
resp = await client.post(agent_endpoint, json=task.model_dump())
return JSONResponse(status_code=resp.status_code, content=resp.json())
src/agents/asr_agent.py
from __future__ import annotations
import os
import asyncio
from typing import AsyncIterator
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from common.a2a import A2ATask, A2AEvent, sse_stream_generator
from common.mcp import MCPClient
load_dotenv()
MCP_ENDPOINT = os.getenv("MCP_ENDPOINT", "http://127.0.0.1:8099/mcp")
app = FastAPI(title="ASR Agent")
async def _asr_stream(mcp: MCPClient, audio_url: str, task_id: str) -> AsyncIterator[A2AEvent]:
# 这里演示“边调用工具边产出增量”的流程。真实情况可采用 chunk 推理。
yield A2AEvent(task_id=task_id, event="progress", data={"msg": "asr_started"})
# 调用 MCP 工具(mock)
result = await mcp.call("asr.transcribe", {"audio_url": audio_url})
text = result.get("text", "")
# 模拟分词增量
for token in text.split():
await asyncio.sleep(0.05)
yield A2AEvent(task_id=task_id, event="delta", data={"token": token})
yield A2AEvent(task_id=task_id, event="final", data={"text": text})
@app.post("/a2a/task")
async def handle(task: A2ATask):
if task.type != "asr.transcribe":
raise HTTPException(status_code=400, detail=f"Unsupported task type: {task.type}")
audio_url = task.payload.get("audio_url")
if not audio_url:
raise HTTPException(status_code=400, detail="payload.audio_url required")
mcp = MCPClient(MCP_ENDPOINT)
if task.stream:
gen = _asr_stream(mcp, audio_url, task.id)
return StreamingResponse(sse_stream_generator(gen), media_type="text/event-stream")
else:
# 非流式:一次性返回
result = await mcp.call("asr.transcribe", {"audio_url": audio_url})
return JSONResponse(result)
src/agents/legal_agent.py
from __future__ annotations
import os
import asyncio
from typing import AsyncIterator
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from common.a2a import A2ATask, A2AEvent, sse_stream_generator
from common.mcp import MCPClient
load_dotenv()
MCP_ENDPOINT = os.getenv("MCP_ENDPOINT", "http://127.0.0.1:8099/mcp")
app = FastAPI(title="Legal Expert Agent")
async def _legal_stream(mcp: MCPClient, query: str, task_id: str) -> AsyncIterator[A2AEvent]:
yield A2AEvent(task_id=task_id, event="progress", data={"msg": "legal_started"})
# 1) RAG 检索(示例:调用 MCP 的 kb.search)
kb = await mcp.call("kb.search", {"query": query, "top_k": 3})
yield A2AEvent(task_id=task_id, event="progress", data={"kb_hits": kb.get("hits", [])})
# 2) LLM 归纳(示例:调用 MCP 的 llm.complete)
prompt = f"根据以下材料做初步法律分析:\n{kb.get('context', '')}\n---\n问题:{query}\n"
llm = await mcp.call("llm.complete", {"prompt": prompt, "temperature": 0.2})
# 模拟分段流出
text = llm.get("text", "")
for chunk in [text[i:i+60] for i in range(0, len(text), 60)]:
await asyncio.sleep(0.05)
yield A2AEvent(task_id=task_id, event="delta", data={"text": chunk})
yield A2AEvent(task_id=task_id, event="final", data={"text": text})
@app.post("/a2a/task")
async def handle(task: A2ATask):
if task.type != "legal.analyze":
raise HTTPException(status_code=400, detail=f"Unsupported task type: {task.type}")
query = task.payload.get("query")
if not query:
raise HTTPException(status_code=400, detail="payload.query required")
mcp = MCPClient(MCP_ENDPOINT)
if task.stream:
gen = _legal_stream(mcp, query, task.id)
return StreamingResponse(sse_stream_generator(gen), media_type="text/event-stream")
else:
kb = await mcp.call("kb.search", {"query": query, "top_k": 3})
llm = await mcp.call("llm.complete", {"prompt": str(kb), "temperature": 0.2})
return JSONResponse(llm)
注意:
from __future__ annotations
是 Python 3.12 写法,若报错改为from __future__ import annotations
。
src/agents/tts_agent.py
from __future__ import annotations
import os
import asyncio
from typing import AsyncIterator
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from common.a2a import A2ATask, A2AEvent, sse_stream_generator
from common.mcp import MCPClient
load_dotenv()
MCP_ENDPOINT = os.getenv("MCP_ENDPOINT", "http://127.0.0.1:8099/mcp")
app = FastAPI(title="TTS Agent")
async def _tts_stream(mcp: MCPClient, text: str, task_id: str) -> AsyncIterator[A2AEvent]:
yield A2AEvent(task_id=task_id, event="progress", data={"msg": "tts_started"})
# 调用 MCP 的 tts.speak(mock),返回“伪音频块”数组
result = await mcp.call("tts.speak", {"text": text})
chunks = result.get("chunks", [])
for i, ch in enumerate(chunks):
await asyncio.sleep(0.05)
yield A2AEvent(task_id=task_id, event="delta", data={"seq": i, "audio_chunk": ch})
yield A2AEvent(task_id=task_id, event="final", data={"ok": True})
@app.post("/a2a/task")
async def handle(task: A2ATask):
if task.type != "tts.speak":
raise HTTPException(status_code=400, detail=f"Unsupported task type: {task.type}")
text = task.payload.get("text")
if not text:
raise HTTPException(status_code=400, detail="payload.text required")
mcp = MCPClient(MCP_ENDPOINT)
if task.stream:
gen = _tts_stream(mcp, text, task.id)
return StreamingResponse(sse_stream_generator(gen), media_type="text/event-stream")
else:
result = await mcp.call("tts.speak", {"text": text})
return JSONResponse(result)
src/tools/mock_mcp_server.py
from __future__ import annotations
import random
from typing import Any, Dict
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI(title="Mock MCP Server (JSON-RPC over HTTP)")
TOOLS = [
{"name": "asr.transcribe", "args": {"audio_url": "str"}},
{"name": "kb.search", "args": {"query": "str", "top_k": "int"}},
{"name": "llm.complete", "args": {"prompt": "str", "temperature": "float"}},
{"name": "tts.speak", "args": {"text": "str"}},
]
class JSONRPCRequest(BaseModel):
jsonrpc: str
id: str
method: str
params: Dict[str, Any] | None = None
class JSONRPCResponse(BaseModel):
jsonrpc: str = "2.0"
id: str
result: Dict[str, Any] | None = None
error: Dict[str, Any] | None = None
@app.post("/mcp")
async def mcp(req: JSONRPCRequest) -> JSONRPCResponse:
try:
if req.method == "tools/list_tools":
return JSONRPCResponse(id=req.id, result={"tools": TOOLS})
if req.method == "tools/call":
params = req.params or {}
tool = params.get("tool")
args = params.get("args", {})
if tool == "asr.transcribe":
text = f"(mock asr) transcribed from {args.get('audio_url','?')}"
return JSONRPCResponse(id=req.id, result={"text": text})
if tool == "kb.search":
hits = [
{"id": f"doc{i}", "score": round(random.random(), 3), "snippet": f"snippet {i}"}
for i in range(1, (args.get("top_k", 3) + 1))
]
context = "\n".join(h["snippet"] for h in hits)
return JSONRPCResponse(id=req.id, result={"hits": hits, "context": context})
if tool == "llm.complete":
prompt = args.get("prompt", "")
return JSONRPCResponse(id=req.id, result={"text": f"(mock llm) summary for: {prompt[:60]}..."})
if tool == "tts.speak":
text = args.get("text", "")
chunks = [f"audio-bytes-chunk-{i}" for i in range(5)]
return JSONRPCResponse(id=req.id, result={"chunks": chunks, "desc": f"(mock) tts of {text[:20]}..."})
return JSONRPCResponse(id=req.id, error={"message": f"unknown tool: {tool}"})
return JSONRPCResponse(id=req.id, error={"message": f"unknown method: {req.method}"})
except Exception as e:
return JSONRPCResponse(id=req.id, error={"message": str(e)})
启动方式
1) 安装依赖
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
cp .env.sample .env
2) 分别启动 Mock MCP 与各 Agent、Router
# 终端 1:Mock MCP Server
uvicorn src.tools.mock_mcp_server:app --host 0.0.0.0 --port 8099 --reload
# 终端 2:ASR Agent
uvicorn src.agents.asr_agent:app --host 0.0.0.0 --port 8011 --reload
# 终端 3:Legal Agent
uvicorn src.agents.legal_agent:app --host 0.0.0.0 --port 8012 --reload
# 终端 4:TTS Agent
uvicorn src.agents.tts_agent:app --host 0.0.0.0 --port 8013 --reload
# 终端 5:Router
uvicorn src.router:app --host 0.0.0.0 --port 8000 --reload
测试(SSE 流式)
例如让 Router 把任务派给 Legal Agent:
curl -N -H 'Accept: text/event-stream' \
-H 'Content-Type: application/json' \
-d '{
"type": "legal.analyze",
"payload": {"query": "公司违法辞退赔偿怎么计算?"},
"stream": true
}' \
http://127.0.0.1:8000/a2a/task
你会看到 data: {"task_id":..., "event":"progress"...}
、delta
、final
等事件逐步返回。
架构要点回顾
- A2A:
/a2a/task
即代理之间的统一入口;Router 依据task.type
做分发,并透传 SSE 流式结果。 - MCP:各代理内部通过
MCPClient
调用工具(这里用mock_mcp_server
演示),未来可无缝替换为真实 MCP 工具(向量库、Redis 记忆、企业 API、LLM、TTS/ASR 服务等)。 - 可扩展:新增代理=加一个服务并在 Router 的
ROUTE_TABLE
加前缀映射;新增工具=在 MCP 端注册新tool
名并在代理里调用。
下一步可升级点
- 鉴权/租户隔离:在 Router 与 Agent 层统一加 Bearer/JWT;任务元数据里携带 tenant_id / session_id。
- 重试与超时:Router 对下游 Agent 实现熔断/超时、错误回传;Agent 内部对 MCP 工具加重试策略。
- 真正的流式工具:将 MCP 端改为 SSE/分块输出,
MCPClient
增加acall_stream
,实现端到端全链路流式。 - 记忆:通过 MCP 工具挂 Redis:
memory.write/read/append
,在 Router/Agent 层封装对话级短期记忆。 - 发现与注册:为 A2A 增加
/.well-known/agents
,支持 Agent 动态注册/心跳;或引入服务发现组件。 - 观测:统一埋点(OpenTelemetry)、任务状态表(Postgres/ClickHouse)、SSE 日志镜像通道用于回放与质检。
更多推荐
所有评论(0)