A2A + MCP Python Skeleton

目的:演示 A2A(代理↔代理) 作为编排骨干、MCP(代理↔工具/数据) 作为工具接入的最小可运行骨架。包含一个 Router、三个子 Agent(ASR / Legal / TTS),以及一个 Mock MCP Server(JSON‑RPC 风格),全链路支持 SSE 流式


目录结构

.
├─ requirements.txt
├─ .env.sample
├─ src/
│  ├─ common/
│  │  ├─ a2a.py
│  │  └─ mcp.py
│  ├─ router.py
│  ├─ agents/
│  │  ├─ asr_agent.py
│  │  ├─ legal_agent.py
│  │  └─ tts_agent.py
│  └─ tools/
│     └─ mock_mcp_server.py

requirements.txt

fastapi==0.111.0
uvicorn[standard]==0.30.0
httpx==0.27.2
pydantic==2.8.2
python-dotenv==1.0.1

.env.sample

# Router 将任务路由到各个 Agent 的地址
ASR_AGENT_URL=http://127.0.0.1:8011
LEGAL_AGENT_URL=http://127.0.0.1:8012
TTS_AGENT_URL=http://127.0.0.1:8013

# MCP Server 地址(示例使用 mock)
MCP_ENDPOINT=http://127.0.0.1:8099/mcp

运行前复制为 .env 并按需修改。


src/common/a2a.py

from __future__ import annotations
import json
import uuid
from typing import Any, AsyncIterator, Dict

from pydantic import BaseModel, Field

A2A_MIME_SSE = "text/event-stream"

# === A2A 任务与事件 ===
class A2ATask(BaseModel):
    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    type: str  # 例如: "asr.transcribe", "legal.analyze", "tts.speak"
    payload: Dict[str, Any] = {}
    stream: bool = True  # 是否请求流式

class A2AEvent(BaseModel):
    task_id: str
    event: str  # 例如: "progress", "delta", "final", "error"
    data: Dict[str, Any] = {}

# === SSE 编解码 ===

def sse_encode(event: A2AEvent) -> bytes:
    """将事件编码为 SSE 文本块。"""
    line = "data: " + event.model_dump_json() + "\n\n"
    return line.encode("utf-8")

async def sse_stream_generator(generator: AsyncIterator[A2AEvent]):
    async for ev in generator:
        yield sse_encode(ev)

# === 工具函数 ===
class A2AError(RuntimeError):
    pass

src/common/mcp.py

from __future__ import annotations
import asyncio
import json
import time
import uuid
from typing import Any, Dict, List, Optional

import httpx
from pydantic import BaseModel

class MCPError(RuntimeError):
    pass

class MCPClient:
    """最小 MCP JSON-RPC 客户端(HTTP 版)。

    - list_tools(): 调用 JSON-RPC method="tools/list_tools"
    - call(tool, args): 调用 JSON-RPC method="tools/call"

    注:真实 MCP 可用 stdio / SSE 等传输,这里给出 HTTP JSON‑RPC 骨架,便于快速落地与替换。
    """

    def __init__(self, endpoint: str, timeout: float = 30.0):
        self.endpoint = endpoint.rstrip("/")
        self.timeout = timeout
        self._client = httpx.AsyncClient(timeout=timeout)

    async def _rpc(self, method: str, params: Dict[str, Any]) -> Any:
        req = {
            "jsonrpc": "2.0",
            "id": str(uuid.uuid4()),
            "method": method,
            "params": params,
        }
        r = await self._client.post(self.endpoint, json=req)
        r.raise_for_status()
        data = r.json()
        if "error" in data:
            raise MCPError(str(data["error"]))
        return data.get("result")

    async def list_tools(self) -> List[Dict[str, Any]]:
        return await self._rpc("tools/list_tools", {})

    async def call(self, tool: str, args: Dict[str, Any]) -> Dict[str, Any]:
        return await self._rpc("tools/call", {"tool": tool, "args": args})

    async def aclose(self):
        await self._client.aclose()

src/router.py

from __future__ import annotations
import os
from typing import AsyncIterator

import httpx
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse

from common.a2a import A2ATask, A2AEvent, sse_encode

load_dotenv()

ASR_URL = os.getenv("ASR_AGENT_URL", "http://127.0.0.1:8011")
LEGAL_URL = os.getenv("LEGAL_AGENT_URL", "http://127.0.0.1:8012")
TTS_URL = os.getenv("TTS_AGENT_URL", "http://127.0.0.1:8013")

ROUTE_TABLE = {
    "asr.": ASR_URL,
    "legal.": LEGAL_URL,
    "tts.": TTS_URL,
}

app = FastAPI(title="A2A Router")


def _pick_agent(task_type: str) -> str:
    for prefix, url in ROUTE_TABLE.items():
        if task_type.startswith(prefix):
            return url
    raise HTTPException(status_code=400, detail=f"No agent for task type: {task_type}")


@app.post("/a2a/task")
async def route_task(task: A2ATask):
    agent_base = _pick_agent(task.type)
    agent_endpoint = f"{agent_base}/a2a/task"

    async with httpx.AsyncClient(timeout=None) as client:
        if task.stream:
            # 以 SSE 方式转发,并将下游的 SSE 原样转发给调用方
            resp = await client.post(agent_endpoint, json=task.model_dump(), headers={"accept": "text/event-stream"})
            if resp.status_code != 200:
                raise HTTPException(status_code=resp.status_code, detail=resp.text)

            async def forward() -> AsyncIterator[bytes]:
                async for chunk in resp.aiter_bytes():
                    # 直接转发下游 SSE 字节流
                    yield chunk

            return StreamingResponse(forward(), media_type="text/event-stream")
        else:
            resp = await client.post(agent_endpoint, json=task.model_dump())
            return JSONResponse(status_code=resp.status_code, content=resp.json())

src/agents/asr_agent.py

from __future__ import annotations
import os
import asyncio
from typing import AsyncIterator

from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse

from common.a2a import A2ATask, A2AEvent, sse_stream_generator
from common.mcp import MCPClient

load_dotenv()
MCP_ENDPOINT = os.getenv("MCP_ENDPOINT", "http://127.0.0.1:8099/mcp")

app = FastAPI(title="ASR Agent")

async def _asr_stream(mcp: MCPClient, audio_url: str, task_id: str) -> AsyncIterator[A2AEvent]:
    # 这里演示“边调用工具边产出增量”的流程。真实情况可采用 chunk 推理。
    yield A2AEvent(task_id=task_id, event="progress", data={"msg": "asr_started"})

    # 调用 MCP 工具(mock)
    result = await mcp.call("asr.transcribe", {"audio_url": audio_url})
    text = result.get("text", "")

    # 模拟分词增量
    for token in text.split():
        await asyncio.sleep(0.05)
        yield A2AEvent(task_id=task_id, event="delta", data={"token": token})

    yield A2AEvent(task_id=task_id, event="final", data={"text": text})


@app.post("/a2a/task")
async def handle(task: A2ATask):
    if task.type != "asr.transcribe":
        raise HTTPException(status_code=400, detail=f"Unsupported task type: {task.type}")

    audio_url = task.payload.get("audio_url")
    if not audio_url:
        raise HTTPException(status_code=400, detail="payload.audio_url required")

    mcp = MCPClient(MCP_ENDPOINT)

    if task.stream:
        gen = _asr_stream(mcp, audio_url, task.id)
        return StreamingResponse(sse_stream_generator(gen), media_type="text/event-stream")
    else:
        # 非流式:一次性返回
        result = await mcp.call("asr.transcribe", {"audio_url": audio_url})
        return JSONResponse(result)

src/agents/legal_agent.py

from __future__ annotations
import os
import asyncio
from typing import AsyncIterator

from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse

from common.a2a import A2ATask, A2AEvent, sse_stream_generator
from common.mcp import MCPClient

load_dotenv()
MCP_ENDPOINT = os.getenv("MCP_ENDPOINT", "http://127.0.0.1:8099/mcp")

app = FastAPI(title="Legal Expert Agent")

async def _legal_stream(mcp: MCPClient, query: str, task_id: str) -> AsyncIterator[A2AEvent]:
    yield A2AEvent(task_id=task_id, event="progress", data={"msg": "legal_started"})

    # 1) RAG 检索(示例:调用 MCP 的 kb.search)
    kb = await mcp.call("kb.search", {"query": query, "top_k": 3})
    yield A2AEvent(task_id=task_id, event="progress", data={"kb_hits": kb.get("hits", [])})

    # 2) LLM 归纳(示例:调用 MCP 的 llm.complete)
    prompt = f"根据以下材料做初步法律分析:\n{kb.get('context', '')}\n---\n问题:{query}\n"
    llm = await mcp.call("llm.complete", {"prompt": prompt, "temperature": 0.2})

    # 模拟分段流出
    text = llm.get("text", "")
    for chunk in [text[i:i+60] for i in range(0, len(text), 60)]:
        await asyncio.sleep(0.05)
        yield A2AEvent(task_id=task_id, event="delta", data={"text": chunk})

    yield A2AEvent(task_id=task_id, event="final", data={"text": text})


@app.post("/a2a/task")
async def handle(task: A2ATask):
    if task.type != "legal.analyze":
        raise HTTPException(status_code=400, detail=f"Unsupported task type: {task.type}")

    query = task.payload.get("query")
    if not query:
        raise HTTPException(status_code=400, detail="payload.query required")

    mcp = MCPClient(MCP_ENDPOINT)

    if task.stream:
        gen = _legal_stream(mcp, query, task.id)
        return StreamingResponse(sse_stream_generator(gen), media_type="text/event-stream")
    else:
        kb = await mcp.call("kb.search", {"query": query, "top_k": 3})
        llm = await mcp.call("llm.complete", {"prompt": str(kb), "temperature": 0.2})
        return JSONResponse(llm)

注意:from __future__ annotations 是 Python 3.12 写法,若报错改为 from __future__ import annotations


src/agents/tts_agent.py

from __future__ import annotations
import os
import asyncio
from typing import AsyncIterator

from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse, JSONResponse

from common.a2a import A2ATask, A2AEvent, sse_stream_generator
from common.mcp import MCPClient

load_dotenv()
MCP_ENDPOINT = os.getenv("MCP_ENDPOINT", "http://127.0.0.1:8099/mcp")

app = FastAPI(title="TTS Agent")

async def _tts_stream(mcp: MCPClient, text: str, task_id: str) -> AsyncIterator[A2AEvent]:
    yield A2AEvent(task_id=task_id, event="progress", data={"msg": "tts_started"})
    # 调用 MCP 的 tts.speak(mock),返回“伪音频块”数组
    result = await mcp.call("tts.speak", {"text": text})
    chunks = result.get("chunks", [])
    for i, ch in enumerate(chunks):
        await asyncio.sleep(0.05)
        yield A2AEvent(task_id=task_id, event="delta", data={"seq": i, "audio_chunk": ch})
    yield A2AEvent(task_id=task_id, event="final", data={"ok": True})


@app.post("/a2a/task")
async def handle(task: A2ATask):
    if task.type != "tts.speak":
        raise HTTPException(status_code=400, detail=f"Unsupported task type: {task.type}")

    text = task.payload.get("text")
    if not text:
        raise HTTPException(status_code=400, detail="payload.text required")

    mcp = MCPClient(MCP_ENDPOINT)

    if task.stream:
        gen = _tts_stream(mcp, text, task.id)
        return StreamingResponse(sse_stream_generator(gen), media_type="text/event-stream")
    else:
        result = await mcp.call("tts.speak", {"text": text})
        return JSONResponse(result)

src/tools/mock_mcp_server.py

from __future__ import annotations
import random
from typing import Any, Dict

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI(title="Mock MCP Server (JSON-RPC over HTTP)")

TOOLS = [
    {"name": "asr.transcribe", "args": {"audio_url": "str"}},
    {"name": "kb.search", "args": {"query": "str", "top_k": "int"}},
    {"name": "llm.complete", "args": {"prompt": "str", "temperature": "float"}},
    {"name": "tts.speak", "args": {"text": "str"}},
]

class JSONRPCRequest(BaseModel):
    jsonrpc: str
    id: str
    method: str
    params: Dict[str, Any] | None = None

class JSONRPCResponse(BaseModel):
    jsonrpc: str = "2.0"
    id: str
    result: Dict[str, Any] | None = None
    error: Dict[str, Any] | None = None

@app.post("/mcp")
async def mcp(req: JSONRPCRequest) -> JSONRPCResponse:
    try:
        if req.method == "tools/list_tools":
            return JSONRPCResponse(id=req.id, result={"tools": TOOLS})

        if req.method == "tools/call":
            params = req.params or {}
            tool = params.get("tool")
            args = params.get("args", {})

            if tool == "asr.transcribe":
                text = f"(mock asr) transcribed from {args.get('audio_url','?')}"
                return JSONRPCResponse(id=req.id, result={"text": text})

            if tool == "kb.search":
                hits = [
                    {"id": f"doc{i}", "score": round(random.random(), 3), "snippet": f"snippet {i}"}
                    for i in range(1, (args.get("top_k", 3) + 1))
                ]
                context = "\n".join(h["snippet"] for h in hits)
                return JSONRPCResponse(id=req.id, result={"hits": hits, "context": context})

            if tool == "llm.complete":
                prompt = args.get("prompt", "")
                return JSONRPCResponse(id=req.id, result={"text": f"(mock llm) summary for: {prompt[:60]}..."})

            if tool == "tts.speak":
                text = args.get("text", "")
                chunks = [f"audio-bytes-chunk-{i}" for i in range(5)]
                return JSONRPCResponse(id=req.id, result={"chunks": chunks, "desc": f"(mock) tts of {text[:20]}..."})

            return JSONRPCResponse(id=req.id, error={"message": f"unknown tool: {tool}"})

        return JSONRPCResponse(id=req.id, error={"message": f"unknown method: {req.method}"})
    except Exception as e:
        return JSONRPCResponse(id=req.id, error={"message": str(e)})

启动方式

1) 安装依赖

python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
cp .env.sample .env

2) 分别启动 Mock MCP 与各 Agent、Router

# 终端 1:Mock MCP Server
uvicorn src.tools.mock_mcp_server:app --host 0.0.0.0 --port 8099 --reload

# 终端 2:ASR Agent
uvicorn src.agents.asr_agent:app --host 0.0.0.0 --port 8011 --reload

# 终端 3:Legal Agent
uvicorn src.agents.legal_agent:app --host 0.0.0.0 --port 8012 --reload

# 终端 4:TTS Agent
uvicorn src.agents.tts_agent:app --host 0.0.0.0 --port 8013 --reload

# 终端 5:Router
uvicorn src.router:app --host 0.0.0.0 --port 8000 --reload

测试(SSE 流式)

例如让 Router 把任务派给 Legal Agent:

curl -N -H 'Accept: text/event-stream' \
  -H 'Content-Type: application/json' \
  -d '{
    "type": "legal.analyze",
    "payload": {"query": "公司违法辞退赔偿怎么计算?"},
    "stream": true
  }' \
  http://127.0.0.1:8000/a2a/task

你会看到 data: {"task_id":..., "event":"progress"...}deltafinal 等事件逐步返回。


架构要点回顾

  • A2A/a2a/task 即代理之间的统一入口;Router 依据 task.type 做分发,并透传 SSE 流式结果。
  • MCP:各代理内部通过 MCPClient 调用工具(这里用 mock_mcp_server 演示),未来可无缝替换为真实 MCP 工具(向量库、Redis 记忆、企业 API、LLM、TTS/ASR 服务等)。
  • 可扩展:新增代理=加一个服务并在 Router 的 ROUTE_TABLE 加前缀映射;新增工具=在 MCP 端注册新 tool 名并在代理里调用。

下一步可升级点

  1. 鉴权/租户隔离:在 Router 与 Agent 层统一加 Bearer/JWT;任务元数据里携带 tenant_id / session_id。
  2. 重试与超时:Router 对下游 Agent 实现熔断/超时、错误回传;Agent 内部对 MCP 工具加重试策略。
  3. 真正的流式工具:将 MCP 端改为 SSE/分块输出,MCPClient 增加 acall_stream,实现端到端全链路流式。
  4. 记忆:通过 MCP 工具挂 Redis:memory.write/read/append,在 Router/Agent 层封装对话级短期记忆。
  5. 发现与注册:为 A2A 增加 /.well-known/agents,支持 Agent 动态注册/心跳;或引入服务发现组件。
  6. 观测:统一埋点(OpenTelemetry)、任务状态表(Postgres/ClickHouse)、SSE 日志镜像通道用于回放与质检。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐