IM 系统开发中的 5 个常见问题及解决方案

在 IM 系统开发中,会遇到消息重复、死连接、消息丢失、性能瓶颈等问题。本文介绍 AQChat 中这些问题的解决方案。

一、问题一:消息重复处理问题

问题描述

场景

  1. 网络重传:客户端发送消息后未收到 ACK,自动重发
  2. 用户重复点击:用户快速点击发送按钮
  3. 并发请求:同一消息被并发处理

后果

  • 同一条消息被处理多次
  • 数据库重复插入
  • 用户看到重复消息

解决方案:Redis 去重机制

实现原理

@Component
public class AQMessageHolder implements IMessageHolder {
    
    @Resource
    private RedisCacheHelper redisCacheHelper;
    
    /**
     * 检查消息是否已存在
     */
    @Override
    public boolean isExistMessageId(String roomId, String msgId) {
        // 从Redis Hash中获取房间的消息ID列表
        List<String> messageIds = redisCacheHelper.getCacheMapValue(
            AQRedisKeyPrefix.AQ_ROOM_PREFIX + roomId,
            AQRedisKeyPrefix.AQ_ROOM_MESSAGE_PREFIX
        );
        
        if (messageIds == null) {
            return false;  // 房间没有消息,肯定不存在
        }
        
        // 检查消息ID是否在列表中
        return messageIds.contains(msgId);
    }
    
    /**
     * 保存消息ID
     */
    @Override
    public void putMessageId(String roomId, String msgId) {
        List<String> messageIds = redisCacheHelper.getCacheMapValue(
            AQRedisKeyPrefix.AQ_ROOM_PREFIX + roomId,
            AQRedisKeyPrefix.AQ_ROOM_MESSAGE_PREFIX
        );
        
        if (messageIds == null) {
            messageIds = new ArrayList<>();
        }
        
        messageIds.add(msgId);
        // 保存到Redis
        redisCacheHelper.setCacheMapValue(
            AQRedisKeyPrefix.AQ_ROOM_PREFIX + roomId,
            AQRedisKeyPrefix.AQ_ROOM_MESSAGE_PREFIX,
            messageIds
        );
    }
}

使用流程

// SendMsgCmdHandler.java
public void handle(ChannelHandlerContext ctx, AQChatMsgProtocol.SendMsgCmd cmd) {
    String msgId = cmd.getMsgId();
    String roomId = cmd.getRoomId();
    
    // 1. 检查消息是否已处理(去重)
    if (messageHolder.isExistMessageId(roomId, msgId)) {
        LOGGER.info("当前消息已发送: msgId = {}", msgId);
        // 直接返回ACK,不重复处理
        ctx.writeAndFlush(buildSendMsgAck(roomId, userId, msgId));
        return;
    }
    
    // 2. 处理消息
    // ... 消息广播、持久化等 ...
    
    // 3. 保存消息ID(标记为已处理)
    messageHolder.putMessageId(roomId, msgId);
}

Redis 存储结构

Key: AQChat:room:roomData:{roomId}
Field: message -> List<String> (消息ID列表)

优势

  • 性能:Redis 查询 O(1)
  • 可靠性:Redis 持久化,服务重启不丢失
  • 简单:使用 List.contains() 判断

优化建议:

当前实现使用 List,可以优化为 Set:

// 优化方案:使用Redis Set
public boolean isExistMessageId(String roomId, String msgId) {
    // 使用SISMEMBER,O(1)时间复杂度
    return redisCacheHelper.sIsMember(
        AQRedisKeyPrefix.AQ_ROOM_MESSAGE_PREFIX + roomId,
        msgId
    );
}

二、问题二:死连接导致的内存泄漏

问题描述

场景

  1. 网络异常:客户端突然断网,服务端未感知
  2. 客户端崩溃:应用异常退出,连接未正常关闭
  3. 防火墙断开:NAT 设备超时断开连接

后果

  • Channel 对象无法释放
  • ConcurrentHashMap 中保留无效引用
  • 内存持续增长,最终 OOM

解决方案:心跳检测 + 连接清理

  1. 心跳超时检测
// AQChatNettyStarter.java
ch.pipeline().addLast(new IdleStateHandler(0, 0, 10, TimeUnit.MINUTES));
// 参数说明:
// - readerIdleTime: 0(不检测读空闲)
// - writerIdleTime: 0(不检测写空闲)
// - allIdleTime: 10分钟(检测读写都空闲)
@Component
public class HearBeatHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent event) {
            if (event.state() == IdleState.ALL_IDLE) {
                // 10分钟无读写,判定为死连接
                String userId = (String) ctx.channel().attr(
                    AttributeKey.valueOf(AQBusinessConstant.USER_ID)
                ).get();
                
                if (userId != null) {
                    LOGGER.info("用户{}心跳超时10分钟,发送离线消息并断开连接", userId);
                    
                    // 1. 发送离线消息
                    AQChatMsgProtocol.OfflineMsg offlineMsg = buildOfflineMsg(userId);
                    ctx.writeAndFlush(offlineMsg);
                    
                    // 2. 清理资源
                    userHolder.offline(userId);
                    channelHolder.offline(userId, (NioSocketChannel) ctx.channel());
                }
                
                // 3. 关闭连接
                ctx.channel().close();
            }
        }
    }
}
  1. 连接断开检测
// AQChatCommandHandler.java
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    super.channelInactive(ctx);
    
    String userId = (String) ctx.channel().attr(
        AttributeKey.valueOf(AQBusinessConstant.USER_ID)
    ).get();
    
    if (userId != null) {
        LOGGER.info("用户{}断开连接", userId);
        
        // 1. 设置用户离线
        userHolder.offline(userId);
        
        // 2. 移除用户连接
        globalChannelHolder.offline(userId, (NioSocketChannel) ctx.channel());
        
        // 3. 发送离线消息
        mqSendingAgent.sendOfflineMessage(userId);
    }
    
    ctx.close();
}
  1. 资源清理
// GlobalChannelHolder.java
public void offline(String userId, NioSocketChannel nioSocketChannel) {
    // 1. 从CHANNELS Map中移除
    remove(userId);
    
    // 2. 从ChannelGroup中移除
    messageBroadcaster.removeChannel(userId, nioSocketChannel);
}

清理效果:

场景 检测方式 清理时间
正常断开 channelInactive 立即
网络异常 心跳超时 10分钟
客户端崩溃 心跳超时 10分钟

三、问题三:消息丢失问题

问题描述

场景

  1. 服务崩溃:消息处理中服务重启
  2. 网络异常:消息发送失败
  3. 数据库异常:持久化失败

后果

  • 用户发送的消息丢失
  • 聊天记录不完整
  • 数据不一致

解决方案:RocketMQ 持久化 + 重试机制

  1. RocketMQ 消息持久化
// MqSendingAgent.java
public void storeMessages(MessageDto messageDto) {
    Message message = new Message();
    message.setTopic(AQChatMQConstant.MQTopic.STORE_MESSAGE_TOPIC);
    message.setBody(JSONObject.toJSONString(messageDto).getBytes());
    
    // 发送到RocketMQ,消息会持久化到磁盘
    mqProducer.send(message);
}

RocketMQ 保障

  • 消息持久化到磁盘
  • 支持消息重试
  • 支持事务消息
  1. 消息重试机制
// RocketMQProducerConfiguration.java
@Bean
public MQProducer mqProducer() {
    DefaultMQProducer producer = new DefaultMQProducer();
    
    // 配置重试次数(默认3次)
    producer.setRetryTimesWhenSendFailed(rocketMQConfig.getProducer().getRetryTimes());
    producer.setRetryTimesWhenSendAsyncFailed(rocketMQConfig.getProducer().getRetryTimes());
    
    // 如果Broker存储失败,尝试另一个Broker
    producer.setRetryAnotherBrokerWhenNotStoreOK(true);
    
    producer.start();
    return producer;
}

配置文件

rocketmq:
  producer:
    retryTimes: 3      # 重试3次
    sendTimeOut: 3000  # 超时时间3
  1. 消费者异常处理
// StoreMessageReceiver.java
defaultMQPushConsumer.setMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
    for (MessageExt messageExt : messageExtList) {
        try {
            MessageDto messageDto = JSONObject.parseObject(msgStr, MessageDto.class);
            
            // 持久化消息
            messageService.saveMessage(messageDto);
            
        } catch (Exception e) {
            LOGGER.error("存储消息失败, 重试次数={}", 
                messageExt.getReconsumeTimes(), e);
            
            // 如果重试次数超过3次,记录日志
            if (messageExt.getReconsumeTimes() >= 3) {
                LOGGER.error("消息重试3次后仍然失败,需要人工处理");
                // 可以保存到失败消息表,用于后续补偿
            }
            
            // 返回重试,RocketMQ会自动重试
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

保障机制

保障点 实现方式 说明
消息持久化 RocketMQ 磁盘存储 服务重启不丢失
发送重试 自动重试 3 次 网络异常自动恢复
消费重试 RocketMQ 默认重试 16 次 处理失败自动重试
最终保障 数据库持久化 双重保障

四、问题四:高并发下的性能问题

问题描述

场景

  • 同步处理:消息广播、持久化同步执行
  • 数据库压力:频繁查询用户信息、房间信息
  • 线程阻塞:AI 处理阻塞主线程

后果

  • 响应时间慢:50-80ms
  • 吞吐量低:每秒只能处理 20 条消息
  • CPU 使用率高

解决方案:异步处理 + Redis 缓存

1. RocketMQ 异步处理

// SendMsgCmdHandler.java
public void handle(ChannelHandlerContext ctx, AQChatMsgProtocol.SendMsgCmd cmd) {
    // ... 消息去重、验证等 ...
    
    // 构建消息对象
    MessageDto messageDto = buildMessageDto(cmd);
    
    // 异步发送到RocketMQ(立即返回,不阻塞)
    mqSendingAgent.sendMessageToRoom(messageDto);  // 异步广播
    mqSendingAgent.storeMessages(messageDto);      // 异步持久化
    
    // 立即返回响应(< 5ms)
    ctx.writeAndFlush(buildSendMsgAck());
}

异步线程池配置

// RocketMQProducerConfiguration.java
@Bean
public MQProducer mqProducer() {
    // 配置异步发送线程池
    ThreadPoolExecutor asyncThreadPoolExecutor = new ThreadPoolExecutor(
        100, 150, 3, TimeUnit.MINUTES,
        new ArrayBlockingQueue<>(1000),
        r -> {
            Thread thread = new Thread(r);
            thread.setName("rocketmq-produce-" + ThreadLocalRandom.current().nextInt(1000));
            return thread;
        }
    );
    
    DefaultMQProducer producer = new DefaultMQProducer();
    // 设置异步发送线程池
    producer.setAsyncSenderExecutor(asyncThreadPoolExecutor);
    producer.start();
    return producer;
}

2. Redis 缓存

// AQUserHolder.java
@Override
public UserGlobalInfoDto getUserInfo(String userId) {
    // 从Redis读取,< 1ms
    return redisCacheHelper.getCacheObject(
        AQRedisKeyPrefix.AQ_USER_INFO_PREFIX + userId,
        UserGlobalInfoDto.class
    );
}

// AQRoomHolder.java
@Override
public RoomInfoDto getRoomInfoById(String roomId) {
    // 从Redis Hash读取,< 1ms
    return redisCacheHelper.getCacheMapValue(
        AQRedisKeyPrefix.AQ_ROOM_PREFIX + roomId,
        AQRedisKeyPrefix.AQ_ROOM_INFO_PREFIX
    );
}

性能对比:

操作 同步方式 异步方式 提升
消息广播 10-20ms < 1ms 10-20倍
消息持久化 20-30ms < 1ms 20-30倍
用户信息查询 5-10ms < 1ms 5-10倍
总响应时间 50-80ms < 5ms 10-16倍

五、问题五:AI 响应阻塞问题

问题描述

场景

  • AI 处理耗时:每次 AI 调用需要 5-10 秒
  • 同步处理:AI 处理阻塞 MQ 消费线程
  • 并发限制:多个 AI 请求串行处理

后果

  • MQ 消费线程被阻塞
  • 其他消息无法处理
  • 系统吞吐量下降

解决方案:独立线程池

1. 独立线程池配置

@Component
public class ThreadPoolUtil {
    private static final int CORE_POOL_SIZE = 5;   // 核心线程数
    private static final int MAX_POOL_SIZE = 10;   // 最大线程数
    private static final long KEEP_ALIVE_TIME = 1L;
    private static final BlockingQueue<Runnable> WORK_QUEUE = 
        new LinkedBlockingQueue<>(100);  // 任务队列
    
    private final ThreadPoolExecutor threadPoolExecutor;
    
    public ThreadPoolUtil() {
        this.threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAX_POOL_SIZE,
            KEEP_ALIVE_TIME,
            TimeUnit.MINUTES,
            WORK_QUEUE
        );
    }
    
    public void submitTask(Runnable task) {
        this.threadPoolExecutor.submit(task);
    }
}

2. AI 处理异步化

// AIHelperReceiver.java
defaultMQPushConsumer.setMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
    for (MessageExt messageExt : messageExtList) {
        String msgStr = new String(messageExt.getBody());
        
        // 提交到独立线程池,不阻塞MQ消费线程
        threadPoolUtil.submitTask(() -> {
            try {
                // AI处理(可能需要5-10秒)
                aiService.streamCallWithMessage(
                    messageDto.getMessageContent(),
                    aiResult -> {
                        // 流式响应处理
                        globalChannelHolder.sendBroadcastAIMessage(aiMessageDto, aiId);
                    }
                );
            } catch (Exception e) {
                LOGGER.error("AI助手处理消息失败", e);
                // 错误处理
            } finally {
                // 持久化AI回复消息
                messageService.saveMessage(storeMessage);
            }
        });
    }
    
    // MQ消费线程立即返回,不等待AI处理完成
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

优势:

特性 同步处理 独立线程池 说明
MQ消费线程 阻塞 不阻塞 提高吞吐量
AI处理并发 串行 并行(最多10个) 提高处理能力
错误隔离 影响其他消息 不影响 提高可靠性

性能提升

指标 同步处理 独立线程池 提升
MQ消费速度 慢(被阻塞) 快(不阻塞) 显著提升
AI处理并发 1个 10个 10倍
系统吞吐量 显著提升

六、总结

问题与解决方案对比:

问题 解决方案 关键技术
消息重复处理 Redis 去重机制 Redis List/Set
死连接内存泄漏 心跳检测 + 连接清理 IdleStateHandler + channelInactive
消息丢失 RocketMQ 持久化 + 重试 消息持久化 + 自动重试
高并发性能 异步处理 + Redis 缓存 RocketMQ + Redis
AI 响应阻塞 独立线程池 ThreadPoolExecutor

经验总结:

幂等性设计:消息去重保证幂等性
资源管理:及时清理无效连接,避免内存泄漏
可靠性保障:多重保障机制,确保消息不丢失
性能优化:异步处理 + 缓存,提升系统性能
错误隔离:独立线程池,避免相互影响

通过以上解决方案,AQChat 解决了 IM 系统开发中的常见问题,提升了系统的可靠性、性能和用户体验。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐