目录

  1. 简介
  2. 核心组件
  3. 双模式操作机制
  4. 服务端初始化参数
  5. DNS重绑定防护机制
  6. 流式写入与读取实现
  7. 事件存储与连接恢复
  8. 性能调优与安全最佳实践

简介

Streamable HTTP协议是MCP(Model Control Protocol)框架中的核心传输方式,为客户端与服务器之间的通信提供了灵活且安全的双向通道。该协议基于HTTP/1.1标准,巧妙地结合了JSON-RPC 2.0协议与服务器发送事件(SSE)技术,实现了高效的实时数据流传输。其核心设计理念在于通过单一的HTTP连接,支持两种响应模式:当客户端请求需要实时流式响应时,采用SSE(text/event-stream)格式;对于常规请求,则返回标准的JSON响应。这种双模式操作不仅满足了不同场景下的性能需求,还确保了协议的广泛兼容性。Streamable HTTP协议特别适用于需要低延迟、高吞吐量交互的应用场景,如实时AI推理、流式数据处理和长连接会话管理。

核心组件

Streamable HTTP协议的实现依赖于几个关键的Python类,它们共同构成了一个健壮的通信框架。StreamableHTTPServerTransport是协议的核心,负责处理所有HTTP请求和响应,管理读写流,并实现SSE流式传输。TransportSecurityMiddleware作为安全中间件,专门用于防御DNS重绑定攻击,通过验证Host和Origin请求头来确保请求来源的合法性。EventStore是一个抽象基类,定义了事件存储的接口,允许实现连接恢复和消息重放功能。StreamableHTTPSessionManager则负责管理整个会话的生命周期,包括会话的创建、跟踪和销毁,是连接客户端与MCP服务器的桥梁。这些组件协同工作,确保了协议在功能、性能和安全性上的卓越表现。

本节来源

双模式操作机制

Streamable HTTP协议的灵活性体现在其双模式响应机制上,该机制根据客户端的Accept请求头动态选择响应格式。

包含 text/event-stream
不包含 text/event-stream
客户端发起HTTP请求
检查Accept头
启用SSE流式响应
返回标准JSON响应
建立SSE连接
持续推送事件
一次性返回JSON结果

图源

本节来源

SSE流式响应模式

当客户端在Accept头中明确声明接受text/event-stream时,服务器将激活SSE流式响应模式。在此模式下,服务器会创建一个持久的HTTP连接,并通过该连接以SSE格式持续向客户端推送事件。每个事件都包含一个data字段,其中封装了JSON-RPC消息,以及一个可选的id字段用于事件追踪。这种模式非常适合处理长时间运行的任务,如AI模型的逐步推理过程,客户端可以实时接收中间结果,而无需轮询服务器。

标准JSON响应模式

如果客户端的Accept头不包含text/event-stream,或者服务器被配置为禁用流式响应(通过is_json_response_enabled参数),则服务器将返回标准的JSON响应。在这种模式下,服务器处理完请求后,会一次性将完整的JSON-RPC响应通过HTTP响应体返回给客户端。这种方式更符合传统的RESTful API交互模式,适用于请求-响应周期较短的场景。

服务端初始化参数

StreamableHTTPServerTransport类的初始化过程接受多个关键参数,这些参数定义了连接的行为和安全策略。

"使用"
"可选依赖"
StreamableHTTPServerTransport
+mcp_session_id : str | None
+is_json_response_enabled : bool
+_event_store : EventStore | None
+_security : TransportSecurityMiddleware
+__init__(mcp_session_id, is_json_response_enabled, event_store, security_settings)
+handle_request(scope, receive, send)
TransportSecuritySettings
+enable_dns_rebinding_protection : bool
+allowed_hosts : list[str]
+allowed_origins : list[str]
EventStore

图源

本节来源

会话ID验证

mcp_session_id参数用于标识一个唯一的会话。该ID必须仅包含可见的ASCII字符(十六进制0x21到0x7E),系统会使用正则表达式SESSION_ID_PATTERN进行严格验证。如果提供的会话ID包含无效字符,初始化将抛出ValueError。会话ID在mcp-session-id自定义HTTP头中传递,服务器会验证每个请求的会话ID是否与当前连接的会话ID匹配,从而防止会话劫持。

事件存储支持

event_store参数接受一个实现了EventStore接口的对象。如果提供了此参数,协议将启用连接恢复功能。服务器会将发送给客户端的每个事件存储起来,并分配一个唯一的event_id。当客户端因网络中断等原因断开连接后,可以使用Last-Event-ID请求头重新连接,服务器将从指定ID之后的事件开始重放,确保客户端不会丢失任何消息。

安全设置

security_settings参数用于配置TransportSecurityMiddleware,这是防御DNS重绑定攻击的核心。通过TransportSecuritySettings对象,可以精细地控制安全策略,包括是否启用防护、允许的主机列表和允许的源列表。

DNS重绑定防护机制

DNS重绑定攻击是一种利用DNS解析机制的网络攻击,攻击者可以诱使用户的浏览器向内部网络的服务器发起请求。Streamable HTTP协议通过TransportSecurityMiddleware实现了多层防护。

"客户端" "TransportSecurityMiddleware" "StreamableHTTPServerTransport" 发送HTTP请求 (Host : evil.com) validate_request() _validate_host() 返回421错误 转发请求 处理并响应 alt [防护启用且Host不匹配] [验证通过] "客户端" "TransportSecurityMiddleware" "StreamableHTTPServerTransport"

图源

本节来源

Host和Origin头验证

中间件在handle_request方法的早期阶段就会调用validate_request进行验证。它会检查HostOrigin请求头:

  • Host验证:检查Host头的值是否在allowed_hosts列表中。支持精确匹配和通配符端口模式(如localhost:*)。
  • Origin验证:检查Origin头的值是否在allowed_origins列表中,同样支持精确匹配和通配符端口模式。

允许的主机列表配置

安全策略的配置非常灵活。allowed_hostsallowed_origins是字符串列表,可以包含具体的主机名(如127.0.0.1)、域名(如localhost)或带有通配符端口的模式(如custom.host:*)。默认情况下,DNS重绑定防护是启用的,强烈建议在生产环境中保持启用状态。

安全测试用例

测试文件test_streamable_http_security.py中的用例验证了安全策略的有效性。例如,test_streamable_http_security_invalid_host_header测试用例模拟了一个带有恶意Host头(evil.com)的请求,验证了服务器是否正确地返回了421(Misdirected Request)状态码,从而证明了防护机制的有效性。

流式写入与读取实现

Streamable HTTP协议的流式能力依赖于anyio库提供的内存对象流(MemoryObjectStream)。

服务器内部
SessionMessage
ServerMessageMetadata
MemoryObjectSendStream
EventMessage
MemoryObjectReceiveStream
SSE Writer
客户端

图源

本节来源

流式写入

当服务器需要向客户端发送消息时,它会通过_write_stream(一个MemoryObjectSendStream)发送一个SessionMessage对象。这个流被_handle_post_request_handle_get_request方法中的SSE写入器监听。SSE写入器从流中读取消息,将其转换为SSE格式的字典(包含eventdata和可选的id),然后通过EventSourceResponse推送给客户端。

读取流

客户端发送的请求通过HTTP请求体到达服务器。_read_stream_writer(另一个MemoryObjectSendStream)负责将解析后的SessionMessage注入到服务器的主处理循环中。MCP服务器的run方法会从_read_stream(对应的MemoryObjectReceiveStream)中读取这些消息,并进行处理。

事件存储与连接恢复

连接恢复功能通过EventStore接口和_replay_events方法实现。

«interface»
EventStore
+store_event(stream_id, message) : EventId
+replay_events_after(last_event_id, send_callback)
InMemoryEventStore
+streams : dict[StreamId, deque[EventEntry]]
+event_index : dict[EventId, EventEntry]
+store_event(stream_id, message) : EventId
+replay_events_after(last_event_id, send_callback)
EventEntry
+event_id : EventId
+stream_id : StreamId
+message : JSONRPCMessage

图源

本节来源

事件存储实现

InMemoryEventStore是一个简单的内存实现,用于演示目的。它使用deque(双端队列)按流ID存储事件,并使用字典event_index通过事件ID快速查找。store_event方法生成一个UUID作为event_id,并将事件存入相应的队列中。为了防止内存无限增长,队列设置了最大长度,当队列满时,最旧的事件会被自动移除。

连接恢复流程

当客户端使用Last-Event-ID头发起GET请求时,服务器调用_replay_events方法。该方法首先通过event_index查找该ID对应的事件,然后从streams中找到该事件所属的流,并从该事件之后的所有事件开始,通过send_callback(即SSE写入器)逐个重放给客户端。这确保了客户端在重新连接后能够无缝地继续接收之前错过的消息。

性能调优与安全最佳实践

为了充分发挥Streamable HTTP协议的潜力并确保其安全稳定运行,建议遵循以下最佳实践。

性能调优指南

  • 会话管理:对于短生命周期的交互,考虑使用stateless=True模式,避免不必要的会话开销。
  • 事件存储:生产环境应使用持久化存储(如Redis或数据库)替代InMemoryEventStore,以防止服务器重启导致事件丢失。
  • 流控:在高并发场景下,监控内存流的缓冲区大小,避免因生产者过快而消费者过慢导致内存溢出。
  • 连接超时:为SSE连接设置合理的超时时间,防止因客户端异常断开而造成资源泄漏。

安全最佳实践

  • 强制启用防护:始终在生产环境中启用enable_dns_rebinding_protection
  • 严格配置白名单:明确配置allowed_hostsallowed_origins,避免使用过于宽松的通配符。
  • HTTPS:始终通过HTTPS提供服务,以加密传输数据并防止中间人攻击。
  • 输入验证:除了协议层的验证,应用层也应对所有输入数据进行严格的验证和清理。
  • 会话ID安全:生成足够长且随机的会话ID,并通过安全的通道(如HTTPS)传输。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐