使用Python实现MCP协议Streamable HTTP详细教程

目录

  1. 概念引入
  2. 原理讲解
  3. 场景应用
  4. 代码实现
  5. 问题解决
  6. 总结提炼

概念引入

什么是MCP协议?

MCP(Model Context Protocol)是专为模型交互设计的协议,它定义了模型服务器与客户端之间的通信标准。想象一下,MCP就像是不同语言的人之间的"通用翻译器",让各种AI模型和应用能够顺畅地交流。

什么是Streamable HTTP?

Streamable HTTP是MCP协议的一个重要新特性,它基于HTTP协议实现了流式数据传输。我们可以把它比作一个"智能水管系统":

  • 传统HTTP就像是一次性送水的桶装水服务,每次只能送一整桶
  • Streamable HTTP则像是自来水管道,可以持续不断地输送数据,而且可以双向流动

这种设计特别适合需要持续交换上下文或进行多轮对话的大模型应用场景。

原理讲解

Streamable HTTP的核心机制

Streamable HTTP基于以下技术实现:

  1. HTTP/1.1 Chunked Transfer EncodingHTTP/2 Streams:这些技术允许数据分块传输,而不需要等待整个响应完成。

  2. 双向流式处理:支持客户端和服务器同时发送数据,实现真正的双向通信。

  3. Web友好性:基于标准HTTP协议,天然兼容现有的Web基础设施,如反向代理、负载均衡器等。

工作流程

Streamable HTTP MCP的工作流程如下:

客户端请求 → 建立HTTP连接 → 流式数据交换 → 关闭连接

与传统HTTP不同,Streamable HTTP在连接建立后可以持续交换多个请求和响应,而不需要为每个请求建立新连接,大大提高了通信效率。

场景应用

适用场景

Streamable HTTP特别适合以下场景:

  1. 长对话应用:需要保持上下文的多轮对话场景
  2. 实时数据流处理:如实时翻译、代码生成辅助等
  3. 分布式模型服务:多个模型服务需要协同工作
  4. Web集成场景:需要与现有Web基础设施无缝集成

实际价值

  • 提高效率:减少连接建立和关闭的开销
  • 降低延迟:流式传输可以边接收边处理
  • 增强扩展性:更容易与现有Web技术栈集成
  • 改善用户体验:支持实时反馈和交互

代码实现

环境准备

首先,我们需要安装必要的依赖。推荐使用FastMCP框架,它是Python中实现MCP协议的最佳选择。

# 安装FastMCP框架
pip install fastmcp

# 或者使用uv包管理器(推荐)
pip install uv
uv init mcp-streamable-http-demo
cd mcp-streamable-http-demo
uv venv
# Windows
.venv\Scripts\activate
# macOS/Linux
source .venv/bin/activate
uv add fastmcp

基础实现

下面是一个最简单的Streamable HTTP MCP服务器实现:

# server.py
from fastmcp import FastMCP

# 初始化MCP服务器
mcp = FastMCP("Demo")

@mcp.tool()
def add(a: int, b: int) -> int:
    """Add two numbers"""
    return a + b

if __name__ == "__main__":
    mcp.run(transport="streamable-http", host="0.0.0.0", port=8000, path="/mcp")

这段代码只有8行,但已经实现了一个完整的Streamable HTTP MCP服务器。让我们逐行解释:

  1. 导入FastMCP框架
  2. 创建MCP服务器实例,命名为"Demo"
  3. 使用装饰器注册一个工具函数add
  4. 定义工具函数,实现两数相加
  5. 启动服务器,指定传输方式为"streamable-http"

进阶实现:获取公网IP

下面是一个更实用的例子,演示如何获取服务器的公网IP地址:

# advanced_server.py
import json
import requests
from fastmcp import FastMCP

# 初始化MCP服务器
mcp = FastMCP("Network Tools")

@mcp.tool()
def get_public_ip_address() -> str:
    """获取服务器公网IP地址
    
    返回:
        str: 服务器的公网IP地址
    """
    try:
        # 使用多个IP查询服务以提高可靠性
        services = [
            "https://api.ipify.org?format=json",
            "https://httpbin.org/ip",
            "https://ipinfo.io/json"
        ]
        
        for service in services:
            try:
                response = requests.get(service, timeout=5)
                if response.status_code == 200:
                    data = response.json()
                    # 不同服务返回的JSON结构可能不同
                    if "ip" in data:
                        return data["ip"]
                    elif "origin" in data:
                        return data["origin"]
            except:
                continue
        
        return "无法获取公网IP地址"
    except Exception as e:
        return f"获取IP时发生错误: {str(e)}"

@mcp.tool()
def get_ip_details(ip_address: str = None) -> str:
    """获取IP地址的详细信息
    
    参数:
        ip_address (str, optional): 要查询的IP地址,如果不提供则查询当前公网IP
    
    返回:
        str: IP地址的详细信息
    """
    try:
        # 如果没有提供IP地址,先获取当前公网IP
        if not ip_address:
            ip_address = get_public_ip_address()
            if "无法获取" in ip_address or "发生错误" in ip_address:
                return ip_address
        
        # 查询IP详细信息
        response = requests.get(f"https://ipinfo.io/{ip_address}/json", timeout=5)
        if response.status_code == 200:
            data = response.json()
            return json.dumps(data, indent=2, ensure_ascii=False)
        else:
            return f"查询IP {ip_address} 详细信息失败"
    except Exception as e:
        return f"查询IP详细信息时发生错误: {str(e)}"

if __name__ == "__main__":
    mcp.run(transport="streamable-http", host="0.0.0.0", port=8000, path="/mcp")

文生图服务实现

下面是一个更复杂的例子,演示如何实现一个文生图服务:

# image_generation_server.py
from fastmcp import FastMCP
from fastmcp.server.dependencies import get_http_request
import requests
from starlette.requests import Request
import json
import time

# 初始化MCP服务器
mcp = FastMCP("Text to Image Service")

# 模拟的图像生成API地址
# 在实际应用中,这里应该是真实的API地址
IMAGE_SYNTHESIS_URL = "https://api.example.com/image-synthesis"
TASKS_URL = "https://api.example.com/tasks"

# 模拟的任务存储
task_storage = {}

@mcp.tool(
    name="文生图 - 创建任务",
    description="提交文生图请求,返回任务ID(task_id)。\n"
              "参数说明:\n"
              "prompt: 提示词(中英文,≤800字符);\n"
              "size: 图片尺寸(如512*512,默认1:1);\n"
              "style: 图像风格(如写实、卡通、水彩等);\n"
              "quality: 图像质量(如standard、high)"
)
def create_image_task(
    prompt: str, 
    size: str = "512*512", 
    style: str = "写实", 
    quality: str = "standard"
) -> dict:
    """创建图像生成任务
    
    参数:
        prompt (str): 图像描述提示词
        size (str): 图像尺寸,默认为"512*512"
        style (str): 图像风格,默认为"写实"
        quality (str): 图像质量,默认为"standard"
    
    返回:
        dict: 包含任务ID的响应
    """
    # 生成唯一任务ID
    task_id = f"img_{int(time.time())}_{hash(prompt) % 10000}"
    
    # 存储任务信息
    task_storage[task_id] = {
        "status": "processing",
        "prompt": prompt,
        "size": size,
        "style": style,
        "quality": quality,
        "created_at": time.time(),
        "result_url": None
    }
    
    # 在实际应用中,这里应该调用真实的图像生成API
    # 这里我们模拟一个异步处理过程
    # 实际项目中可能需要使用异步任务队列如Celery
    
    return {
        "task_id": task_id,
        "status": "submitted",
        "message": "图像生成任务已提交,请使用task_id查询结果"
    }

@mcp.tool(
    name="文生图 - 查询结果",
    description="根据task_id查询图像生成状态与结果\n"
              "参数说明:\n"
              "task_id: 图像生成任务的唯一标识符"
)
def get_image_result(task_id: str) -> dict:
    """查询图像生成结果
    
    参数:
        task_id (str): 图像生成任务的ID
    
    返回:
        dict: 任务状态和结果
    """
    # 检查任务是否存在
    if task_id not in task_storage:
        return {
            "error": f"任务ID {task_id} 不存在"
        }
    
    task = task_storage[task_id]
    
    # 模拟处理过程 - 在实际应用中,这里应该查询真实API
    current_time = time.time()
    elapsed_time = current_time - task["created_at"]
    
    # 模拟处理需要5秒
    if elapsed_time > 5 and task["status"] == "processing":
        # 模拟任务完成
        task["status"] = "completed"
        task["result_url"] = f"https://example.com/images/{task_id}.png"
        task["completed_at"] = current_time
    
    return {
        "task_id": task_id,
        "status": task["status"],
        "prompt": task["prompt"],
        "size": task["size"],
        "style": task["style"],
        "quality": task["quality"],
        "created_at": task["created_at"],
        "result_url": task["result_url"],
        "completed_at": task.get("completed_at")
    }

@mcp.tool(
    name="文生图 - 批量查询",
    description="批量查询多个图像生成任务的状态\n"
              "参数说明:\n"
              "task_ids: 图像生成任务ID列表,用逗号分隔"
)
def batch_get_results(task_ids: str) -> dict:
    """批量查询图像生成结果
    
    参数:
        task_ids (str): 逗号分隔的任务ID列表
    
    返回:
        dict: 所有任务的状态和结果
    """
    id_list = [tid.strip() for tid in task_ids.split(",") if tid.strip()]
    results = {}
    
    for task_id in id_list:
        results[task_id] = get_image_result(task_id)
    
    return {
        "batch_results": results,
        "total": len(id_list),
        "timestamp": time.time()
    }

if __name__ == "__main__":
    mcp.run(transport="streamable-http", host="0.0.0.0", port=8000, path="/mcp")

客户端实现

下面是如何实现一个Streamable HTTP MCP客户端的示例:

# mcp_client.py
import requests
import json
from typing import Dict, Any, Optional

class StreamableHTTPMCPClient:
    """Streamable HTTP MCP客户端"""
    
    def __init__(self, base_url: str = "http://localhost:8000/mcp"):
        """
        初始化MCP客户端
        
        参数:
            base_url (str): MCP服务器的基础URL
        """
        self.base_url = base_url
        self.session = requests.Session()
    
    def list_tools(self) -> Dict[str, Any]:
        """获取可用工具列表"""
        response = self.session.post(
            f"{self.base_url}/tools/list",
            json={}
        )
        return response.json()
    
    def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """调用指定工具
        
        参数:
            tool_name (str): 工具名称
            arguments (Dict[str, Any]): 工具参数
            
        返回:
            Dict[str, Any]: 工具执行结果
        """
        payload = {
            "name": tool_name,
            "arguments": arguments
        }
        
        response = self.session.post(
            f"{self.base_url}/tools/call",
            json=payload
        )
        return response.json()
    
    def close(self):
        """关闭客户端连接"""
        self.session.close()

# 使用示例
if __name__ == "__main__":
    # 创建客户端实例
    client = StreamableHTTPMCPClient()
    
    try:
        # 获取可用工具列表
        tools = client.list_tools()
        print("可用工具:")
        for tool in tools.get("tools", []):
            print(f"- {tool.get('name')}: {tool.get('description')}")
        
        # 调用加法工具
        result = client.call_tool("add", {"a": 10, "b": 20})
        print("\n加法结果:", result)
        
        # 调用获取公网IP工具
        ip_result = client.call_tool("get_public_ip_address", {})
        print("\n公网IP:", ip_result)
        
        # 创建图像生成任务
        image_task = client.call_tool("文生图 - 创建任务", {
            "prompt": "一只可爱的小猫在花园里玩耍",
            "size": "512*512",
            "style": "卡通"
        })
        print("\n图像任务创建结果:", image_task)
        
        # 查询图像生成结果
        if "task_id" in image_task:
            task_id = image_task["task_id"]
            image_result = client.call_tool("文生图 - 查询结果", {"task_id": task_id})
            print("\n图像生成结果:", image_result)
    
    finally:
        # 关闭客户端连接
        client.close()

问题解决

常见问题1:连接超时

问题描述:客户端连接服务器时出现超时错误。

解决方案

  1. 检查服务器是否正在运行
  2. 确认端口号和IP地址是否正确
  3. 检查防火墙设置
# 增加超时设置和重试机制
import time
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class RobustMCPClient(StreamableHTTPMCPClient):
    def __init__(self, base_url: str = "http://localhost:8000/mcp", max_retries: int = 3):
        super().__init__(base_url)
        
        # 配置重试策略
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
    
    def call_tool(self, tool_name: str, arguments: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]:
        """调用工具,增加超时设置"""
        payload = {
            "name": tool_name,
            "arguments": arguments
        }
        
        try:
            response = self.session.post(
                f"{self.base_url}/tools/call",
                json=payload,
                timeout=timeout
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": f"请求失败: {str(e)}"}

常见问题2:数据序列化错误

问题描述:传递复杂数据结构时出现序列化错误。

解决方案

  1. 确保所有参数都是JSON可序列化的
  2. 对于特殊对象,实现自定义序列化方法
import json
from datetime import datetime

class CustomJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        # 添加其他自定义类型的处理
        return super().default(obj)

# 在客户端中使用自定义编码器
def call_tool_with_custom_serialization(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
    """使用自定义序列化调用工具"""
    try:
        payload = {
            "name": tool_name,
            "arguments": arguments
        }
        
        # 使用自定义编码器序列化数据
        json_data = json.dumps(payload, cls=CustomJSONEncoder)
        
        response = self.session.post(
            f"{self.base_url}/tools/call",
            data=json_data,
            headers={"Content-Type": "application/json"}
        )
        return response.json()
    except Exception as e:
        return {"error": f"序列化错误: {str(e)}"}

常见问题3:并发访问限制

问题描述:高并发场景下服务器响应缓慢或拒绝连接。

解决方案

  1. 实现连接池
  2. 添加速率限制
  3. 使用异步客户端
import asyncio
import aiohttp
from typing import List

class AsyncMCPClient:
    """异步MCP客户端,支持高并发"""
    
    def __init__(self, base_url: str = "http://localhost:8000/mcp", max_connections: int = 100):
        self.base_url = base_url
        self.connector = aiohttp.TCPConnector(limit=max_connections)
    
    async def call_tool(self, session: aiohttp.ClientSession, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """异步调用工具"""
        payload = {
            "name": tool_name,
            "arguments": arguments
        }
        
        async with session.post(
            f"{self.base_url}/tools/call",
            json=payload
        ) as response:
            return await response.json()
    
    async def batch_call_tools(self, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """批量调用工具"""
        async with aiohttp.ClientSession(connector=self.connector) as session:
            tasks = [
                self.call_tool(session, req["tool_name"], req["arguments"])
                for req in requests
            ]
            return await asyncio.gather(*tasks)
    
    async def close(self):
        """关闭客户端"""
        await self.connector.close()

# 使用示例
async def main():
    client = AsyncMCPClient()
    
    try:
        # 准备批量请求
        batch_requests = [
            {"tool_name": "add", "arguments": {"a": i, "b": i*2}},
            {"tool_name": "get_public_ip_address", "arguments": {}}
            for i in range(10)
        ]
        
        # 执行批量请求
        results = await client.batch_call_tools(batch_requests)
        for i, result in enumerate(results):
            print(f"请求 {i+1} 结果:", result)
    
    finally:
        await client.close()

if __name__ == "__main__":
    asyncio.run(main())

总结提炼

核心步骤

实现Python MCP协议Streamable HTTP的关键步骤:

  1. 环境准备:安装FastMCP框架和相关依赖
  2. 服务器创建:使用FastMCP创建MCP服务器实例
  3. 工具注册:使用@mcp.tool()装饰器注册工具函数
  4. 服务器启动:指定transport="streamable-http"启动服务器
  5. 客户端实现:创建HTTP客户端与服务器交互

适用场景

Streamable HTTP MCP特别适合:

  • 需要持续交互的应用:如聊天机器人、代码助手
  • 实时数据流处理:如实时翻译、数据分析
  • 分布式系统:多个服务需要协同工作
  • Web集成场景:需要与现有Web基础设施无缝集成

最佳实践

  1. 错误处理:实现完善的错误处理和重试机制
  2. 性能优化:使用连接池和异步处理提高性能
  3. 安全考虑:添加认证和授权机制保护服务
  4. 监控日志:实现详细的日志记录和监控
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐