@microsoft/fetch-event-source 实现可靠的 SSE 对话流:从原理到封装实践

在构建现代 AI 对话应用时,流式响应(Streaming Response) 已成为提升用户体验的关键能力——用户不再需要等待数秒才能看到完整回答,而是像真人聊天一样“逐字输出”。而实现这一能力的核心技术之一,就是 SSE(Server-Sent Events)

本文将带你深入理解 SSE,并分享我们在实际项目中如何基于 @microsoft/fetch-event-source 封装一个健壮、可复用的 SSE 请求工具,解决原生 EventSource 的诸多痛点。


1. SSE 是什么?

SSE(Server-Sent Events) 是一种基于 HTTP 的单向通信协议,允许服务器主动向浏览器客户端推送实时数据。与 WebSocket 不同,SSE 是单工通信(仅服务端 → 客户端),且基于标准 HTTP/HTTPS,天然支持 CORS、缓存、代理等 Web 基础设施。

一个典型的 SSE 响应格式如下:

HTTP/1.1 200 OK
Content-Type: text/event-stream
Connection: keep-alive
Cache-Control: no-cache

data: {"content": "你好"}

data: {"content": "世界"}

event: done
data: {"status": "complete"}

每条消息以 data: 开头,可选带 event: 类型。浏览器通过监听 message 或自定义事件名来处理数据。

在大模型对话场景中,后端会将生成的 token 逐个通过 data: {...} 推送给前端,前端拼接后实时渲染,从而实现“打字机”效果。


2. 原生 EventSource 有什么缺陷?

虽然浏览器提供了原生 EventSource API 来处理 SSE,但在实际项目中,我们发现它存在几个致命短板

❌ 无法发送 POST 请求(或带 body 的请求)

EventSource 构造函数只接受 URL,不支持自定义 method、headers 或 request body。这意味着你无法:

  • 在请求体中传递对话历史(messages)
  • 设置 Authorization Token
  • 发送 JSON 格式的参数

虽然可通过 URL query 传参,但长度受限且不安全,完全无法满足 LLM 对话的复杂入参需求。

❌ 错误处理机制薄弱

EventSource 只提供简单的 onerror 回调,无法区分是网络错误、服务端业务错误(如配额超限),还是流内容本身的异常。

❌ 无法精细控制连接生命周期

比如:无法在请求发出前 abort,或在特定条件下重连策略不可控。

❌ 不支持流式 JSON 解析的上下文管理

你需要手动拼接 data 字段、处理换行、解析 JSON,容易出错。

这些限制使得原生 EventSource 几乎无法用于生产级的 AI 对话流场景


3. 为什么选择 @microsoft/fetch-event-source

经过调研多个开源方案(如 event-source-polyfillsse.js 等),我们最终选定 Microsoft 官方出品的 @microsoft/fetch-event-source,原因如下:

基于 fetch 实现:天然支持 POST、自定义 headers、request body
完全兼容 SSE 协议:正确解析 data:event:id:retry: 等字段
提供 AbortController 支持:可随时取消请求
支持自定义 onopen/onmessage/onerror/onclose 钩子:便于集成业务逻辑
TypeScript 友好:官方提供类型定义
轻量无依赖:仅 ~5KB,无第三方依赖

更重要的是,它模拟了 EventSource 的行为,但底层使用 fetch + ReadableStream,完美绕过了原生 API 的限制。


4. 封装 @microsoft/fetch-event-source:打造项目级 SSE 工具

为了在团队中统一使用、降低接入成本,我们对 fetchEventSource 进行了二次封装,目标是:

  • 支持标准 JSON 请求体
  • 自动处理流式 JSON chunk
  • 提供成功/运行中/错误三种状态回调
  • 支持取消请求
  • 兼容 BFF 层错误与 SSE 内容错误

封装代码(TypeScript)

// sse.ts
import { fetchEventSource } from '@microsoft/fetch-event-source';

class FetchBffError extends Error {
  constructor(message: string) {
    super(message);
    this.name = 'FetchBffError';
  }
}

const isJSON = (str: string): boolean => {
  try {
    JSON.parse(str);
    return true;
  } catch {
    return false;
  }
};

export interface SSEOptions {
  url: string;
  data: Record<string, any>;
  headers?: Record<string, string>;
  cb: (chunk: any | null, status: 'running' | 'success') => void;
  onError?: (error: { type: 'BFF_ERROR' | 'SSE_ERROR'; message: string }) => void;
}

export function createSSEStream({
  url,
  data,
  headers = {},
  cb,
  onError,
}: SSEOptions) {
  const controller = new AbortController();

  fetchEventSource(url, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      Accept: 'text/event-stream;charset=UTF-8',
      ...headers,
    },
    body: JSON.stringify(data),
    signal: controller.signal,
    openWhenHidden: true,

    // 处理非流式错误(如 BFF 返回 JSON 错误)
    async onopen(response) {
      if (response.headers.get('content-type')?.includes('application/json')) {
        const reader = response.body?.getReader();
        if (reader) {
          const { value } = await reader.read();
          const text = new TextDecoder().decode(value);
          if (isJSON(text)) {
            const json = JSON.parse(text);
            if (json.code && json.code !== '200') {
              onError?.({ type: 'BFF_ERROR', message: json.message || '请求失败' });
              controller.abort();
            }
          }
        }
      }
    },

    // 处理流式消息
    onmessage(msg) {
      if (msg.data && isJSON(msg.data)) {
        const chunk = JSON.parse(msg.data);
        // 这里可根据实际后端格式提取 content,例如 OpenAI 格式:
        // const content = chunk.choices?.[0]?.delta?.content;
        cb(chunk, 'running');
      }
    },

    onclose() {
      cb(null, 'success'); // 流结束
    },

    onerror(err) {
      if (!err.message?.includes('aborted')) {
        onError?.({ type: 'BFF_ERROR', message: err.message || '连接异常' });
      }
      throw err; // 阻止自动重连
    },
  });

  return {
    cancel: () => controller.abort(),
  };
}

使用示例

// 在 React 组件中使用
useEffect(() => {
  const stream = createSSEStream({
    url: '/api/chat/completions',
    data: {
      model: 'qwen-max',
      messages: [{ role: 'user', content: '天空为什么是蓝的?' }],
      stream: true,
    },
    headers: {
      Authorization: `Bearer ${token}`,
    },
    cb: (chunk, status) => {
      if (status === 'running' && chunk) {
        const content = chunk.choices?.[0]?.delta?.content;
        setContent(prev => prev + (content || ''));
      }
    },
    onError: (err) => {
      console.error('对话流错误:', err);
      toast.error(err.message);
    },
  });

  return () => {
    stream.cancel(); // 组件卸载时取消请求
  };
}, []);

结语

SSE 是实现 AI 对话流式体验的轻量级利器,而 @microsoft/fetch-event-source 则解决了原生 API 的关键缺陷。通过合理封装,我们不仅能获得灵活的请求能力,还能统一错误处理、状态管理和生命周期控制。

在我们的项目中,这套方案已稳定支撑日均百万级的对话流请求,兼具性能与可维护性。

附:相关链接

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐