想象一下这些常见的需求:

  • 一个实时更新的数据大屏,展示最新的业务指标。
  • 一个新闻网站,向用户推送突发新闻。
  • 一个后台系统,当耗时任务完成后给用户发送通知。

在这些场景中,数据流是单向的:从服务器到客户端。客户端只是一个被动的接收者。如果这时我们依然选择 WebSocket,就好像建立了一条双向的私人高速公路——功能强大,但过于复杂且成本高昂。

是时候认识一下 WebSocket 的轻量级表亲了:Server-Sent Events (SSE)。它用一种极其优雅和简单的方式,完美解决了单向数据推送的难题。与 WebSocket 需要通过 ws:// 协议进行复杂的“升级握手”不同,SSE 完全运行在标准的 HTTP/HTTPS 之上。

一. 什么是 SSE?

Server-Sent Events (SSE) ,这是一种非常适合实时数据推送的技术。与 WebSocket 不同,SSE 只支持服务器到客户端的单向通信。它是基于 HTTP 协议的,能够让服务器推送数据到客户端。简单来说,SSE 就是让服务器能够在不需要客户端不断请求的情况下,主动推送数据

核心特点:

  • 基于 HTTP 协议,易于配置
  • 单向通信(服务器到客户端),但不限制消息流的复杂性
  • 自动重连机制,连接中断后会自动重新建立
  • 轻量级,使用简单,适合做实时更新
  • 纯文本数据格式,易于调试和查看

二. SSE vs WebSocket

在这里插入图片描述

如何选择?

  • 如果你的需求是 服务器推送实时数据,而且只是 单向通信,SSE 是一个非常轻便高效的选择。
  • 如果需要 双向通信,如聊天、多人协作等功能,WebSocket 则是更合适的解决方案。

三. SSE 的应用场景

SSE 是为一些特定的 实时数据推送场景设计的,尤其适合以下几种使用场景:

  • 大语言模型的流式输出:比如 AI 辅助工具实时输出文本。
  • 实时通知和提醒:如实时的后台任务进度推送、消息提醒。
  • 实时日志显示:例如服务器日志实时更新、调试信息推送。
  • 股票价格更新:金融市场数据、实时股票价格变化等。
  • 社交媒体信息流:动态推送用户的更新信息。

四. 代码实现

客户端简单到令人惊喜

// 链接到服务器的事件流端点
const eventSource = new EventSource("/api/sse");
eventSource.onmessage = function (event) {
   console.log("新消息:",event.data);
};
eventSource.onerror = (error) => {
  console.error("SSE: 连接错误,等待自动重连", error);
};

就是这么简单!没有复杂的连接状态管理,没有心跳检测,更没有手动重连逻辑。浏览器为你搞定了一切。

实战演示:

DEMO1:一个简单的实时时钟

const express = require("express");
const app = express();
const cors = require("cors");
// app.use(express.static("public"));

app.use(cors());
app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.get("/", async (req, res) => {
  res.send(`
    <div>SSE 实时推送</div>
    <div id="clock"></div>
    <script >
      const clock = document.getElementById("clock");
      const eventSource = new EventSource("/sse");
      eventSource.onmessage = function (event) {
        clock.textContent = event.data;
      };
    </script>
    `);
});

app.get("/sse", async (req, res) => {
  try {
    res.setHeader("Content-Type", "text/event-stream");
    res.setHeader("Cache-Control", "no-cache");
    res.setHeader("Connection", "keep-alive");
    const intervalId = setInterval(() => {
      res.write(`data: ${new Date().toLocaleTimeString()}\n\n`); // write不会关闭连接
    }, 1000);
    res.on("close", () => {
      clearInterval(intervalId);
      console.log("Connection closed");
      res.end(); // end将内容返回后关闭连接;
    });

    // res.status(200).send({ status: "200", data: 'ok', msg: "ok" });
  } catch (error) {
    res.status(500).send({ status: "500", data: [], error });
  }
});

// 定义错误处理中间件
app.use((err, req, res, next) => {
  res.status(500).send(err.message);
});
app.listen(8888, () => {
  console.log("listening on 8888...");
});

DEMO2:模拟大模型流式响应
在这里插入图片描述

  1. 客户端
<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <title>Document</title>
  </head>
  <body>
    <div>SSE 实时推送</div>
    <script>
      const eventSource = new EventSource("http://localhost:3001/stream");
      // ✅ 标准事件可以用两种方式
      eventSource.onmessage = function (event) {
        console.log(event); // 输出: "标准消息"

        var div = document.createElement("div");
        div.textContent = event.data;
        document.body.appendChild(div);
      };
      // eventSource.addEventListener("message", function (event) {
      //   console.log("addEventListener message:", event.data); // 输出: "标准消息"
      // });
      eventSource.onerror = (error) => {
        // SSE 的自动重连机制是基于 EventSource 的自动重连机制实现的
        console.error("SSE: 连接错误,等待自动重连", error);
      };
      // ✅ 正确的方式 - 使用 addEventListener
      eventSource.addEventListener("complete", function (event) {
        console.log("监听到完成事件:", event.data); // 输出: "流已完成"
        const data = JSON.parse(event.data);
        console.log("流已完成:", data.message);
        eventSource.close();
        console.log("EventSource 已关闭");
      });
      // ❌ 错误的方式 - oncomplete 属性不存在
      eventSource.oncomplete = function (event) {
        console.log("这永远不会执行"); // 不会执行!
      };
      // EventSource 的标准事件
      /* class EventSource {
        只有这些标准的事件处理器属性
        onopen: null | function;
        onmessage: null | function;
        onerror: null | function;
        没有这些属性!
        oncomplete: null | function
        onclose: null | function
        onend: null | function
      } */
    </script>
  </body>
</html>

  1. 服务端
const http = require("http");

// 模拟大模型的响应内容
const mockResponses = [
  "你好!我是AI助手,",
  "我正在处理你的请求。",
  "这是一个流式输出的演示,",
  "我会每隔一段时间发送消息。",
  "现在演示即将结束。",
  "再见!",
];

// 创建 HTTP 服务器
const server = http.createServer((req, res) => {
  // 设置响应头
  res.setHeader("Access-Control-Allow-Origin", "*");

  // SSE 接口
  if (req.url === "/stream") {
    // 设置 SSE 相关的响应头
    res.writeHead(200, {
      "Content-Type": "text/event-stream; charset=UTF-8",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    });

    let messageIndex = 0;

    // 发送初始化消息
    res.write(`: This is comment\n`); // 这是一个注释
    res.write(`retry: 3000\n`); // 如果断开,3秒后重连

    // 定义发送消息的函数
    const sendMessage = () => {
      if (messageIndex < mockResponses.length) {
        // 发送消息
        res.write(`id: ${messageIndex + 1}\n`); // 消息ID
        res.write(`event: message\n`); // 事件类型
        res.write(
          `data: ${JSON.stringify({
            content: mockResponses[messageIndex],
          })}\n\n`
        );

        messageIndex++;
        // 延迟发送下一条消息
        setTimeout(sendMessage, 500);
      } else {
        // 发送结束消息
        res.write(`id: final\n`);
        res.write(`event: complete\n`);
        res.write(`data: "stream completed"\n\n`);
        res.end();
      }
    };

    // 开始发送消息
    sendMessage();
    // 监听客户端断开连接
    req.on("close", () => {
      console.log("客户端断开连接");
    });
  }
});

// 服务器监听 3001 端口
const PORT = 3001;
server.listen(PORT, () => {
  console.log(`服务器正在运行,端口: ${PORT}`);
});

服务端实现要点

  1. 设置正确的响应头:Content-Type: text/event-stream:指定数据流格式为 SSE。Cache-Control: no-cache:避免缓存。Connection:keep-alive:保持连接活跃。
  2. :开头的行会被当作注释,服务器可以用它发送调试信息,但客户端会忽略这些行。
  3. retry:开头的行用于指定自动重连的时间间隔,单位为毫秒。
  4. id:开头的行用于指定消息的 ID,用于区分不同的消息。客户端可以通过 eventSource.lastEventId 获取最后收到的消息 ID,当需要断线重连时,请求会自动发送 Last-Event-ID 头,服务器可以用它来续传消息。
  5. event:开头的行用于指定消息的类型,如果不指定,默认为 message 事件。客户端可以用 addEventListener(event, callback) 来监听响应的事件(message事件使用 onmessage监听)。另外,自定义的事件类型也必须包含 data 字段,否则不会被触发。
  6. data:开头的行用于指定消息的内容,客户端可以通过 event.data 获取消息的内容,并且消息必须以 JSON 格式传递,结尾必须是\n\n
  7. 每个字段都必须独占一行,多个字段之间不需要特定顺序,但通常的顺序是:
id: 消息ID
event: 事件类型
data: 消息内容

前端实现要点:

  1. 事件处理,需要处理以下三个关键事件:

    onmessage:接收消息 onerror:处理错误 onopen:连接建立 其余事件使用addEventListener监听

  2. 避免内存泄漏: 在请求完成或者组件卸载时,记得调用 eventSource.close() 关闭连接

EventSource 的缺点

EventSource API 存在很多限制,它允许传递的参数只有urlwithCredentials。所以会有以下缺点:

  • 无法传递请求体,所有参数都必须编码在 url 中,而浏览器对 url 的长度有限制(大多在 2000 字符左右);
  • 无法自定义请求头;
  • 只能使用 GET 请求;
  • 自动重连机制无法手动控制;

为了解决以上问题,我们可以使用第三方库 fetch-event-source 实现。

// BEFORE:
const sse = new EventSource('/api/sse');
sse.onmessage = (ev) => {
    console.log(ev.data);
};

// AFTER:
import { fetchEventSource } from '@microsoft/fetch-event-source';

await fetchEventSource('/api/sse', {
    onmessage(ev) {
        console.log(ev.data);
    }
});
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐