RocketMQ延迟消息实现原理深度剖析

本文基于RocketMQ 4.6.0版本源码,深入分析延迟消息的实现机制及其内部工作原理

延迟消息使用场景

延迟消息在分布式系统中有着广泛的应用:

  • 订单超时未支付自动取消

  • 定时任务调度

  • 消息重试机制

  • 预约提醒服务

  • 分布式事务超时控制

生产者发送延迟消息

java

public class DelayProducer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("delay-producer-group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        
        String[] tags = {"A", "B", "C"};
        ThreadLocalRandom random = ThreadLocalRandom.current();
        
        // 发送5条延迟消息
        for (int i = 0; i < 5; i++) {
            String content = "延迟消息示例 - 序号: " + i;
            Message msg = new Message("delay-topic", tags[i % tags.length], content.getBytes());
            
            // 关键:设置延迟级别(1-5)
            msg.setDelayTimeLevel(random.nextInt(5) + 1);
            
            SendResult result = producer.send(msg);
            System.out.println("发送状态: " + result.getSendStatus() + 
                              " 消息ID: " + result.getMsgId());
        }
        producer.shutdown();
    }
}

核心代码解析

  • setDelayTimeLevel()方法设置消息的延迟级别

  • RocketMQ支持18个延迟级别(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)

Broker端处理机制

CommitLog存储处理

java

// org.apache.rocketmq.store.CommitLog#putMessage
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // 延迟消息特殊处理
    if (msg.getDelayTimeLevel() > 0) {
        // 校验延迟级别是否有效
        if (msg.getDelayTimeLevel() > maxDelayLevel) {
            msg.setDelayTimeLevel(maxDelayLevel);
        }
        
        // 修改为延迟消息专用Topic
        topic = ScheduleMessageService.SCHEDULE_TOPIC;
        // 计算队列ID:delayLevel - 1
        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
        
        // 备份原始Topic和QueueID
        MessageAccessor.putProperty(msg, 
            MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, 
            MessageConst.PROPERTY_REAL_QUEUE_ID, 
            String.valueOf(msg.getQueueId()));
        
        // 更新消息属性
        msg.setTopic(topic);
        msg.setQueueId(queueId);
    }
    
    // 后续存储逻辑...
}

关键处理步骤

  1. 校验延迟级别:确保不超过最大级别(默认18)

  2. Topic重定向:将消息Topic改为 SCHEDULE_TOPIC_XXXX

  3. 队列计算queueId = delayLevel - 1

  4. 备份原始信息:存储原始Topic和QueueID到消息属性

  5. 更新消息属性:应用新的Topic和QueueID

延迟服务初始化流程

延迟级别配置解析

java

// org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel
public boolean parseDelayLevel() {
    // 时间单位映射表
    Map<String, Long> unitMap = new HashMap<>();
    unitMap.put("s", 1000L);
    unitMap.put("m", 60 * 1000L);
    unitMap.put("h", 60 * 60 * 1000L);
    unitMap.put("d", 24 * 60 * 60 * 1000L);

    // 获取配置的延迟级别字符串
    String levelConfig = this.messageStoreConfig.getMessageDelayLevel();
    
    try {
        String[] levels = levelConfig.split(" ");
        for (int i = 0; i < levels.length; i++) {
            String level = levels[i];
            String unitChar = level.substring(level.length() - 1);
            Long unitValue = unitMap.get(unitChar);
            
            int levelNum = i + 1;
            long num = Long.parseLong(level.substring(0, level.length() - 1));
            long delayMillis = unitValue * num;
            this.delayLevelTable.put(levelNum, delayMillis);
        }
        return true;
    } catch (Exception e) {
        log.error("解析延迟级别异常", e);
        return false;
    }
}

配置格式
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

调度服务启动

java

// org.apache.rocketmq.store.schedule.ScheduleMessageService#start
public void start() {
    if (started.compareAndSet(false, true)) {
        // 创建定时器线程
        this.timer = new Timer("ScheduleMessageTimerThread", true);
        
        // 为每个延迟级别创建定时任务
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            this.timer.schedule(
                new DeliverDelayedMessageTimerTask(level, 0), 
                FIRST_DELAY_TIME);
        }
    }
}

消息投递机制

定时任务执行流程

java

// org.apache.rocketmq.store.schedule.DeliverDelayedMessageTimerTask#executeOnTimeup
public void executeOnTimeup() {
    // 获取对应延迟级别的消费队列
    ConsumeQueue cq = defaultMessageStore.findConsumeQueue(
        SCHEDULE_TOPIC, 
        delayLevel2QueueId(delayLevel));
    
    // 遍历队列中的消息
    SelectMappedBufferResult buffer = cq.getIndexBuffer(this.offset);
    for (int i = 0; i < buffer.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
        long offsetPy = buffer.getByteBuffer().getLong();
        int sizePy = buffer.getByteBuffer().getInt();
        long tagsCode = buffer.getByteBuffer().getLong();
        
        // 计算消息应投递的时间
        long deliverTimestamp = computeDeliverTimestamp(delayLevel, tagsCode);
        long now = System.currentTimeMillis();
        long countdown = deliverTimestamp - now;
        
        if (countdown <= 0) {
            // 消息到期,执行投递
            MessageExt msgExt = defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
            if (msgExt != null) {
                // 重建原始消息
                MessageExtBrokerInner msgInner = rebuildOriginalMessage(msgExt);
                
                // 存储到CommitLog
                PutMessageResult result = writeMessageStore.putMessage(msgInner);
            }
        } else {
            // 重新调度任务
            ScheduleMessageService.this.timer.schedule(
                new DeliverDelayedMessageTimerTask(delayLevel, nextOffset),
                countdown);
            return;
        }
    }
}

消息重建逻辑

java

private MessageExtBrokerInner rebuildOriginalMessage(MessageExt msgExt) {
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    
    // 恢复原始Topic和QueueID
    msgInner.setTopic(msgExt.getProperty(REAL_TOPIC_PROPERTY));
    msgInner.setQueueId(Integer.parseInt(
        msgExt.getProperty(REAL_QUEUE_ID_PROPERTY)));
    
    // 复制消息内容
    msgInner.setBody(msgExt.getBody());
    msgInner.setFlag(msgExt.getFlag());
    
    // 清除延迟标记
    MessageAccessor.clearProperty(msgInner, DELAY_TIME_LEVEL_PROPERTY);
    
    return msgInner;
}

核心设计原理

消息流转流程

  1. 生产者发送:设置delayTimeLevel属性

  2. Broker接收:修改Topic为SCHEDULE_TOPIC,存储到特定队列

  3. 定时任务扫描:定期检查到期消息

  4. 消息重建:恢复原始Topic/QueueID

  5. 重新投递:作为普通消息存入CommitLog

  6. 消费者消费:接收延迟消息

关键技术点

  • Topic重定向:所有延迟消息统一存储,隔离普通消息

  • 时间轮调度:每个延迟级别独立任务,高效管理

  • 消息属性复用:通过属性保存原始信息,最小化存储开销

  • 精准时间控制:毫秒级精度投递

最佳实践建议

  1. 合理使用延迟级别

    java

    // 明确指定延迟级别(推荐)
    msg.setDelayTimeLevel(3); // 10秒延迟
    
    // 避免随机级别(不推荐)
    // msg.setDelayTimeLevel(random.nextInt(18) + 1);
  2. 监控延迟队列

    bash

    # 查看延迟消息堆积情况
    ./mqadmin consumerProgress -n localhost:9876 -t SCHEDULE_TOPIC_XXXX
  3. 性能优化建议

    • 避免使用过高延迟级别(>10)

    • 单个Topic延迟消息量控制在1万/秒以内

    • 监控DeliverDelayedMessageTimerTask执行时间

常见问题排查

问题现象 可能原因 解决方案
消息未按时投递 定时任务阻塞 检查Broker CPU负载
延迟级别不生效 Broker配置错误 确认messageDelayLevel配置
消息重复消费 投递过程异常 检查消息重建逻辑

总结

RocketMQ通过巧妙的Topic重定向和时间轮调度机制,实现了高效可靠的延迟消息功能。核心设计在于:

  1. 将延迟消息转换为特殊Topic存储

  2. 通过定时任务扫描到期消息

  3. 重建原始消息重新投递

  4. 消费者无感知接收延迟消息

这种设计既保证了消息可靠性,又实现了高性能的延迟投递,是分布式系统中实现定时任务的理想方案。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐