RabbitMQ延迟队列实现 ( 插件版 )
安装完rabbitmq-delayed-message-exchange插件后,会生成一个新的Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制。接收到消息后并不会立即将消息投递至目标队列,而是存储在mnesia table(一个分布式数据库)中,然后检测消息延迟时间,如果达到可投递时间( 过期时间 )后,将其通过 x-delayed-type 类型标记的交换机
一、RabbitMQ延迟队列实现 ( 插件版 )
1.1、下载 rabbitmq-delayed-message-exchange 插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
- 由于我的RabbitMQ是再Linux服务器中通过Docker安装的,所以先将rabbitmq_delayed_message_exchange-3.9.0.ez插件上传到Linux中 ( 什么位置都可以 )。

- 进入放rabbitmq_delayed_message_exchange-3.9.0.ez插件的目录
cd /home/docker
- 将插件拷贝到容器内plugins目录下(cd-rabbitmq是RabbitMQ容器的容器名称,也可以使用容器ID)
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez cd-rabbitmq:/plugins
- 进入 RabbitMQ 容器
docker exec -it cd-rabbitmq /bin/bash
- 查看 rabbitmq_delayed_message_exchange-3.9.0.ez 插件是否存在
cd plugins
ls |grep delay
- 在 plugins 内启用 rabbitmq_delayed_message_exchange-3.9.0.ez 插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 退出RabbitMQ容器
exit
- 重启 RabbitMQ 容器
docker restart cd-rabbitmq

- 容器启动成功之后,登录RabbitMQ的管理页面,找到ExchangesTab页。点击Add a new exchange,在Type里面查看是否有x-delayed-message选项,如果存在就代表插件安装成功。

1.2、rabbitmq-delayed-message-exchange插件是实现原理

安装完rabbitmq-delayed-message-exchange插件后,会生成一个新的Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制。接收到消息后并不会立即将消息投递至目标队列,而是存储在mnesia table(一个分布式数据库)中,然后检测消息延迟时间,如果达到可投递时间( 过期时间 )后,将其通过 x-delayed-type 类型标记的交换机投递到目标队列中。
原理:DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,会判断消息是否具有x-delay属性,如果有x-delay属性,说明是延迟消息,那么这些消息就会存到Mnesia table ( 一个分布式数据库 )中,并读取x-delay属性值作为延迟时间。消息通过计时器调度分发,在x-delay延迟时间到期后,就会重新投递消息到指定队列中。
1.3、配置RabbitMQ连接
#[ RabbitMQ相关配置 ]
#rabbitmq服务器IP
spring.rabbitmq.host=安装RabbitMQ的服务器IP
#rabbitmq服务器端口(默认为5672)
spring.rabbitmq.port=5672
#用户名(默认用户名为guest)
spring.rabbitmq.username=admin
#用户密码(默认密码为guest)
spring.rabbitmq.password=admin
#虚拟主机(一个RabbitMQ服务可以配置多个虚拟主机,每一个虚拟机主机之间是相互隔离,相互独立的,授权用户到指定的virtual-host就可以发送消息到指定队列)
#vhost虚拟主机地址( 默认为/ )
spring.rabbitmq.virtual-host=/
1.4、创建自定义RabbitMQ配置类,配置延迟交换机、延迟队列、以及根据路由键配置交换机和队列的绑定关系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfiguration {
//延迟交换机
public static final String DELAY_EXCHANGE = "delay_exchange";
//延迟队列
public static final String DELAY_QUEUE = "delay_queue";
//延迟路由键
public static final String DELAY_QUEUE_ROUTING_KEY = "delay_queue_routing_key";
//延迟交换机
@Bean("delayExchange")
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange(DELAY_EXCHANGE)
//delayed标记当前交换机是一个具备延迟效果的交换机,类型默认是direct直接模式
.delayed()
.durable(true)
.build();
}
//延迟队列
@Bean("delayQueue")
public Queue delayQueue(){
return new Queue(DELAY_QUEUE, true, false, false);
}
//延迟队列和延迟交换机
@Bean("delayBinding")
public Binding delayBinding(@Qualifier("delayQueue")Queue queue,
@Qualifier("delayExchange")DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY);
}
}
1.5、创建消息生产者发送消息
1)注意:消息头Header中一定要携带上 x-delay 参数,用来指定消息的延迟时间。
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.*;
/**
* 延迟消息生产者
*/
@Component
public class DelayMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
//发送延迟消息(1 推荐)
public void sendDelayMessage(String message, Integer delayTime){
rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
//setDelay()的本质是对消息头设置 x-delay 参数,用来指定消息的延迟时间
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
//发送延迟消息(2)
public void sendDelayMessage02(String message, Integer delayTime){
rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
//消息头中一定要携带上 x-delay 参数,用来指定消息的延迟时间
msg.getMessageProperties().setHeader("x-delay", delayTime);
return msg;
});
}
//发送延迟消息(3)
public void sendDelayMessage03(String message, Integer delayTime){
// 1. 准备消息
Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("x-delay", delayTime)
.build();
// 2. 准备correlationData
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3. 发送消息
rabbitTemplate.convertAndSend("DELAY_EXCHANGE",
"DELAY_QUEUE_ROUTING_KEY",
message,
correlationData);
}
}
1.6、创建消息消费者消费消息
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE;
@Slf4j
@Component
public class DelayQueueConsumer {
/**
* 监听延迟队列
* @param message 接收的信息
*/
@RabbitListener(queues = DELAY_QUEUE)
public void receiveMessage(Message message) {
String msg = new String(message.getBody());
// 记录日志
log.info("当前时间:{},从延迟队列中消费的消息:{}", LocalDateTime.now(), msg);
}
}
1.7、创建控制类接收客户请求
import com.cd.springbootrabbitmq.producer.DelayMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
@Slf4j
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQController {
@Autowired
private DelayMessageProducer producer;
/**
* @param message 客户发送的消息
* @param delayTime 消息延迟时间
*/
@RequestMapping("/send")
public void send(String message, Integer delayTime){
// 记录日志
log.info("当前时间:{},消息:{},延迟时间:{}", LocalDateTime.now(), message, delayTime );
// 发送延迟消息
producer.sendDelayMessage(message, delayTime);
}
}
1.8、测试
在浏览器中先后提交下面两个请求:
-
localhost:8088/rabbitmq/send?message=测试自定义延迟处理60s&delayTime=60000
-
localhost:8088/rabbitmq/send?message=测试自定义延迟处理10s&delayTime=10000
查看idea控制台:

解析:从控制台打印信息可以看出,虽然延迟60s的消息先发送,延迟10s的消息后发送。但延迟10s的消息无需等待延迟60s的消息被释放后,才能被消费。这些都是rabbitmq-delayed-message-exchange插件帮我们实现的。
1.9、提示
消息分发前是存储在节点下的Mnesia table中,通过计时器调度实现分发,官网写到:这个插件的设计并不适合大量延迟消息的情况(例如数百万条)。因为随着mnesia数据库的增长,延迟消息的延时时间变得难以控制,就很难达到预期的效果
1.10、登录rabbitmq management可视化控制台,遇到的错误:Not management user

原因:该用户不是管理员,也就是我们登录的账号没有管理员权限
- 进入RabbitMQ容器
docker exec -it cd-rabbitmq /bin/bash
- 查看用户列表
rabbitmqctl list_users
- 使用命令
rabbitmqctl set_user_tags 用户名 adminitrator给用户赋予管理员权限。
rabbitmqctl set_user_tags guest adminitrator
- 然后使用
rabbitmqctl list_users命令查看用户列表,如果该用户的tags标签变成adminitrator,则表示已经为该用户赋予了管理员权限
rabbitmqctl list_users

guest用户始终无法登录问题
我修改了guest用户为管理用户后,发现依然无法使用guest登录,所以新建了一个admin用户登录,创建流程如下:
- 创建一个用户名为admin,密码为admin的用户
rabbitmqctl add_user admin admin
- 由于该用户刚创建,还没有赋予管理员权限。如果此时使用该用户、密码去登录rabbitmq management可视化控制台。就会报下面这个错误:Not management user 不是管理员用户

- 为admin用户设置权限,赋予用户默认vhose的全部操作权限:
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
- 所以需要使用
rabbitmqctl set_user_tags admin administrator命令为admin用户赋予管理员权限,让这个用户变成管理员用户。
rabbitmqctl set_user_tags admin administrator
- 然后使用
rabbitmqctl list_users命令查看用户列表,如果该用户的tags标签变成adminitrator,则表示已经为该用户赋予了管理员权限
rabbitmqctl list_users
- exit退出RabbitMQ容器
exit

- 然后这次使用admin用户可以登录成功了,但不明白前面的guest用户为什么登录不了( guest用户也设置成管理用户了!但就是登陆不了 )
更多推荐


所有评论(0)