1.消息过期时间(TTL)

如果我们想设置消息在指定时间内没被消费就过期,有如下种设置方式:

1.1 Queue TTL

所有队列中的消息超过时间未被消费时都会过期,通过队列属性设置消息过期时间

// 使用SpringAMQP声明队列
@Bean("ttlQueue") 
public Queue queue(){ 
    Map<String,Object> map = new HashMap<String,Object>(); 
    map.put("x-message-ttl",11000);// 队列中的消息未被消费 11 秒后过期 
    return new Queue("GP_TTL_QUEUE",true,false,false,map); 
}

在这里插入图片描述

1.2 Message TTL

设置单条消息的过期时间,在发送消息的时候指定消息属性

// SpringAMQP封装的Message
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("4000"); // 消息的过期属性,单位ms
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message("这条消息4秒后过期".getBytes(), messageProperties);
rabbitTemplate.send("GP_TTL_EXCHANGE", "gupao.ttl", message);

如果同时指定了Message TTL和Queue TTL,则小的那个时间生效

2.死信队列

消息在某些情况下会变成死信(Dead Letter),比如:

  1. 消息被消费者拒绝并且未设置重回队列:(NACK|| Reject)&&requeue== false
  2. 消息过期
  3. 队列达到最大长度,超过了Maxlength(消息数)或者 Maxlengthbytes(字节数),最先入队的消息会被发送到DLX。

照理说死信应该是被抛弃的,但是如果定义了死信交换机,当消息成为死信就会进入死信队列

  • DLX:死信交换机(Dead Letter Exchange),队列在创建的时候可以指定一个,实际上也是普通的交换机
  • DLQ:死信队列(DeadLetterQueue),被死信交换机绑定的队列,实际也是普通队列(例如替补球员也是普通球员)

在这里插入图片描述

2.1 死信队列使用

1). 原交换机(GP_ORI_USE_EXCHANGE)、原队列(GP_ORI_USE_QUEUE) ,相互绑定。

  • 设置队列中的消息10秒钟过期,因为没有消费者,会变成死信。
  • 指定原队列的死信交换机(GP_DEAD_LETTER_EXCHANGE)。
// 使用Spring AMQP进行声明
@Bean("oriUseExchange") // 原交换机
public DirectExchange exchange() {
    return new DirectExchange("GP_ORI_USE_EXCHANGE", true, false, new HashMap<>());
}

@Bean("oriUseQueue") // 原队列
public Queue queue() {
    Map<String, Object> map = new HashMap<String, Object>();
    // 10秒钟后成为死信
    map.put("x-message-ttl", 10000); 
    // 死信交换机,当前队列中的消息变成死信后进入死信交换机
    map.put("x-dead-letter-exchange", "GP_DEAD_LETTER_EXCHANGE"); 

    return new Queue("GP_ORI_USE_QUEUE", true, false, false, map);
}

@Bean // 绑定原队列到原交换机
public Binding binding(@Qualifier("oriUseQueue") Queue queue,@Qualifier("oriUseExchange") 
                       DirectExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with("gupao.ori.use");
}

2). 声明死信交换机 ( GP_DEAD_LETTER_EXCHANGE ) 、 死信队列(GP_DEAD_LETTER_QUEUE),相互绑定

@Bean("deatLetterExchange") // 队列的死信交换机
public TopicExchange deadLetterExchange() {
    return new TopicExchange("GP_DEAD_LETTER_EXCHANGE", true, false, new HashMap<>());
}

@Bean("deatLetterQueue") // 死信队列
public Queue deadLetterQueue() {
    return new Queue("GP_DEAD_LETTER_QUEUE", true, false, false, new HashMap<>());
}

@Bean // 绑定死信队列到死信交换机
public Binding bindingDead(@Qualifier("deatLetterQueue") Queue 
					queue,@Qualifier("deatLetterExchange") TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with("#"); // 无条件路由
}

3). 最终消费者监听死信队列
4). 生产者发送消息

在这里插入图片描述

3.延迟队列

我们在实际业务中有一些需要延时发送消息的场景,例如:

  1. 家里有一台智能热水器,需要在30分钟后启动
  2. 未付款的订单,15分钟后关闭

RabbitMQ本身不支持延迟队列,总的来说有三种实现方案:

  1. 先存储到数据库,用定时任务扫描
  2. 利用RabbitMQ的死信队列(Dead Letter Queue)实现
  3. 利用rabbitmq-delayed-message-exchange插件

3.1 TTL+DLX

基于消息TTL,我们来看一下如何利用死信队列(DLQ)实现延迟队列:

  1. 创建一个交换机
  2. 创建一个队列,与上述交换机绑定,并且通过属性指定队列的死信交换机。
  3. 创建一个死信交换机
  4. 创建一个死信队列
  5. 将死信交换机绑定到死信队列
  6. 消费者监听死信队列

消息的流转流程:生产者 --> 原交换机 --> 原队列(超过TTL之后)–> 死信交换机 --> 死信队列 --> 最终消费者

使用死信队列实现延时消息的缺点:

  • 如果统一用队列来设置消息的 TTL,当梯度非常多的情况下,比如 1 分钟,2分钟,5分钟,10分钟,20分钟,30分钟……需要创建很多交换机和队列来路由消息。
  • 如果单独设置消息的TTL,则可能会造成队列中的消息阻塞——前一条消息没有出队(没有被消费),后面的消息无法投递(比如第一条消息过期TTL是30min,第二条消息TTL是10min。10分钟后,即使第二条消息应该投递了,但是由于第一条消息还未出队,所以无法投递)。
  • 可能存在一定的时间误差

3.2 延迟队列插件

在 RabbitMQ 3.5.7 及 以后的版 本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延时队列功能。同时插件依赖Erlang/OPT 18.0及以上。

插件源码地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
插件下载地址:https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

1). 进入插件目录

whereis rabbitmq 
cd/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.12/plugins

2). 下载插件

wget https://bintray.com/rabbitmq/community-plugins/download_file?
file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

如果下载的文件名带问号则需要改名,如图

在这里插入图片描述

mv download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez 
rabbitmq_delayed_message_exchange-0.0.1.ez

在这里插入图片描述

3). 启用插件

rabbitmq-plugins enable  rabbitmq_delayed_message_exchange

4). 停用插件

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

5). 使用插件

通过声明一个x-delayed-message类型的Exchange来使用delayed-messaging特性。 x-delayed-message是插件提供的类型,并不是rabbitmq本身的(区别于direct、topic、fanout、headers)。

在这里插入图片描述

@Bean("delayExchange")
    public TopicExchange exchange() {
        Map<String, Object> argss = new HashMap<String, Object>();
        argss.put("x-delayed-type", "direct");
        return new TopicExchange("GP_DELAY_EXCHANGE", true, false, argss);
    }

生产者:消息属性中指定x-delay参数(SpringAMQP)

MessageProperties messageProperties = new MessageProperties();
// 延迟的间隔时间,目标时刻减去当前时刻
messageProperties.setHeader("x-delay", delayTime.getTime() - now.getTime());
Message message = new Message(msg.getBytes(), messageProperties);

// 不能在本地测试,必须发送消息到安装了插件的服务端
rabbitTemplate.send("GP_DELAY_EXCHANGE", "#", message);

4.服务端流控

当RabbitMQ生产MQ消息的速度远大于消费消息的速度时,会产生大量的消息堆积,占用系统资源,导致机器的性能下降。我们想要控制服务端接收的消息的数量,应该怎么做呢?

4.1 长度控制

队列有两个控制长度的属性:

  • x-max-length:队列中最大存储最大消息数,超过这个数量,队头的消息会被丢弃。
  • x-max-length-bytes:队列中存储的最大消息容量(单位bytes),超过这个容量,队头的消息会被丢弃。

在这里插入图片描述

需要注意的是,设置队列长度只在消息堆积的情况下有意义,而且会删除先入队的消息,不能真正地实现服务端限流。

4.2 内存控制

RabbitMQ 会在启动时检测机器的物理内存数值。默认当 MQ 占用 40% 以上内存时,MQ 会主动抛出一个内存警告并阻塞所有连接(Connections)。可以通过修改rabbitmq.config 文件来调整内存阈值,默认值是 0.4,如下所示:

[{rabbit,[{vm_memory_high_watermark,0.4}]}].

也可以用命令动态设置,如果设置成0,则所有的消息都不能发布

rabbitmqctl set_vm_memory_high_watermark 0.3

4.3 磁盘控制

另一种方式是通过磁盘来控制消息的发布。当磁盘空间低于指定的值时(默认50MB),触发流控措施。

例如:指定为磁盘的30%或者2GB

disk_free_limit.relative=3.0 
disk_free_limit.absolute=2GB

更多相关配置内容可以参考 官网

5.消费端流控

默认情况下,如果不进行配置,RabbitMQ会尽可能快速地把队列中的消息发送到消费者。因为消费者会在本地缓存消息,如果消息数量过多,可能会导致OOM或者影响其他进程的正常运行。

在消费者处理消息的能力有限,例如消费者数量太少,或者单条消息的处理时间过长的情况下,如果我们希望在一定数量的消息消费完之前,不再推送消息过来,就要用到消费端的流量限制措施。

可以基于Consumer或者channel设置prefetch count的值,含义为Consumer端的最大的unackedmessages数目。当超过这个数值的消息未被确认,RabbitMQ会停止投递新的消息给该消费者。

channel.basicQos(2);// 如果超过 2 条消息没有发送 ACK,当前消费者不再接受队列消息 
channel.basicConsume(QUEUE_NAME,false,consumer);

Spring Boot配置:

spring.rabbitmq.listener.simple.prefetch=2

关于Consumer Prefetch 官网也给出了相应解释

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐