RabbitMQ详细教程、包含安装、配置和使用及整合SpringBoot
RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
作者:ChenZhen
博客地址:https://www.chenzhen.space/
版权:来自b站视频
【SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】【尚硅谷RabbitMQ教程丨快速掌握MQ消息中间件rabbitmq】
将视频中的笔记整理出来
如果对你有帮助,请给一个小小的star⭐
目录
1.RabbitMQ
RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包
裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是
一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,
存储和转发消息数据。
四大核心
- 生产者
产生数据发送消息的程序是生产者 - 交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息
推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推
送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定 - 队列
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存
储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可
以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式 - 消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费
者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
名词解释
- Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker Virtual
- host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似 于网络中的 namespace
概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出 多个 vhost,每个用户在自己的 vhost
创建 exchange/queue 等 Connection:publisher/consumer 和 broker 之间的 TCP 连接 - Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection
的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程 序支持多线程,通常每个
thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客 户端和 message - broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection
极大减少了操作系统建立 TCP connection 的开销 Exchange:message 到达 broker
的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到 queue 中去。常用的类型有:direct
(point-to-point), topic (publish-subscribe) and fanout (multicast) - Queue:消息最终被送到这里等待 consumer 取走 Binding:exchange 和 queue
之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保 存到 exchange 中的查询表中,用于 - message 的分发依据

2.安装
安装文件:
链接:https://pan.baidu.com/s/1YA0-pBKXDbDMR9UnBGpe2g?pwd=7mo0
提取码:7mo0
上传我提供文件到/usr/local/software 目录下(如果没有 software 需要自己创建)
3. 安装文件(分别按照以下顺序安装)
1.安装socat 插件
yum install -y socat
2.安装erlang,erlang是rabbitmq需要的环境语言,el7表示可以在linux7上安装
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
- 如果安装报错:error: Failed dependencies:
libcrypto.so.10()(64bit) is needed by erlang-23.2.3-1.el7.x86_64
说明缺少依赖:执行以下命令
rpm -ivh openssl-libs-1.0.2k-19.el7.x86_64.rpm --force
- 如果继续报错:error: Failed dependencies:
libnsl.so.1()(64bit) is needed by erlang-23.2.3-1.el7.x86_64
说明缺少类库:执行以下命令
dnf install libnsl
安装完依赖和类库之后,继续执行安装erlang
3.安装rabbitmq
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
分别输入上面给出的3条命令


- 常用命令(按照以下顺序执行)
添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on
启动服务
/sbin/service rabbitmq-server start
查看服务状态
/sbin/service rabbitmq-server status

停止服务
/sbin/service rabbitmq-server stop
开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management
重新启动服务
/sbin/service rabbitmq-server stop
/sbin/service rabbitmq-server start
使用
uname - a
可以查看版本
web管理界面会在15672端口开启,所以我们需要将15672端口开放( 需要重启防火墙服务)
firewall-cmd --zone=public --add-port=15672/tcp --permanent
重启防火墙服务
firewall-cmd --reload
访问地址 http://192.168.43.128:15672/
将ip地址改为你们自己的linux服务器的ip地址

初始账户密码都为:guest
但是一开始用默认账号密码(guest)访问会出现权限问题,guest账户只能在localhost(本机)下面访问,所以需要创建一个管理员账号我们需要添加一个新的用户来进行登录。
添加一个新的用户
创建账号(用户名:admin,密码:123)
rabbitmqctl add_user admin 123
设置用户角色(设置admin的角色为administrator)
rabbitmqctl set_user_tags admin administrator
设置用户权限
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
用户 user_admin 具有/ 这个 virtual host 中所有资源的配置、写、读权限
查询当前用户和角色
rabbitmqctl list_users
再次利用 admin 用户登录,成功登录

3.使用Java发送消息
我们将用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者。我们将介绍 Java API 中的一些细节。
在这之前我们要先将Linux服务器的5672端口开放,用于tcp连接。
firewall-cmd --zone=public --add-port=5672/tcp --permanent
重启防火墙服务
firewall-cmd --reload
引入依赖
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
消息生产者
java代码
public class Producer {
//队列名称
public static final String QUEUE_NAME = "queue1";
//发消息
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂IP 连接RabbitMQ队列
factory.setHost("192.168.43.128");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("123");
//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
/**
* 生成队列
* 1.队列名称
* 2.队列里面的消息是否持久化
* 3.是否只供一个消费者进行消费 是否进行消息共享
* 4.是否进行自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消息
String message = "hello world";
/**
* 发消息
* 1.发送到哪个交换机
* 2.路由的key值是哪个 本次队列的名称
* 3.其他参数
* 4.发送的消息的消息体
*/
channel.basicPublish(QUEUE_NAME,"",null, message.getBytes());
System.out.println("消息发送完毕");
}
}
运行后,查看web管理界面,可以发现已经接受到了一个消息
消息消费者
package com.chenzhen.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author ChenZhen
* @Description
* @create 2023/3/6 17:11
* @QQ 1583296383
* @WeXin(WeChat) ShockChen7
*/
public class Consumer {
//队列名称
public static final String QUEUE_NAME = "queue1";
//接受消息
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂IP 连接RabbitMQ队列
factory.setHost("192.168.43.128");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("123");
//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
//声明 接受消息
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println("message = " + new String(message.getBody()));
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag ->{
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功后是否要自动应答 true:自动 false:手动
* 3.消费成功之后的回调
* 4.消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME, true,deliverCallback,cancelCallback);
}
}
运行后
封装工具类
package com.chenzhen.rabbitmq.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author ChenZhen
* @Description
* @create 2023/3/6 17:43
* @QQ 1583296383
* @WeXin(WeChat) ShockChen7
*/
public class RabbitMqUtils {
public static Channel getChannel() throws Exception{
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂IP 连接RabbitMQ队列
factory.setHost("192.168.43.128");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("123");
//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
return channel;
}
}
消息应答
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
- 自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用
- 手动应答的方法
A.Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel.basicNack(用于否定确认)
C.Channel.basicReject(用于否定确认)
与 Channel.basicNack 相比少一个参数
不处理该消息了直接拒绝,可以将其丢弃了
消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
消息生产者代码
package com.chenzhen.rabbitmq.three;
import com.chenzhen.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* @author ChenZhen
* @Description
* @create 2023/3/7 16:51
* @QQ 1583296383
* @WeXin(WeChat) ShockChen7
*/
public class Task2 {
//队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextInt()){
String message = scanner.next();
channel.basicPublish(TASK_QUEUE_NAME,"", null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+ message);
}
}
}
消息消费者代码,其一
package com.chenzhen.rabbitmq.three;
import com.chenzhen.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.concurrent.TimeUnit;
/**
* @author ChenZhen
* @Description
* @create 2023/3/7 17:17
* @QQ 1583296383
* @WeXin(WeChat) ShockChen7
*/
public class Work3 {
//队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";
//接收消息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("c3等待接受消息处理");
DeliverCallback deliverCallback = (consumerTag, message) -> {
try {
//等待1s 模拟处理消息
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new String (message.getBody(),"UTF-8"));
//手动应答
/**
* 1.消息的标记 tag
* 2.是否批量应答 false:不批量应答
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag ->{
System.out.println("消息消费被中断");
};
boolean autoAck = false;
//采用手动应答
channel.basicConsume(TASK_QUEUE_NAME, autoAck,deliverCallback,cancelCallback);
}
}
消息消费者,其二
package com.chenzhen.rabbitmq.three;
import com.chenzhen.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.util.concurrent.TimeUnit;
/**
* @author ChenZhen
* @Description
* @create 2023/3/7 17:17
* @QQ 1583296383
* @WeXin(WeChat) ShockChen7
*/
public class Work4 {
//队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";
//接收消息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("c4等待接受消息处理");
DeliverCallback deliverCallback = (consumerTag, message) -> {
try {
//等待30s 模拟处理消息
Thread.sleep(1000*30);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new String (message.getBody(),"UTF-8"));
//手动应答
/**
* 1.消息的标记 tag
* 2.是否批量应答 false:不批量应答
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag ->{
System.out.println("消息消费被中断");
};
boolean autoAck = false;
//采用手动应答
channel.basicConsume(TASK_QUEUE_NAME, autoAck,deliverCallback,cancelCallback);
}
}
分别启动3个进程



生产者发送2个消息

如果第二个消费者在消费的时候挂机了,就不会进行应答,队列中的消息不会被销毁,则被转发给第一个消费者


队列持久化
//消息队列持久化
boolean durable = true;
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
在声明队列持久化前,如果该队列已存在,则需要先将该队列删除
否则会出现以下错误
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_queue' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
运行代码后,ack_queue队列已开启持久化

消息持久化
要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添
加这个属性。
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
添加持久化参数后
//设置生产者发送信息为持久化消息(要求保存到磁盘上)
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了.
4. 队列模式
RabbitMQ官网介绍了,它支持六种应用场景:简单队列、工作队列、发布/订阅、路由模式、Topics主题模式、RPC,接下来分别介绍。

4.1简单队列
简单队列缺点
耦合度高,队列名在一端改动,另一端也要跟着改动。生产者和消费者一一对应,不支持多个消费者。

4.2Work Queues
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

一般在实际应用中,生产者发送消息耗时较少,反应较快,反而是消费者因为要处理业务逻辑,处理时间可能会很慢,这样队列中会积压很多消息,所以需要多个消费者分摊压力,这个时候可以使用工作队列。
工作队列的逻辑就是队列拿到生产者的消息后会在消费者中选择一个把消息发送过去,并不是把消息同时发送给两个消费者。
听不太懂,总之就是用多个消费者来并发的处理一个队列里的消息。
在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。
package com.chenzhen.rabbitmq.two;
import com.chenzhen.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* @author ChenZhen
* @Description 这是一个工作线程,相当于是消费者
* @create 2023/3/6 18:02
* @QQ 1583296383
* @WeXin(WeChat) ShockChen7
*/
//这是一个工作线程,相当于是消费者
public class Worker01 {
//队列的名称
public static final String QUEUE_NAME = "queue1";
//接受消息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,message) -> {
System.out.println(new String (message.getBody()));
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag ->{
System.out.println("消息消费被中断");
};
System.out.println("c1等待接受消息......");
//消息的接受
channel.basicConsume(QUEUE_NAME, true,deliverCallback,cancelCallback);
}
}
运行后

我们在设置中开启允许多线程。

随后修改输出语句c1为c2,然后再次启动主函数。

可以看到此时运行了2个工作进程。

用来发送多个消息的生产者的代码,即消息发送线程
package com.chenzhen.rabbitmq.two;
import com.chenzhen.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* @author ChenZhen
* @Description
* @create 2023/3/7 11:14
* @QQ 1583296383
* @WeXin(WeChat) ShockChen7
*/
public class Task01 {
public static final String QUEUE_NAME = "queue1";
//发送大量消息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
/**
* 生成队列
* 1.队列名称
* 2.队列里面的消息是否持久化
* 3.是否只供一个消费者进行消费 是否进行消息共享
* 4.是否进行自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//从控制台中接受信息
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
/**
* 发消息
* 1.发送到哪个交换机
* 2.路由的key值是哪个 本次队列的名称
* 3.其他参数
* 4.发送的消息的消息体
*/
channel.basicPublish(QUEUE_NAME,"", null, message.getBytes());
System.out.println("发送消息:"+ message);
}
}
}
消息发送线程,发送4个消息abcd

第一个消费者接受a c 消息

第二个消费者接受b d消息

4.3 PubSub订阅模式

在订阅模型中,多了一个 Exchange 角色,交换机类型为 fanout,而且过程略有变化:
⚫ P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
⚫ C:消费者,消息的接收者,会一直等待消息到来
⚫ Queue:消息队列,接收消息、缓存消息
⚫ Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、
递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
➢ Fanout:广播,将消息交给所有绑定到交换机的队列
➢ Direct:定向,把消息交给符合指定routing key 的队列
➢ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合
路由规则的队列,那么消息会丢失!
创建生产者代码
package com.chenzhen.rabbitmq.four;
import com.chenzhen.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* @author ChenZhen
* @Description
* @create 2023/3/7 16:51
* @QQ 1583296383
* @WeXin(WeChat) ShockChen7
*/
public class PubSub_Producer {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 创建交换机
/**
* 1.exchange:交换机名称
* 2.type : 交换机类型
* DIRECT("direct"):定向
* FANOUT("fanout"):扇形(广播)
* TOPIC("topic"):通配符
* HEADERS("headers"):参数匹配
* 3.durable : 是否持久化
* 4.autoDelete : 是否自动删除
* 5.internal : 内部使用。默认false
* 6.arguments : 参数列表
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,false,false,false,null);
//创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
//消息队列持久化
boolean durable = false;
//声明队列
channel.queueDeclare(queue1Name, durable, false, false, null);
channel.queueDeclare(queue2Name, durable, false, false, null);
//绑定队列和交换机
/**
* 1. queue :队列名称
* 2.exchange: 交换机名称
* 3.routingKey : 路由键,绑定规则
* 如果交换机的类型为 fanout,routinKye设置为“”,
*/
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
//发送消息
channel.basicPublish(exchangeName,"", null ,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+ message);
}
}
}
启动后发出消息

查看队列,2个队列都接受到了消息。

4.4 Routing 路由模式
交换机类型为 direct。
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
- 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
- Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的
Routingkey 与消息的 Routing key 完全一致,才会接收到消息
代码跟前面的基本都差不多,这里就不重复打了
在生产者代码里,将模式修改为BuiltinExchangeType.DIRECT
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,false,false,false,null);
并在队列和交换机绑定时指定路由key
channel.queueBind(queue1Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
4.5 Topics 通配符模式
交换机类型为 topic。它是路由模式的升级版,支持通配符匹配。
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型
Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc
或者 item.insert,item.* 只能匹配 item.insert
这个和路由模式很像,只不过将路由key改成了通配符的形式,代码也是十分简单,修改交换机规则和路由键即可。就不将代码给出了。
5. Spring Boot整合RabbitMQ
生产者
- 创建生产者SpringBoot工程
- 引入start,依赖坐标
<!-- Spring Boot整合RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 编写yml配置,基本信息配置
# 配置rabbitmq的基本信息
spring:
rabbitmq:
host: 192.168.43.128
port: 5672
username: admin
password: 123
virtual-host: /
- 定义交换机,队列以及绑定关系的配置类
package com.chenzhen.springboot_rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author ChenZhen
* @Description
* @create 2023/3/8 17:45
* @QQ 1583296383
* @WeXin(WeChat) ShockChen7
*/
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
//1.交换机
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2.queue队列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//3.队列和交换机的绑定关系
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
- 注入RabbitTemplate,调用方法,完成消息发送
package com.chenzhen.springboot_rabbitmq;
import com.chenzhen.springboot_rabbitmq.config.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
//1.注入rabbitmqTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello ");
}
}
运行测试类后,队列成功接受到消息

消费者
消费者的代码比较简单,只要创建一个监听类并加上@RabbitListener注解即可。
package com.chenzhen.springboot_rabbitmq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author ChenZhen
* @Description
* @create 2023/3/11 16:14
* @QQ 1583296383
* @WeXin(WeChat) ShockChen7
*/
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_queue")
public void listenerQueue(Message message){
System.out.println("message = " + new String(message.getBody()));
}
}
消息的可靠投递(代码配置实现)
RabbitMQ 的可靠投递链路:
1. 生产者确认机制(Publisher Confirm)
2. 交换机失败回退(Return Callback)
3. 队列镜像 / 持久化
4. 消费者确认(ACK)
生产者确认和 交换机失败回退
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提
供了两种方式用来控制消息的投递可靠性模式。
- confirm 确认模式
- return 退回模式
rabbitmq 整个消息投递的路径为:
producer—> rabbitmq broker—>exchange—>queue—>consumer
- 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
- 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
也就是说:
-
Confirm告诉你:消息“进了RabbitMQ” -
Return Callback告诉你:消息“能找到队列去吗?”
Broker 就是 RabbitMQ 服务器本体,是消息的中转站和调度中心。
🎯 生产者确认机制是怎么工作的?
开启 Confirm 后:
-
生产者发送消息到 RabbitMQ
-
RabbitMQ成功持久化并写入队列表后 -
RabbitMQ异步回传一个 ACK 给生产者
如果失败,则回传NACK
在rabbitmq-provider项目的application.yml文件上,加上消息确认的配置项后:
server:
port: 8021
spring:
#给项目来个名字
application:
name: rabbitmq-provider
#配置rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
#虚拟host 可以不设置,使用server默认host
virtual-host: /
#消息确认配置项
#确认消息已发送到交换机(Exchange)
publisher-confirms: true
#确认消息已发送到队列(Queue)
publisher-returns: true
然后是配置相关的消息确认回调函数,RabbitConfig.java:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);
System.out.println("ConfirmCallback: "+"确认情况:"+ack);
System.out.println("ConfirmCallback: "+"原因:"+cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("ReturnCallback: "+"消息:"+message);
System.out.println("ReturnCallback: "+"回应码:"+replyCode);
System.out.println("ReturnCallback: "+"回应信息:"+replyText);
System.out.println("ReturnCallback: "+"交换机:"+exchange);
System.out.println("ReturnCallback: "+"路由键:"+routingKey);
}
});
return rabbitTemplate;
}
}
到这里,生产者推送消息的消息确认调用回调函数已经完毕。
可以看到上面写了两个回调函数, ConfirmCallback 和RetrunCallback;
那么以上这两种回调函数都是在什么情况会触发呢?
在 RabbitMQ 中,ConfirmCallback 和 ReturnCallback 是两种不同的回调函数,用于处理消息的确认和退回(返回)情况。
1. ConfirmCallback:
ConfirmCallback 用于处理消息的确认情况。当消息发送到 Exchange 时,Broker(RabbitMQ 服务器)会发送一个确认消息给生产者,告诉生产者消息已经被成功接收。这样可以确保消息成功发送到了 RabbitMQ,从而提供了一定的可靠性保证。
- 触发情况:ConfirmCallback 会在以下情况下触发:
– 当消息被成功路由到 Exchange,并且 Exchange 已将消息发送到至少一个队列时,Broker 会发送确认消息给生产者。
– 如果消息无法路由到任何队列(可能是因为没有合适的绑定),Broker 不会发送确认消息。
ReturnCallback:
2. ReturnCallback
用于处理消息退回情况。当消息无法被正确路由到任何队列时,Broker 会将该消息退回给生产者。这可以帮助生产者处理无法投递的消息。
- 触发情况:ReturnCallback 会在以下情况下触发:
– 当消息无法被正确路由到任何队列时,Broker 会发送一个退回消息给生产者。
– 这种情况通常发生在消息发送时使用了一个不存在的交换机,或者路由键无法匹配任何队列。
消费者-消息确认机制
和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
所以,消息接收的确认机制主要存在三种模式:
1、不确认, 这也是默认的消息确认情况。AcknowledgeMode.NONERabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
2、 自动确认
根据请况确认 listener.simple.acknowledge-mode: auto,主要分成以下几种情况:
- 如果消费者在消费的过程中没有抛出异常,则自动确认。
- 当消费者消费的过程中抛出
AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且该消息不会重回队列。 - 当抛出
ImmediateAcknowledgeAmqpException异常,消息会被确认。 - 如果抛出其他的异常,则消息会被拒绝,但是与前两个不同的是,该消息会重回队列,如果此时只有一个消费者监听该队列,那么该消息重回队列后又会推送给该消费者,会造成死循环的情况。
3、手动确认
这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。
basic.ack用于肯定确认basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息
消费者端以上的3``个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理。
而basic.nack,basic.reject表示没有被正确处理:
着重讲下reject,因为有时候一些场景是需要重新入列的。
channel.basicReject(deliveryTag, true); 拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器把这个消息丢掉就行,下次不想再消费这条消息了。
使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。
但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。
顺便也简单讲讲 nack,这个也是相当于设置不消费某条消息。
channel.basicNack(deliveryTag, false, true);
第一个参数依然是当前消息到的数据的唯一id;
第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。
第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。
同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。
看了上面这么多介绍,接下来我们一起配置下,看看一般的消息接收手动确认是怎么样的。
在消费者项目里,新建MessageListenerConfig.java上添加代码相关的配置代码:
import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private MyAckReceiver myAckReceiver;//消息接收处理类
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
// RabbitMQ默认是自动确认,这里改为手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置一个队列
container.setQueueNames("TestDirectQueue");
//如果同时设置多个如下: 前提是队列都是必须已经创建存在的
// container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");
//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
//container.setQueues(new Queue("TestDirectQueue",true));
//container.addQueues(new Queue("TestDirectQueue2",true));
//container.addQueues(new Queue("TestDirectQueue3",true));
container.setMessageListener(myAckReceiver);
return container;
}
}
对应的手动确认消息监听类,MyAckReceiver.java(手动确认模式需要实现 ChannelAwareMessageListener):
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
public class ManualAckConsumer implements ChannelAwareMessageListener {
@RabbitListener(queues = "your-queue-name")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
// 模拟消费逻辑
String messageBody = new String(message.getBody());
System.out.println("Received message: " + messageBody);
// 手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 发生异常时可以选择拒绝消息或进行其他处理
// 重新入队列:channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 拒绝并丢弃:channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
消息的持久化

VirtualHost虚拟机作用
VirtualHost 作用及用法
Virtual Hosts的使用场景
多租户的使用场景,比如主机资源紧缺情况下开发和测试共用一个RabbitMQ,可以使用Virtual Hosts将开发和测试隔离开
队列模式八股
RabbitMQ 的队列模式(其实准确说是交换机类型与投递模式)决定了消息如何从生产者流转到消费者。官方将其归纳为 6 种标准模式。
1. 简单模式 (Simple Queue)
这是最基础的模式。一个生产者,一个队列,一个消费者。
- 特点:点对点,消息一旦被消费就从队列中删除。
- 适用:简单的任务处理。
2. 工作队列模式 (Work Queues)
一个生产者,一个队列,多个消费者。
- 特点:消息会在多个消费者之间“轮询”分配。每条消息只能被一个消费者处理。
- 核心功能:负载均衡。通过配置
prefetchCount=1,可以让处理速度快的消费者领到更多任务(能者多劳)。
3. 发布/订阅模式 (Publish/Subscribe)
引入了 Exchange(交换机)。交换机类型为 fanout。
- 特点:生产者将消息发送给 Exchange,Exchange 将消息广播到所有绑定(Binding)到它的队列上。
- 适用:群发通知、天气预报、日志分发。
4. 路由模式 (Routing)
交换机类型为 direct。
- 特点:生产者发送消息时指定一个
Routing Key。Exchange 只有在Routing Key完全匹配的情况下,才会将消息投递到对应的 Queue。 - 适用:错误日志分级处理(如:队列 A 只接收
error级别,队列 B 接收info/warn/error)。
5. 主题模式 (Topics)
交换机类型为 topic。它是路由模式的升级版,支持通配符匹配。
-
匹配规则:
-
*:匹配一个单词。 -
#:匹配零个或多个单词。 -
例子:
item.#能匹配item.insert和item.insert.user;item.*只能匹配item.insert。 -
适用:复杂业务路由,如按“区域.业务.级别”动态订阅。
6. RPC 模式 (Remote Procedure Call)
这是 RabbitMQ 实现远程过程调用的方式。
- 特点:生产者(客户端)发送消息时,会带上一个
replyTo队列地址。消费者(服务端)处理完后,将结果发回这个回调队列。 - 适用:跨服务同步调用。
模式对比总结
| 模式 | 交换机类型 | 路由匹配 | 消息流向 |
|---|---|---|---|
| Simple | 无 (默认) | 队列名匹配 | 1对1 |
| Work | 无 (默认) | 轮询分配 | 1对多(竞争消费) |
| Publish/Subscribe | Fanout | 无需匹配 | 1对多(广播) |
| Routing | Direct | 精确匹配 | 1对多(按键筛选) |
| Topics | Topic | 通配符匹配 | 1对多(按规则筛选) |
你想通过一个具体的业务场景(比如“电商下单后的处理流”)来看看这些模式是如何组合使用的吗?
RabbitMQ-保证消息的可靠投递(八股)
RabbitMQ 保证消息可靠投递(Reliable Delivery)是一个全链路的过程,需要从 生产者、Broker(服务器)和消费者 三个端点共同发力。如果任何一环出现问题,消息都可能丢失。

1. 生产者端:确保消息成功到达 Broker
生产者发送消息后,需要知道 Broker 是否真的收到了消息。
- 确认机制 (Publisher Confirm): 这是最常用的方式。生产者将信道设置为
confirm模式,Broker 收到消息后会回传一个确认(ACK)给生产者。如果 Broker 发生故障导致消息丢失,会发送nack。 - 事务机制 (Transaction): 类似于数据库事务(
txSelect,txCommit,txRollback)。虽然安全,但性能极低(比 Confirm 慢很多),通常不建议在高性能场景使用。 - 退回机制 (Return Listener): 当消息发送到交换机(Exchange),但根据路由键(Routing Key)找不到匹配的队列时,如果设置了
mandatory参数,Broker 会将消息退回给生产者。

2. Broker 端:确保消息在服务器上不丢失
即便 Broker 收到了消息,如果此时宕机且消息只在内存中,数据依然会丢失。
- 持久化 (Persistence):
- Exchange 持久化: 声明交换机时设置
durable=true。 - Queue 持久化: 声明队列时设置
durable=true。 - Message 持久化: 发送消息时设置
deliveryMode=2。 - 只有三者同时持久化,消息才会在磁盘上保存。
3. 消费者端:确保消息被正确处理
消息发给消费者后,如果消费者在处理逻辑时崩溃(比如代码报错或断电),消息不应被视为已完成。
-
手动确认 (Manual ACK): 消费者应关闭自动确认模式(
autoAck=false)。只有在业务逻辑处理成功后,显式调用basicAck。- 如果消费者没发 ACK 就断开了连接,RabbitMQ 会认为消息处理失败,将其重新入队(Requeue)分发给其他消费者。
-
自动确认(Auto Ack / No Ack):消费者从队列接收消息时,RabbitMQ 立即认为消息已被消费,不等待业务处理完成。消费者处理失败或宕机 → 消息丢失。
-
死信队列 (Dead Letter Exchange, DLX): 当消息被拒绝(Reject/Nack)且不重新入队,或者消息过期、队列满时,消息可以被转发到“死信交换机”,由专门的逻辑去记录和审计这些失败的消息。
-

4. 极端情况:幂等性(Idempotency)
在追求“可靠投递”的过程中,由于网络波动,可能会出现 “重复投递” 的情况(例如:消费者处理完了但 ACK 发回时网络断了,Broker 重新发了一次)。
- 解决方案: 消费者端必须实现幂等处理。通常通过给每条消息分配一个唯一 ID(如订单号、UUID),在消费前先查重(如利用 Redis 或数据库唯一索引),确保同一条消息不会被处理两次。
延迟队列
消息的TTL+死信Exchange解决方案,先要了解两个概念:
- TTL:即消息的存活时间。
RabbitMQ可以对队列和消息分别设置 TTL,如果对队列设置,则队列中所有的消息都具有相同的过期时间。超过了这个时间,我们认为这个消息就死了,称之为死信。 - 死信
Exchange(DLX):一个消息在满足以下条件会进入死信交换机- 一个消息被
Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。 - TTL 到期的消息。
- 队列满了被丢弃的消息。
- 一个消息被
一个延时消息的流程如下图:
📌 实现思路
- 把消息投到一个 延迟队列(普通队列)
- 队列给消息设置 TTL(过期时间)
- 消息 TTL 到期后进入 死信交换机(DLX)
- 由 DLX 转发到真正要消费的 业务队列

- 定义一个BizQueue,用来接收死信消息,并进行业务消费。
- 定义一个死信交换机(DLXExchange),绑定BizQueue,接收延时队列的消息,并转发给BizQueue。
- 定义一组延时队列DelayQueue_xx,分别配置不同的TTL,用来处理固定延时5s、10s、30s等延时等级,并绑定到DLXExchange。
- 定义DelayExchange,用来接收业务发过来的延时消息,并根据延时时间转发到不同的延时队列中。
优点:可以支持海量延时消息,支持分布式处理。
缺点:不灵活,只能支持固定延时等级。使用复杂,要配置一堆延时队列。
面试题:






线程池 VS MQ
1️⃣ 线程池做异步的特点
✅ 优点
-
延迟低、响应快
- 线程池直接在应用进程内执行任务,没有消息中间件的网络 IO 开销。
- 适合 短任务 或 对时延敏感的异步操作。
-
部署简单
- 不依赖额外的 MQ 系统(如 RabbitMQ、Kafka)。
-
可控性强
- 线程池大小、队列长度可以直接控制。
- 可通过拒绝策略、优先级队列等调度策略来控制压力。
❌ 缺点
-
可靠性低
- 应用进程挂掉 → 任务丢失。
- 没有持久化、重试机制,需要自己实现。
-
可扩展性有限
- 单机线程池无法跨多台服务器共享任务。
- 当请求量暴涨时,线程池可能导致 OOM 或线程饥饿。
-
削峰能力差
- 线程池队列是固定大小,容易满。
- 大量请求涌入可能直接拒绝或阻塞。
2️⃣ MQ 做异步的特点
✅ 优点
-
解耦
- 生产者和消费者完全分离,模块之间依赖低。
- 系统可扩展性好,容易做分布式任务处理。
-
可靠性高
- 消息持久化、ACK机制、重试机制保证任务不丢失。
- 可以应对服务短暂不可用。
-
削峰填谷
- MQ 可以作为缓冲区,应对瞬时高并发。
- 消费者按能力慢慢处理消息,保护系统。
❌ 缺点
-
延迟高
- 网络传输 + 消息序列化 + broker 存储 → 延迟增加。
- 不适合对低延迟要求很高的业务。
-
运维复杂
- 需要部署 MQ 集群(Kafka/RabbitMQ)
- 消息丢失、重复消费等问题需要处理。
-
系统复杂性增加
- 需要考虑幂等性、消息顺序、死信队列等。
💡 面试回答示例:
“我们选择线程池做异步,主要是任务短、对延迟敏感,且任务在应用内部就能完成,不需要跨服务通信。MQ 适合高可靠、跨服务的异步解耦场景,但它增加了网络开销和运维成本。”
面试场景题
一个电商的场景。啊,有一个生产者的服务。嗯,那么这个生产服务,当用户在完成他的这个订单付款之后,这个生产者的服务会发一个消息到一个 MQ 里面。还有消费这个消息来处理发货的逻辑。
用户付完钱了,上游生产者丢个 MQ 消息,下游来做发货。那要求呃在任何的意外情况之下,都保证每个订单一定会被发货。第二个保证不会重复发货。就这么两个设计目标,那你会怎么样来做这部分的系统设计?
可以用任何你知道的数据库存储、消息队列、中间件等等的技术方案,只要达成这个目标就行。
好的,这道题是典型的 “消息队列 + 幂等 + 可靠性”业务设计题,适合用在面试中讲架构思路。我们可以从 整体架构、幂等设计、消息可靠性、异常处理几方面拆解。
一、核心问题分析
-
消息丢失:MQ 消息可能因为网络、Broker 宕机等丢失。
-
消费者宕机或处理失败:消费消息时可能异常,导致发货操作未成功。
-
重复消费:MQ 的 at-least-once 投递会导致同一条消息被消费多次,即使消息重复消费也不能再次发货
二、设计思路
- 用户支付完成 → 生成订单
- 生产者(订单服务)发送消息到 MQ(包含订单号、商品信息、用户信息等)
- 消费者(发货服务)接收消息 → 执行发货逻辑
- 消费成功 → 确认消息,更新状态
目标:
-
每个订单一定会发货 → 至少一次消费
-
不能重复发货 → 幂等设计
-
异常情况可恢复
1. 消息发送可靠性
- 生产者发送消息:
- 开启
Publisher Confirm模式,确保消息成功到Broker Broker持久化:声明队列、交换机为Durable,消息设为 Persistent。- 消费者手动确认,有在发货逻辑(包括写数据库)完成后,才调用
basicAck。
- 开启
- 生产者发送顺序:
- 事务型消息或本地事务
- 方案 1:先写入任务表,再发 MQ,用户支付完成后,生产者服务在同一个数据库事务中,这保证任务表写入和订单状态更新是原子性的。
- 可能出现 DB 写成功 MQ 发送失败 → 可通过定时补偿,避免了生产者发送 MQ 失败导致订单状态更新但消息丢失的问题。
- 方案 2:事务型消息 / 可靠消息服务(RocketMQ /事务消息)
- 方案 1:先写入任务表,再发 MQ,用户支付完成后,生产者服务在同一个数据库事务中,这保证任务表写入和订单状态更新是原子性的。
- 事务型消息或本地事务
2. 消费者处理 & 幂等保证
幂等设计:
为了保证“不会重复发货”,需要在发货操作上做幂等处理:
-
订单表加状态字段:
order_status = INIT | PAID | DELIVERED
-
更新订单状态使用 原子操作:
UPDATE orders SET order_status = '发货', delivery_time = NOW() WHERE order_id = ? AND order_status = '已支付';
如果受影响行数为 0,说明该订单已经处理过,直接给 MQ 返回 ack 即可,不再重复调用物流接口。
“原子更新指的是一次操作要么完全成功,要么完全失败,不会被中断。在订单发货场景,我们可以通过 SQL 条件更新或乐观锁实现原子更新:更新订单状态时,先检查状态是否为未发货,如果是则更新,否则忽略。这样保证了消息重复投递也不会导致重复发货,保证幂等性。”
消息确认模式:
-
手动 ACK(autoAck=false)
-
如果失败 → 不 ACK → RabbitMQ 重新投递(保证至少一次消费)
3. 异常场景处理
| 异常情况 | 解决方案 |
|---|---|
| 生产者发送 MQ 失败 | 使用事务型消息(outbox 表)或者本地重试/定时任务确保发送 |
| 消费者处理发货失败 | MQ 重试机制 + 数据库幂等性保证不会重复发货 |
| 消费者宕机或 MQ 重启 | 消费者重启后继续消费未 ack 的消息 |
| MQ 消息丢失(极少) | 持久化消息 + 生产者 Confirm / 事务消息 |
| 网络抖动导致消息重复投递 | 数据库幂等操作(UPDATE ... WHERE status='PAID')避免重复发货 |
| 数据库宕机或事务异常 | 消息处理事务化,保证失败时回滚,不更新状态,重试消费即可 |
关键点:
1. MQ持久化 + Publisher Confirm → 保证消息投递
2. 消费者手动ACK + 幂等操作 → 保证只发货一次
3. 异常重试机制 / DLQ → 消息不会丢
更多推荐


所有评论(0)