从 WebSocket 到 SSE,大模型应用绕不开的 SSE
这是一种非常适合实时数据推送的技术。与 WebSocket 不同,SSE 只支持服务器到客户端的单向通信。它是基于 HTTP 协议的,能够让服务器推送数据到客户端。简单来说,SSE 就是让服务器能够在不需要客户端不断请求的情况下,主动推送数据。
想象一下这些常见的需求:
- 一个实时更新的数据大屏,展示最新的业务指标。
- 一个新闻网站,向用户推送突发新闻。
- 一个后台系统,当耗时任务完成后给用户发送通知。
在这些场景中,数据流是单向的:从服务器到客户端。客户端只是一个被动的接收者。如果这时我们依然选择 WebSocket,就好像建立了一条双向的私人高速公路——功能强大,但过于复杂且成本高昂。
是时候认识一下 WebSocket 的轻量级表亲了:Server-Sent Events (SSE)。它用一种极其优雅和简单的方式,完美解决了单向数据推送的难题。与 WebSocket 需要通过 ws:// 协议进行复杂的“升级握手”不同,SSE 完全运行在标准的 HTTP/HTTPS 之上。
一. 什么是 SSE?
Server-Sent Events (SSE) ,这是一种非常适合实时数据推送的技术。与 WebSocket 不同,SSE 只支持服务器到客户端的单向通信。它是基于 HTTP 协议的,能够让服务器推送数据到客户端。简单来说,SSE 就是让服务器能够在不需要客户端不断请求的情况下,主动推送数据。
核心特点:
- 基于 HTTP 协议,易于配置
- 单向通信(服务器到客户端),但不限制消息流的复杂性
- 自动重连机制,连接中断后会自动重新建立
- 轻量级,使用简单,适合做实时更新
- 纯文本数据格式,易于调试和查看
二. SSE vs WebSocket

如何选择?
- 如果你的需求是 服务器推送实时数据,而且只是 单向通信,SSE 是一个非常轻便高效的选择。
- 如果需要 双向通信,如聊天、多人协作等功能,WebSocket 则是更合适的解决方案。
三. SSE 的应用场景
SSE 是为一些特定的 实时数据推送场景设计的,尤其适合以下几种使用场景:
- 大语言模型的流式输出:比如 AI 辅助工具实时输出文本。
- 实时通知和提醒:如实时的后台任务进度推送、消息提醒。
- 实时日志显示:例如服务器日志实时更新、调试信息推送。
- 股票价格更新:金融市场数据、实时股票价格变化等。
- 社交媒体信息流:动态推送用户的更新信息。
四. 代码实现
客户端简单到令人惊喜
// 链接到服务器的事件流端点
const eventSource = new EventSource("/api/sse");
eventSource.onmessage = function (event) {
console.log("新消息:",event.data);
};
eventSource.onerror = (error) => {
console.error("SSE: 连接错误,等待自动重连", error);
};
就是这么简单!没有复杂的连接状态管理,没有心跳检测,更没有手动重连逻辑。浏览器为你搞定了一切。
实战演示:
DEMO1:一个简单的实时时钟
const express = require("express");
const app = express();
const cors = require("cors");
// app.use(express.static("public"));
app.use(cors());
app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.get("/", async (req, res) => {
res.send(`
<div>SSE 实时推送</div>
<div id="clock"></div>
<script >
const clock = document.getElementById("clock");
const eventSource = new EventSource("/sse");
eventSource.onmessage = function (event) {
clock.textContent = event.data;
};
</script>
`);
});
app.get("/sse", async (req, res) => {
try {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
const intervalId = setInterval(() => {
res.write(`data: ${new Date().toLocaleTimeString()}\n\n`); // write不会关闭连接
}, 1000);
res.on("close", () => {
clearInterval(intervalId);
console.log("Connection closed");
res.end(); // end将内容返回后关闭连接;
});
// res.status(200).send({ status: "200", data: 'ok', msg: "ok" });
} catch (error) {
res.status(500).send({ status: "500", data: [], error });
}
});
// 定义错误处理中间件
app.use((err, req, res, next) => {
res.status(500).send(err.message);
});
app.listen(8888, () => {
console.log("listening on 8888...");
});
DEMO2:模拟大模型流式响应
- 客户端
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>Document</title>
</head>
<body>
<div>SSE 实时推送</div>
<script>
const eventSource = new EventSource("http://localhost:3001/stream");
// ✅ 标准事件可以用两种方式
eventSource.onmessage = function (event) {
console.log(event); // 输出: "标准消息"
var div = document.createElement("div");
div.textContent = event.data;
document.body.appendChild(div);
};
// eventSource.addEventListener("message", function (event) {
// console.log("addEventListener message:", event.data); // 输出: "标准消息"
// });
eventSource.onerror = (error) => {
// SSE 的自动重连机制是基于 EventSource 的自动重连机制实现的
console.error("SSE: 连接错误,等待自动重连", error);
};
// ✅ 正确的方式 - 使用 addEventListener
eventSource.addEventListener("complete", function (event) {
console.log("监听到完成事件:", event.data); // 输出: "流已完成"
const data = JSON.parse(event.data);
console.log("流已完成:", data.message);
eventSource.close();
console.log("EventSource 已关闭");
});
// ❌ 错误的方式 - oncomplete 属性不存在
eventSource.oncomplete = function (event) {
console.log("这永远不会执行"); // 不会执行!
};
// EventSource 的标准事件
/* class EventSource {
只有这些标准的事件处理器属性
onopen: null | function;
onmessage: null | function;
onerror: null | function;
没有这些属性!
oncomplete: null | function
onclose: null | function
onend: null | function
} */
</script>
</body>
</html>
- 服务端
const http = require("http");
// 模拟大模型的响应内容
const mockResponses = [
"你好!我是AI助手,",
"我正在处理你的请求。",
"这是一个流式输出的演示,",
"我会每隔一段时间发送消息。",
"现在演示即将结束。",
"再见!",
];
// 创建 HTTP 服务器
const server = http.createServer((req, res) => {
// 设置响应头
res.setHeader("Access-Control-Allow-Origin", "*");
// SSE 接口
if (req.url === "/stream") {
// 设置 SSE 相关的响应头
res.writeHead(200, {
"Content-Type": "text/event-stream; charset=UTF-8",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
let messageIndex = 0;
// 发送初始化消息
res.write(`: This is comment\n`); // 这是一个注释
res.write(`retry: 3000\n`); // 如果断开,3秒后重连
// 定义发送消息的函数
const sendMessage = () => {
if (messageIndex < mockResponses.length) {
// 发送消息
res.write(`id: ${messageIndex + 1}\n`); // 消息ID
res.write(`event: message\n`); // 事件类型
res.write(
`data: ${JSON.stringify({
content: mockResponses[messageIndex],
})}\n\n`
);
messageIndex++;
// 延迟发送下一条消息
setTimeout(sendMessage, 500);
} else {
// 发送结束消息
res.write(`id: final\n`);
res.write(`event: complete\n`);
res.write(`data: "stream completed"\n\n`);
res.end();
}
};
// 开始发送消息
sendMessage();
// 监听客户端断开连接
req.on("close", () => {
console.log("客户端断开连接");
});
}
});
// 服务器监听 3001 端口
const PORT = 3001;
server.listen(PORT, () => {
console.log(`服务器正在运行,端口: ${PORT}`);
});
服务端实现要点
- 设置正确的响应头:
Content-Type: text/event-stream:指定数据流格式为 SSE。Cache-Control: no-cache:避免缓存。Connection:keep-alive:保持连接活跃。 - 以
:开头的行会被当作注释,服务器可以用它发送调试信息,但客户端会忽略这些行。 - 以
retry:开头的行用于指定自动重连的时间间隔,单位为毫秒。 - 以
id:开头的行用于指定消息的 ID,用于区分不同的消息。客户端可以通过eventSource.lastEventId获取最后收到的消息 ID,当需要断线重连时,请求会自动发送 Last-Event-ID 头,服务器可以用它来续传消息。 - 以
event:开头的行用于指定消息的类型,如果不指定,默认为message事件。客户端可以用addEventListener(event, callback)来监听响应的事件(message事件使用onmessage监听)。另外,自定义的事件类型也必须包含data字段,否则不会被触发。 - 以
data:开头的行用于指定消息的内容,客户端可以通过event.data获取消息的内容,并且消息必须以 JSON 格式传递,结尾必须是\n\n。 - 每个字段都必须独占一行,多个字段之间不需要特定顺序,但通常的顺序是:
id: 消息ID
event: 事件类型
data: 消息内容
前端实现要点:
-
事件处理,需要处理以下三个关键事件:
onmessage:接收消息onerror:处理错误onopen:连接建立 其余事件使用addEventListener监听 -
避免内存泄漏: 在请求完成或者组件卸载时,记得调用
eventSource.close()关闭连接
EventSource 的缺点
EventSource API 存在很多限制,它允许传递的参数只有url和 withCredentials。所以会有以下缺点:
- 无法传递请求体,所有参数都必须编码在
url中,而浏览器对 url 的长度有限制(大多在 2000 字符左右); - 无法自定义请求头;
- 只能使用
GET请求; - 自动重连机制无法手动控制;
为了解决以上问题,我们可以使用第三方库 fetch-event-source 实现。
// BEFORE:
const sse = new EventSource('/api/sse');
sse.onmessage = (ev) => {
console.log(ev.data);
};
// AFTER:
import { fetchEventSource } from '@microsoft/fetch-event-source';
await fetchEventSource('/api/sse', {
onmessage(ev) {
console.log(ev.data);
}
});
更多推荐


所有评论(0)