RocketMQ延迟消息实战以及实现原理深度剖析
RocketMQ延迟消息实现原理深度剖析
本文基于RocketMQ 4.6.0版本源码,深入分析延迟消息的实现机制及其内部工作原理
延迟消息使用场景
延迟消息在分布式系统中有着广泛的应用:
-
订单超时未支付自动取消
-
定时任务调度
-
消息重试机制
-
预约提醒服务
-
分布式事务超时控制
生产者发送延迟消息
java
public class DelayProducer { public static void main(String[] args) throws Exception { // 创建生产者实例 DefaultMQProducer producer = new DefaultMQProducer("delay-producer-group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); String[] tags = {"A", "B", "C"}; ThreadLocalRandom random = ThreadLocalRandom.current(); // 发送5条延迟消息 for (int i = 0; i < 5; i++) { String content = "延迟消息示例 - 序号: " + i; Message msg = new Message("delay-topic", tags[i % tags.length], content.getBytes()); // 关键:设置延迟级别(1-5) msg.setDelayTimeLevel(random.nextInt(5) + 1); SendResult result = producer.send(msg); System.out.println("发送状态: " + result.getSendStatus() + " 消息ID: " + result.getMsgId()); } producer.shutdown(); } }
核心代码解析:
-
setDelayTimeLevel()
方法设置消息的延迟级别 -
RocketMQ支持18个延迟级别(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)
Broker端处理机制
CommitLog存储处理
java
// org.apache.rocketmq.store.CommitLog#putMessage public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // 延迟消息特殊处理 if (msg.getDelayTimeLevel() > 0) { // 校验延迟级别是否有效 if (msg.getDelayTimeLevel() > maxDelayLevel) { msg.setDelayTimeLevel(maxDelayLevel); } // 修改为延迟消息专用Topic topic = ScheduleMessageService.SCHEDULE_TOPIC; // 计算队列ID:delayLevel - 1 queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // 备份原始Topic和QueueID MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); // 更新消息属性 msg.setTopic(topic); msg.setQueueId(queueId); } // 后续存储逻辑... }
关键处理步骤:
-
校验延迟级别:确保不超过最大级别(默认18)
-
Topic重定向:将消息Topic改为
SCHEDULE_TOPIC_XXXX
-
队列计算:
queueId = delayLevel - 1
-
备份原始信息:存储原始Topic和QueueID到消息属性
-
更新消息属性:应用新的Topic和QueueID
延迟服务初始化流程
延迟级别配置解析
java
// org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel public boolean parseDelayLevel() { // 时间单位映射表 Map<String, Long> unitMap = new HashMap<>(); unitMap.put("s", 1000L); unitMap.put("m", 60 * 1000L); unitMap.put("h", 60 * 60 * 1000L); unitMap.put("d", 24 * 60 * 60 * 1000L); // 获取配置的延迟级别字符串 String levelConfig = this.messageStoreConfig.getMessageDelayLevel(); try { String[] levels = levelConfig.split(" "); for (int i = 0; i < levels.length; i++) { String level = levels[i]; String unitChar = level.substring(level.length() - 1); Long unitValue = unitMap.get(unitChar); int levelNum = i + 1; long num = Long.parseLong(level.substring(0, level.length() - 1)); long delayMillis = unitValue * num; this.delayLevelTable.put(levelNum, delayMillis); } return true; } catch (Exception e) { log.error("解析延迟级别异常", e); return false; } }
配置格式:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
调度服务启动
java
// org.apache.rocketmq.store.schedule.ScheduleMessageService#start public void start() { if (started.compareAndSet(false, true)) { // 创建定时器线程 this.timer = new Timer("ScheduleMessageTimerThread", true); // 为每个延迟级别创建定时任务 for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); this.timer.schedule( new DeliverDelayedMessageTimerTask(level, 0), FIRST_DELAY_TIME); } } }
消息投递机制
定时任务执行流程
java
// org.apache.rocketmq.store.schedule.DeliverDelayedMessageTimerTask#executeOnTimeup public void executeOnTimeup() { // 获取对应延迟级别的消费队列 ConsumeQueue cq = defaultMessageStore.findConsumeQueue( SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); // 遍历队列中的消息 SelectMappedBufferResult buffer = cq.getIndexBuffer(this.offset); for (int i = 0; i < buffer.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = buffer.getByteBuffer().getLong(); int sizePy = buffer.getByteBuffer().getInt(); long tagsCode = buffer.getByteBuffer().getLong(); // 计算消息应投递的时间 long deliverTimestamp = computeDeliverTimestamp(delayLevel, tagsCode); long now = System.currentTimeMillis(); long countdown = deliverTimestamp - now; if (countdown <= 0) { // 消息到期,执行投递 MessageExt msgExt = defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); if (msgExt != null) { // 重建原始消息 MessageExtBrokerInner msgInner = rebuildOriginalMessage(msgExt); // 存储到CommitLog PutMessageResult result = writeMessageStore.putMessage(msgInner); } } else { // 重新调度任务 ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(delayLevel, nextOffset), countdown); return; } } }
消息重建逻辑
java
private MessageExtBrokerInner rebuildOriginalMessage(MessageExt msgExt) { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); // 恢复原始Topic和QueueID msgInner.setTopic(msgExt.getProperty(REAL_TOPIC_PROPERTY)); msgInner.setQueueId(Integer.parseInt( msgExt.getProperty(REAL_QUEUE_ID_PROPERTY))); // 复制消息内容 msgInner.setBody(msgExt.getBody()); msgInner.setFlag(msgExt.getFlag()); // 清除延迟标记 MessageAccessor.clearProperty(msgInner, DELAY_TIME_LEVEL_PROPERTY); return msgInner; }
核心设计原理
消息流转流程
-
生产者发送:设置
delayTimeLevel
属性 -
Broker接收:修改Topic为
SCHEDULE_TOPIC
,存储到特定队列 -
定时任务扫描:定期检查到期消息
-
消息重建:恢复原始Topic/QueueID
-
重新投递:作为普通消息存入CommitLog
-
消费者消费:接收延迟消息
关键技术点
-
Topic重定向:所有延迟消息统一存储,隔离普通消息
-
时间轮调度:每个延迟级别独立任务,高效管理
-
消息属性复用:通过属性保存原始信息,最小化存储开销
-
精准时间控制:毫秒级精度投递
最佳实践建议
-
合理使用延迟级别:
java
// 明确指定延迟级别(推荐) msg.setDelayTimeLevel(3); // 10秒延迟 // 避免随机级别(不推荐) // msg.setDelayTimeLevel(random.nextInt(18) + 1);
-
监控延迟队列:
bash
# 查看延迟消息堆积情况 ./mqadmin consumerProgress -n localhost:9876 -t SCHEDULE_TOPIC_XXXX
-
性能优化建议:
-
避免使用过高延迟级别(>10)
-
单个Topic延迟消息量控制在1万/秒以内
-
监控
DeliverDelayedMessageTimerTask
执行时间
-
常见问题排查
问题现象 | 可能原因 | 解决方案 |
---|---|---|
消息未按时投递 | 定时任务阻塞 | 检查Broker CPU负载 |
延迟级别不生效 | Broker配置错误 | 确认messageDelayLevel 配置 |
消息重复消费 | 投递过程异常 | 检查消息重建逻辑 |
总结
RocketMQ通过巧妙的Topic重定向和时间轮调度机制,实现了高效可靠的延迟消息功能。核心设计在于:
-
将延迟消息转换为特殊Topic存储
-
通过定时任务扫描到期消息
-
重建原始消息重新投递
-
消费者无感知接收延迟消息
这种设计既保证了消息可靠性,又实现了高性能的延迟投递,是分布式系统中实现定时任务的理想方案。
更多推荐
所有评论(0)