RocketMQ(国内主流、易集成Spring Boot)为例,实现最基础的「用户考试报名事件」处理场景:

  • 场景:考试报名服务(生产者) 发布「用户报名考试」事件到MQ;
  • 场景:通知服务(消费者) 订阅该事件,接收后给用户发送报名成功短信。

案例全程基于Spring Boot,代码极简、无冗余,新手可直接复制运行,快速理解MQ事件处理的核心逻辑。

一、环境准备

1. 本地安装RocketMQ(极简步骤)

MQ事件处理依赖MQ服务端,先快速搭建本地RocketMQ环境(Windows/Linux通用):

  1. 下载RocketMQ:官网下载地址(选4.9.7/5.1.0稳定版);
  2. 启动NameServer:
    # 进入bin目录
    cd rocketmq-all-5.1.0-bin-release/bin
    # 启动NameServer(Windows用mqnamesrv.cmd,Linux/mac用mqnamesrv.sh)
    mqnamesrv.cmd
    
  3. 启动Broker:
    # Windows
    mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
    # Linux/mac
    ./mqbroker.sh -n 127.0.0.1:9876 autoCreateTopicEnable=true
    

注:若启动报错,需确保本地有JDK 8+环境,且配置ROCKETMQ_HOME环境变量(指向RocketMQ解压目录)。

2. 项目依赖(Spring Boot + RocketMQ)

新建Spring Boot项目,引入核心依赖(无需Spring Cloud,纯Spring Boot即可):

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.2</version>
    <relativePath/>
</parent>

<dependencies>
    <!-- Spring Boot核心 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!-- RocketMQ整合Spring Boot -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.3</version> <!-- 适配Spring Boot 3.x -->
    </dependency>
    <!-- Lombok(简化代码,可选) -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>
3. 配置文件(application.yml)

配置RocketMQ连接信息,极简配置即可:

spring:
  application:
    name: rocketmq-event-demo
# RocketMQ配置
rocketmq:
  name-server: 127.0.0.1:9876 # NameServer地址(本地启动的地址)
  producer:
    group: exam-signup-producer-group # 生产者组(自定义,唯一即可)
    send-message-timeout: 3000 # 发送超时时间

二、核心代码实现(MQ事件处理三要素)

1. 定义「事件消息体」(跨服务传输的事件数据)

和Spring Event的事件载体类似,MQ事件需要可序列化的消息体,封装事件核心数据:

package com.example.rocketmqdemo.event;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;

/**
 * 考试报名事件消息体(MQ传输的事件数据)
 * 必须实现Serializable,保证MQ序列化传输
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ExamSignupEvent implements Serializable {
    // 事件唯一ID(幂等用)
    private String eventId;
    // 考生ID
    private Long userId;
    // 考试ID
    private Long examId;
    // 报名时间
    private Date signupTime;
    // 考生手机号(用于发送短信)
    private String phone;
}

核心:必须实现Serializable接口,否则RocketMQ无法序列化传输对象。

2. 实现「事件生产者」(发布MQ事件)

模拟「考试报名服务」,当用户报名考试成功后,发布事件到MQ:

package com.example.rocketmqdemo.producer;

import com.example.rocketmqdemo.event.ExamSignupEvent;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.Date;

/**
 * 考试报名事件生产者(发布MQ事件)
 */
@Service
@RequiredArgsConstructor
public class ExamSignupProducer {
    // 注入RocketMQ模板(Spring Boot自动配置)
    private final RocketMQTemplate rocketMQTemplate;

    /**
     * 发布「用户报名考试」事件到MQ
     * @param userId 考生ID
     * @param examId 考试ID
     * @param phone 考生手机号
     */
    public void publishExamSignupEvent(Long userId, Long examId, String phone) {
        // 1. 构建事件消息体(生成唯一eventId,用于幂等)
        ExamSignupEvent event = new ExamSignupEvent(
            UUID.randomUUID().toString().replace("-", ""),
            userId,
            examId,
            new Date(),
            phone
        );

        // 2. 发送MQ消息:格式为「Topic:Tag」
        // Topic:EXAM_EVENT(考试事件主题),Tag:signup(报名子事件)
        String destination = "EXAM_EVENT:signup";
        rocketMQTemplate.convertAndSend(destination, event);

        // 3. 打印日志(模拟核心业务完成)
        System.out.println("【生产者】发布考试报名事件成功 → 用户ID:" + userId + ",考试ID:" + examId + ",事件ID:" + event.getEventId());
    }
}

核心说明:

  • RocketMQTemplate:Spring Boot整合RocketMQ的核心模板,自动注入即可使用;
  • convertAndSend:将对象自动序列化为JSON发送到MQ;
  • Topic:Tag:Topic是事件大类(如EXAM_EVENT),Tag是事件细分(如signup),便于消费者精准订阅。
3. 实现「事件消费者」(订阅并处理MQ事件)

模拟「通知服务」,订阅MQ中的「报名事件」,接收后给用户发送短信:

package com.example.rocketmqdemo.consumer;

import com.example.rocketmqdemo.event.ExamSignupEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 考试报名事件消费者(订阅MQ事件并处理)
 */
@Component
@Slf4j
// RocketMQ监听注解:指定Topic、Tag、消费者组
@RocketMQMessageListener(
    topic = "EXAM_EVENT",          // 对应生产者的Topic
    selectorExpression = "signup", // 对应生产者的Tag(精准订阅)
    consumerGroup = "exam-signup-consumer-group" // 消费者组(唯一,按业务划分)
)
public class ExamSignupConsumer implements RocketMQListener<ExamSignupEvent> {

    /**
     * 接收MQ事件的核心方法(事件触发时自动执行)
     * @param event 生产者发送的考试报名事件
     */
    @Override
    public void onMessage(ExamSignupEvent event) {
        // 1. 幂等校验(极简实现:打印日志模拟,实际可存Redis/数据库)
        System.out.println("【消费者】幂等校验 → 事件ID:" + event.getEventId());

        // 2. 处理事件:模拟发送报名成功短信
        String smsContent = "【在线考试系统】您已成功报名考试ID:" + event.getExamId() + ",报名时间:" + event.getSignupTime();
        System.out.println("【消费者】处理报名事件 → 给用户" + event.getUserId() + "(手机号:" + event.getPhone() + ")发送短信:" + smsContent);

        // 3. 模拟业务处理完成
        log.info("【消费者】考试报名事件处理完成 → 用户ID:{},考试ID:{}", event.getUserId(), event.getExamId());
    }
}

核心说明:

  • @RocketMQMessageListener:核心注解,配置监听的Topic、Tag、消费者组;
  • RocketMQListener<ExamSignupEvent>:泛型指定要接收的事件类型,框架自动反序列化为对象;
  • onMessage:MQ消息到达时自动执行的方法,即事件处理逻辑。
4. 测试代码(启动类+触发事件)

编写启动类,项目启动后自动触发事件发布,验证整个流程:

package com.example.rocketmqdemo;

import com.example.rocketmqdemo.producer.ExamSignupProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@RequiredArgsConstructor
public class RocketmqEventDemoApplication {
    // 注入生产者
    private final ExamSignupProducer examSignupProducer;

    public static void main(String[] args) {
        SpringApplication.run(RocketmqEventDemoApplication.class, args);
    }

    /**
     * 项目启动后自动执行:发布考试报名事件
     */
    @Bean
    public CommandLineRunner testMqEvent() {
        return args -> {
            // 模拟用户1001报名考试9001
            examSignupProducer.publishExamSignupEvent(1001L, 9001L, "13800138000");
        };
    }
}

三、运行结果(核心日志)

启动项目后,控制台会依次输出以下日志,代表MQ事件发布+消费成功:

【生产者】发布考试报名事件成功 → 用户ID:1001,考试ID:9001,事件ID:f8e7d6c5b4a392817069584736251409
【消费者】幂等校验 → 事件ID:f8e7d6c5b4a392817069584736251409
【消费者】处理报名事件 → 给用户1001(手机号:13800138000)发送短信:【在线考试系统】您已成功报名考试ID:9001,报名时间:Fri Feb 06 15:30:00 CST 2026
【消费者】考试报名事件处理完成 → 用户ID:1001,考试ID:9001

四、核心知识点(新手必懂)

1. MQ事件处理 vs 单体Spring Event
特性 单体Spring Event MQ事件处理(分布式)
通信范围 单个服务内 跨服务/跨应用
依赖 Spring核心 MQ服务端(如RocketMQ)
可靠性 内存级(服务重启丢失) 持久化(消息落地磁盘)
异步性 可选(@Async) 天然异步
核心场景 服务内逻辑解耦 分布式服务间解耦
2. 关键概念解释
  • Topic(主题):事件的「大类」,如所有考试相关事件归为EXAM_EVENT,相当于事件的“一级分类”;
  • Tag(标签):事件的「细分」,如报名事件signup、提交事件submit,相当于“二级分类”,消费者可只订阅指定Tag,减少无用消息;
  • 生产者组(Producer Group):一组生产者的标识,用于故障排查、负载均衡;
  • 消费者组(Consumer Group):一组消费者的标识,MQ保证同一组内只有一个消费者消费同一条消息,避免重复处理。
3. 新手避坑点
  1. MQ服务未启动:运行前必须确保NameServer和Broker已启动,否则报错connect to <127.0.0.1:9876> failed
  2. 序列化问题:事件类必须实现Serializable,否则MQ无法传输对象;
  3. 消费者组重复:不同业务的消费者组不能重复,否则会导致消息分发异常;
  4. 幂等性:生产环境必须做幂等(如用eventId存Redis),防止MQ消息重复消费导致业务异常(如重复发短信)。

总结

MQ事件处理的核心要点可总结为3点:

  1. 事件消息体:需实现Serializable,封装事件数据+唯一ID(幂等);
  2. 生产者:通过RocketMQTemplate发送消息,按「Topic:Tag」分类事件;
  3. 消费者:通过@RocketMQMessageListener订阅指定Topic/Tag,实现RocketMQListener接口处理事件。

这个入门案例覆盖了MQ事件处理的完整流程,新手可基于此扩展(如添加重试、死信队列、异步处理),适配更复杂的分布式场景。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐