RabbitMQ的冒险历程

问题起源:

开发同事的RabbitMQ遇到了问题,线上的消息有两条在Unacked状态,导致其他的消息也一直无法消费,堆积到了那里,很快Ready中的消息数目已经达到了五千多条。

解决过程:

第一步:

以前遇到过类似的问题,因为网络问题,消息到了消费者那里,可是一直没有回应,导致了消息在Unacked状态,一直无法消费。我们可以通过重启应用的办法,使消息重新进入队列中,这时消费者会再次消费信息,问题解决。

第二步:

但是这次在我帮同事解决问题之前,他们已经重启过了,发现无法再次消费,那两条消息,还是会进入Unacked的状态。通过日志发现,这两条消息是因为生产者推重复了,导致消费者这边接收到消息后,结果消息就卡到那边了,再次重启也是一样的处理逻辑。

第三步:

发现配置文件中的是消费者确认模式是manual,但是发现代码中竟然没有消息确认的语句。于是最快的办法,就是先将消费者确认模式修改成了none,不必确认,结果发现那两条消息还是无法被正常消费。

spring.rabbitmq.listener.simple.acknowledge-mode: manual
spring.rabbitmq.listener.simple.acknowledge-mode: none

第四步:

当时很怀疑,好像参数没有生效的感觉,接着开始调整prefetch参数,期望一个消费者可以多处理几条,之前默认的是1,那么这一条被卡着的信息一直无法逾越,想着虽然这一条不行,但是如果消费者可以一次消费5条,至少不会因为这一条而无法继续消费消息,导致消息堆积吧,结果参数仍然无法生效。

spring.rabbitmq.listener.simple.prefetch: 5

第五步:

到这里不能再继续了,于是开始撸源码,发现配置和我想的不一样,而且已经看到控制台中我们配置的参数是否起作用了。我们配置的prefetch会在下图中提现,当时配置成了5后,控制台中还一直是1。

在这里插入图片描述

撸代码,果然就是不一样,发现正确的配置应该是这样的,配置中没有simple的那个节点。真是折磨人啊,原来新版本的和历史版本的配置改版了。以下是低版本的正确配置,我们项目中使用的1.5.1.RELEASE的spring-boot-starter-amqp,而我们一直在尝试使用新版本的配置去配置它,自然会不生效。

以下附上具体的配置:

RabbitMQ配置文件(1.5.1.RELEASE)

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>1.5.1.RELEASE</version>
</dependency>

当我们引用的是1.5.1.RELEASE的amqp版本的时候,配置如下:

# ----------------RabbitMQ配置----------------
spring:
  rabbitmq:
    host: 10.10.10.10
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    # ----------------消费者端配置----------------
    listener:
      # 消费者端手动确认机制
      acknowledge-mode: manual
      # 消费者数量
      concurrency:
      # 最大消费者数量
      max-concurrency:
      # 消费者每次从队列中获取消息的数量
      prefetch: 5
    # ----------------生产者端配置----------------
    template:
      # 重试次数
      retry:
        enabled: true
        max-attempts: 3
        # 间隔一秒
        initial-interval: 1000
    # 信息发送到了交换器上
    publisher-confirms: true
    # 发送的消息,无法路由到队列时触发
    publisher-returns: true

RabbitMQ配置文件(2.1.6.RELEASE)

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.1.6.RELEASE</version>
</dependency>

当我们引用的是2.1.6.RELEASE的amqp版本的时候,配置如下:

# ----------------RabbitMQ配置----------------
spring:
  rabbitmq:
    host: 10.10.10.10
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    # ----------------消费者端配置----------------
    listener:
      simple:
        # 消费者端手动确认机制
        acknowledge-mode: manual
        # 消费者数量
        concurrency:
        # 最大消费者数量
        max-concurrency:
        # 消费者每次从队列中获取消息的数量
        prefetch: 5
    # ----------------生产者端配置----------------
    template:
      # 重试次数
      retry:
        enabled: true
        max-attempts: 3
        # 间隔一秒
        initial-interval: 1000
    # 信息发送到了交换器上
    publisher-confirms: true
    # 发送的消息,无法路由到队列时触发
    publisher-returns: true

复盘反思

  1. 因为自己对RabbitMQ的控制台不是很熟悉,只能通过两次的线上尝试,才彻底证明了配置确实不生效。
  2. 配置是可以在IDEA中查看是否生效的,通过确定配置是否可以链接到代码中去,至少可以证明,配置是否有效,再通过查看RabbitMQ的控制台,解决问题的过程会快很多。
  3. 项目中没有显示的ACK,之前一直没有出现问题,就是因为里面的配置没有生效,默认的消费者确认机制是auto,只要消息到了消费者端,就会被确认。
  4. prefetch参数和确认机制是有关联的,只有需要确认时,这个参数才会生效,否则你配置多少,在控制台中查看都是0。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐