基于Spring Boot + Redis Pub/Sub 实现跨实例SSE消息推送
本文介绍了基于Redis Pub/Sub实现跨实例SSE消息推送的解决方案。当Web应用部署多实例时,通过Redis发布/订阅机制解决用户连接不同实例导致的消息推送问题。核心组件包括:1)SSE连接管理器维护用户连接池,支持多标签页和生命周期管理;2)Redis消息订阅者监听频道并转发消息。方案采用轻量级SSE协议,结合Redis实现消息广播,确保用户无论连接到哪个实例都能实时接收推送消息。文中提
在构建Web应用时,消息推送是一个常见需求——比如站内信、订单状态更新、告警通知等。SSE(Server-Sent Events)相比WebSocket更轻量,适合单向推送场景。但当服务部署多个实例时,问题就出现了:用户A连接的是实例1,用户B连接的是实例2,A发送的消息如何推送给B?本文介绍一种基于Redis Pub/Sub的解决方案,让SSE连接能够跨实例互通。
上图展示了完整的消息流转过程:
建立连接:用户A和B分别连接到不同的后端实例,每个实例维护着自己的SSE连接池。
发送消息:用户A发起私信请求,请求落在实例1上。
Redis广播:实例1将消息发布到Redis的station:message频道,所有订阅了该频道的实例都会收到。
推送消息:实例2发现目标用户B在自己身上,通过SSE连接将消息推送给B。实例1收到广播后也会检查,发现目标用户不在自己身上,则直接忽略。
一、引入依赖
Spring Boot Web提供了SSE的支持(SseEmitter),而spring-boot-starter-data-redis则为我们带来了Redis连接和Pub/Sub能力。commons-pool2是连接池,生产环境必备,避免频繁创建连接带来的性能损耗。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
二、SSE连接管理器
SseEmitterManager是整个方案的核心。这里有几个设计点值得注意:
支持多标签页:一个用户可能打开多个浏览器标签页,所以用Map<String, Map<String, SseEmitter>>来组织,外层key是userId,内层key是token(每个标签页唯一)。
生命周期管理:通过onCompletion、onTimeout、onError回调来清理连接,防止内存泄漏。
不超时设置:new SseEmitter(0L)表示永不超时,你也可以根据业务需要设置一个合理的超时时间(如30分钟)。
全站广播:broadcast()方法会遍历所有在线用户并推送,适合系统公告类消息。
@Component
@Slf4j
public class SseEmitterManager {
/**
* 用户ID -> (连接token -> SseEmitter)
* 一个用户可能有多个浏览器标签页,用token区分
*/
private final Map<String, Map<String, SseEmitter>> userEmitters = new ConcurrentHashMap<>();
/**
* 建立SSE连接
* @param userId 用户ID
* @param token 连接标识(可用UUID)
*/
public SseEmitter connect(String userId, String token) {
// 超时时间设为0表示不超时,也可设置具体毫秒数
SseEmitter emitter = new SseEmitter(0L);
// 注册回调:连接关闭时清理
emitter.onCompletion(() -> removeEmitter(userId, token));
emitter.onTimeout(() -> removeEmitter(userId, token));
emitter.onError(e -> removeEmitter(userId, token));
// 存储连接
userEmitters.computeIfAbsent(userId, k -> new ConcurrentHashMap<>())
.put(token, emitter);
log.info("SSE connected: userId={}, token={}, total users={}",
userId, token, userEmitters.size());
return emitter;
}
/**
* 向指定用户推送消息
*/
public void sendToUser(String userId, String message) {
Map<String, SseEmitter> emitters = userEmitters.get(userId);
if (emitters == null || emitters.isEmpty()) {
log.debug("User {} not online, message stored for later", userId);
return;
}
// 向该用户所有连接推送
emitters.forEach((token, emitter) -> {
try {
emitter.send(SseEmitter.event()
.name("message")
.data(message));
} catch (IOException e) {
log.error("Send to user {} failed, removing emitter", userId, e);
removeEmitter(userId, token);
}
});
}
/**
* 全站广播
*/
public void broadcast(String message) {
userEmitters.forEach((userId, emitters) -> {
sendToUser(userId, message);
});
log.info("Broadcast message to {} users", userEmitters.size());
}
/**
* 获取当前在线人数
*/
public int getOnlineCount() {
return userEmitters.size();
}
private void removeEmitter(String userId, String token) {
Map<String, SseEmitter> emitters = userEmitters.get(userId);
if (emitters != null) {
emitters.remove(token);
if (emitters.isEmpty()) {
userEmitters.remove(userId);
}
}
}
}
三、Redis消息订阅者
RedisMessageSubscriber实现了MessageListener接口,它会监听station:message频道。当收到Redis消息时:
将JSON反序列化为StationMessage对象。
根据type字段判断是私信还是广播。
调用SseEmitterManager的方法推送给目标用户。
这里有个细节:每个后端实例都会收到自己发布的消息,所以推送前需要判断目标用户是否在当前实例上——这个判断逻辑其实隐含在sendToUser中:如果目标用户不在本实例的连接池里,就直接返回,不会报错。
@Component
@Slf4j
public class RedisMessageSubscriber implements MessageListener {
@Autowired
private SseEmitterManager sseEmitterManager;
@Autowired
private ObjectMapper objectMapper;
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String channel = new String(message.getChannel());
String body = new String(message.getBody());
// 解析消息
StationMessage msg = objectMapper.readValue(body, StationMessage.class);
log.info("Received Redis message: channel={}, type={}, target={}",
channel, msg.getType(), msg.getTargetUserId());
// 根据消息类型分发
if ("user".equals(msg.getType())) {
// 私信:发给指定用户
sseEmitterManager.sendToUser(msg.getTargetUserId(), msg.getContent());
} else if ("broadcast".equals(msg.getType())) {
// 广播:发给所有在线用户
sseEmitterManager.broadcast(msg.getContent());
}
} catch (Exception e) {
log.error("Failed to process Redis message", e);
}
}
}
四、Redis配置
RedisMessageListenerContainer是Spring Data Redis提供的消息容器,它会自动管理订阅和监听线程。注意这里订阅的是station:message频道,你可以根据业务需要定义多个频道(比如station:notice、station:system)。
RedisTemplate的序列化配置也很重要:key使用StringRedisSerializer保证可读性,value使用Jackson2JsonRedisSerializer来支持对象存储。
@Configuration
public class RedisConfig {
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory,
RedisMessageSubscriber subscriber) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 订阅站内信频道
container.addMessageListener(subscriber, new ChannelTopic("station:message"));
return container;
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
return template;
}
}
五、消息实体
StationMessage是消息的载体,在Redis中传输的JSON格式就对应这个结构。字段设计上:
id:消息唯一标识,可用于去重和消息历史记录。
type:区分私信和广播,方便订阅者做路由。
targetUserId:私信时的接收者,广播时可为空。
timestamp:时间戳,客户端可以用来做消息排序或展示发送时间。
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class StationMessage {
private String id; // 消息ID
private String type; // user:私信, broadcast:广播
private String targetUserId; // 私信时的目标用户ID
private String content; // 消息内容
private String senderId; // 发送者ID
private Long timestamp; // 时间戳
}
六、消息发送服务
MessageService封装了发送逻辑。核心动作很简单:构造StationMessage -> 序列化为JSON -> redisTemplate.convertAndSend()。发布之后,所有实例的订阅者都会收到消息,相当于Redis帮我们做了一个“广播式”的跨实例通信。
这种设计的优点是:发送方不需要知道消息最终由哪个实例处理,也不需要维护实例之间的网络连接,所有协调工作都交给了Redis。
@Service
@Slf4j
public class MessageService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private SseEmitterManager sseEmitterManager;
@Autowired
private ObjectMapper objectMapper;
/**
* 发送私信
*/
public void sendPrivateMessage(String fromUserId, String toUserId, String content) {
StationMessage msg = StationMessage.builder()
.id(UUID.randomUUID().toString())
.type("user")
.targetUserId(toUserId)
.senderId(fromUserId)
.content(content)
.timestamp(System.currentTimeMillis())
.build();
try {
String json = objectMapper.writeValueAsString(msg);
// 发布到Redis,所有实例都会收到
redisTemplate.convertAndSend("station:message", json);
log.info("Private message sent: {} -> {}", fromUserId, toUserId);
} catch (JsonProcessingException e) {
log.error("Failed to serialize message", e);
}
}
/**
* 全站广播
*/
public void broadcast(String fromUserId, String content) {
StationMessage msg = StationMessage.builder()
.id(UUID.randomUUID().toString())
.type("broadcast")
.senderId(fromUserId)
.content(content)
.timestamp(System.currentTimeMillis())
.build();
try {
String json = objectMapper.writeValueAsString(msg);
redisTemplate.convertAndSend("station:message", json);
log.info("Broadcast message sent by: {}", fromUserId);
} catch (JsonProcessingException e) {
log.error("Failed to serialize broadcast", e);
}
}
}
七、Controller层
Controller对外暴露了三个核心接口:
GET /api/sse/connect:客户端通过EventSource或fetch API调用这个接口建立SSE连接。每个连接会生成一个唯一token,用于后续的清理。
POST /api/sse/private:发送私信,需要提供发送者、接收者和消息内容。
POST /api/sse/broadcast:发送广播消息。
GET /api/sse/online-count:查询当前在线人数,可用于展示“在线状态”或做监控。
生产环境建议给这些接口加上认证鉴权(比如从token中解析userId),避免伪造身份。
@RestController
@RequestMapping("/api/sse")
@Slf4j
public class SseController {
@Autowired
private SseEmitterManager sseEmitterManager;
@Autowired
private MessageService messageService;
/**
* SSE连接端点
*/
@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect(@RequestParam String userId) {
String token = UUID.randomUUID().toString();
return sseEmitterManager.connect(userId, token);
}
/**
* 发送私信
*/
@PostMapping("/private")
public ResponseEntity<?> sendPrivate(@RequestBody PrivateMessageRequest request) {
messageService.sendPrivateMessage(
request.getFromUserId(),
request.getToUserId(),
request.getContent()
);
return ResponseEntity.ok().build();
}
/**
* 全站广播
*/
@PostMapping("/broadcast")
public ResponseEntity<?> broadcast(@RequestBody BroadcastRequest request) {
messageService.broadcast(request.getFromUserId(), request.getContent());
return ResponseEntity.ok().build();
}
/**
* 获取在线人数
*/
@GetMapping("/online-count")
public ResponseEntity<Integer> getOnlineCount() {
return ResponseEntity.ok(sseEmitterManager.getOnlineCount());
}
}
更多推荐



所有评论(0)