IM 系统开发中的 5 个常见问题及解决方案
问题解决方案关键技术消息重复处理Redis 去重机制死连接内存泄漏心跳检测 + 连接清理消息丢失RocketMQ 持久化 + 重试消息持久化 + 自动重试高并发性能异步处理 + Redis 缓存AI 响应阻塞独立线程池幂等性设计:消息去重保证幂等性资源管理:及时清理无效连接,避免内存泄漏可靠性保障:多重保障机制,确保消息不丢失性能优化:异步处理 + 缓存,提升系统性能错误隔离:独立线程池,避免相互
·
IM 系统开发中的 5 个常见问题及解决方案
在 IM 系统开发中,会遇到消息重复、死连接、消息丢失、性能瓶颈等问题。本文介绍 AQChat 中这些问题的解决方案。
一、问题一:消息重复处理问题
问题描述
场景:
- 网络重传:客户端发送消息后未收到 ACK,自动重发
- 用户重复点击:用户快速点击发送按钮
- 并发请求:同一消息被并发处理
后果:
- 同一条消息被处理多次
- 数据库重复插入
- 用户看到重复消息
解决方案:Redis 去重机制
实现原理:
@Component
public class AQMessageHolder implements IMessageHolder {
@Resource
private RedisCacheHelper redisCacheHelper;
/**
* 检查消息是否已存在
*/
@Override
public boolean isExistMessageId(String roomId, String msgId) {
// 从Redis Hash中获取房间的消息ID列表
List<String> messageIds = redisCacheHelper.getCacheMapValue(
AQRedisKeyPrefix.AQ_ROOM_PREFIX + roomId,
AQRedisKeyPrefix.AQ_ROOM_MESSAGE_PREFIX
);
if (messageIds == null) {
return false; // 房间没有消息,肯定不存在
}
// 检查消息ID是否在列表中
return messageIds.contains(msgId);
}
/**
* 保存消息ID
*/
@Override
public void putMessageId(String roomId, String msgId) {
List<String> messageIds = redisCacheHelper.getCacheMapValue(
AQRedisKeyPrefix.AQ_ROOM_PREFIX + roomId,
AQRedisKeyPrefix.AQ_ROOM_MESSAGE_PREFIX
);
if (messageIds == null) {
messageIds = new ArrayList<>();
}
messageIds.add(msgId);
// 保存到Redis
redisCacheHelper.setCacheMapValue(
AQRedisKeyPrefix.AQ_ROOM_PREFIX + roomId,
AQRedisKeyPrefix.AQ_ROOM_MESSAGE_PREFIX,
messageIds
);
}
}
使用流程:
// SendMsgCmdHandler.java
public void handle(ChannelHandlerContext ctx, AQChatMsgProtocol.SendMsgCmd cmd) {
String msgId = cmd.getMsgId();
String roomId = cmd.getRoomId();
// 1. 检查消息是否已处理(去重)
if (messageHolder.isExistMessageId(roomId, msgId)) {
LOGGER.info("当前消息已发送: msgId = {}", msgId);
// 直接返回ACK,不重复处理
ctx.writeAndFlush(buildSendMsgAck(roomId, userId, msgId));
return;
}
// 2. 处理消息
// ... 消息广播、持久化等 ...
// 3. 保存消息ID(标记为已处理)
messageHolder.putMessageId(roomId, msgId);
}
Redis 存储结构:
Key: AQChat:room:roomData:{roomId}
Field: message -> List<String> (消息ID列表)
优势:
- 性能:Redis 查询 O(1)
- 可靠性:Redis 持久化,服务重启不丢失
- 简单:使用 List.contains() 判断
优化建议:
当前实现使用 List,可以优化为 Set:
// 优化方案:使用Redis Set
public boolean isExistMessageId(String roomId, String msgId) {
// 使用SISMEMBER,O(1)时间复杂度
return redisCacheHelper.sIsMember(
AQRedisKeyPrefix.AQ_ROOM_MESSAGE_PREFIX + roomId,
msgId
);
}
二、问题二:死连接导致的内存泄漏
问题描述
场景:
- 网络异常:客户端突然断网,服务端未感知
- 客户端崩溃:应用异常退出,连接未正常关闭
- 防火墙断开:NAT 设备超时断开连接
后果:
- Channel 对象无法释放
ConcurrentHashMap中保留无效引用- 内存持续增长,最终 OOM
解决方案:心跳检测 + 连接清理
- 心跳超时检测
// AQChatNettyStarter.java
ch.pipeline().addLast(new IdleStateHandler(0, 0, 10, TimeUnit.MINUTES));
// 参数说明:
// - readerIdleTime: 0(不检测读空闲)
// - writerIdleTime: 0(不检测写空闲)
// - allIdleTime: 10分钟(检测读写都空闲)
@Component
public class HearBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent event) {
if (event.state() == IdleState.ALL_IDLE) {
// 10分钟无读写,判定为死连接
String userId = (String) ctx.channel().attr(
AttributeKey.valueOf(AQBusinessConstant.USER_ID)
).get();
if (userId != null) {
LOGGER.info("用户{}心跳超时10分钟,发送离线消息并断开连接", userId);
// 1. 发送离线消息
AQChatMsgProtocol.OfflineMsg offlineMsg = buildOfflineMsg(userId);
ctx.writeAndFlush(offlineMsg);
// 2. 清理资源
userHolder.offline(userId);
channelHolder.offline(userId, (NioSocketChannel) ctx.channel());
}
// 3. 关闭连接
ctx.channel().close();
}
}
}
}
- 连接断开检测
// AQChatCommandHandler.java
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
String userId = (String) ctx.channel().attr(
AttributeKey.valueOf(AQBusinessConstant.USER_ID)
).get();
if (userId != null) {
LOGGER.info("用户{}断开连接", userId);
// 1. 设置用户离线
userHolder.offline(userId);
// 2. 移除用户连接
globalChannelHolder.offline(userId, (NioSocketChannel) ctx.channel());
// 3. 发送离线消息
mqSendingAgent.sendOfflineMessage(userId);
}
ctx.close();
}
- 资源清理
// GlobalChannelHolder.java
public void offline(String userId, NioSocketChannel nioSocketChannel) {
// 1. 从CHANNELS Map中移除
remove(userId);
// 2. 从ChannelGroup中移除
messageBroadcaster.removeChannel(userId, nioSocketChannel);
}
清理效果:
| 场景 | 检测方式 | 清理时间 |
|---|---|---|
| 正常断开 | channelInactive | 立即 |
| 网络异常 | 心跳超时 | 10分钟 |
| 客户端崩溃 | 心跳超时 | 10分钟 |
三、问题三:消息丢失问题
问题描述
场景:
- 服务崩溃:消息处理中服务重启
- 网络异常:消息发送失败
- 数据库异常:持久化失败
后果:
- 用户发送的消息丢失
- 聊天记录不完整
- 数据不一致
解决方案:RocketMQ 持久化 + 重试机制
- RocketMQ 消息持久化
// MqSendingAgent.java
public void storeMessages(MessageDto messageDto) {
Message message = new Message();
message.setTopic(AQChatMQConstant.MQTopic.STORE_MESSAGE_TOPIC);
message.setBody(JSONObject.toJSONString(messageDto).getBytes());
// 发送到RocketMQ,消息会持久化到磁盘
mqProducer.send(message);
}
RocketMQ 保障:
- 消息持久化到磁盘
- 支持消息重试
- 支持事务消息
- 消息重试机制
// RocketMQProducerConfiguration.java
@Bean
public MQProducer mqProducer() {
DefaultMQProducer producer = new DefaultMQProducer();
// 配置重试次数(默认3次)
producer.setRetryTimesWhenSendFailed(rocketMQConfig.getProducer().getRetryTimes());
producer.setRetryTimesWhenSendAsyncFailed(rocketMQConfig.getProducer().getRetryTimes());
// 如果Broker存储失败,尝试另一个Broker
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
producer.start();
return producer;
}
配置文件:
rocketmq:
producer:
retryTimes: 3 # 重试3次
sendTimeOut: 3000 # 超时时间3秒
- 消费者异常处理
// StoreMessageReceiver.java
defaultMQPushConsumer.setMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
for (MessageExt messageExt : messageExtList) {
try {
MessageDto messageDto = JSONObject.parseObject(msgStr, MessageDto.class);
// 持久化消息
messageService.saveMessage(messageDto);
} catch (Exception e) {
LOGGER.error("存储消息失败, 重试次数={}",
messageExt.getReconsumeTimes(), e);
// 如果重试次数超过3次,记录日志
if (messageExt.getReconsumeTimes() >= 3) {
LOGGER.error("消息重试3次后仍然失败,需要人工处理");
// 可以保存到失败消息表,用于后续补偿
}
// 返回重试,RocketMQ会自动重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
保障机制
| 保障点 | 实现方式 | 说明 |
|---|---|---|
| 消息持久化 | RocketMQ 磁盘存储 | 服务重启不丢失 |
| 发送重试 | 自动重试 3 次 | 网络异常自动恢复 |
| 消费重试 | RocketMQ 默认重试 16 次 | 处理失败自动重试 |
| 最终保障 | 数据库持久化 | 双重保障 |
四、问题四:高并发下的性能问题
问题描述
场景:
- 同步处理:消息广播、持久化同步执行
- 数据库压力:频繁查询用户信息、房间信息
- 线程阻塞:AI 处理阻塞主线程
后果:
- 响应时间慢:50-80ms
- 吞吐量低:每秒只能处理 20 条消息
- CPU 使用率高
解决方案:异步处理 + Redis 缓存
1. RocketMQ 异步处理
// SendMsgCmdHandler.java
public void handle(ChannelHandlerContext ctx, AQChatMsgProtocol.SendMsgCmd cmd) {
// ... 消息去重、验证等 ...
// 构建消息对象
MessageDto messageDto = buildMessageDto(cmd);
// 异步发送到RocketMQ(立即返回,不阻塞)
mqSendingAgent.sendMessageToRoom(messageDto); // 异步广播
mqSendingAgent.storeMessages(messageDto); // 异步持久化
// 立即返回响应(< 5ms)
ctx.writeAndFlush(buildSendMsgAck());
}
异步线程池配置:
// RocketMQProducerConfiguration.java
@Bean
public MQProducer mqProducer() {
// 配置异步发送线程池
ThreadPoolExecutor asyncThreadPoolExecutor = new ThreadPoolExecutor(
100, 150, 3, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(1000),
r -> {
Thread thread = new Thread(r);
thread.setName("rocketmq-produce-" + ThreadLocalRandom.current().nextInt(1000));
return thread;
}
);
DefaultMQProducer producer = new DefaultMQProducer();
// 设置异步发送线程池
producer.setAsyncSenderExecutor(asyncThreadPoolExecutor);
producer.start();
return producer;
}
2. Redis 缓存
// AQUserHolder.java
@Override
public UserGlobalInfoDto getUserInfo(String userId) {
// 从Redis读取,< 1ms
return redisCacheHelper.getCacheObject(
AQRedisKeyPrefix.AQ_USER_INFO_PREFIX + userId,
UserGlobalInfoDto.class
);
}
// AQRoomHolder.java
@Override
public RoomInfoDto getRoomInfoById(String roomId) {
// 从Redis Hash读取,< 1ms
return redisCacheHelper.getCacheMapValue(
AQRedisKeyPrefix.AQ_ROOM_PREFIX + roomId,
AQRedisKeyPrefix.AQ_ROOM_INFO_PREFIX
);
}
性能对比:
| 操作 | 同步方式 | 异步方式 | 提升 |
|---|---|---|---|
| 消息广播 | 10-20ms | < 1ms | 10-20倍 |
| 消息持久化 | 20-30ms | < 1ms | 20-30倍 |
| 用户信息查询 | 5-10ms | < 1ms | 5-10倍 |
| 总响应时间 | 50-80ms | < 5ms | 10-16倍 |
五、问题五:AI 响应阻塞问题
问题描述
场景:
- AI 处理耗时:每次 AI 调用需要 5-10 秒
- 同步处理:AI 处理阻塞 MQ 消费线程
- 并发限制:多个 AI 请求串行处理
后果:
- MQ 消费线程被阻塞
- 其他消息无法处理
- 系统吞吐量下降
解决方案:独立线程池
1. 独立线程池配置
@Component
public class ThreadPoolUtil {
private static final int CORE_POOL_SIZE = 5; // 核心线程数
private static final int MAX_POOL_SIZE = 10; // 最大线程数
private static final long KEEP_ALIVE_TIME = 1L;
private static final BlockingQueue<Runnable> WORK_QUEUE =
new LinkedBlockingQueue<>(100); // 任务队列
private final ThreadPoolExecutor threadPoolExecutor;
public ThreadPoolUtil() {
this.threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.MINUTES,
WORK_QUEUE
);
}
public void submitTask(Runnable task) {
this.threadPoolExecutor.submit(task);
}
}
2. AI 处理异步化
// AIHelperReceiver.java
defaultMQPushConsumer.setMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
for (MessageExt messageExt : messageExtList) {
String msgStr = new String(messageExt.getBody());
// 提交到独立线程池,不阻塞MQ消费线程
threadPoolUtil.submitTask(() -> {
try {
// AI处理(可能需要5-10秒)
aiService.streamCallWithMessage(
messageDto.getMessageContent(),
aiResult -> {
// 流式响应处理
globalChannelHolder.sendBroadcastAIMessage(aiMessageDto, aiId);
}
);
} catch (Exception e) {
LOGGER.error("AI助手处理消息失败", e);
// 错误处理
} finally {
// 持久化AI回复消息
messageService.saveMessage(storeMessage);
}
});
}
// MQ消费线程立即返回,不等待AI处理完成
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
优势:
| 特性 | 同步处理 | 独立线程池 | 说明 |
|---|---|---|---|
| MQ消费线程 | 阻塞 | 不阻塞 | 提高吞吐量 |
| AI处理并发 | 串行 | 并行(最多10个) | 提高处理能力 |
| 错误隔离 | 影响其他消息 | 不影响 | 提高可靠性 |
性能提升
| 指标 | 同步处理 | 独立线程池 | 提升 |
|---|---|---|---|
| MQ消费速度 | 慢(被阻塞) | 快(不阻塞) | 显著提升 |
| AI处理并发 | 1个 | 10个 | 10倍 |
| 系统吞吐量 | 低 | 高 | 显著提升 |
六、总结
问题与解决方案对比:
| 问题 | 解决方案 | 关键技术 |
|---|---|---|
| 消息重复处理 | Redis 去重机制 | Redis List/Set |
| 死连接内存泄漏 | 心跳检测 + 连接清理 | IdleStateHandler + channelInactive |
| 消息丢失 | RocketMQ 持久化 + 重试 | 消息持久化 + 自动重试 |
| 高并发性能 | 异步处理 + Redis 缓存 | RocketMQ + Redis |
| AI 响应阻塞 | 独立线程池 | ThreadPoolExecutor |
经验总结:
幂等性设计:消息去重保证幂等性
资源管理:及时清理无效连接,避免内存泄漏
可靠性保障:多重保障机制,确保消息不丢失
性能优化:异步处理 + 缓存,提升系统性能
错误隔离:独立线程池,避免相互影响
通过以上解决方案,AQChat 解决了 IM 系统开发中的常见问题,提升了系统的可靠性、性能和用户体验。
更多推荐


所有评论(0)