用 `@microsoft/fetch-event-source` 实现可靠的 SSE 对话流:从原理到封装实践
摘要:基于 @microsoft/fetch-event-source 实现 AI 对话流式响应 本文探讨了使用 SSE(Server-Sent Events)技术实现 AI 对话流式响应的方法,重点介绍了 @microsoft/fetch-event-source 库的封装实践。原生 EventSource 存在无法发送 POST 请求、错误处理薄弱等缺陷,而 @microsoft/fetch-
用 @microsoft/fetch-event-source 实现可靠的 SSE 对话流:从原理到封装实践
在构建现代 AI 对话应用时,流式响应(Streaming Response) 已成为提升用户体验的关键能力——用户不再需要等待数秒才能看到完整回答,而是像真人聊天一样“逐字输出”。而实现这一能力的核心技术之一,就是 SSE(Server-Sent Events)。
本文将带你深入理解 SSE,并分享我们在实际项目中如何基于 @microsoft/fetch-event-source 封装一个健壮、可复用的 SSE 请求工具,解决原生 EventSource 的诸多痛点。
1. SSE 是什么?
SSE(Server-Sent Events) 是一种基于 HTTP 的单向通信协议,允许服务器主动向浏览器客户端推送实时数据。与 WebSocket 不同,SSE 是单工通信(仅服务端 → 客户端),且基于标准 HTTP/HTTPS,天然支持 CORS、缓存、代理等 Web 基础设施。
一个典型的 SSE 响应格式如下:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Connection: keep-alive
Cache-Control: no-cache
data: {"content": "你好"}
data: {"content": "世界"}
event: done
data: {"status": "complete"}
每条消息以 data: 开头,可选带 event: 类型。浏览器通过监听 message 或自定义事件名来处理数据。
在大模型对话场景中,后端会将生成的 token 逐个通过 data: {...} 推送给前端,前端拼接后实时渲染,从而实现“打字机”效果。
2. 原生 EventSource 有什么缺陷?
虽然浏览器提供了原生 EventSource API 来处理 SSE,但在实际项目中,我们发现它存在几个致命短板:
❌ 无法发送 POST 请求(或带 body 的请求)
EventSource 构造函数只接受 URL,不支持自定义 method、headers 或 request body。这意味着你无法:
- 在请求体中传递对话历史(messages)
- 设置
AuthorizationToken - 发送 JSON 格式的参数
虽然可通过 URL query 传参,但长度受限且不安全,完全无法满足 LLM 对话的复杂入参需求。
❌ 错误处理机制薄弱
EventSource 只提供简单的 onerror 回调,无法区分是网络错误、服务端业务错误(如配额超限),还是流内容本身的异常。
❌ 无法精细控制连接生命周期
比如:无法在请求发出前 abort,或在特定条件下重连策略不可控。
❌ 不支持流式 JSON 解析的上下文管理
你需要手动拼接 data 字段、处理换行、解析 JSON,容易出错。
这些限制使得原生 EventSource 几乎无法用于生产级的 AI 对话流场景。
3. 为什么选择 @microsoft/fetch-event-source?
经过调研多个开源方案(如 event-source-polyfill、sse.js 等),我们最终选定 Microsoft 官方出品的 @microsoft/fetch-event-source,原因如下:
✅ 基于 fetch 实现:天然支持 POST、自定义 headers、request body
✅ 完全兼容 SSE 协议:正确解析 data:、event:、id:、retry: 等字段
✅ 提供 AbortController 支持:可随时取消请求
✅ 支持自定义 onopen/onmessage/onerror/onclose 钩子:便于集成业务逻辑
✅ TypeScript 友好:官方提供类型定义
✅ 轻量无依赖:仅 ~5KB,无第三方依赖
更重要的是,它模拟了 EventSource 的行为,但底层使用 fetch + ReadableStream,完美绕过了原生 API 的限制。
4. 封装 @microsoft/fetch-event-source:打造项目级 SSE 工具
为了在团队中统一使用、降低接入成本,我们对 fetchEventSource 进行了二次封装,目标是:
- 支持标准 JSON 请求体
- 自动处理流式 JSON chunk
- 提供成功/运行中/错误三种状态回调
- 支持取消请求
- 兼容 BFF 层错误与 SSE 内容错误
封装代码(TypeScript)
// sse.ts
import { fetchEventSource } from '@microsoft/fetch-event-source';
class FetchBffError extends Error {
constructor(message: string) {
super(message);
this.name = 'FetchBffError';
}
}
const isJSON = (str: string): boolean => {
try {
JSON.parse(str);
return true;
} catch {
return false;
}
};
export interface SSEOptions {
url: string;
data: Record<string, any>;
headers?: Record<string, string>;
cb: (chunk: any | null, status: 'running' | 'success') => void;
onError?: (error: { type: 'BFF_ERROR' | 'SSE_ERROR'; message: string }) => void;
}
export function createSSEStream({
url,
data,
headers = {},
cb,
onError,
}: SSEOptions) {
const controller = new AbortController();
fetchEventSource(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream;charset=UTF-8',
...headers,
},
body: JSON.stringify(data),
signal: controller.signal,
openWhenHidden: true,
// 处理非流式错误(如 BFF 返回 JSON 错误)
async onopen(response) {
if (response.headers.get('content-type')?.includes('application/json')) {
const reader = response.body?.getReader();
if (reader) {
const { value } = await reader.read();
const text = new TextDecoder().decode(value);
if (isJSON(text)) {
const json = JSON.parse(text);
if (json.code && json.code !== '200') {
onError?.({ type: 'BFF_ERROR', message: json.message || '请求失败' });
controller.abort();
}
}
}
}
},
// 处理流式消息
onmessage(msg) {
if (msg.data && isJSON(msg.data)) {
const chunk = JSON.parse(msg.data);
// 这里可根据实际后端格式提取 content,例如 OpenAI 格式:
// const content = chunk.choices?.[0]?.delta?.content;
cb(chunk, 'running');
}
},
onclose() {
cb(null, 'success'); // 流结束
},
onerror(err) {
if (!err.message?.includes('aborted')) {
onError?.({ type: 'BFF_ERROR', message: err.message || '连接异常' });
}
throw err; // 阻止自动重连
},
});
return {
cancel: () => controller.abort(),
};
}
使用示例
// 在 React 组件中使用
useEffect(() => {
const stream = createSSEStream({
url: '/api/chat/completions',
data: {
model: 'qwen-max',
messages: [{ role: 'user', content: '天空为什么是蓝的?' }],
stream: true,
},
headers: {
Authorization: `Bearer ${token}`,
},
cb: (chunk, status) => {
if (status === 'running' && chunk) {
const content = chunk.choices?.[0]?.delta?.content;
setContent(prev => prev + (content || ''));
}
},
onError: (err) => {
console.error('对话流错误:', err);
toast.error(err.message);
},
});
return () => {
stream.cancel(); // 组件卸载时取消请求
};
}, []);
结语
SSE 是实现 AI 对话流式体验的轻量级利器,而 @microsoft/fetch-event-source 则解决了原生 API 的关键缺陷。通过合理封装,我们不仅能获得灵活的请求能力,还能统一错误处理、状态管理和生命周期控制。
在我们的项目中,这套方案已稳定支撑日均百万级的对话流请求,兼具性能与可维护性。
附:相关链接
更多推荐



所有评论(0)