手写 MCP 协议入门(分层版 + 源码段 + 单元测试)

对应源码:test/mcp_framework.py
目标读者:没有 MCP 基础,但会一点 Python。


0. 先看最终分层

mcp_framework.py 把 MCP 拆成 4 层:

  1. Transport:只管帧协议(Content-Length
  2. Server:只管 JSON-RPC 方法分发与 Tool 调用
  3. Client:只管请求发送、响应接收、握手封装
  4. Agent:只管业务编排(调用哪些工具)

这个顺序很重要:先把底层协议跑通,再谈 Agent 体验


1. Layer 1: Transport(协议地基)

它在源码里就是协议地基,只干一件事:

把 JSON 消息按 MCP/LSP 的 Content-Length 帧格式在二进制流里收发

MCP 使用类似 LSP 的帧

Content-Length: <字节数>\r\n
\r\n
<JSON body>

完整代码

# ============================================================================
# Layer 1: Transport — Content-Length 帧协议
# ============================================================================
#
# MCP 使用与 LSP (Language Server Protocol) 相同的帧格式:
#
#   Content-Length: <字节数>\r\n
#   \r\n
#   <JSON body>
#
# 这一层只负责:把 Python dict 变成帧写入管道,从管道读帧还原为 dict。
# ============================================================================
class MCPTransport:
    """
    MCP 帧协议传输层。

    职责:
    - write_message: dict → Content-Length 帧 → 写入管道
    - read_message:  管道 → 解析 Content-Length 帧 → dict

    帧格式:
    ```
    Content-Length: 42\\r\\n
    \\r\\n
    {"jsonrpc":"2.0","id":1,"method":"..."}
    ```
    """

    @staticmethod
    def write_message(stream: BufferedWriter, payload: Dict[str, Any]) -> None:
        """
        将一条 JSON-RPC 消息写入字节流。

        流程:
        1. payload → json.dumps → UTF-8 bytes (body)
        2. 计算 body 字节长度
        3. 构造 header: "Content-Length: {长度}\\r\\n\\r\\n"
        4. 写入 header + body
        5. flush() 立即推送

        Args:
            stream: 可写的二进制流(如 sys.stdout.buffer 或 proc.stdin)
            payload: 待发送的 JSON-RPC 消息字典
        """
        body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
        header = f"Content-Length: {len(body)}\r\n\r\n".encode("utf-8")
        stream.write(header)
        stream.write(body)
        stream.flush()

    @staticmethod
    def read_message(stream: BufferedReader) -> Optional[Dict[str, Any]]:
        """
        从字节流中读取一条 JSON-RPC 消息。

        流程:
        1. 逐行读取 header,直到遇到空行 (\\r\\n)
        2. 从 header 中提取 Content-Length
        3. 精确读取 Content-Length 个字节作为 body
        4. json.loads → dict

        Args:
            stream: 可读的二进制流(如 sys.stdin.buffer 或 proc.stdout)

        Returns:
            解析后的 JSON-RPC 消息字典,管道关闭时返回 None
        """
        # ── 读取 Header ──
        content_length = 0
        while True:
            line = stream.readline()
            if not line:
                # 管道关闭 / EOF
                return None

            decoded = line.decode("utf-8").strip()

            # 空行 = header 结束
            if not decoded:
                break

            # 解析 "Content-Length: 42"
            if decoded.lower().startswith("content-length:"):
                content_length = int(decoded.split(":", 1)[1].strip())

        if content_length <= 0:
            return None

        # ── 读取 Body ──
        body = stream.read(content_length)
        if not body:
            return None

        return json.loads(body.decode("utf-8"))

MCPTransport 的职责就两条:

  • write_message(stream, payload):dict -> JSON bytes -> header + body -> 写入 -> flush()
  • read_message(stream):读 header(按行) -> 解析 Content-Length -> 精确读 body -> json.loads()

1.1 写帧:write_message

来自 MCPTransport.write_message 的核心源码段:

def write_message(stream: BufferedWriter, payload: Dict[str, Any]) -> None:
    """
    将一条 JSON-RPC 消息写入字节流。

    流程:
    1. payload → json.dumps → UTF-8 bytes (body)
    2. 计算 body 字节长度
    3. 构造 header: "Content-Length: {长度}\\r\\n\\r\\n"
    4. 写入 header + body
    5. flush() 立即推送

    Args:
        stream: 可写的二进制流(如 sys.stdout.buffer 或 proc.stdin)
        payload: 待发送的 JSON-RPC 消息字典
    """
    body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
    header = f"Content-Length: {len(body)}\r\n\r\n".encode("utf-8")
    stream.write(header)
    stream.write(body)
    stream.flush()

要点:

  • Content-Length 必须是 字节长度,不是字符数。
  • flush() 必须调用,否则对端可能一直收不到数据。

1.2 读帧:read_message

来自 MCPTransport.read_message 的核心源码段:

"""
流程:
1. 逐行读取 header,直到遇到空行 (\\r\\n)
2. 从 header 中提取 Content-Length
3. 精确读取 Content-Length 个字节作为 body
4. json.loads → dict
"""
while True:
    line = stream.readline()
    if not line:
        break

    decoded = line.decode("utf-8")

    if not decoded:
        break

    # 解析 "Content-Length: 42"
    if decoded.lower().startwith("content-length:"):
        content_length = int(decoded.split(":", 1)[1].strip())

    if content_length <= 0:
        return None

# ── 读取 Body ──
body = stream.read(content_length)
if not body:
    return None

return json.loads(body.decode("utf-8"))

要点:

  • 先读 header,空行结束。
  • 再按长度精确读取 body。

1.3 这一层先测什么

  • 写入再读出是否一致(round-trip)。
  • 空流是否返回 None

2. Layer 2: Server(方法分发 + Tool 注册)

这层的目标是:让“客户端发来的 JSON-RPC 请求”能被正确分发到 handler,并且能注册/调用工具

完整代码

# ============================================================================
# Layer 2: Server — JSON-RPC 请求处理 + Tool 注册
# ============================================================================
#
# MCP Server 的职责:
# 1. 监听 stdin,循环读取 JSON-RPC 请求
# 2. 根据 method 字段分发处理:
#    - "initialize"    → 返回服务器能力声明
#    - "tools/list"    → 返回所有注册的 Tool 列表
#    - "tools/call"    → 执行具体 Tool 并返回结果
# 3. 将响应写入 stdout
# ============================================================================

class Tool:
    """
    一个 MCP Tool 的描述。

    Attributes:
        name: 工具名称(唯一标识)
        description: 工具描述(给 LLM 看的)
        input_schema: JSON Schema 描述输入参数
        handler: 实际执行函数 (arguments: dict) → Any
    """
    def __init__(
        self, 
        name: str,
        description: str,
        input_schema: Dict[str, Any],
        handler: Callable[[Dict[str, Any]], Any]
    ) -> None:
        self.name = name
        self.description = description
        self.input_schema = input_schema
        self.handler = handler

    def to_schema(self) -> Dict[str, Any]:
        """
        转换为 MCP tools/list 响应格式。
        """
        return {
            "name": self.name,
            "description": self.description,
            "inputSchema": self.input_schema,
        }

class MCPServer:
    """
    最小 MCP Server(stdio 传输)。

    职责:
    - 注册 Tool
    - 监听 stdin 上的 JSON-RPC 请求
    - 分发并执行,将结果写回 stdout

    MCP 协议方法处理:
    ```
    initialize              → 返回 serverInfo + capabilities
    notifications/initialized → 忽略(notification 无需响应)
    tools/list              → 返回已注册 Tool 的 schema 列表
    tools/call              → 执行 Tool handler,返回结果
    ```
    """
    # MCP 协议版本
    PROTOCOL_VERSION = "2024-11-05"

    def __init__(self, name: str = "mcp-server", version: str = "0.1.0") -> None:
        self.name = name
        self.version = version
        self._tools: Dict[str, Tool] = {}
    
    def register_tool(self, tool: Tool) -> None:
        """
        注册一个 Tool。

        Args:
            tool: Tool 实例
        """
        self._tools[tool.name] = tool
    
    def tool(
        self,
        name: str,
        description: str,
        input_schema: Dict[str, Any],
    ) -> Callable:
        """
        装饰器方式注册 Tool。

        用法:
        ```python
        @server.tool("add", "两数相加", {...})
        def add(args):
            return args["a"] + args["b"]
        ```
        """

        def decorator(func: Callable[[Dict[str, Any]], Any]) -> Callable:
            self.register_tool(Tool(name, description, input_schema, func))
            return func
        return decorator

    def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        处理 initialize 请求。

        返回:
        - protocolVersion: 支持的协议版本
        - serverInfo: 服务器名称和版本
        - capabilities: 声明支持 tools
        """
        return {
            "protocolVersion": self.PROTOCOL_VERSION,
            "serverInfo": {
                "name": self.name,
                "version": self.version,
            },
            "capabilities": {
                "tools": {"listChanged": False},
            },
        }

    def _handle_tools_list(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        处理 tools/list 请求。

        返回所有已注册 Tool 的 schema。
        """
        return {
            "tools": [tool.to_schema() for tool in self._tools.values()],
        }

    def _handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        处理 tools/call 请求。

        1. 从 params 中取出 tool name 和 arguments
        2. 查找对应的 Tool handler
        3. 执行 handler,封装结果

        返回格式:
        ```json
        {
          "content": [{"type": "text", "text": "..."}],
          "isError": false
        }
        ```
        """
        tool_name = params.get("name", "")
        arguments = params.get("arguments", {})

        tool = self._tools.get(tool_name)
        if tool is None:
            raise ValueError(f"未知工具: {tool_name}")
        
        result = tool.handler(arguments)

        # 统一封装为 MCP content 格式
        if isinstance(result, str):
            text = result
        else:
            text = json.dumps(result, ensure_ascii=False, indent=2)

        return {
            "content": [{"type": "text", "text": text}],
            "isError": False,
        }

    def _dispatch(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        根据 method 分发到对应处理函数。

        Args:
            method: JSON-RPC method 字段
            params: JSON-RPC params 字段

        Returns:
            result 字典

        Raises:
            ValueError: 未知 method
        """
        dispatch_table: Dict[str, Callable] = {
            "initialize": self._handle_initialize,
            "tools/list": self._handle_tools_list,
            "tools/call": self._handle_tools_call,
        }

        handler = dispatch_table.get(method)
        if handler is None:
            raise ValueError(f"不支持的方法: {method}")

        return handler(params)
    
    def run(self) -> None:
        """
        启动 Server 主循环。

        流程:
        ```
        while True:
            1. 从 stdin 读取一条 JSON-RPC 消息
            2. 如果是 notification(无 id)→ 跳过
            3. 分发到对应 handler
            4. 构造 JSON-RPC response
            5. 写入 stdout
        ```
        """
        stdin = sys.stdin.buffer
        stdout = sys.stdout.buffer

        while True:
            # 读取请求
            request = MCPTransport.read_message(stdin)
            if request is None:
                break # EOF → 管道关闭,退出

            req_id = request.get("id")
            method = request.get("method", "")
            params = request.get("params") or {}

            # Notification(无 id )不需要响应
            if req_id is None:
                continue

            # 处理请求
            try:
                result = self._dispatch(method, params)
                response = {
                    "jsonrpc": "2.0",
                    "id": req_id,
                    "result": result,
                }
            except Exception as exc:
                response = {
                    "jsonrpc": "2.0",
                    "id": req_id,
                    "error": {
                        "code": -32000,
                        "message": str(exc),
                    },
                }

                # 写回相应
                MCPTransport.write_message(stdout, response)

2.1 Tool 抽象

Tool 类最核心字段:

class Tool:
    def __init__(self, name, description, input_schema, handler):
        self.name = name
        self.description = description
        self.input_schema = input_schema
        self.handler = handler

Tool 的职责很纯粹:描述自己 + 执行自己。

2.2 Server 的方法分发

MCPServer._dispatch 核心段:

dispatch_table = {
    "initialize": self._handle_initialize,
    "tools/list": self._handle_tools_list,
    "tools/call": self._handle_tools_call,
}
handler = dispatch_table.get(method)
if handler is None:
    raise ValueError(f"不支持的方法: {method}")
return handler(params)

2.3 Tool 调用路径

_handle_tools_call 核心段:

tool_name = params.get("name", "")
arguments = params.get("arguments", {})
tool = self._tools.get(tool_name)
if tool is None:
    raise ValueError(f"未知工具: {tool_name}")
result = tool.handler(arguments)

2.4 这一层先测什么

  • initialize 返回结构是否完整。
  • tools/list 是否能列出注册工具。
  • tools/call 正常调用与未知工具异常。
  • _dispatch 未知方法异常。

3. Layer 3: Client(请求响应与握手)

第三层把“协议对话”抽象成函数调用(client side 的 SDK)

完整代码

# ============================================================================
# Layer 3: Client — JSON-RPC 请求发送 + 响应接收
# ============================================================================
#
# MCP Client 的职责:
# 1. 通过 subprocess 启动 Server 子进程
# 2. 发送 JSON-RPC 请求到子进程 stdin
# 3. 从子进程 stdout 读取响应
# 4. 提供便捷方法:initialize / list_tools / call_tool
# ============================================================================


class MCPClient:
    """
    最小 MCP Client(stdio 传输)。

    通过 subprocess.Popen 启动 Server 子进程,
    用管道进行 JSON-RPC 通信。

    生命周期:
    ```
    client = MCPClient()
    client.start("python", ["server.py", "--server"])
    client.initialize()
    tools = client.list_tools()
    result = client.call_tool("add", {"a": 1, "b": 2})
    client.stop()
    ```
    """

    def __init__(self) -> None:
        self._next_id: int = 1
        self._proc: Optional[subprocess.Popen] = None

    @property
    def is_running(self) -> bool:
        return self._proc is not None and self._proc.poll() is None

    def start(self, command: str, args: List[str], cwd: Optional[str] = None):
        """
        启动 Server 子进程。

        发生了什么:
        ```
        操作系统层面:
        1. 创建 3 对管道 (stdin/stdout/stderr)
        2. fork 子进程
        3. 子进程的 stdin/stdout/stderr 连接到管道
        4. 子进程执行 command + args

        Python 层面:
        self._proc.stdin  → 管道写入端(父进程写,子进程从stdin读)
        self._proc.stdout → 管道读取端(子进程写stdout,父进程读)
        ```

        Args:
            command: 可执行文件路径(如 "python" 或 sys.executable)
            args: 命令行参数列表
            cwd: 工作目录
        """
         self._proc = subprocess.Popen(
            [command] + args,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            cwd=cwd,
        )

    def stop(self) -> None:
        """
        停止 Server 子进程。
        """
        if self._proc and self._proc.poll() is None:
            self._proc.terminate()
            try:
                self._proc.wait(timeout = 5)
            except subprocess.TimeoutExpired:
                self._proc.kill()

        self._proc = None

    def _send(self, payload: Dict[str, Any]) -> None:
        """
        发送一条 JSON-RPC 消息到 Server 的 stdin。
        """
        if not self._proc or not self._proc.stdin:
            raise RuntimeError("Server 未启动")
        MCPTransport.write_message(self._proc.stdin, payload)

    def _recv(self) -> Dict[str, Any]:
        """
        从 Server 的 stdout 读取一条 JSON-RPC 响应。
        """
        if not self._proc or not self._proc.stdout:
            raise RuntimeError("Server 未启动")
        msg = MCPTransport.read_message(self._proc.stdout)
        if msg is None:
            raise RuntimeError("Server 无响应(管道已关闭)")
        return msg 

    def request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        发送 JSON-RPC request 并等待 response。

        流程:
        ```
        1. 生成自增 id
        2. 构造 {"jsonrpc":"2.0", "id": N, "method":"...", "params":{...}}
        3. _send() 写入管道
        4. _recv() 从管道读取
        5. 匹配 id,返回 result
        ```

        Args:
            method: RPC 方法名
            params: 参数字典

        Returns:
            response 中的 result 字段
        """
        req_id = self._next_id
        self._next_id += 1

        request = {
            "jsonrpc": "2.0",
            "id": req_id,
            "method": method,
            "params": params or {},
        }
        self._send(request)

        # 循环读取,跳过不匹配的消息(如 notification)
        while True:
            response = self._recv()
            if response.get("id") == req_id:
                if "error: in response:
                    err = response["error"]
                    raise RuntimeError(f"RPC Error [{err.get('code')}]: {err.get('message')}")
                return response.get("result", {})
    
    def notify(self, method: str, params: Optional[Dict[str, Any]] = None) -> None:
        """
        发送 JSON-RPC notification(无 id,Server 不回复)。

        Args:
            method: 通知方法名
            params: 参数字典
        """
        notification = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
        }
        self._send(notification)
    
    # ── 便捷方法 ──

    def initialize(self) -> Dict[str, Any]:
        """
        执行 MCP 握手:initialize + notifications/initialized。

        MCP 握手流程:
        ```
        Client                          Server
          │                                │
          │──── initialize ───────────────►│
          │     {protocolVersion, ...}     │
          │                                │
          │◄─── result ───────────────────│
          │     {serverInfo, capabilities} │
          │                                │
          │──── notifications/initialized ►│
          │     (notification, 无响应)     │
          │                                │
        ```
        """
        result = self.request("initialize", {
            "protocolVersion": "2024-11-05",
            "clientInfo": {"name": "mcp-client", "version": "0.1.0"},
            "capabilities": {},
        })
        self.notify("notifications/initialized")
        return result

    def list_tools(self) -> List[Dict[str, Any]]:
        """
        获取 Server 注册的所有 Tool 列表。
        """
        result = self.request("tools/list")
        return list(result.get("tools") or [])

    def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """
        调用指定 Tool。

        Args:
            name: 工具名
            arguments: 工具参数

        Returns:
            Tool 执行结果(MCP content 格式)
        """
        return self.request("tools/call", {"name": name, "arguments": arguments})

3.1 发请求:request

核心段:

req_id = self._next_id
self._next_id += 1
request = {"jsonrpc": "2.0", "id": req_id, "method": method, "params": params or {}}
self._send(request)

while True:
    response = self._recv()
    if response.get("id") == req_id:
        if "error" in response:
            raise RuntimeError(...)
        return response.get("result", {})

要点:

  • id 自增并匹配响应。
  • 可跳过不匹配消息(如别的通知)。

3.2 MCP 握手:initialize

核心段:

result = self.request("initialize", {...})
self.notify("notifications/initialized")
return result

3.3 这一层先测什么

  • request 是否正确处理 error
  • initialize 是否先 requestnotify
  • list_tools/call_tool 的封装是否正确转发。

4. Layer 4: Agent(业务编排)

有了 tools/list 和 tools/call 还不够——我要让 LLM 知道何时调用、怎么调用、拿到结果后怎么继续推理。

完整代码

# ============================================================================
# Layer 4: Agent — 业务编排层
# ============================================================================
#
# Agent 的职责:
# 1. 管理 MCP Client 的生命周期
# 2. 根据用户意图选择合适的 Tool 调用
# 3. 组合多个 Tool 调用的结果
#
# 在真实场景中,Agent 会接入 LLM 来做意图识别和 Tool 选择。
# 这里用简单的命令行参数模拟。
# ============================================================================


class MCPAgent:
    """
    最小 MCP Agent。

    管理 Client 生命周期,提供用户交互入口。

    在真实系统中,这里会接入 LLM:
    ```
    用户问题 → LLM 分析 → 选择 Tool → 调用 → LLM 总结 → 返回
    ```

    本实现简化为命令行直接指定 Tool。
    """

    def __init__(self) -> None:
        self.client = MCPClient()
    
    def start(self) -> Dict[str, Any]:
        """
        启动 Agent:拉起 Server 子进程并完成握手。
        """
        script_path = str(Path(__file__).resolve())
        self.client.start(
            command=sys.executable,
            args=[script_path, "--server"],
            cwd=str(Path(__file__).resolve().parent),
        )
        return self.client.initialize()

    def stop(self) -> None:
        """
        停止 Agent。
        """
        self.client.stop()

    def run_interactive(self) -> None:
        """
        交互式运行:展示 Tool 列表,然后执行指定 Tool。
        """
        try:
            # 握手
            init_result = self.start()
            print("=" * 60)
            print("MCP Agent 已启动")
            print(f"Server: {init_result.get('serverInfo', {}).get('name')}")
            print(f"Protocol: {init_result.get('protocolVersion')}")
            print("=" * 60)

            # 列出工具
            tools = self.client.list_tools()
            print(f"\n可用工具 ({len(tools)} 个):")
            for i, tool in enumerate(tools, 1):
                print(f"  {i}. {tool['name']}{tool.get('description', '')}")    

            # 交互循环
            print("\n输入格式: <tool_name> <json_args>")
            print("输入 'quit' 退出\n")

            while True:
                try:
                    line = input(">>> ").strip()
                except (EOFError, KeyboardInterrupt):
                    break

                if not line or line.lower() in ("quit", "exit", "q"):
                    break

                parts = line.split(" ", 1)
                tool_name = parts[0]
                tool_args = {}
                if len(parts) > 1:
                    try:
                        tool_args = json.loads(parts[1])
                    except json.JSONDecodeError as e:
                        print(f"JSON 解析出错: {e}")
                        continue

                try:
                    result = self.client.call_tool(tool_name, tool_args)
                    print(json.dumps(result, ensure_ascii=False, indent=2))
                except RuntimeError as e:
                    print(f"调用出错: {e}")
                
                finally:
                    self.stop()
                    print("\nAgent 已停止。")

4.1 启动流程

MCPAgent.start 核心段:

script_path = str(Path(__file__).resolve())
self.client.start(
    command=sys.executable,
    args=[script_path, "--server"],
    cwd=str(Path(__file__).resolve().parent),
)
return self.client.initialize()

要点:

  • Agent 不直接写帧。
  • Agent 只调用 Client 的能力,保持边界清晰。

4.2 这一层先测什么

  • start() 是否调用 client.start + client.initialize
  • stop() 是否调用 client.stop

5. 完整测试

当前路径下新建文件 mcp_framework.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
MCP 协议基础架构 — 从零手写

从最底层的帧协议到最上层的 Agent,完整实现 MCP stdio 通信。
不依赖任何第三方 MCP SDK,纯 Python 标准库。

架构分层:
  Layer 1: Transport    — Content-Length 帧协议读写
  Layer 2: Server       — JSON-RPC 请求分发 + Tool 注册
  Layer 3: Client       — JSON-RPC 请求发送 + 响应接收
  Layer 4: Agent        — 业务逻辑编排(决定调用哪个 Tool)

用法:
  # 1. 启动 Server(通常由 Client 自动拉起子进程)
  python mcp_framework.py --server

  # 2. 运行 Agent Demo
  python mcp_framework.py

  # 3. 单独测试某个 Tool
  python mcp_framework.py --call add '{"a": 1, "b": 2}'
"""

from __future__ import annotations

import json
import subprocess
import sys
from abc import ABC, abstractmethod
from io import BufferedReader, BufferedWriter
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple


# ============================================================================
# Layer 1: Transport — Content-Length 帧协议
# ============================================================================
#
# MCP 使用与 LSP (Language Server Protocol) 相同的帧格式:
#
#   Content-Length: <字节数>\r\n
#   \r\n
#   <JSON body>
#
# 这一层只负责:把 Python dict 变成帧写入管道,从管道读帧还原为 dict。
# ============================================================================


class MCPTransport:
    """
    MCP 帧协议传输层。

    职责:
    - write_message: dict → Content-Length 帧 → 写入管道
    - read_message:  管道 → 解析 Content-Length 帧 → dict

    帧格式:
    ```
    Content-Length: 42\\r\\n
    \\r\\n
    {"jsonrpc":"2.0","id":1,"method":"..."}
    ```
    """

    @staticmethod
    def write_message(stream: BufferedWriter, payload: Dict[str, Any]) -> None:
        """
        将一条 JSON-RPC 消息写入字节流。

        流程:
        1. payload → json.dumps → UTF-8 bytes (body)
        2. 计算 body 字节长度
        3. 构造 header: "Content-Length: {长度}\\r\\n\\r\\n"
        4. 写入 header + body
        5. flush() 立即推送

        Args:
            stream: 可写的二进制流(如 sys.stdout.buffer 或 proc.stdin)
            payload: 待发送的 JSON-RPC 消息字典
        """
        body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
        header = f"Content-Length: {len(body)}\r\n\r\n".encode("utf-8")
        stream.write(header)
        stream.write(body)
        stream.flush()

    @staticmethod
    def read_message(stream: BufferedReader) -> Optional[Dict[str, Any]]:
        """
        从字节流中读取一条 JSON-RPC 消息。

        流程:
        1. 逐行读取 header,直到遇到空行 (\\r\\n)
        2. 从 header 中提取 Content-Length
        3. 精确读取 Content-Length 个字节作为 body
        4. json.loads → dict

        Args:
            stream: 可读的二进制流(如 sys.stdin.buffer 或 proc.stdout)

        Returns:
            解析后的 JSON-RPC 消息字典,管道关闭时返回 None
        """
        # ── 读取 Header ──
        content_length = 0
        while True:
            line = stream.readline()
            if not line:
                # 管道关闭 / EOF
                return None

            decoded = line.decode("utf-8").strip()

            # 空行 = header 结束
            if not decoded:
                break

            # 解析 "Content-Length: 42"
            if decoded.lower().startswith("content-length:"):
                content_length = int(decoded.split(":", 1)[1].strip())

        if content_length <= 0:
            return None

        # ── 读取 Body ──
        body = stream.read(content_length)
        if not body:
            return None

        return json.loads(body.decode("utf-8"))


# ============================================================================
# Layer 2: Server — JSON-RPC 请求处理 + Tool 注册
# ============================================================================
#
# MCP Server 的职责:
# 1. 监听 stdin,循环读取 JSON-RPC 请求
# 2. 根据 method 字段分发处理:
#    - "initialize"    → 返回服务器能力声明
#    - "tools/list"    → 返回所有注册的 Tool 列表
#    - "tools/call"    → 执行具体 Tool 并返回结果
# 3. 将响应写入 stdout
# ============================================================================


class Tool:
    """
    一个 MCP Tool 的描述。

    Attributes:
        name: 工具名称(唯一标识)
        description: 工具描述(给 LLM 看的)
        input_schema: JSON Schema 描述输入参数
        handler: 实际执行函数 (arguments: dict) → Any
    """

    def __init__(
        self,
        name: str,
        description: str,
        input_schema: Dict[str, Any],
        handler: Callable[[Dict[str, Any]], Any],
    ) -> None:
        self.name = name
        self.description = description
        self.input_schema = input_schema
        self.handler = handler

    def to_schema(self) -> Dict[str, Any]:
        """
        转换为 MCP tools/list 响应格式。
        """
        return {
            "name": self.name,
            "description": self.description,
            "inputSchema": self.input_schema,
        }


class MCPServer:
    """
    最小 MCP Server(stdio 传输)。

    职责:
    - 注册 Tool
    - 监听 stdin 上的 JSON-RPC 请求
    - 分发并执行,将结果写回 stdout

    MCP 协议方法处理:
    ```
    initialize              → 返回 serverInfo + capabilities
    notifications/initialized → 忽略(notification 无需响应)
    tools/list              → 返回已注册 Tool 的 schema 列表
    tools/call              → 执行 Tool handler,返回结果
    ```
    """

    # MCP 协议版本
    PROTOCOL_VERSION = "2024-11-05"

    def __init__(self, name: str = "mcp-server", version: str = "0.1.0") -> None:
        self.name = name
        self.version = version
        self._tools: Dict[str, Tool] = {}

    def register_tool(self, tool: Tool) -> None:
        """
        注册一个 Tool。

        Args:
            tool: Tool 实例
        """
        self._tools[tool.name] = tool

    def tool(
        self,
        name: str,
        description: str,
        input_schema: Dict[str, Any],
    ) -> Callable:
        """
        装饰器方式注册 Tool。

        用法:
        ```python
        @server.tool("add", "两数相加", {...})
        def add(args):
            return args["a"] + args["b"]
        ```
        """

        def decorator(func: Callable[[Dict[str, Any]], Any]) -> Callable:
            self.register_tool(Tool(name, description, input_schema, func))
            return func

        return decorator

    def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        处理 initialize 请求。

        返回:
        - protocolVersion: 支持的协议版本
        - serverInfo: 服务器名称和版本
        - capabilities: 声明支持 tools
        """
        return {
            "protocolVersion": self.PROTOCOL_VERSION,
            "serverInfo": {
                "name": self.name,
                "version": self.version,
            },
            "capabilities": {
                "tools": {"listChanged": False},
            },
        }

    def _handle_tools_list(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        处理 tools/list 请求。

        返回所有已注册 Tool 的 schema。
        """
        return {
            "tools": [tool.to_schema() for tool in self._tools.values()],
        }

    def _handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        处理 tools/call 请求。

        1. 从 params 中取出 tool name 和 arguments
        2. 查找对应的 Tool handler
        3. 执行 handler,封装结果

        返回格式:
        ```json
        {
          "content": [{"type": "text", "text": "..."}],
          "isError": false
        }
        ```
        """
        tool_name = params.get("name", "")
        arguments = params.get("arguments", {})

        tool = self._tools.get(tool_name)
        if tool is None:
            raise ValueError(f"未知工具: {tool_name}")

        result = tool.handler(arguments)

        # 统一封装为 MCP content 格式
        if isinstance(result, str):
            text = result
        else:
            text = json.dumps(result, ensure_ascii=False, indent=2)

        return {
            "content": [{"type": "text", "text": text}],
            "isError": False,
        }

    def _dispatch(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        根据 method 分发到对应处理函数。

        Args:
            method: JSON-RPC method 字段
            params: JSON-RPC params 字段

        Returns:
            result 字典

        Raises:
            ValueError: 未知 method
        """
        dispatch_table: Dict[str, Callable] = {
            "initialize": self._handle_initialize,
            "tools/list": self._handle_tools_list,
            "tools/call": self._handle_tools_call,
        }

        handler = dispatch_table.get(method)
        if handler is None:
            raise ValueError(f"不支持的方法: {method}")

        return handler(params)

    def run(self) -> None:
        """
        启动 Server 主循环。

        流程:
        ```
        while True:
            1. 从 stdin 读取一条 JSON-RPC 消息
            2. 如果是 notification(无 id)→ 跳过
            3. 分发到对应 handler
            4. 构造 JSON-RPC response
            5. 写入 stdout
        ```
        """
        stdin = sys.stdin.buffer
        stdout = sys.stdout.buffer

        while True:
            # 读取请求
            request = MCPTransport.read_message(stdin)
            if request is None:
                break  # EOF → 管道关闭,退出

            req_id = request.get("id")
            method = request.get("method", "")
            params = request.get("params") or {}

            # Notification(无 id )不需要响应
            if req_id is None:
                continue

            # 处理请求
            try:
                result = self._dispatch(method, params)
                response = {
                    "jsonrpc": "2.0",
                    "id": req_id,
                    "result": result,
                }
            except Exception as exc:
                response = {
                    "jsonrpc": "2.0",
                    "id": req_id,
                    "error": {
                        "code": -32000,
                        "message": str(exc),
                    },
                }

            # 写回响应
            MCPTransport.write_message(stdout, response)


# ============================================================================
# Layer 3: Client — JSON-RPC 请求发送 + 响应接收
# ============================================================================
#
# MCP Client 的职责:
# 1. 通过 subprocess 启动 Server 子进程
# 2. 发送 JSON-RPC 请求到子进程 stdin
# 3. 从子进程 stdout 读取响应
# 4. 提供便捷方法:initialize / list_tools / call_tool
# ============================================================================


class MCPClient:
    """
    最小 MCP Client(stdio 传输)。

    通过 subprocess.Popen 启动 Server 子进程,
    用管道进行 JSON-RPC 通信。

    生命周期:
    ```
    client = MCPClient()
    client.start("python", ["server.py", "--server"])
    client.initialize()
    tools = client.list_tools()
    result = client.call_tool("add", {"a": 1, "b": 2})
    client.stop()
    ```
    """

    def __init__(self) -> None:
        self._next_id: int = 1
        self._proc: Optional[subprocess.Popen] = None

    @property
    def is_running(self) -> bool:
        return self._proc is not None and self._proc.poll() is None

    def start(self, command: str, args: List[str], cwd: Optional[str] = None) -> None:
        """
        启动 Server 子进程。

        发生了什么:
        ```
        操作系统层面:
        1. 创建 3 对管道 (stdin/stdout/stderr)
        2. fork 子进程
        3. 子进程的 stdin/stdout/stderr 连接到管道
        4. 子进程执行 command + args

        Python 层面:
        self._proc.stdin  → 管道写入端(父进程写,子进程从stdin读)
        self._proc.stdout → 管道读取端(子进程写stdout,父进程读)
        ```

        Args:
            command: 可执行文件路径(如 "python" 或 sys.executable)
            args: 命令行参数列表
            cwd: 工作目录
        """
        self._proc = subprocess.Popen(
            [command] + args,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            cwd=cwd,
        )

    def stop(self) -> None:
        """
        停止 Server 子进程。
        """
        if self._proc and self._proc.poll() is None:
            self._proc.terminate()
            try:
                self._proc.wait(timeout=5)
            except subprocess.TimeoutExpired:
                self._proc.kill()
        self._proc = None

    def _send(self, payload: Dict[str, Any]) -> None:
        """
        发送一条 JSON-RPC 消息到 Server 的 stdin。
        """
        if not self._proc or not self._proc.stdin:
            raise RuntimeError("Server 未启动")
        MCPTransport.write_message(self._proc.stdin, payload)

    def _recv(self) -> Dict[str, Any]:
        """
        从 Server 的 stdout 读取一条 JSON-RPC 响应。
        """
        if not self._proc or not self._proc.stdout:
            raise RuntimeError("Server 未启动")
        msg = MCPTransport.read_message(self._proc.stdout)
        if msg is None:
            raise RuntimeError("Server 无响应(管道已关闭)")
        return msg

    def request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        发送 JSON-RPC request 并等待 response。

        流程:
        ```
        1. 生成自增 id
        2. 构造 {"jsonrpc":"2.0", "id": N, "method":"...", "params":{...}}
        3. _send() 写入管道
        4. _recv() 从管道读取
        5. 匹配 id,返回 result
        ```

        Args:
            method: RPC 方法名
            params: 参数字典

        Returns:
            response 中的 result 字段
        """
        req_id = self._next_id
        self._next_id += 1

        request = {
            "jsonrpc": "2.0",
            "id": req_id,
            "method": method,
            "params": params or {},
        }
        self._send(request)

        # 循环读取,跳过不匹配的消息(如 notification)
        while True:
            response = self._recv()
            if response.get("id") == req_id:
                if "error" in response:
                    err = response["error"]
                    raise RuntimeError(f"RPC Error [{err.get('code')}]: {err.get('message')}")
                return response.get("result", {})

    def notify(self, method: str, params: Optional[Dict[str, Any]] = None) -> None:
        """
        发送 JSON-RPC notification(无 id,Server 不回复)。

        Args:
            method: 通知方法名
            params: 参数字典
        """
        notification = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
        }
        self._send(notification)

    # ── 便捷方法 ──

    def initialize(self) -> Dict[str, Any]:
        """
        执行 MCP 握手:initialize + notifications/initialized。

        MCP 握手流程:
        ```
        Client                          Server
          │                                │
          │──── initialize ───────────────►│
          │     {protocolVersion, ...}     │
          │                                │
          │◄─── result ───────────────────│
          │     {serverInfo, capabilities} │
          │                                │
          │──── notifications/initialized ►│
          │     (notification, 无响应)     │
          │                                │
        ```
        """
        result = self.request("initialize", {
            "protocolVersion": "2024-11-05",
            "clientInfo": {"name": "mcp-client", "version": "0.1.0"},
            "capabilities": {},
        })
        self.notify("notifications/initialized")
        return result

    def list_tools(self) -> List[Dict[str, Any]]:
        """
        获取 Server 注册的所有 Tool 列表。
        """
        result = self.request("tools/list")
        return list(result.get("tools") or [])

    def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """
        调用指定 Tool。

        Args:
            name: 工具名
            arguments: 工具参数

        Returns:
            Tool 执行结果(MCP content 格式)
        """
        return self.request("tools/call", {"name": name, "arguments": arguments})


# ============================================================================
# Layer 4: Agent — 业务编排层
# ============================================================================
#
# Agent 的职责:
# 1. 管理 MCP Client 的生命周期
# 2. 根据用户意图选择合适的 Tool 调用
# 3. 组合多个 Tool 调用的结果
#
# 在真实场景中,Agent 会接入 LLM 来做意图识别和 Tool 选择。
# 这里用简单的命令行参数模拟。
# ============================================================================


class MCPAgent:
    """
    最小 MCP Agent。

    管理 Client 生命周期,提供用户交互入口。

    在真实系统中,这里会接入 LLM:
    ```
    用户问题 → LLM 分析 → 选择 Tool → 调用 → LLM 总结 → 返回
    ```

    本实现简化为命令行直接指定 Tool。
    """

    def __init__(self) -> None:
        self.client = MCPClient()

    def start(self) -> Dict[str, Any]:
        """
        启动 Agent:拉起 Server 子进程并完成握手。
        """
        script_path = str(Path(__file__).resolve())
        self.client.start(
            command=sys.executable,
            args=[script_path, "--server"],
            cwd=str(Path(__file__).resolve().parent),
        )
        return self.client.initialize()

    def stop(self) -> None:
        """
        停止 Agent。
        """
        self.client.stop()

    def run_interactive(self) -> None:
        """
        交互式运行:展示 Tool 列表,然后执行指定 Tool。
        """
        try:
            # 握手
            init_result = self.start()
            print("=" * 60)
            print("MCP Agent 已启动")
            print(f"Server: {init_result.get('serverInfo', {}).get('name')}")
            print(f"Protocol: {init_result.get('protocolVersion')}")
            print("=" * 60)

            # 列出工具
            tools = self.client.list_tools()
            print(f"\n可用工具 ({len(tools)} 个):")
            for i, tool in enumerate(tools, 1):
                print(f"  {i}. {tool['name']}{tool.get('description', '')}")

            # 交互循环
            print("\n输入格式: <tool_name> <json_args>")
            print("输入 'quit' 退出\n")

            while True:
                try:
                    line = input(">>> ").strip()
                except (EOFError, KeyboardInterrupt):
                    break

                if not line or line.lower() in ("quit", "exit", "q"):
                    break

                parts = line.split(" ", 1)
                tool_name = parts[0]
                tool_args = {}
                if len(parts) > 1:
                    try:
                        tool_args = json.loads(parts[1])
                    except json.JSONDecodeError as e:
                        print(f"JSON 解析出错: {e}")
                        continue

                try:
                    result = self.client.call_tool(tool_name, tool_args)
                    print(json.dumps(result, ensure_ascii=False, indent=2))
                except RuntimeError as e:
                    print(f"调用出错: {e}")

        finally:
            self.stop()
            print("\nAgent 已停止。")


# ============================================================================
# Demo Tools — 用于测试的示例工具
# ============================================================================


def create_demo_server() -> MCPServer:
    """
    创建一个带示例 Tool 的 MCP Server。
    """
    server = MCPServer(name="demo-mcp-server", version="0.1.0")

    # ── Tool 1: 加法 ──
    @server.tool(
        name="add",
        description="计算两个数的和",
        input_schema={
            "type": "object",
            "properties": {
                "a": {"type": "number", "description": "第一个数"},
                "b": {"type": "number", "description": "第二个数"},
            },
            "required": ["a", "b"],
        },
    )
    def add(args: Dict[str, Any]) -> Any:
        return {"result": args["a"] + args["b"]}

    # ── Tool 2: 字符串反转 ──
    @server.tool(
        name="reverse",
        description="反转字符串",
        input_schema={
            "type": "object",
            "properties": {
                "text": {"type": "string", "description": "要反转的文本"},
            },
            "required": ["text"],
        },
    )
    def reverse(args: Dict[str, Any]) -> Any:
        return {"result": args["text"][::-1]}

    # ── Tool 3: 获取当前时间 ──
    @server.tool(
        name="now",
        description="获取当前日期时间",
        input_schema={
            "type": "object",
            "properties": {},
        },
    )
    def now(args: Dict[str, Any]) -> Any:
        from datetime import datetime

        return {"result": datetime.now().isoformat()}

    # ── Tool 4: 计算器 ──
    @server.tool(
        name="calc",
        description="计算数学表达式(支持 +, -, *, /, **, %)",
        input_schema={
            "type": "object",
            "properties": {
                "expression": {"type": "string", "description": "数学表达式,如 '2 + 3 * 4'"},
            },
            "required": ["expression"],
        },
    )
    def calc(args: Dict[str, Any]) -> Any:
        expr = args["expression"]
        # 安全检查:只允许数字和基本运算符
        allowed = set("0123456789.+-*/%(). ")
        if not all(c in allowed for c in expr):
            raise ValueError(f"不安全的表达式: {expr}")
        result = eval(expr)  # noqa: S307 — demo only
        return {"expression": expr, "result": result}

    return server


# ============================================================================
# CLI 入口
# ============================================================================


def main() -> None:
    """
    CLI 入口。

    模式:
    - `--server`                  → 启动 MCP Server(阻塞,等待 stdin 输入)
    - `--call <name> <json>`      → 快速调用某个 Tool
    - 无参数                      → 启动交互式 Agent
    """
    argv = sys.argv[1:]

    # ── 模式 1: 作为 MCP Server 运行 ──
    if "--server" in argv:
        server = create_demo_server()
        server.run()
        return

    # ── 模式 2: 快速调用 ──
    if "--call" in argv:
        idx = argv.index("--call")
        if idx + 2 >= len(argv):
            print("用法: python mcp_framework.py --call <tool_name> '<json_args>'")
            sys.exit(1)

        tool_name = argv[idx + 1]
        tool_args = json.loads(argv[idx + 2])

        agent = MCPAgent()
        try:
            agent.start()
            result = agent.client.call_tool(tool_name, tool_args)
            print(json.dumps(result, ensure_ascii=False, indent=2))
        finally:
            agent.stop()
        return

    # ── 模式 3: 交互式 Agent ──
    agent = MCPAgent()
    agent.run_interactive()


if __name__ == "__main__":
    main()

5.1 交互模式

python mcp_framework.py

输出

============================================================
MCP Agent 已启动
Server: demo-mcp-server
Protocol: 2024-11-05
============================================================

可用工具 (4):
  1. add — 计算两个数的和
  2. reverse — 反转字符串
  3. now — 获取当前日期时间
  4. calc — 计算数学表达式(支持 +, -, *, /, **, %)

输入格式: <tool_name> <json_args>
输入 'quit' 退出

>>> add {"a": 10, "b": 32}
{
  "content": [{"type": "text", "text": "{\"result\": 42}"}],
  "isError": false
}

>>> reverse {"text": "Hello MCP"}
{
  "content": [{"type": "text", "text": "{\"result\": \"PCM olleH\"}"}],
  "isError": false
}

>>> calc {"expression": "2 ** 10"}
{
  "content": [{"type": "text", "text": "{\"expression\": \"2 ** 10\", \"result\": 1024}"}],
  "isError": false
}

>>> quit
Agent 已停止。

5.2 快速调用

python test/mcp_framework.py --call add "{\"a\": 100, \"b\": 200}"

5.3单独启动 Server(调试用)

python test/mcp_framework.py --server
# Server 阻塞等待 stdin,可用于测试管道通信

6.数据流转图

python mcp_framework.py
        │
        ▼
   MCPAgent.start()
        │
        ├── subprocess.Popen("python mcp_framework.py --server")
        │         │
        │         ▼
        │   MCPServer.run()                     子进程
        │   while True:                         (stdin/stdout 连到管道)
        │       read_message(stdin)  ◄──────┐
        │       dispatch(method)            │
        │       write_message(stdout) ──┐   │
        │                               │   │
        │                               │   │
        ├── client.initialize()         │   │
        │     _send({initialize}) ──────┼───┘
        │     _recv() ◄─────────────────┘
        │
        ├── client.list_tools()
        │     _send({tools/list}) ──────────► Server._handle_tools_list()
        │     _recv() ◄──────────────────────
        │
        ├── client.call_tool("add", {a,b})
        │     _send({tools/call}) ──────────► Server._handle_tools_call()
        │     _recv() ◄──────────────────────   → Tool.handler({a,b})
        │                                         → return {result: a+b}
        │
        └── client.stop()
              proc.terminate()
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐