SseEmitter
3. **错误处理**:如果发生异常(如 `IOException` 或 `InterruptedException`),调用 `SseEmitter.completeWithError()` 关闭连接并报告错误。- **`Request.Builder()`**:通过 `Request` 类构建 HTTP 请求,设置 `Accept` 头为 `text/event-stream`。1. **`s
`SseEmitter` 是 Spring Framework 提供的一个类,用于实现 **服务器发送事件**(Server-Sent Events,SSE)的支持。它允许服务器通过单个 HTTP 连接不断地向客户端发送数据,而客户端则使用该连接持续接收数据。这种机制常用于需要实时更新的应用场景,例如股票行情、推送通知等。
### **SSE 与传统 HTTP 请求的区别**
- 在传统的 HTTP 请求中,客户端发送请求,服务器处理并返回响应,之后连接关闭。
- 在 SSE 中,客户端向服务器发起一个请求,服务器可以持续通过同一个连接发送多个事件更新,而不会立即关闭连接。客户端保持连接并持续接收服务器推送的消息,直到服务器关闭连接或客户端主动断开。
### **`SseEmitter` 的工作原理**
`SseEmitter` 是 Spring Web 提供的一种基于 Servlet API 的机制,用于在 Spring MVC 中实现 SSE。它的核心功能是能够异步地将数据从服务器推送到客户端。`SseEmitter` 通过 HTTP 响应流(`ResponseBody`)来持续发送消息。
### **SseEmitter 的构造方法**
- `SseEmitter()`:使用默认超时时间(30秒)。
- `SseEmitter(Long timeout)`:指定自定义的超时时间。如果在指定的时间内没有发送任何事件,连接会被关闭。
### **SseEmitter 的基本方法**
1. **`send(Object object)`**:将一个事件发送给客户端。可以发送任意 Java 对象,Spring 会将其序列化为 JSON 格式发送。
2. **`complete()`**:关闭连接,表示事件发送完成。
3. **`completeWithError(Throwable ex)`**:在发送过程中遇到错误时,关闭连接并报告错误。
4. **`onCompletion(Runnable callback)`**:指定连接完成时的回调函数。
5. **`onTimeout(Runnable callback)`**:设置超时后的回调操作。
---
### **使用 `SseEmitter` 的示例**
#### 服务端使用 `SseEmitter` 实现推送数据
```java
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RestController
public class SseController {
// 创建一个线程池,用于异步发送数据
private final ExecutorService executorService = Executors.newCachedThreadPool();
@GetMapping("/sse")
public SseEmitter handleSse() {
// 创建一个 SseEmitter 实例,超时时间设置为 5 分钟
SseEmitter emitter = new SseEmitter(5 * 60 * 1000L);
// 异步执行数据发送操作
executorService.execute(() -> {
try {
for (int i = 0; i < 10; i++) {
// 模拟数据发送,每秒发送一条数据
emitter.send("Message " + i);
Thread.sleep(1000);
}
// 发送完成,关闭连接
emitter.complete();
} catch (IOException | InterruptedException e) {
emitter.completeWithError(e);
}
});
// 返回 SseEmitter 实例
return emitter;
}
}
```
**解释:**
1. **SseEmitter 实例化**:通过 `new SseEmitter(5 * 60 * 1000L)` 创建一个 `SseEmitter` 对象,指定超时时间为 5 分钟(即 300,000 毫秒)。超时后,如果没有任何数据发送,连接会自动关闭。
2. **异步发送数据**:在 `executorService.execute()` 中启动一个异步任务,向客户端每秒发送一条消息,持续 10 秒。使用 `SseEmitter.send()` 方法发送消息,最后通过 `SseEmitter.complete()` 完成数据推送并关闭连接。
3. **错误处理**:如果发生异常(如 `IOException` 或 `InterruptedException`),调用 `SseEmitter.completeWithError()` 关闭连接并报告错误。
---
### **客户端接收 SSE 推送**
要使用 Java 的 `HttpClient` 和 `OkHttp` 来接收来自服务器的 Server-Sent Events (SSE) 推送,可以通过监听 HTTP 响应的流式数据来模拟 SSE 消息接收。由于 SSE 是基于 `text/event-stream` 格式的流式响应,客户端需要不断读取流中的消息。
### **1. 使用 Java HttpClient 接收 SSE 推送**
从 Java 11 开始,`HttpClient` 可以方便地用于接收流式的 SSE 推送。我们可以使用异步请求并监听响应流。
#### 示例:使用 Java `HttpClient` 接收 SSE
```java
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
public class SseClientHttpClient {
public static void main(String[] args) {
// 创建 HttpClient 实例
HttpClient client = HttpClient.newHttpClient();
// 创建一个 HTTP GET 请求,设置 Accept 头为 text/event-stream
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8080/sse")) // 替换为你的 SSE 服务地址
.header("Accept", "text/event-stream")
.build();
// 发送异步请求并处理响应
CompletableFuture<Void> future = client.sendAsync(request, HttpResponse.BodyHandlers.ofLines())
.thenAccept(response -> {
response.body().forEach(line -> {
// 逐行读取 SSE 消息
System.out.println("Received SSE message: " + line);
});
});
// 保持程序运行,等待异步操作完成
future.join();
}
}
```
**说明:**
- **`HttpClient.newHttpClient()`**:创建一个新的 `HttpClient` 实例。
- **`sendAsync`**:使用异步方式发送请求,`BodyHandlers.ofLines()` 会以流的形式逐行处理响应。
- **`forEach(line -> ...)`**:处理每一行 SSE 消息,`EventSource` 事件一般在一行中表示一个数据事件。
### **2. 使用 OkHttp 接收 SSE 推送**
`OkHttp` 是另一个常用的 HTTP 客户端库,可以通过它来监听 SSE 推送的流。OkHttp 的 `Call` 对象提供了处理原始 HTTP 响应体的能力,这使得它非常适合处理流式响应。
#### 示例:使用 `OkHttp` 接收 SSE
```java
import okhttp3.*;
import java.io.IOException;
public class SseClientOkHttp {
public static void main(String[] args) {
OkHttpClient client = new OkHttpClient();
// 创建 Request 对象,指定 SSE 的请求头
Request request = new Request.Builder()
.url("http://localhost:8080/sse") // 替换为你的 SSE 服务地址
.header("Accept", "text/event-stream")
.build();
// 异步调用
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
e.printStackTrace();
}
@Override
public void onResponse(Call call, Response response) throws IOException {
if (response.isSuccessful()) {
// 读取响应体
try (var reader = response.body().source()) {
while (!reader.exhausted()) {
String line = reader.readUtf8Line();
if (line != null && !line.isEmpty()) {
// 输出接收到的 SSE 消息
System.out.println("Received SSE message: " + line);
}
}
}
}
}
});
}
}
```
**说明:**
- **`OkHttpClient`**:创建一个新的 OkHttp 客户端实例。
- **`Request.Builder()`**:通过 `Request` 类构建 HTTP 请求,设置 `Accept` 头为 `text/event-stream`。
- **`enqueue(new Callback())`**:使用异步调用的方式发起请求,`onResponse` 回调中处理来自服务器的 SSE 消息流。
- **`response.body().source()`**:获取响应流并逐行读取数据。
---
### **SSE 消息格式**
SSE 消息通常是这样的:
```
data: some data
data: more data
event: customEvent
data: event-specific data
id: 12345
data: another message
```
因此,你的客户端需要解析 `data: ` 开头的行来获取消息的实际内容。通常,这些库都会直接为你逐行处理响应体,因此你可以简单地打印或处理每一行的数据。
---
### **总结**
1. **Java `HttpClient`**:可以方便地以异步的方式处理 SSE 流式响应,通过 `HttpResponse.BodyHandlers.ofLines()` 将响应按行处理。
2. **OkHttp**:同样可以轻松处理 SSE,通过监听原始响应流,逐行读取数据并处理。
这两种方式都非常适合在 Java 客户端中实现对 SSE 推送的接收,适用于后台服务、非浏览器客户端等需要实时消息更新的场景。
SSE 优缺点
优点
- 简单高效:相比于 WebSocket,SSE 的实现更为简单,特别是在只需要服务器向客户端发送数据的场景中。
- 自动重连:
EventSource支持自动重连,如果连接中断,浏览器会自动尝试重新连接。 - 文本传输:SSE 适用于文本数据传输,支持多种事件类型和 ID 管理。
缺点
- 单向通信:SSE 只能由服务器向客户端推送消息,客户端无法通过同一个连接向服务器发送消息。双向通信需要通过其他方式,如 AJAX 或 WebSocket。
- 连接限制:由于 SSE 基于 HTTP 长连接,浏览器对同一服务器的连接数有限制,通常为 6 个连接,超过后可能会发生连接阻塞。
- 数据传输量:SSE 适合中小规模的数据传输,如果传输大量二进制数据或高频更新的实时数据,SSE 的性能可能不如 WebSocket。
SSE 的常见使用场景
- 实时更新:如股票行情、新闻推送、实时评论等。
- 服务器状态通知:如服务器健康状态监控、任务进度更新。
- 简单的通知系统:如消息通知、报警系统。
总结
SseEmitter 是一个用于实现服务器向客户端推送数据的工具类,适用于需要单向推送实时数据的场景。它的实现相对简单,通过长连接可以减少反复创建连接的开销。与 WebSocket 不同,SSE 更加轻量级,但由于它的单向通信特性和连接数限制,适用于特定的实时推送应用。
如果你还有其他问题或需要更多示例,随时告诉我!
更多推荐



所有评论(0)