目录

一、部署单点RocketMQ

二、原理篇

三、实操篇

1、引入依赖

2、启动自动装配

3、配置application.yml

4、启动类

5、编写一个统一格式的消息对象

6、生产者

​编辑

7、定义一个constant

8、多/单个消费者订阅一个主题

1.实现消费者

2.编写接口发送消息

3.接口测试

9、测试多个消费者分别订阅不同主题

10、一个消费者订阅多个主题

11、多个消费者组订阅相同主题

1、实现消费者

2、编写接口发送消息

3、接口测试

四、文末大佬好文


一、部署单点RocketMQ

RocketMQ5.0保姆级单点Docker部署教程-CSDN博客文章浏览阅读98次。本文提供了目前基于最新版本的rocketmq部署教程,包括dashboard的部署,还有dashboard的具体使用,当然也参考了很多大佬的文章,得出了这个干货部署和使用教程。 https://blog.csdn.net/qq_73440769/article/details/151049115?spm=1001.2014.3001.5501

二、原理篇

为什么使用RocketMQ:

为什么选择RocketMQ | RocketMQ为什么 RocketMQhttps://rocketmq.apache.org/zh/docs/

关于一些原理,感觉官网讲的也非常透彻

领域模型概述 | RocketMQ本文为您介绍 Apache RocketMQ 的领域模型。https://rocketmq.apache.org/zh/docs/domainModel/01main

还有一些功能特性:

普通消息 | RocketMQ普通消息为 Apache RocketMQ 中最基础的消息,区别于有特性的顺序消息、定时/延时消息和事务消息。本文为您介绍普通消息的应用场景、功能原理、使用方法和使用建议。https://rocketmq.apache.org/zh/docs/featureBehavior/01normalmessage

本文的实操篇只是讲了发送普通消息

关于中间件对比,下面我之前有看过一些很好的文章:

Kafka、RabbitMQ、RocketMQ等消息中间件的对比_rabbimq rocket 差异-CSDN博客文章浏览阅读4.5w次,点赞20次,收藏124次。消息中间件现在有不少,网上很多文章都对其做过对比,在这我对其做进一步总结与整理。RocketMQ淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步优化,2011年初,Linkin开源了Kafka这个优秀的消息中间件,淘宝中间件团队在对Kafka做过充分Review之后,Kafka无限消息堆积,..._rabbimq rocket 差异 https://blog.csdn.net/belvine/article/details/80842240

rpc和zmq性能对比 rpc mq区别_mob6454cc70642f的技术博客_51CTO博客rpc和zmq性能对比 rpc mq区别,目录1、RPC2、MQ3、MQ优点:4、引入的问题:1、RPCRPC(RemoteProcedureCall)—远程过程调用 ,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发分布式程序就像开发本地程序一样简单。https://blog.51cto.com/u_16099270/10236952

RabbitMQ,RocketMQ,Kafka--区别/对比/选型_51CTO博客_rocketmq rabbitmq kafka选型RabbitMQ,RocketMQ,Kafka--区别/对比/选型,本文介绍几种MQ(消息队列)的区别,包括:RabbitMQ,RocketMQ,Kafka。本内容也是Java后端面试中常见的问题。https://blog.51cto.com/knifeedge/5011115

三、实操篇

先讲讲原理:

如果你需要不同业务,就需要不同消费者组,不要想着同一个消费者组可以通过订阅不同主题达到不同业务,因为同一个消费者组内的功能必须是一致的,可以换个角度想,既然你是一个业务,一个业务就是一个主题嘛,你用不同的业务实现,就多添加几个消费者组,分别订阅那个主题(业务),然后通过不同的Tag区分就行了,而且而且,不要想着说:一个消费者组一个主题通过不同Tag来区分,虽然我在刚刚学习的时候也这样子想,结果踩了一天的坑,看了好多博客好文来理解,在文末也有关于为什么不能这样子做。

1、引入依赖

RocketMQ的依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

可能还需要多一个依赖,我在写这个测试案例的时候只有这个依赖是没问题的,但是后来出现了一个问题,可以关注后面的可能出现的Bug

demo案例的全部依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bluefoxyu</groupId>
    <artifactId>RocketMQ-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>3.2.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.27</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.16</version>
        </dependency>


    </dependencies>

</project>

2、启动自动装配

2.2.3 版本的RocketMQ 没有适配 SpringBoot3,只适配SpringBoot2,所以需要自己去配置好自动装配。可以参考我下面这篇文章:

Springboot3+自动装配_springboot3自动装配-CSDN博客文章浏览阅读548次,点赞9次,收藏2次。之前有一次希望用springboot模块拿到工具模块的配置configuration的时候,想通过之前的spring.factories来实现自动装配,但是发现一直拿不到配置,找了很久才知道在springboot3版本之后这个方式已经禁用。导言:这里主要讲述springboot3以后spring.factories功能失效,带来的解决办法。官网中推荐用新的自动装配 ,META-INF/spring 这个路径下建新的配置工厂。然后格式是上述官网的例子。如果自己配置大概是这样子。_springboot3自动装配 https://blog.csdn.net/qq_73440769/article/details/139609704?spm=1001.2014.3001.5501

在项目中的 resources 目录下创建 META-INF/spring 文件夹,并创建下面这个文件。

org.springframework.boot.autoconfigure.AutoConfiguration.imports

# RocketMQ 2.2.3 version does not adapt to SpringBoot3
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

3、配置application.yml

server:
  port: 8080

spring:
  profiles:
    active: dev

rocketmq:
  name-server: xxx:9876 # NameServer 地址
  producer:
    group: rocketmq-v3-demo-sent-message-group_bluefoxyu # 全局发送者组定义
    send-message-timeout: 2000
    # 发送消息失败时的重试次数。设置为 1 表示如果发送失败,会再重试一次(总共尝试两次)。适用于同步发送消息失败时的重试次数。
    retry-times-when-send-failed: 1
    # 异步发送失败时的重试次数。设置为 1 表示在异步发送失败时会再尝试一次。适用于异步发送消息失败时的重试次数。
    retry-times-when-send-async-failed: 1

logging:
  level:
    com:
      bluefoxyu:
        producer: info
        consumer: info
        controller: info

4、启动类

相比这个不必多说了。

RocketMQApplication:
package com.bluefoxyu;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@Slf4j
@SpringBootApplication
public class RocketMQApplication {

    public static void main(String[] args) {
        SpringApplication.run(RocketMQApplication.class, args);
    }
}

5、编写一个统一格式的消息对象

package com.bluefoxyu.message;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serial;
import java.io.Serializable;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GeneralMessageEvent implements Serializable {

    @Serial
    private static final long serialVersionUID = 1L;

    private String body;

    private String keys;

}

上述实体类实现了Serializable接口,能够正常被序列化或者反序列化。

6、生产者

编写一个生产者,统一做好发送消息的一个模板,方便简化接口实现发送消息的代码编写,显得更加优雅一点,说到发送消息,就需要知道发送到哪个主题,然后哪些消费者组去消费,然后还有每条消息的唯一标识key,唯一标识可以用uuid生成,也可以用redis生成一个增长的不重复的id,这里使用uuid简化。

注意:如果你的项目里面只有一个消费者组,只有一个消费业务,这样子是不需要传Tag(过滤标签)的,但是正常情况都会有多个消息队列任务,下面提供两种重载的方法。

code:

package com.bluefoxyu.producer;

import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;


/**
 * 封装全体的消息生产者
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class GeneralMessageProducer {

    private final RocketMQTemplate rocketMQTemplate;

    /**
     * 发送普通消息
     *
     * @param topic            消息发送主题,用于标识同一类业务逻辑的消息
     * @param keys             消息索引键,可根据关键字精确查找某条消息
     * @param messageSendEvent 普通消息发送事件,自定义对象,最终都会序列化为字符串
     * @return 消息发送 RocketMQ 返回结果
     */
    public SendResult sendMessage(String topic, String keys, GeneralMessageEvent messageSendEvent) {
        SendResult sendResult;
        try {
            Message<?> message = MessageBuilder
                    .withPayload(messageSendEvent)
                    .setHeader(MessageConst.PROPERTY_KEYS, keys)
                    .build();
            // 2000L 表示发送消息的超时时间为 2000 毫秒,即 2 秒
            sendResult = rocketMQTemplate.syncSend(
                    topic,
                    message,
                    2000L
            );
            log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);
        } catch (Throwable ex) {
            log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);
            throw ex;
        }
        return sendResult;
    }

    /**
     * 发送普通消息
     *
     * @param topic            消息发送主题,用于标识同一类业务逻辑的消息
     * @param tag              消息的过滤标签,消费者可通过Tag对消息进行过滤,仅接收指定标签的消息。
     * @param keys             消息索引键,可根据关键字精确查找某条消息
     * @param messageSendEvent 普通消息发送事件,自定义对象,最终都会序列化为字符串
     * @return 消息发送 RocketMQ 返回结果
     */
    public SendResult sendMessage(String topic, String tag, String keys, GeneralMessageEvent messageSendEvent) {
        SendResult sendResult;
        try {
            // 构建消息的 destination (主题和标签)
            StringBuilder destinationBuilder = StrUtil.builder().append(topic);
            if (StrUtil.isNotBlank(tag)) {
                destinationBuilder.append(":").append(tag);  // 设置tag
            }
            Message<?> message = MessageBuilder
                    .withPayload(messageSendEvent)
                    .setHeader(MessageConst.PROPERTY_KEYS, keys)
                    .setHeader(MessageConst.PROPERTY_TAGS, tag) // 设置消息的标签
                    .build();
            // 2000L 表示发送消息的超时时间为 2000 毫秒,即 2 秒
            sendResult = rocketMQTemplate.syncSend(
                    destinationBuilder.toString(),
                    message,
                    2000L
            );
            log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);
        } catch (Throwable ex) {
            log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);
            throw ex;
        }
        return sendResult;
    }


}

7、定义一个constant

package com.bluefoxyu.constant;

/**
 * RocketMQ 常量类
 * @author bluefoxyu
 */
public class RocketMQConstant {

    /**
     * Group 消费者组定义
     */
    public static final String GENERAL_MESSAGE_CONSUMER_GROUP = "general_message_consumer_group";
    public static final String MESSAGE_CONSUMER_GROUP_A = "message_consumer_group_A";
    public static final String MESSAGE_CONSUMER_GROUP_B = "message_consumer_group_B";
    public static final String MESSAGE_CONSUMER_GROUP_C = "message_consumer_group_C";



    /**
     * Topic 主题定义
     */
    public static final String MESSAGE_TOPIC_1 = "message_topic_1";
    public static final String MESSAGE_TOPIC_2 = "message_topic_2";

    /**
     * Tag 标签
     */
    public static final String MESSAGE_TAG_A = "message_tag_A";
    public static final String MESSAGE_TAG_B = "message_tag_B";
    public static final String MESSAGE_TAG_C = "message_tag_C";

}

8、多/单个消费者订阅一个主题

1.实现消费者

这里需要实现监听的消息的实体类类型是什么,GeneralMessageEvent 是我们之前封装的统一消息对象

implements RocketMQListener<GeneralMessageEvent>

在onMessage方法中,通过

JSON.toJSONString(message)

就可以拿到解析好的消息内容,也就是我们真正需要发送的消息,下面我编写三个消费者来进行消费,不过绑定的都是同一个主题,类似负载均衡的功能,这里只用一个消费者也是一样的,因为后续还需要测其他功能,所以这里我写了三个消费者。

GeneralMessageConsumer1:
package com.bluefoxyu.consumer;

import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import static com.bluefoxyu.constant.RocketMQConstant.GENERAL_MESSAGE_CONSUMER_GROUP;
import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;


/**
 * topic 对应主题
 * consumerGroup 指定消费的分组
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = MESSAGE_TOPIC_1,
        consumerGroup = GENERAL_MESSAGE_CONSUMER_GROUP // 相同的 consumerGroup
)
public class GeneralMessageConsumer1 implements RocketMQListener<GeneralMessageEvent>{

    @Override
    public void onMessage(GeneralMessageEvent message) {
        log.info("[消费者GeneralMessageConsumer1] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));
    }

}
GeneralMessageConsumer2:
package com.bluefoxyu.consumer;

import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import static com.bluefoxyu.constant.RocketMQConstant.GENERAL_MESSAGE_CONSUMER_GROUP;
import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;


/**
 * topic 对应主题
 * consumerGroup 指定消费的分组
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = MESSAGE_TOPIC_1,
        consumerGroup = GENERAL_MESSAGE_CONSUMER_GROUP // 相同的 consumerGroup
)
public class GeneralMessageConsumer2 implements RocketMQListener<GeneralMessageEvent>{

    @Override
    public void onMessage(GeneralMessageEvent message) {
        log.info("[消费者GeneralMessageConsumer2] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));
    }

}
GeneralMessageConsumer3:
package com.bluefoxyu.consumer;

import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import static com.bluefoxyu.constant.RocketMQConstant.GENERAL_MESSAGE_CONSUMER_GROUP;
import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;


/**
 * topic 对应主题
 * consumerGroup 指定消费的分组
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = MESSAGE_TOPIC_1,
        consumerGroup = GENERAL_MESSAGE_CONSUMER_GROUP // 相同的 consumerGroup
)
public class GeneralMessageConsumer3 implements RocketMQListener<GeneralMessageEvent> {

    @Override
    public void onMessage(GeneralMessageEvent message) {
        log.info("[消费者GeneralMessageConsumer3] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));
    }

}

2.编写接口发送消息

发送消息需要发送这四要素:

  1.  topic 主题 
  2.  key 唯一标识
  3. message 需要发送的消息
package com.bluefoxyu.controller;

import com.bluefoxyu.producer.GeneralMessageProducer;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

import static com.bluefoxyu.constant.RocketMQConstant.MESSAGE_TOPIC_1;

@RestController
@RequiredArgsConstructor
public class controller {

    private final GeneralMessageProducer generalMessageDemoProduce;

    @PostMapping("/send/topic1/general-messageA")
    public String sendTopic1GeneralMessageA() {
        String keys = UUID.randomUUID().toString();
        GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder()
                .body("Message for A")
                .keys(keys)
                .build();

        SendResult sendResult = generalMessageDemoProduce.sendMessage(
                MESSAGE_TOPIC_1,
                keys,
                generalMessageEvent
        );
        // 返回发送成功的状态名
        return sendResult.getSendStatus().name();
    }

    @PostMapping("/send/topic1/general-messageB")
    public String sendTopic1GeneralMessageB() {
        String keys = UUID.randomUUID().toString();
        GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder()
                .body("Message for B")
                .keys(keys)
                .build();
        SendResult sendResult = generalMessageDemoProduce.sendMessage(
                MESSAGE_TOPIC_1,
                keys,
                generalMessageEvent
        );
        return sendResult.getSendStatus().name();
    }

    @PostMapping("/send/topic1/general-messageC")
    public String sendTopic1GeneralMessageC() {
        String keys = UUID.randomUUID().toString();
        GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder()
                .body("Message for C")
                .keys(keys)
                .build();
        SendResult sendResult = generalMessageDemoProduce.sendMessage(
                MESSAGE_TOPIC_1,
                keys,
                generalMessageEvent
        );
        return sendResult.getSendStatus().name();
    }

}

3.接口测试

准备这三个测试接口:

开始分别测试三个接口这里就不一一展示了。

看控制台:

如结果消费了三次

9、测试多个消费者分别订阅不同主题

如果相同消费组的三个消费者组分别订阅不同主题,会怎么样呢。修改的代码如下,

当然,哈哈哈哈哈哈,就是消费不到消息(对于小白的我也被困扰了好久),由于是有问题的,代码就不粘贴了【狗头】。如下:

这里参考了一篇大佬的文章:

rocketmq问题汇总-一个consumerGroup只对应一个topic_org.apache.rocketmq.client.exception.mqbrokerexcep-CSDN博客

看完后悟了很多,大概意思就是一个消费者组中的职责应该是一致的,应该都去订阅相同主题的,如果一个消费者订阅了两个主题,那么其他同组的消费者也应该订阅那两个主题,参考评论区这几大佬的评论:

这个大佬就说的很透彻了:

10、一个消费者订阅多个主题

在上面说了既然一个消费者可以订阅多个主题,但是前提条件是同一个消费组中必须订阅相同主题,那应该怎么实现呢。

直接给代码:

package com.bluefoxyu.consumer;

import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;

import java.util.List;


/**
 * topic 对应主题
 * consumerGroup 指定消费的分组
 * RocketMQPushConsumerLifecycleListener 基础改监听器可以监听多个主题
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "",
        consumerGroup = "rocketmq-v2-demo-message-group_bluefoxyu" // 相同的 consumerGroup
)
public class GeneralMessageConsumer implements RocketMQListener<GeneralMessageEvent> , RocketMQPushConsumerLifecycleListener {

    // 由于修改成可以订阅多个主题,在下面prepareStart就已经消费了,onMessage就不会执行了
    @Override
    public void onMessage(GeneralMessageEvent message) {
        /*log.info("消费者GeneralMessageConsumer接收到消息,消息体:{}", JSON.toJSONString(message));
        System.out.println("General message = " + message);*/
    }


    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        try {
            consumer.subscribe("rocketmq-demo-message_topic_general", "*");
            consumer.subscribe("rocketmq-demo-message_topic_TagA", "*");
            consumer.subscribe("rocketmq-demo-message_topic_TagB", "*");
            consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {
                if (CollectionUtils.isNotEmpty(messages)) {
                    for (MessageExt message : messages) {
                        log.info("消费者GeneralMessageConsumer接收到消息,消费完成:消费主题为:{} , 消费的消息为:{}" ,message.getTopic() ,new String(message.getBody()));
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
        } catch (MQClientException e) {
            log.error("消费失败,异常消息为:{}",e.getMessage());
        }
    }

}
package com.bluefoxyu.consumer;

import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;

import java.util.List;


/**
 * topic 对应主题
 * consumerGroup 指定消费的分组
 * RocketMQPushConsumerLifecycleListener 基础改监听器可以监听多个主题
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "",
        consumerGroup = "rocketmq-v2-demo-message-group_bluefoxyu" // 相同的 consumerGroup
)
public class TagAMessageConsumer implements RocketMQListener<GeneralMessageEvent> , RocketMQPushConsumerLifecycleListener {

    // 由于修改成可以订阅多个主题,在下面prepareStart就已经消费了,onMessage就不会执行了
    @Override
    public void onMessage(GeneralMessageEvent message) {
        /*log.info("消费者TagAMessageConsumer接收到消息,消息体:{}", JSON.toJSONString(message));
        System.out.println("tagA message = " + message);*/
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        try {
            consumer.subscribe("rocketmq-demo-message_topic_general", "*");
            consumer.subscribe("rocketmq-demo-message_topic_TagA", "*");
            consumer.subscribe("rocketmq-demo-message_topic_TagB", "*");
            consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {
                if (CollectionUtils.isNotEmpty(messages)) {
                    for (MessageExt message : messages) {
                        log.info("消费者TagAMessageConsumer接收到消息,消费完成:消费主题为:{} , 消费的消息为:{}" ,message.getTopic() ,new String(message.getBody()));
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
        } catch (MQClientException e) {
            log.error("消费失败,异常消息为:{}",e.getMessage());
        }
    }
}
package com.bluefoxyu.consumer;

import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;

import java.util.List;


/**
 * topic 对应主题
 * consumerGroup 指定消费的分组
 * RocketMQPushConsumerLifecycleListener 基础改监听器可以监听多个主题
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "",
        consumerGroup = "rocketmq-v2-demo-message-group_bluefoxyu" // 相同的 consumerGroup
)
public class TagBMessageConsumer implements RocketMQListener<GeneralMessageEvent> , RocketMQPushConsumerLifecycleListener {

    // 由于修改成可以订阅多个主题,在下面prepareStart就已经消费了,onMessage就不会执行了
    @Override
    public void onMessage(GeneralMessageEvent message) {
        /*log.info("消费者TagBMessageConsumer接收到消息,消息体:{}", JSON.toJSONString(message));
        System.out.println("tagB message = " + message);*/
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        try {
            consumer.subscribe("rocketmq-demo-message_topic_general", "*");
            consumer.subscribe("rocketmq-demo-message_topic_TagA", "*");
            consumer.subscribe("rocketmq-demo-message_topic_TagB", "*");
            consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {
                if (CollectionUtils.isNotEmpty(messages)) {
                    for (MessageExt message : messages) {
                        log.info("消费者TagBMessageConsumer接收到消息,消费完成:消费主题为:{} , 消费的消息为:{}" ,message.getTopic() ,new String(message.getBody()));
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
        } catch (MQClientException e) {
            log.error("消费失败,异常消息为:{}",e.getMessage());
        }
    }
}

分别测试三个接口:

参考这位大佬的博客:

rocketmq (消费者消费同一个消费组,不同的topic)_rocketmq一个消费组消费多个topic-CSDN博客

11、多个消费者组订阅相同主题

这个业务经常是有的,希望订阅同一种业务,但是有不同的实现,这时候就需要使用Tag过滤标签来区分了。

1、实现消费者
MessageConsumerA:

package com.bluefoxyu.consumer;

import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import static com.bluefoxyu.constant.RocketMQConstant.*;


/**
 * topic 对应主题
 * consumerGroup 指定消费的分组
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = MESSAGE_TOPIC_2,
        consumerGroup = MESSAGE_CONSUMER_GROUP_A,
        selectorExpression = MESSAGE_TAG_A
)
public class MessageConsumerA implements RocketMQListener<GeneralMessageEvent>{

    @Override
    public void onMessage(GeneralMessageEvent message) {
        log.info("[消费者MessageConsumerA] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));
    }

}
MessageConsumerB:

package com.bluefoxyu.consumer;

import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import static com.bluefoxyu.constant.RocketMQConstant.*;


/**
 * topic 对应主题
 * consumerGroup 指定消费的分组
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = MESSAGE_TOPIC_2,
        consumerGroup = MESSAGE_CONSUMER_GROUP_B,
        selectorExpression = MESSAGE_TAG_B
)
public class MessageConsumerB implements RocketMQListener<GeneralMessageEvent>{

    @Override
    public void onMessage(GeneralMessageEvent message) {
        log.info("[消费者MessageConsumerB] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));
    }

}
MessageConsumerC:

package com.bluefoxyu.consumer;

import com.alibaba.fastjson.JSON;
import com.bluefoxyu.message.GeneralMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import static com.bluefoxyu.constant.RocketMQConstant.*;


/**
 * topic 对应主题
 * consumerGroup 指定消费的分组
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = MESSAGE_TOPIC_2,
        consumerGroup = MESSAGE_CONSUMER_GROUP_C,
        selectorExpression = MESSAGE_TAG_C
)
public class MessageConsumerC implements RocketMQListener<GeneralMessageEvent>{

    @Override
    public void onMessage(GeneralMessageEvent message) {
        log.info("[消费者MessageConsumerC] 接收到消息,消息体:{},消息:{}", JSON.toJSONString(message), JSON.toJSONString(message.getBody()));
    }

}

2、编写接口发送消息

再controller添加那三个接口

    @PostMapping("/send/topic2/messageA")
    public String sendTopic2MessageA() {
        String keys = UUID.randomUUID().toString();
        GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder()
                .body("Message for A")
                .keys(keys)
                .build();

        SendResult sendResult = generalMessageDemoProduce.sendMessage(
                MESSAGE_TOPIC_2,
                MESSAGE_TAG_A,
                keys,
                generalMessageEvent
        );
        // 返回发送成功的状态名
        return sendResult.getSendStatus().name();
    }

    @PostMapping("/send/topic2/messageB")
    public String sendTopic3MessageB() {
        String keys = UUID.randomUUID().toString();
        GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder()
                .body("Message for B")
                .keys(keys)
                .build();
        SendResult sendResult = generalMessageDemoProduce.sendMessage(
                MESSAGE_TOPIC_2,
                MESSAGE_TAG_B,
                keys,
                generalMessageEvent
        );
        return sendResult.getSendStatus().name();
    }

    @PostMapping("/send/topic2/messageC")
    public String sendTopic2MessageC() {
        String keys = UUID.randomUUID().toString();
        GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder()
                .body("Message for C")
                .keys(keys)
                .build();
        SendResult sendResult = generalMessageDemoProduce.sendMessage(
                MESSAGE_TOPIC_2,
                MESSAGE_TAG_C,
                keys,
                generalMessageEvent
        );
        return sendResult.getSendStatus().name();
    }
3、接口测试

消费成功!

也可以参考一个佬的博客:RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?_rocketmq一个topic多个tag-CSDN博客

当然,如果订阅不同主题也是没问题的,这里就不作演示了。

四、可能出现的Bug

这个bug在我的测试案例中没有,但是我使用到项目中却发现的这么样的问题:

Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1': org/apache/rocketmq/common/protocol/heartbeat/MessageModel类似这种报错

问题是我的消费者上面加了这个注解:

可以去掉这个注解或者看下面的解决办法,选其一就可以解决。

解决方法:

引入依赖即可解决:

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.0.0</version>
</dependency>

记得这里的版本需要和你报错提示的版本一致

五、文末大佬好文

最后加上两个大佬好文,感觉讲的都很好:

面试官:RocketMQ同一个消费组内的消费者订阅了不同tag,会有问题吗?_rocketmq 订阅多个tag-CSDN博客文章浏览阅读8.6k次,点赞8次,收藏61次。面试官:同一个消费组内的消费者,如果订阅了相同的 topic,但是订阅的 tag 不一样,会有什么问题吗?我:会出现丢消息的情况。面试官:能详细说一说吗?我:RocketMQ 要求同一个消费组内的消费者必须订阅关系一致,如果订阅关系不一致,会出现消息丢失的问题。面试官:什么是订阅关系一致呢?我:订阅关系一致是指同一个消费者组下所有消费者所订阅的 Topic、Tag 必须完全一致。如下图所示:其中,消费组 1 中的消费组都订阅了 Topic1 中的 Tag1,消费组 2 中的消费组_rocketmq 订阅多个tag https://blog.csdn.net/zjj2006/article/details/122014993?spm=1001.2014.3001.5506

面试官:RocketMQ一个消费组内订阅同一个主题不同的TAG为什么会丢消息_为什么rocketmq相同消费组不同tag会有问题-CSDN博客文章浏览阅读3.7k次,点赞7次,收藏14次。看源码并不是目的,理解内部的运作机制才是关键,为遇到问题提供解决思路。_为什么rocketmq相同消费组不同tag会有问题 https://blog.csdn.net/prestigeding/article/details/116299837?spm=1001.2014.3001.5506

对于一个消费者组订阅同一个主题不同tag会丢消息,在前几天从0到1学习的时候,以为是可以的,但是踩了大坑。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐