Python手写MCP基础协议
目标读者:没有 MCP 基础,但会一点 Python。
·
手写 MCP 协议入门(分层版 + 源码段 + 单元测试)
对应源码:
test/mcp_framework.py
目标读者:没有 MCP 基础,但会一点 Python。
0. 先看最终分层
mcp_framework.py 把 MCP 拆成 4 层:
- Transport:只管帧协议(
Content-Length) - Server:只管 JSON-RPC 方法分发与 Tool 调用
- Client:只管请求发送、响应接收、握手封装
- Agent:只管业务编排(调用哪些工具)
这个顺序很重要:先把底层协议跑通,再谈 Agent 体验。
1. Layer 1: Transport(协议地基)
它在源码里就是协议地基,只干一件事:
把 JSON 消息按 MCP/LSP 的 Content-Length 帧格式在二进制流里收发
MCP 使用类似 LSP 的帧
Content-Length: <字节数>\r\n
\r\n
<JSON body>
完整代码
# ============================================================================
# Layer 1: Transport — Content-Length 帧协议
# ============================================================================
#
# MCP 使用与 LSP (Language Server Protocol) 相同的帧格式:
#
# Content-Length: <字节数>\r\n
# \r\n
# <JSON body>
#
# 这一层只负责:把 Python dict 变成帧写入管道,从管道读帧还原为 dict。
# ============================================================================
class MCPTransport:
"""
MCP 帧协议传输层。
职责:
- write_message: dict → Content-Length 帧 → 写入管道
- read_message: 管道 → 解析 Content-Length 帧 → dict
帧格式:
```
Content-Length: 42\\r\\n
\\r\\n
{"jsonrpc":"2.0","id":1,"method":"..."}
```
"""
@staticmethod
def write_message(stream: BufferedWriter, payload: Dict[str, Any]) -> None:
"""
将一条 JSON-RPC 消息写入字节流。
流程:
1. payload → json.dumps → UTF-8 bytes (body)
2. 计算 body 字节长度
3. 构造 header: "Content-Length: {长度}\\r\\n\\r\\n"
4. 写入 header + body
5. flush() 立即推送
Args:
stream: 可写的二进制流(如 sys.stdout.buffer 或 proc.stdin)
payload: 待发送的 JSON-RPC 消息字典
"""
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n".encode("utf-8")
stream.write(header)
stream.write(body)
stream.flush()
@staticmethod
def read_message(stream: BufferedReader) -> Optional[Dict[str, Any]]:
"""
从字节流中读取一条 JSON-RPC 消息。
流程:
1. 逐行读取 header,直到遇到空行 (\\r\\n)
2. 从 header 中提取 Content-Length
3. 精确读取 Content-Length 个字节作为 body
4. json.loads → dict
Args:
stream: 可读的二进制流(如 sys.stdin.buffer 或 proc.stdout)
Returns:
解析后的 JSON-RPC 消息字典,管道关闭时返回 None
"""
# ── 读取 Header ──
content_length = 0
while True:
line = stream.readline()
if not line:
# 管道关闭 / EOF
return None
decoded = line.decode("utf-8").strip()
# 空行 = header 结束
if not decoded:
break
# 解析 "Content-Length: 42"
if decoded.lower().startswith("content-length:"):
content_length = int(decoded.split(":", 1)[1].strip())
if content_length <= 0:
return None
# ── 读取 Body ──
body = stream.read(content_length)
if not body:
return None
return json.loads(body.decode("utf-8"))
MCPTransport 的职责就两条:
write_message(stream, payload):dict -> JSON bytes -> header + body -> 写入 -> flush()read_message(stream):读 header(按行) -> 解析 Content-Length -> 精确读 body -> json.loads()
1.1 写帧:write_message
来自 MCPTransport.write_message 的核心源码段:
def write_message(stream: BufferedWriter, payload: Dict[str, Any]) -> None:
"""
将一条 JSON-RPC 消息写入字节流。
流程:
1. payload → json.dumps → UTF-8 bytes (body)
2. 计算 body 字节长度
3. 构造 header: "Content-Length: {长度}\\r\\n\\r\\n"
4. 写入 header + body
5. flush() 立即推送
Args:
stream: 可写的二进制流(如 sys.stdout.buffer 或 proc.stdin)
payload: 待发送的 JSON-RPC 消息字典
"""
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n".encode("utf-8")
stream.write(header)
stream.write(body)
stream.flush()
要点:
Content-Length必须是 字节长度,不是字符数。flush()必须调用,否则对端可能一直收不到数据。
1.2 读帧:read_message
来自 MCPTransport.read_message 的核心源码段:
"""
流程:
1. 逐行读取 header,直到遇到空行 (\\r\\n)
2. 从 header 中提取 Content-Length
3. 精确读取 Content-Length 个字节作为 body
4. json.loads → dict
"""
while True:
line = stream.readline()
if not line:
break
decoded = line.decode("utf-8")
if not decoded:
break
# 解析 "Content-Length: 42"
if decoded.lower().startwith("content-length:"):
content_length = int(decoded.split(":", 1)[1].strip())
if content_length <= 0:
return None
# ── 读取 Body ──
body = stream.read(content_length)
if not body:
return None
return json.loads(body.decode("utf-8"))
要点:
- 先读 header,空行结束。
- 再按长度精确读取 body。
1.3 这一层先测什么
- 写入再读出是否一致(round-trip)。
- 空流是否返回
None。
2. Layer 2: Server(方法分发 + Tool 注册)
这层的目标是:让“客户端发来的 JSON-RPC 请求”能被正确分发到 handler,并且能注册/调用工具
完整代码
# ============================================================================
# Layer 2: Server — JSON-RPC 请求处理 + Tool 注册
# ============================================================================
#
# MCP Server 的职责:
# 1. 监听 stdin,循环读取 JSON-RPC 请求
# 2. 根据 method 字段分发处理:
# - "initialize" → 返回服务器能力声明
# - "tools/list" → 返回所有注册的 Tool 列表
# - "tools/call" → 执行具体 Tool 并返回结果
# 3. 将响应写入 stdout
# ============================================================================
class Tool:
"""
一个 MCP Tool 的描述。
Attributes:
name: 工具名称(唯一标识)
description: 工具描述(给 LLM 看的)
input_schema: JSON Schema 描述输入参数
handler: 实际执行函数 (arguments: dict) → Any
"""
def __init__(
self,
name: str,
description: str,
input_schema: Dict[str, Any],
handler: Callable[[Dict[str, Any]], Any]
) -> None:
self.name = name
self.description = description
self.input_schema = input_schema
self.handler = handler
def to_schema(self) -> Dict[str, Any]:
"""
转换为 MCP tools/list 响应格式。
"""
return {
"name": self.name,
"description": self.description,
"inputSchema": self.input_schema,
}
class MCPServer:
"""
最小 MCP Server(stdio 传输)。
职责:
- 注册 Tool
- 监听 stdin 上的 JSON-RPC 请求
- 分发并执行,将结果写回 stdout
MCP 协议方法处理:
```
initialize → 返回 serverInfo + capabilities
notifications/initialized → 忽略(notification 无需响应)
tools/list → 返回已注册 Tool 的 schema 列表
tools/call → 执行 Tool handler,返回结果
```
"""
# MCP 协议版本
PROTOCOL_VERSION = "2024-11-05"
def __init__(self, name: str = "mcp-server", version: str = "0.1.0") -> None:
self.name = name
self.version = version
self._tools: Dict[str, Tool] = {}
def register_tool(self, tool: Tool) -> None:
"""
注册一个 Tool。
Args:
tool: Tool 实例
"""
self._tools[tool.name] = tool
def tool(
self,
name: str,
description: str,
input_schema: Dict[str, Any],
) -> Callable:
"""
装饰器方式注册 Tool。
用法:
```python
@server.tool("add", "两数相加", {...})
def add(args):
return args["a"] + args["b"]
```
"""
def decorator(func: Callable[[Dict[str, Any]], Any]) -> Callable:
self.register_tool(Tool(name, description, input_schema, func))
return func
return decorator
def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
处理 initialize 请求。
返回:
- protocolVersion: 支持的协议版本
- serverInfo: 服务器名称和版本
- capabilities: 声明支持 tools
"""
return {
"protocolVersion": self.PROTOCOL_VERSION,
"serverInfo": {
"name": self.name,
"version": self.version,
},
"capabilities": {
"tools": {"listChanged": False},
},
}
def _handle_tools_list(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
处理 tools/list 请求。
返回所有已注册 Tool 的 schema。
"""
return {
"tools": [tool.to_schema() for tool in self._tools.values()],
}
def _handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
处理 tools/call 请求。
1. 从 params 中取出 tool name 和 arguments
2. 查找对应的 Tool handler
3. 执行 handler,封装结果
返回格式:
```json
{
"content": [{"type": "text", "text": "..."}],
"isError": false
}
```
"""
tool_name = params.get("name", "")
arguments = params.get("arguments", {})
tool = self._tools.get(tool_name)
if tool is None:
raise ValueError(f"未知工具: {tool_name}")
result = tool.handler(arguments)
# 统一封装为 MCP content 格式
if isinstance(result, str):
text = result
else:
text = json.dumps(result, ensure_ascii=False, indent=2)
return {
"content": [{"type": "text", "text": text}],
"isError": False,
}
def _dispatch(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""
根据 method 分发到对应处理函数。
Args:
method: JSON-RPC method 字段
params: JSON-RPC params 字段
Returns:
result 字典
Raises:
ValueError: 未知 method
"""
dispatch_table: Dict[str, Callable] = {
"initialize": self._handle_initialize,
"tools/list": self._handle_tools_list,
"tools/call": self._handle_tools_call,
}
handler = dispatch_table.get(method)
if handler is None:
raise ValueError(f"不支持的方法: {method}")
return handler(params)
def run(self) -> None:
"""
启动 Server 主循环。
流程:
```
while True:
1. 从 stdin 读取一条 JSON-RPC 消息
2. 如果是 notification(无 id)→ 跳过
3. 分发到对应 handler
4. 构造 JSON-RPC response
5. 写入 stdout
```
"""
stdin = sys.stdin.buffer
stdout = sys.stdout.buffer
while True:
# 读取请求
request = MCPTransport.read_message(stdin)
if request is None:
break # EOF → 管道关闭,退出
req_id = request.get("id")
method = request.get("method", "")
params = request.get("params") or {}
# Notification(无 id )不需要响应
if req_id is None:
continue
# 处理请求
try:
result = self._dispatch(method, params)
response = {
"jsonrpc": "2.0",
"id": req_id,
"result": result,
}
except Exception as exc:
response = {
"jsonrpc": "2.0",
"id": req_id,
"error": {
"code": -32000,
"message": str(exc),
},
}
# 写回相应
MCPTransport.write_message(stdout, response)
2.1 Tool 抽象
Tool 类最核心字段:
class Tool:
def __init__(self, name, description, input_schema, handler):
self.name = name
self.description = description
self.input_schema = input_schema
self.handler = handler
Tool 的职责很纯粹:描述自己 + 执行自己。
2.2 Server 的方法分发
MCPServer._dispatch 核心段:
dispatch_table = {
"initialize": self._handle_initialize,
"tools/list": self._handle_tools_list,
"tools/call": self._handle_tools_call,
}
handler = dispatch_table.get(method)
if handler is None:
raise ValueError(f"不支持的方法: {method}")
return handler(params)
2.3 Tool 调用路径
_handle_tools_call 核心段:
tool_name = params.get("name", "")
arguments = params.get("arguments", {})
tool = self._tools.get(tool_name)
if tool is None:
raise ValueError(f"未知工具: {tool_name}")
result = tool.handler(arguments)
2.4 这一层先测什么
initialize返回结构是否完整。tools/list是否能列出注册工具。tools/call正常调用与未知工具异常。_dispatch未知方法异常。
3. Layer 3: Client(请求响应与握手)
第三层把“协议对话”抽象成函数调用(client side 的 SDK)
完整代码
# ============================================================================
# Layer 3: Client — JSON-RPC 请求发送 + 响应接收
# ============================================================================
#
# MCP Client 的职责:
# 1. 通过 subprocess 启动 Server 子进程
# 2. 发送 JSON-RPC 请求到子进程 stdin
# 3. 从子进程 stdout 读取响应
# 4. 提供便捷方法:initialize / list_tools / call_tool
# ============================================================================
class MCPClient:
"""
最小 MCP Client(stdio 传输)。
通过 subprocess.Popen 启动 Server 子进程,
用管道进行 JSON-RPC 通信。
生命周期:
```
client = MCPClient()
client.start("python", ["server.py", "--server"])
client.initialize()
tools = client.list_tools()
result = client.call_tool("add", {"a": 1, "b": 2})
client.stop()
```
"""
def __init__(self) -> None:
self._next_id: int = 1
self._proc: Optional[subprocess.Popen] = None
@property
def is_running(self) -> bool:
return self._proc is not None and self._proc.poll() is None
def start(self, command: str, args: List[str], cwd: Optional[str] = None):
"""
启动 Server 子进程。
发生了什么:
```
操作系统层面:
1. 创建 3 对管道 (stdin/stdout/stderr)
2. fork 子进程
3. 子进程的 stdin/stdout/stderr 连接到管道
4. 子进程执行 command + args
Python 层面:
self._proc.stdin → 管道写入端(父进程写,子进程从stdin读)
self._proc.stdout → 管道读取端(子进程写stdout,父进程读)
```
Args:
command: 可执行文件路径(如 "python" 或 sys.executable)
args: 命令行参数列表
cwd: 工作目录
"""
self._proc = subprocess.Popen(
[command] + args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=cwd,
)
def stop(self) -> None:
"""
停止 Server 子进程。
"""
if self._proc and self._proc.poll() is None:
self._proc.terminate()
try:
self._proc.wait(timeout = 5)
except subprocess.TimeoutExpired:
self._proc.kill()
self._proc = None
def _send(self, payload: Dict[str, Any]) -> None:
"""
发送一条 JSON-RPC 消息到 Server 的 stdin。
"""
if not self._proc or not self._proc.stdin:
raise RuntimeError("Server 未启动")
MCPTransport.write_message(self._proc.stdin, payload)
def _recv(self) -> Dict[str, Any]:
"""
从 Server 的 stdout 读取一条 JSON-RPC 响应。
"""
if not self._proc or not self._proc.stdout:
raise RuntimeError("Server 未启动")
msg = MCPTransport.read_message(self._proc.stdout)
if msg is None:
raise RuntimeError("Server 无响应(管道已关闭)")
return msg
def request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
发送 JSON-RPC request 并等待 response。
流程:
```
1. 生成自增 id
2. 构造 {"jsonrpc":"2.0", "id": N, "method":"...", "params":{...}}
3. _send() 写入管道
4. _recv() 从管道读取
5. 匹配 id,返回 result
```
Args:
method: RPC 方法名
params: 参数字典
Returns:
response 中的 result 字段
"""
req_id = self._next_id
self._next_id += 1
request = {
"jsonrpc": "2.0",
"id": req_id,
"method": method,
"params": params or {},
}
self._send(request)
# 循环读取,跳过不匹配的消息(如 notification)
while True:
response = self._recv()
if response.get("id") == req_id:
if "error: in response:
err = response["error"]
raise RuntimeError(f"RPC Error [{err.get('code')}]: {err.get('message')}")
return response.get("result", {})
def notify(self, method: str, params: Optional[Dict[str, Any]] = None) -> None:
"""
发送 JSON-RPC notification(无 id,Server 不回复)。
Args:
method: 通知方法名
params: 参数字典
"""
notification = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
}
self._send(notification)
# ── 便捷方法 ──
def initialize(self) -> Dict[str, Any]:
"""
执行 MCP 握手:initialize + notifications/initialized。
MCP 握手流程:
```
Client Server
│ │
│──── initialize ───────────────►│
│ {protocolVersion, ...} │
│ │
│◄─── result ───────────────────│
│ {serverInfo, capabilities} │
│ │
│──── notifications/initialized ►│
│ (notification, 无响应) │
│ │
```
"""
result = self.request("initialize", {
"protocolVersion": "2024-11-05",
"clientInfo": {"name": "mcp-client", "version": "0.1.0"},
"capabilities": {},
})
self.notify("notifications/initialized")
return result
def list_tools(self) -> List[Dict[str, Any]]:
"""
获取 Server 注册的所有 Tool 列表。
"""
result = self.request("tools/list")
return list(result.get("tools") or [])
def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
调用指定 Tool。
Args:
name: 工具名
arguments: 工具参数
Returns:
Tool 执行结果(MCP content 格式)
"""
return self.request("tools/call", {"name": name, "arguments": arguments})
3.1 发请求:request
核心段:
req_id = self._next_id
self._next_id += 1
request = {"jsonrpc": "2.0", "id": req_id, "method": method, "params": params or {}}
self._send(request)
while True:
response = self._recv()
if response.get("id") == req_id:
if "error" in response:
raise RuntimeError(...)
return response.get("result", {})
要点:
id自增并匹配响应。- 可跳过不匹配消息(如别的通知)。
3.2 MCP 握手:initialize
核心段:
result = self.request("initialize", {...})
self.notify("notifications/initialized")
return result
3.3 这一层先测什么
request是否正确处理error。initialize是否先request再notify。list_tools/call_tool的封装是否正确转发。
4. Layer 4: Agent(业务编排)
有了 tools/list 和 tools/call 还不够——我要让 LLM 知道何时调用、怎么调用、拿到结果后怎么继续推理。
完整代码
# ============================================================================
# Layer 4: Agent — 业务编排层
# ============================================================================
#
# Agent 的职责:
# 1. 管理 MCP Client 的生命周期
# 2. 根据用户意图选择合适的 Tool 调用
# 3. 组合多个 Tool 调用的结果
#
# 在真实场景中,Agent 会接入 LLM 来做意图识别和 Tool 选择。
# 这里用简单的命令行参数模拟。
# ============================================================================
class MCPAgent:
"""
最小 MCP Agent。
管理 Client 生命周期,提供用户交互入口。
在真实系统中,这里会接入 LLM:
```
用户问题 → LLM 分析 → 选择 Tool → 调用 → LLM 总结 → 返回
```
本实现简化为命令行直接指定 Tool。
"""
def __init__(self) -> None:
self.client = MCPClient()
def start(self) -> Dict[str, Any]:
"""
启动 Agent:拉起 Server 子进程并完成握手。
"""
script_path = str(Path(__file__).resolve())
self.client.start(
command=sys.executable,
args=[script_path, "--server"],
cwd=str(Path(__file__).resolve().parent),
)
return self.client.initialize()
def stop(self) -> None:
"""
停止 Agent。
"""
self.client.stop()
def run_interactive(self) -> None:
"""
交互式运行:展示 Tool 列表,然后执行指定 Tool。
"""
try:
# 握手
init_result = self.start()
print("=" * 60)
print("MCP Agent 已启动")
print(f"Server: {init_result.get('serverInfo', {}).get('name')}")
print(f"Protocol: {init_result.get('protocolVersion')}")
print("=" * 60)
# 列出工具
tools = self.client.list_tools()
print(f"\n可用工具 ({len(tools)} 个):")
for i, tool in enumerate(tools, 1):
print(f" {i}. {tool['name']} — {tool.get('description', '')}")
# 交互循环
print("\n输入格式: <tool_name> <json_args>")
print("输入 'quit' 退出\n")
while True:
try:
line = input(">>> ").strip()
except (EOFError, KeyboardInterrupt):
break
if not line or line.lower() in ("quit", "exit", "q"):
break
parts = line.split(" ", 1)
tool_name = parts[0]
tool_args = {}
if len(parts) > 1:
try:
tool_args = json.loads(parts[1])
except json.JSONDecodeError as e:
print(f"JSON 解析出错: {e}")
continue
try:
result = self.client.call_tool(tool_name, tool_args)
print(json.dumps(result, ensure_ascii=False, indent=2))
except RuntimeError as e:
print(f"调用出错: {e}")
finally:
self.stop()
print("\nAgent 已停止。")
4.1 启动流程
MCPAgent.start 核心段:
script_path = str(Path(__file__).resolve())
self.client.start(
command=sys.executable,
args=[script_path, "--server"],
cwd=str(Path(__file__).resolve().parent),
)
return self.client.initialize()
要点:
- Agent 不直接写帧。
- Agent 只调用 Client 的能力,保持边界清晰。
4.2 这一层先测什么
start()是否调用client.start + client.initialize。stop()是否调用client.stop。
5. 完整测试
当前路径下新建文件 mcp_framework.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
MCP 协议基础架构 — 从零手写
从最底层的帧协议到最上层的 Agent,完整实现 MCP stdio 通信。
不依赖任何第三方 MCP SDK,纯 Python 标准库。
架构分层:
Layer 1: Transport — Content-Length 帧协议读写
Layer 2: Server — JSON-RPC 请求分发 + Tool 注册
Layer 3: Client — JSON-RPC 请求发送 + 响应接收
Layer 4: Agent — 业务逻辑编排(决定调用哪个 Tool)
用法:
# 1. 启动 Server(通常由 Client 自动拉起子进程)
python mcp_framework.py --server
# 2. 运行 Agent Demo
python mcp_framework.py
# 3. 单独测试某个 Tool
python mcp_framework.py --call add '{"a": 1, "b": 2}'
"""
from __future__ import annotations
import json
import subprocess
import sys
from abc import ABC, abstractmethod
from io import BufferedReader, BufferedWriter
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
# ============================================================================
# Layer 1: Transport — Content-Length 帧协议
# ============================================================================
#
# MCP 使用与 LSP (Language Server Protocol) 相同的帧格式:
#
# Content-Length: <字节数>\r\n
# \r\n
# <JSON body>
#
# 这一层只负责:把 Python dict 变成帧写入管道,从管道读帧还原为 dict。
# ============================================================================
class MCPTransport:
"""
MCP 帧协议传输层。
职责:
- write_message: dict → Content-Length 帧 → 写入管道
- read_message: 管道 → 解析 Content-Length 帧 → dict
帧格式:
```
Content-Length: 42\\r\\n
\\r\\n
{"jsonrpc":"2.0","id":1,"method":"..."}
```
"""
@staticmethod
def write_message(stream: BufferedWriter, payload: Dict[str, Any]) -> None:
"""
将一条 JSON-RPC 消息写入字节流。
流程:
1. payload → json.dumps → UTF-8 bytes (body)
2. 计算 body 字节长度
3. 构造 header: "Content-Length: {长度}\\r\\n\\r\\n"
4. 写入 header + body
5. flush() 立即推送
Args:
stream: 可写的二进制流(如 sys.stdout.buffer 或 proc.stdin)
payload: 待发送的 JSON-RPC 消息字典
"""
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n".encode("utf-8")
stream.write(header)
stream.write(body)
stream.flush()
@staticmethod
def read_message(stream: BufferedReader) -> Optional[Dict[str, Any]]:
"""
从字节流中读取一条 JSON-RPC 消息。
流程:
1. 逐行读取 header,直到遇到空行 (\\r\\n)
2. 从 header 中提取 Content-Length
3. 精确读取 Content-Length 个字节作为 body
4. json.loads → dict
Args:
stream: 可读的二进制流(如 sys.stdin.buffer 或 proc.stdout)
Returns:
解析后的 JSON-RPC 消息字典,管道关闭时返回 None
"""
# ── 读取 Header ──
content_length = 0
while True:
line = stream.readline()
if not line:
# 管道关闭 / EOF
return None
decoded = line.decode("utf-8").strip()
# 空行 = header 结束
if not decoded:
break
# 解析 "Content-Length: 42"
if decoded.lower().startswith("content-length:"):
content_length = int(decoded.split(":", 1)[1].strip())
if content_length <= 0:
return None
# ── 读取 Body ──
body = stream.read(content_length)
if not body:
return None
return json.loads(body.decode("utf-8"))
# ============================================================================
# Layer 2: Server — JSON-RPC 请求处理 + Tool 注册
# ============================================================================
#
# MCP Server 的职责:
# 1. 监听 stdin,循环读取 JSON-RPC 请求
# 2. 根据 method 字段分发处理:
# - "initialize" → 返回服务器能力声明
# - "tools/list" → 返回所有注册的 Tool 列表
# - "tools/call" → 执行具体 Tool 并返回结果
# 3. 将响应写入 stdout
# ============================================================================
class Tool:
"""
一个 MCP Tool 的描述。
Attributes:
name: 工具名称(唯一标识)
description: 工具描述(给 LLM 看的)
input_schema: JSON Schema 描述输入参数
handler: 实际执行函数 (arguments: dict) → Any
"""
def __init__(
self,
name: str,
description: str,
input_schema: Dict[str, Any],
handler: Callable[[Dict[str, Any]], Any],
) -> None:
self.name = name
self.description = description
self.input_schema = input_schema
self.handler = handler
def to_schema(self) -> Dict[str, Any]:
"""
转换为 MCP tools/list 响应格式。
"""
return {
"name": self.name,
"description": self.description,
"inputSchema": self.input_schema,
}
class MCPServer:
"""
最小 MCP Server(stdio 传输)。
职责:
- 注册 Tool
- 监听 stdin 上的 JSON-RPC 请求
- 分发并执行,将结果写回 stdout
MCP 协议方法处理:
```
initialize → 返回 serverInfo + capabilities
notifications/initialized → 忽略(notification 无需响应)
tools/list → 返回已注册 Tool 的 schema 列表
tools/call → 执行 Tool handler,返回结果
```
"""
# MCP 协议版本
PROTOCOL_VERSION = "2024-11-05"
def __init__(self, name: str = "mcp-server", version: str = "0.1.0") -> None:
self.name = name
self.version = version
self._tools: Dict[str, Tool] = {}
def register_tool(self, tool: Tool) -> None:
"""
注册一个 Tool。
Args:
tool: Tool 实例
"""
self._tools[tool.name] = tool
def tool(
self,
name: str,
description: str,
input_schema: Dict[str, Any],
) -> Callable:
"""
装饰器方式注册 Tool。
用法:
```python
@server.tool("add", "两数相加", {...})
def add(args):
return args["a"] + args["b"]
```
"""
def decorator(func: Callable[[Dict[str, Any]], Any]) -> Callable:
self.register_tool(Tool(name, description, input_schema, func))
return func
return decorator
def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
处理 initialize 请求。
返回:
- protocolVersion: 支持的协议版本
- serverInfo: 服务器名称和版本
- capabilities: 声明支持 tools
"""
return {
"protocolVersion": self.PROTOCOL_VERSION,
"serverInfo": {
"name": self.name,
"version": self.version,
},
"capabilities": {
"tools": {"listChanged": False},
},
}
def _handle_tools_list(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
处理 tools/list 请求。
返回所有已注册 Tool 的 schema。
"""
return {
"tools": [tool.to_schema() for tool in self._tools.values()],
}
def _handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
处理 tools/call 请求。
1. 从 params 中取出 tool name 和 arguments
2. 查找对应的 Tool handler
3. 执行 handler,封装结果
返回格式:
```json
{
"content": [{"type": "text", "text": "..."}],
"isError": false
}
```
"""
tool_name = params.get("name", "")
arguments = params.get("arguments", {})
tool = self._tools.get(tool_name)
if tool is None:
raise ValueError(f"未知工具: {tool_name}")
result = tool.handler(arguments)
# 统一封装为 MCP content 格式
if isinstance(result, str):
text = result
else:
text = json.dumps(result, ensure_ascii=False, indent=2)
return {
"content": [{"type": "text", "text": text}],
"isError": False,
}
def _dispatch(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""
根据 method 分发到对应处理函数。
Args:
method: JSON-RPC method 字段
params: JSON-RPC params 字段
Returns:
result 字典
Raises:
ValueError: 未知 method
"""
dispatch_table: Dict[str, Callable] = {
"initialize": self._handle_initialize,
"tools/list": self._handle_tools_list,
"tools/call": self._handle_tools_call,
}
handler = dispatch_table.get(method)
if handler is None:
raise ValueError(f"不支持的方法: {method}")
return handler(params)
def run(self) -> None:
"""
启动 Server 主循环。
流程:
```
while True:
1. 从 stdin 读取一条 JSON-RPC 消息
2. 如果是 notification(无 id)→ 跳过
3. 分发到对应 handler
4. 构造 JSON-RPC response
5. 写入 stdout
```
"""
stdin = sys.stdin.buffer
stdout = sys.stdout.buffer
while True:
# 读取请求
request = MCPTransport.read_message(stdin)
if request is None:
break # EOF → 管道关闭,退出
req_id = request.get("id")
method = request.get("method", "")
params = request.get("params") or {}
# Notification(无 id )不需要响应
if req_id is None:
continue
# 处理请求
try:
result = self._dispatch(method, params)
response = {
"jsonrpc": "2.0",
"id": req_id,
"result": result,
}
except Exception as exc:
response = {
"jsonrpc": "2.0",
"id": req_id,
"error": {
"code": -32000,
"message": str(exc),
},
}
# 写回响应
MCPTransport.write_message(stdout, response)
# ============================================================================
# Layer 3: Client — JSON-RPC 请求发送 + 响应接收
# ============================================================================
#
# MCP Client 的职责:
# 1. 通过 subprocess 启动 Server 子进程
# 2. 发送 JSON-RPC 请求到子进程 stdin
# 3. 从子进程 stdout 读取响应
# 4. 提供便捷方法:initialize / list_tools / call_tool
# ============================================================================
class MCPClient:
"""
最小 MCP Client(stdio 传输)。
通过 subprocess.Popen 启动 Server 子进程,
用管道进行 JSON-RPC 通信。
生命周期:
```
client = MCPClient()
client.start("python", ["server.py", "--server"])
client.initialize()
tools = client.list_tools()
result = client.call_tool("add", {"a": 1, "b": 2})
client.stop()
```
"""
def __init__(self) -> None:
self._next_id: int = 1
self._proc: Optional[subprocess.Popen] = None
@property
def is_running(self) -> bool:
return self._proc is not None and self._proc.poll() is None
def start(self, command: str, args: List[str], cwd: Optional[str] = None) -> None:
"""
启动 Server 子进程。
发生了什么:
```
操作系统层面:
1. 创建 3 对管道 (stdin/stdout/stderr)
2. fork 子进程
3. 子进程的 stdin/stdout/stderr 连接到管道
4. 子进程执行 command + args
Python 层面:
self._proc.stdin → 管道写入端(父进程写,子进程从stdin读)
self._proc.stdout → 管道读取端(子进程写stdout,父进程读)
```
Args:
command: 可执行文件路径(如 "python" 或 sys.executable)
args: 命令行参数列表
cwd: 工作目录
"""
self._proc = subprocess.Popen(
[command] + args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=cwd,
)
def stop(self) -> None:
"""
停止 Server 子进程。
"""
if self._proc and self._proc.poll() is None:
self._proc.terminate()
try:
self._proc.wait(timeout=5)
except subprocess.TimeoutExpired:
self._proc.kill()
self._proc = None
def _send(self, payload: Dict[str, Any]) -> None:
"""
发送一条 JSON-RPC 消息到 Server 的 stdin。
"""
if not self._proc or not self._proc.stdin:
raise RuntimeError("Server 未启动")
MCPTransport.write_message(self._proc.stdin, payload)
def _recv(self) -> Dict[str, Any]:
"""
从 Server 的 stdout 读取一条 JSON-RPC 响应。
"""
if not self._proc or not self._proc.stdout:
raise RuntimeError("Server 未启动")
msg = MCPTransport.read_message(self._proc.stdout)
if msg is None:
raise RuntimeError("Server 无响应(管道已关闭)")
return msg
def request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
发送 JSON-RPC request 并等待 response。
流程:
```
1. 生成自增 id
2. 构造 {"jsonrpc":"2.0", "id": N, "method":"...", "params":{...}}
3. _send() 写入管道
4. _recv() 从管道读取
5. 匹配 id,返回 result
```
Args:
method: RPC 方法名
params: 参数字典
Returns:
response 中的 result 字段
"""
req_id = self._next_id
self._next_id += 1
request = {
"jsonrpc": "2.0",
"id": req_id,
"method": method,
"params": params or {},
}
self._send(request)
# 循环读取,跳过不匹配的消息(如 notification)
while True:
response = self._recv()
if response.get("id") == req_id:
if "error" in response:
err = response["error"]
raise RuntimeError(f"RPC Error [{err.get('code')}]: {err.get('message')}")
return response.get("result", {})
def notify(self, method: str, params: Optional[Dict[str, Any]] = None) -> None:
"""
发送 JSON-RPC notification(无 id,Server 不回复)。
Args:
method: 通知方法名
params: 参数字典
"""
notification = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
}
self._send(notification)
# ── 便捷方法 ──
def initialize(self) -> Dict[str, Any]:
"""
执行 MCP 握手:initialize + notifications/initialized。
MCP 握手流程:
```
Client Server
│ │
│──── initialize ───────────────►│
│ {protocolVersion, ...} │
│ │
│◄─── result ───────────────────│
│ {serverInfo, capabilities} │
│ │
│──── notifications/initialized ►│
│ (notification, 无响应) │
│ │
```
"""
result = self.request("initialize", {
"protocolVersion": "2024-11-05",
"clientInfo": {"name": "mcp-client", "version": "0.1.0"},
"capabilities": {},
})
self.notify("notifications/initialized")
return result
def list_tools(self) -> List[Dict[str, Any]]:
"""
获取 Server 注册的所有 Tool 列表。
"""
result = self.request("tools/list")
return list(result.get("tools") or [])
def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
调用指定 Tool。
Args:
name: 工具名
arguments: 工具参数
Returns:
Tool 执行结果(MCP content 格式)
"""
return self.request("tools/call", {"name": name, "arguments": arguments})
# ============================================================================
# Layer 4: Agent — 业务编排层
# ============================================================================
#
# Agent 的职责:
# 1. 管理 MCP Client 的生命周期
# 2. 根据用户意图选择合适的 Tool 调用
# 3. 组合多个 Tool 调用的结果
#
# 在真实场景中,Agent 会接入 LLM 来做意图识别和 Tool 选择。
# 这里用简单的命令行参数模拟。
# ============================================================================
class MCPAgent:
"""
最小 MCP Agent。
管理 Client 生命周期,提供用户交互入口。
在真实系统中,这里会接入 LLM:
```
用户问题 → LLM 分析 → 选择 Tool → 调用 → LLM 总结 → 返回
```
本实现简化为命令行直接指定 Tool。
"""
def __init__(self) -> None:
self.client = MCPClient()
def start(self) -> Dict[str, Any]:
"""
启动 Agent:拉起 Server 子进程并完成握手。
"""
script_path = str(Path(__file__).resolve())
self.client.start(
command=sys.executable,
args=[script_path, "--server"],
cwd=str(Path(__file__).resolve().parent),
)
return self.client.initialize()
def stop(self) -> None:
"""
停止 Agent。
"""
self.client.stop()
def run_interactive(self) -> None:
"""
交互式运行:展示 Tool 列表,然后执行指定 Tool。
"""
try:
# 握手
init_result = self.start()
print("=" * 60)
print("MCP Agent 已启动")
print(f"Server: {init_result.get('serverInfo', {}).get('name')}")
print(f"Protocol: {init_result.get('protocolVersion')}")
print("=" * 60)
# 列出工具
tools = self.client.list_tools()
print(f"\n可用工具 ({len(tools)} 个):")
for i, tool in enumerate(tools, 1):
print(f" {i}. {tool['name']} — {tool.get('description', '')}")
# 交互循环
print("\n输入格式: <tool_name> <json_args>")
print("输入 'quit' 退出\n")
while True:
try:
line = input(">>> ").strip()
except (EOFError, KeyboardInterrupt):
break
if not line or line.lower() in ("quit", "exit", "q"):
break
parts = line.split(" ", 1)
tool_name = parts[0]
tool_args = {}
if len(parts) > 1:
try:
tool_args = json.loads(parts[1])
except json.JSONDecodeError as e:
print(f"JSON 解析出错: {e}")
continue
try:
result = self.client.call_tool(tool_name, tool_args)
print(json.dumps(result, ensure_ascii=False, indent=2))
except RuntimeError as e:
print(f"调用出错: {e}")
finally:
self.stop()
print("\nAgent 已停止。")
# ============================================================================
# Demo Tools — 用于测试的示例工具
# ============================================================================
def create_demo_server() -> MCPServer:
"""
创建一个带示例 Tool 的 MCP Server。
"""
server = MCPServer(name="demo-mcp-server", version="0.1.0")
# ── Tool 1: 加法 ──
@server.tool(
name="add",
description="计算两个数的和",
input_schema={
"type": "object",
"properties": {
"a": {"type": "number", "description": "第一个数"},
"b": {"type": "number", "description": "第二个数"},
},
"required": ["a", "b"],
},
)
def add(args: Dict[str, Any]) -> Any:
return {"result": args["a"] + args["b"]}
# ── Tool 2: 字符串反转 ──
@server.tool(
name="reverse",
description="反转字符串",
input_schema={
"type": "object",
"properties": {
"text": {"type": "string", "description": "要反转的文本"},
},
"required": ["text"],
},
)
def reverse(args: Dict[str, Any]) -> Any:
return {"result": args["text"][::-1]}
# ── Tool 3: 获取当前时间 ──
@server.tool(
name="now",
description="获取当前日期时间",
input_schema={
"type": "object",
"properties": {},
},
)
def now(args: Dict[str, Any]) -> Any:
from datetime import datetime
return {"result": datetime.now().isoformat()}
# ── Tool 4: 计算器 ──
@server.tool(
name="calc",
description="计算数学表达式(支持 +, -, *, /, **, %)",
input_schema={
"type": "object",
"properties": {
"expression": {"type": "string", "description": "数学表达式,如 '2 + 3 * 4'"},
},
"required": ["expression"],
},
)
def calc(args: Dict[str, Any]) -> Any:
expr = args["expression"]
# 安全检查:只允许数字和基本运算符
allowed = set("0123456789.+-*/%(). ")
if not all(c in allowed for c in expr):
raise ValueError(f"不安全的表达式: {expr}")
result = eval(expr) # noqa: S307 — demo only
return {"expression": expr, "result": result}
return server
# ============================================================================
# CLI 入口
# ============================================================================
def main() -> None:
"""
CLI 入口。
模式:
- `--server` → 启动 MCP Server(阻塞,等待 stdin 输入)
- `--call <name> <json>` → 快速调用某个 Tool
- 无参数 → 启动交互式 Agent
"""
argv = sys.argv[1:]
# ── 模式 1: 作为 MCP Server 运行 ──
if "--server" in argv:
server = create_demo_server()
server.run()
return
# ── 模式 2: 快速调用 ──
if "--call" in argv:
idx = argv.index("--call")
if idx + 2 >= len(argv):
print("用法: python mcp_framework.py --call <tool_name> '<json_args>'")
sys.exit(1)
tool_name = argv[idx + 1]
tool_args = json.loads(argv[idx + 2])
agent = MCPAgent()
try:
agent.start()
result = agent.client.call_tool(tool_name, tool_args)
print(json.dumps(result, ensure_ascii=False, indent=2))
finally:
agent.stop()
return
# ── 模式 3: 交互式 Agent ──
agent = MCPAgent()
agent.run_interactive()
if __name__ == "__main__":
main()
5.1 交互模式
python mcp_framework.py
输出
============================================================
MCP Agent 已启动
Server: demo-mcp-server
Protocol: 2024-11-05
============================================================
可用工具 (4 个):
1. add — 计算两个数的和
2. reverse — 反转字符串
3. now — 获取当前日期时间
4. calc — 计算数学表达式(支持 +, -, *, /, **, %)
输入格式: <tool_name> <json_args>
输入 'quit' 退出
>>> add {"a": 10, "b": 32}
{
"content": [{"type": "text", "text": "{\"result\": 42}"}],
"isError": false
}
>>> reverse {"text": "Hello MCP"}
{
"content": [{"type": "text", "text": "{\"result\": \"PCM olleH\"}"}],
"isError": false
}
>>> calc {"expression": "2 ** 10"}
{
"content": [{"type": "text", "text": "{\"expression\": \"2 ** 10\", \"result\": 1024}"}],
"isError": false
}
>>> quit
Agent 已停止。
5.2 快速调用
python test/mcp_framework.py --call add "{\"a\": 100, \"b\": 200}"
5.3单独启动 Server(调试用)
python test/mcp_framework.py --server
# Server 阻塞等待 stdin,可用于测试管道通信
6.数据流转图
python mcp_framework.py
│
▼
MCPAgent.start()
│
├── subprocess.Popen("python mcp_framework.py --server")
│ │
│ ▼
│ MCPServer.run() 子进程
│ while True: (stdin/stdout 连到管道)
│ read_message(stdin) ◄──────┐
│ dispatch(method) │
│ write_message(stdout) ──┐ │
│ │ │
│ │ │
├── client.initialize() │ │
│ _send({initialize}) ──────┼───┘
│ _recv() ◄─────────────────┘
│
├── client.list_tools()
│ _send({tools/list}) ──────────► Server._handle_tools_list()
│ _recv() ◄──────────────────────
│
├── client.call_tool("add", {a,b})
│ _send({tools/call}) ──────────► Server._handle_tools_call()
│ _recv() ◄────────────────────── → Tool.handler({a,b})
│ → return {result: a+b}
│
└── client.stop()
proc.terminate()
更多推荐

所有评论(0)