在构建Web应用时,消息推送是一个常见需求——比如站内信、订单状态更新、告警通知等。SSE(Server-Sent Events)相比WebSocket更轻量,适合单向推送场景。但当服务部署多个实例时,问题就出现了:用户A连接的是实例1,用户B连接的是实例2,A发送的消息如何推送给B?本文介绍一种基于Redis Pub/Sub的解决方案,让SSE连接能够跨实例互通。

Redis Pub/Sub 后端实例2 后端实例1 用户B浏览器 用户A浏览器 Redis Pub/Sub 后端实例2 后端实例1 用户B浏览器 用户A浏览器 1. 建立连接阶段 2. 发送消息阶段 3. Redis广播 4. 推送消息 GET /sse/connect (userId=1) SseEmitter (长连接) GET /sse/connect (userId=2) SseEmitter (长连接) POST /message/send (to userId=2, content) convertAndSend("station:message", {userId=2, content}) 广播消息 广播消息 SseEmitter.send(content)

上图展示了完整的消息流转过程:

  1. 建立连接:用户A和B分别连接到不同的后端实例,每个实例维护着自己的SSE连接池。

  2. 发送消息:用户A发起私信请求,请求落在实例1上。

  3. Redis广播:实例1将消息发布到Redis的station:message频道,所有订阅了该频道的实例都会收到。

  4. 推送消息:实例2发现目标用户B在自己身上,通过SSE连接将消息推送给B。实例1收到广播后也会检查,发现目标用户不在自己身上,则直接忽略。

一、引入依赖

Spring Boot Web提供了SSE的支持(SseEmitter),而spring-boot-starter-data-redis则为我们带来了Redis连接和Pub/Sub能力。commons-pool2是连接池,生产环境必备,避免频繁创建连接带来的性能损耗。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

二、SSE连接管理器

SseEmitterManager是整个方案的核心。这里有几个设计点值得注意:

  • 支持多标签页:一个用户可能打开多个浏览器标签页,所以用Map<String, Map<String, SseEmitter>>来组织,外层key是userId,内层key是token(每个标签页唯一)。

  • 生命周期管理:通过onCompletion、onTimeout、onError回调来清理连接,防止内存泄漏。

  • 不超时设置:new SseEmitter(0L)表示永不超时,你也可以根据业务需要设置一个合理的超时时间(如30分钟)。

  • 全站广播:broadcast()方法会遍历所有在线用户并推送,适合系统公告类消息。

@Component
@Slf4j
public class SseEmitterManager {
    
    /**
     * 用户ID -> (连接token -> SseEmitter)
     * 一个用户可能有多个浏览器标签页,用token区分
     */
    private final Map<String, Map<String, SseEmitter>> userEmitters = new ConcurrentHashMap<>();
    
    /**
     * 建立SSE连接
     * @param userId 用户ID
     * @param token 连接标识(可用UUID)
     */
    public SseEmitter connect(String userId, String token) {
        // 超时时间设为0表示不超时,也可设置具体毫秒数
        SseEmitter emitter = new SseEmitter(0L);
        
        // 注册回调:连接关闭时清理
        emitter.onCompletion(() -> removeEmitter(userId, token));
        emitter.onTimeout(() -> removeEmitter(userId, token));
        emitter.onError(e -> removeEmitter(userId, token));
        
        // 存储连接
        userEmitters.computeIfAbsent(userId, k -> new ConcurrentHashMap<>())
                    .put(token, emitter);
        
        log.info("SSE connected: userId={}, token={}, total users={}", 
                 userId, token, userEmitters.size());
        
        return emitter;
    }
    
    /**
     * 向指定用户推送消息
     */
    public void sendToUser(String userId, String message) {
        Map<String, SseEmitter> emitters = userEmitters.get(userId);
        if (emitters == null || emitters.isEmpty()) {
            log.debug("User {} not online, message stored for later", userId);
            return;
        }
        
        // 向该用户所有连接推送
        emitters.forEach((token, emitter) -> {
            try {
                emitter.send(SseEmitter.event()
                    .name("message")
                    .data(message));
            } catch (IOException e) {
                log.error("Send to user {} failed, removing emitter", userId, e);
                removeEmitter(userId, token);
            }
        });
    }
    
    /**
     * 全站广播
     */
    public void broadcast(String message) {
        userEmitters.forEach((userId, emitters) -> {
            sendToUser(userId, message);
        });
        log.info("Broadcast message to {} users", userEmitters.size());
    }
    
    /**
     * 获取当前在线人数
     */
    public int getOnlineCount() {
        return userEmitters.size();
    }
    
    private void removeEmitter(String userId, String token) {
        Map<String, SseEmitter> emitters = userEmitters.get(userId);
        if (emitters != null) {
            emitters.remove(token);
            if (emitters.isEmpty()) {
                userEmitters.remove(userId);
            }
        }
    }
}

三、Redis消息订阅者

RedisMessageSubscriber实现了MessageListener接口,它会监听station:message频道。当收到Redis消息时:

  • 将JSON反序列化为StationMessage对象。

  • 根据type字段判断是私信还是广播。

  • 调用SseEmitterManager的方法推送给目标用户。

这里有个细节:每个后端实例都会收到自己发布的消息,所以推送前需要判断目标用户是否在当前实例上——这个判断逻辑其实隐含在sendToUser中:如果目标用户不在本实例的连接池里,就直接返回,不会报错。

@Component
@Slf4j
public class RedisMessageSubscriber implements MessageListener {
    
    @Autowired
    private SseEmitterManager sseEmitterManager;
    
    @Autowired
    private ObjectMapper objectMapper;
    
    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            String channel = new String(message.getChannel());
            String body = new String(message.getBody());
            
            // 解析消息
            StationMessage msg = objectMapper.readValue(body, StationMessage.class);
            
            log.info("Received Redis message: channel={}, type={}, target={}", 
                     channel, msg.getType(), msg.getTargetUserId());
            
            // 根据消息类型分发
            if ("user".equals(msg.getType())) {
                // 私信:发给指定用户
                sseEmitterManager.sendToUser(msg.getTargetUserId(), msg.getContent());
            } else if ("broadcast".equals(msg.getType())) {
                // 广播:发给所有在线用户
                sseEmitterManager.broadcast(msg.getContent());
            }
            
        } catch (Exception e) {
            log.error("Failed to process Redis message", e);
        }
    }
}

四、Redis配置

RedisMessageListenerContainer是Spring Data Redis提供的消息容器,它会自动管理订阅和监听线程。注意这里订阅的是station:message频道,你可以根据业务需要定义多个频道(比如station:notice、station:system)。

RedisTemplate的序列化配置也很重要:key使用StringRedisSerializer保证可读性,value使用Jackson2JsonRedisSerializer来支持对象存储。

@Configuration
public class RedisConfig {
    
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory connectionFactory,
            RedisMessageSubscriber subscriber) {
        
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 订阅站内信频道
        container.addMessageListener(subscriber, new ChannelTopic("station:message"));
        return container;
    }
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
        return template;
    }
}

五、消息实体

StationMessage是消息的载体,在Redis中传输的JSON格式就对应这个结构。字段设计上:

  • id:消息唯一标识,可用于去重和消息历史记录。

  • type:区分私信和广播,方便订阅者做路由。

  • targetUserId:私信时的接收者,广播时可为空。

  • timestamp:时间戳,客户端可以用来做消息排序或展示发送时间。

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class StationMessage {
    private String id;           // 消息ID
    private String type;         // user:私信, broadcast:广播
    private String targetUserId; // 私信时的目标用户ID
    private String content;      // 消息内容
    private String senderId;     // 发送者ID
    private Long timestamp;      // 时间戳
}

六、消息发送服务

MessageService封装了发送逻辑。核心动作很简单:构造StationMessage -> 序列化为JSON -> redisTemplate.convertAndSend()。发布之后,所有实例的订阅者都会收到消息,相当于Redis帮我们做了一个“广播式”的跨实例通信。

这种设计的优点是:发送方不需要知道消息最终由哪个实例处理,也不需要维护实例之间的网络连接,所有协调工作都交给了Redis。

@Service
@Slf4j
public class MessageService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private SseEmitterManager sseEmitterManager;
    
    @Autowired
    private ObjectMapper objectMapper;
    
    /**
     * 发送私信
     */
    public void sendPrivateMessage(String fromUserId, String toUserId, String content) {
        StationMessage msg = StationMessage.builder()
            .id(UUID.randomUUID().toString())
            .type("user")
            .targetUserId(toUserId)
            .senderId(fromUserId)
            .content(content)
            .timestamp(System.currentTimeMillis())
            .build();
        
        try {
            String json = objectMapper.writeValueAsString(msg);
            // 发布到Redis,所有实例都会收到
            redisTemplate.convertAndSend("station:message", json);
            log.info("Private message sent: {} -> {}", fromUserId, toUserId);
        } catch (JsonProcessingException e) {
            log.error("Failed to serialize message", e);
        }
    }
    
    /**
     * 全站广播
     */
    public void broadcast(String fromUserId, String content) {
        StationMessage msg = StationMessage.builder()
            .id(UUID.randomUUID().toString())
            .type("broadcast")
            .senderId(fromUserId)
            .content(content)
            .timestamp(System.currentTimeMillis())
            .build();
        
        try {
            String json = objectMapper.writeValueAsString(msg);
            redisTemplate.convertAndSend("station:message", json);
            log.info("Broadcast message sent by: {}", fromUserId);
        } catch (JsonProcessingException e) {
            log.error("Failed to serialize broadcast", e);
        }
    }
}

七、Controller层

Controller对外暴露了三个核心接口:

  • GET /api/sse/connect:客户端通过EventSource或fetch API调用这个接口建立SSE连接。每个连接会生成一个唯一token,用于后续的清理。

  • POST /api/sse/private:发送私信,需要提供发送者、接收者和消息内容。

  • POST /api/sse/broadcast:发送广播消息。

  • GET /api/sse/online-count:查询当前在线人数,可用于展示“在线状态”或做监控。

生产环境建议给这些接口加上认证鉴权(比如从token中解析userId),避免伪造身份。

@RestController
@RequestMapping("/api/sse")
@Slf4j
public class SseController {
    
    @Autowired
    private SseEmitterManager sseEmitterManager;
    
    @Autowired
    private MessageService messageService;
    
    /**
     * SSE连接端点
     */
    @GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter connect(@RequestParam String userId) {
        String token = UUID.randomUUID().toString();
        return sseEmitterManager.connect(userId, token);
    }
    
    /**
     * 发送私信
     */
    @PostMapping("/private")
    public ResponseEntity<?> sendPrivate(@RequestBody PrivateMessageRequest request) {
        messageService.sendPrivateMessage(
            request.getFromUserId(),
            request.getToUserId(),
            request.getContent()
        );
        return ResponseEntity.ok().build();
    }
    
    /**
     * 全站广播
     */
    @PostMapping("/broadcast")
    public ResponseEntity<?> broadcast(@RequestBody BroadcastRequest request) {
        messageService.broadcast(request.getFromUserId(), request.getContent());
        return ResponseEntity.ok().build();
    }
    
    /**
     * 获取在线人数
     */
    @GetMapping("/online-count")
    public ResponseEntity<Integer> getOnlineCount() {
        return ResponseEntity.ok(sseEmitterManager.getOnlineCount());
    }
}
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐