【黑马点评优化】4-基于RabbitMQ消息队列的消息可靠性优化
【黑马点评优化】4-基于RabbitMQ消息队列的消息可靠性优化
之前基于RabbitMQ实现了异步下单秒杀。但是对于异步下单秒杀这个场景,除了考虑性能外,还需要考虑可用性。
github地址如下:https://github.com/xianghua-2/hm-dianping
1 RabbitMQ异步秒杀过程
我们先回顾一下异步下单秒杀的过程。
在用户异步下单时,后端会接收到/voucher-order/seckill/{} 的请求,此时VoucherOrderController会调用seckillVoucher方法。
@PostMapping("seckill/{id}")
@SentinelResource(value = "seckill",blockHandler = "handleBlock",fallback = "handleFallback")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
long startTime = System.currentTimeMillis();
Result result = voucherOrderService.seckillVoucher(voucherId);
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
log.info("秒杀接口耗时:{}ms",duration);
// return voucherOrderService.seckillVoucher(voucherId);
return result;
}
seckillVoucher方法则通过调用VoucherOrderService的seckillVoucher方法来实现异步秒杀
VoucherOrderService.seckillVoucher方法如下
逻辑为:
- 执行lua脚本,判断用户的购买资格
- 有购买资格的话,生成orderId,并将用户id和优惠券id一起存入订单消息中,然后将订单消息存入RabbitMQ消息队列,等待消费者异步消费。
@Resource
RabbitTemplate rabbitTemplate;
@Override
public Result seckillVoucher(Long voucherId) {
//1.执行lua脚本,判断当前用户的购买资格
Long userId = UserHolder.getUser().getId();
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString());
if (result != 0) {
//2.不为0说明没有购买资格
return Result.fail(result==1?"库存不足":"不能重复下单");
}
//3.走到这一步说明有购买资格,将订单信息存到消息队列
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
//存入消息队列等待异步消费
rabbitTemplate.convertAndSend("hmdianping.direct","direct.seckill",voucherOrder);
return Result.ok(orderId);
}
消费者消费消息RabbitMQConfig中的recieveMessage方法。
通过调用 voucherOrderService.handleVoucherOrder(voucherOrder);方法来将该订单写入到数据库中。
@Configuration
public class RabbitMQConfig {
@Resource
IVoucherOrderService voucherOrderService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.seckill.queue"),
key = "direct.seckill",
exchange = @Exchange(name = "hmdianping.direct", type = ExchangeTypes.DIRECT)
))
/* public void recieveMessage(Object message){
System.out.println("监听到了"+message);
}*/
public void recieveMessage(Message message, Channel channel, VoucherOrder voucherOrder){
try {
voucherOrderService.handleVoucherOrder(voucherOrder);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
throw new RuntimeException(e);
}
System.out.println("监听到了"+message);
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
voucherOrderService.handleVoucherOrder(voucherOrder);
- 从消息实体中拿出信息
- 获取redisson锁对象
- 获取锁成功,则调用
createVoucherOrder方法,向数据库中写入订单。
@Transactional
public void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.所有信息从当前消息实体中拿
// 获取用户
Long userId = voucherOrder.getUserId();
//创建锁对象
RLock redisLock = redissonClient.getLock("lock:order:"+userId);
//尝试获取锁
boolean isLock = redisLock.tryLock();
//4.判断是否获取锁成功
if(!isLock){
//获取锁失败,直接返回失败或者重试
log.error("不允许重复下单!");
return;
}
try{
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
proxy.createVoucherOrder(voucherOrder);
}finally {
if (redisLock.isHeldByCurrentThread()) {
redisLock.unlock();
}
}
/* Long voucherId = voucherOrder.getVoucherId();
//2.扣减库存
boolean success = seckillVoucherService.update().setSql("stock=stock-1")
.eq("voucher_id", voucherId)
//======判断当前库存是否大于0就可以决定是否能抢池子中的券了
.gt("stock", 0)
.update();
//3.创建订单
if(success) save(voucherOrder);*/
}
2 异步秒杀过程中存在的问题
抽象起来,现在我们只关注RabbitMQ消息队列。那么无非存在三个过程
- 过程1:生产者(voucherorderservice中的seckillvoucher方法)连接RabbitMQ消息队列
- 过程2:生产者(seckillvoucher)投递订单消息(VoucherOrder)到消息队列中
- 过程3:消费者(RabbitMQConfig中的recieve Messeage)从消息队列中取出消息(VoucherOrder)消费
那么,我们假设一下下列场景。
1.生产者连接RabbitMQ失败。
2.生产者将消息投递到消息队列中失败。
3.消息到消息队列后,还没来得及被消费者消费。但是RabbitMQ服务器突然宕机,重启后,消息丢失。
4.消费者从消息队列中取出消息失败。
虽然可能性不高,但是我们需要考虑到这种情况。
无非就是需要考虑,发送者的可靠性,消息队列的可靠性,消费者的可靠性。
3 项目中实现
3.1 生产者可靠性——生产者确认机制
3.1.1.开启生产者确认
修改application.yaml中添加配置:
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
这里publisher-confirm-type有三种模式可选:
none:关闭confirm机制simple:同步阻塞等待MQ的回执correlated:MQ异步回调返回回执
一般我们推荐使用correlated,回调机制。
3.1.2 定义ReturnCallback
每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在config模块定义一个配置类:
src/main/java/com/hmdp/config/MqConfig.java

内容如下:
package com.hmdp.config;
import com.rabbitmq.client.ConnectionFactory;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import javax.annotation.PostConstruct;
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("触发return callback");
log.debug("exchange: {}", exchange);
log.debug("routingKey: {}", routingKey);
log.debug("message: {}", message);
log.debug("replyCode: {}", replyCode);
log.debug("replyText: {}", replyText);
}
});
}
}
3.1.3 定义ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
这里的CorrelationData中包含两个核心的东西:
id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆SettableListenableFuture:回执结果的Future对象
将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:
修改VoucherOrderServiceImpl.java 中的seckillVoucher方法如下,为MQ添加一个ConfirmCallback:
@Override
public Result seckillVoucher(Long voucherId) {
//1.执行lua脚本,判断当前用户的购买资格
Long userId = UserHolder.getUser().getId();
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString());
if (result != 0) {
//2.不为0说明没有购买资格
return Result.fail(result==1?"库存不足":"不能重复下单");
}
//3.走到这一步说明有购买资格,将订单信息存到消息队列
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
//4. 存入消息队列等待异步消费
// 4.1 创建CorrelationData
CorrelationData cd = new CorrelationData();
// 4.2 给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>(){
@Override
public void onSuccess(CorrelationData.Confirm confirm) {
// 4.3 消息发送成功时的处理逻辑
if(confirm.isAck()){
log.debug("消息发送成功,收到ack!");
}else{
log.error("消息发送失败,收到nack!"+ confirm.getReason());
}
}
@Override
public void onFailure(Throwable throwable) {
// 4.2.2 消息发送失败时的处理逻辑
log.error("消息发送失败,发生异常!"+throwable.getMessage());
}
});
rabbitTemplate.convertAndSend("hmdianping.direct","direct.seckill",voucherOrder);
return Result.ok(orderId);
}
注意:
开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:
- 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
- 交换机名称错误:同样是编程错误导致
- MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。
3.2 消息可靠性——设置持久化消息
默认消息都是持久化消息,不用修改

3.3 消费者可靠性——消费者确认+失败重试
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
- ack:成功处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.
由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:- 如果是业务异常,会自动返回
nack; - 如果是消息处理或校验异常,自动返回
reject;
- 如果是业务异常,会自动返回
返回Reject的常见异常有:
Starting with version 1.3.2, the default ErrorHandler is now a ConditionalRejectingErrorHandler that rejects (and does not requeue) messages that fail with an irrecoverable error. Specifically, it rejects messages that fail with the following errors:
- o.s.amqp…MessageConversionException: Can be thrown when converting the incoming message payload using a MessageConverter.
- o.s.messaging…MessageConversionException: Can be thrown by the conversion service if additional conversion is required when mapping to a @RabbitListener method.
- o.s.messaging…MethodArgumentNotValidException: Can be thrown if validation (for example, @Valid) is used in the listener and the validation fails.
- o.s.messaging…MethodArgumentTypeMismatchException: Can be thrown if the inbound message was converted to a type that is not correct for the target method. For example, the parameter is declared as Message but Message is received.
- java.lang.NoSuchMethodException: Added in version 1.6.3.
- java.lang.ClassCastException: Added in version 1.6.3.
通过下面的配置可以修改SpringAMQP的ACK处理方式:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理
修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理的异常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消费者接收到消息:【" + msg + "】");
if (true) {
throw new MessageConversionException("故意的");
}
log.info("消息处理完成");
}
测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。
我们把确认机制修改为manual,当处理完后,手动发送ack确认:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 自动ack
3.3.1 失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力:
当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
3.3.2 失败处理策略
在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
新建ErrorMessageConfig类

1)在consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
2)定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
完整代码如下:
package com.hmdp.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
// 处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
// 定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
3.3.3 业务幂等性
[!warning]
在黑马点评中,由于多次向数据库中插入订单,结果是一样的。(因为插入时不会重复插入相同订单)
因此,幂等性能够得到保证。
下面的例子是另一个场景,帮助解决幂等性问题。
何为幂等性?
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:
- 根据id删除数据
- 查询数据
- 新增数据
但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:
- 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
- 退款业务。重复退款对商家而言会有经济损失。
所以,我们要尽可能避免业务被重复执行。
然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:
- 页面卡顿时频繁刷新导致表单重复提交
- 服务间调用的重试
- MQ消息的重复投递
我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。
举例:
- 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
- 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
- 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
- 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。
因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:
- 唯一消息ID
- 业务状态判断
3.3.3.1 唯一消息ID
这个思路非常简单:
- 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
我们该如何给消息添加唯一ID呢?
其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
3.3.3.2 业务判断
业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。
相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。
以支付修改订单的业务为例,我们需要修改OrderServiceImpl中的markOrderPaySuccess方法:
@Override
public void markOrderPaySuccess(Long orderId) {
// 1.查询订单
Order old = getById(orderId);
// 2.判断订单状态
if (old == null || old.getStatus() != 1) {
// 订单不存在或者订单状态不是1,放弃处理
return;
}
// 3.尝试更新订单
Order order = new Order();
order.setId(orderId);
order.setStatus(2);
order.setPayTime(LocalDateTime.now());
updateById(order);
}
上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题。
我们可以合并上述操作为这样:
@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}
注意看,上述代码等同于这样的SQL语句:
UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
我们在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,根本不会执行。
态
更多推荐



所有评论(0)