RabbitMQ高级特性:消费者确认机制详解
RabbitMQ高级特性:消费者确认机制详解
RabbitMQ的消费者确认机制是保障消息可靠性的关键环节。本文将深入探讨消费者确认机制的三种处理方式(ack、nack和reject),并通过Spring AMQP框架的具体配置和代码示例,帮助读者全面理解这一机制的实现原理和应用场景。
在RabbitMQ中,为了确认消费者是否成功处理信息,提供了Consumer Acknowledgement,也就是消费者确认机制。不同于生产者确认机制,生产者确认机制更加偏重的是消息的传递是否成功,而消费者确认机制则是更加偏重消费者是否成功处理信息。在这个机制下,当消费者处理完消息结束后,会向RabbitMQ返回一个回执,告知RabbitMQ消息处理的情况,共有如下三种情况:
- ack: 成功处理消息,MQ将从队列中删除该消息。
- nack: 消息处理失败,MQ将再次投递这一消息。
- reject: 消息处理失败并且拒绝该消息,MQ将从队列中删除该消息。通常发生在消息格式有问题的情况,所以出现的概率较低。
触发返回reject的常见异常包括:
- 消息处理失败
- 消息格式错误
- 资源不足
- 业务规则不符
- 重复消息
- 消息过期
- 消费者关闭或重启
- 手动拒绝
由于对消息回执的处理较为统一,Spring AMQP为我们封装好了相应的功能,并且提供三种处理方式供开发者选择。我们只需要在消费者所在的配置文件中配置对应的属性即可:
- none: 不做任何处理,消息投递到了后直接返回ack,不管消息处理是否异常,都返回ack。
- manual: 手动处理,需要在相应的业务代码中调用对应的api,发送ack或者是reject,灵活性高。
- auto: 自动模式。当业务正常执行时则返回ack,当业务出现异常时,根据异常的结果返回nack或reject。判断的依据如我们上文所说,如果时业务异常,返回ack,如果是消息处理异常则返回reject。并且都是自动返回,无需在业务代码中调用对应api,无业务侵入。基于面向切面编程(AOP)实现的环绕(around)增强。
需要添加的yml配置信息如下:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理
读者可以通过在消费者程序中捕获一个异常,通过观察不同异常与不同处理方式之间的处理方式来感受三中处理方式的差异。在这里我们做一个简单的代码演示。
我们需要先准备一个消息:
准备一个消费者并绑定发送指定消息队列,同时在异常处打上断点:
@RabbitListener(queues = "simple.queue")
public void listener(String msg) {
log.info("接收到信息:【{}】", msg);
if (true) {
throw new RuntimeException("测试Exception");
}
log.info("消息处理结束");
}
配置处理方式为none的效果如下:
消费者模块:
消息队列:
通过上述操作不难发现,消费者在未处理完对应的业务的时候,就已经返回了ack,这对大部分业务显然是不够安全也不推荐的,所以我们在使用none作为处理方案的时候,需要慎重考虑。
配置为auto的效果如下:
可以发现在修改为auto后,如果遇到非reject的业务异常,消息会重新回到队列,并且重新投递,因此便出现了上面异常截图中“接收到信息”这一日志信息出现在异常信息下的情况。此处通过这一截图也是为了体现此时消息正在被反复投递。显然这样处理方式是更加安全可靠的。读者可以根据业务需要选择合适的处理方案。
失败重试和失败处理策略:
在前文我们提到了当使用manual或auto作为处理方案时,如果遇到业务异常,消费者则会抛出异常并且返回nack报文,消息则会返回到队列中被重新投递。如果同一消息一直出现异常,此时便有可能陷入投递返回的死循环,此时无论我们的队列是否开启了持久化是否使用了LazyQueue都会带来不小的性能压力以及增加队列阻塞的风险。因此对于消息业务处理失败后重试次数的限制和重试失败后的处理策略也同样重要。
Spring AMQP允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,返回reject,丢弃消息。也是默认方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队,需要注意的是,这里的重试是在消费者本地重试,假设设置了重试次数为3次,就会在本地先重试三次后才返回队列。相对而言可以减小MQ的压力
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机,存档后交由人工处理。
我们可以通过配置文件配置本地失败(ImmediateRequeueMessageRecoverer)重试的次数:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
这里的话和生产者确认中的重连机制意思相近,读者可以配合理解。
RepublishMessageRecoverer我们需要提前声明一个用于投递失败信息的交换机和队列,接着定义一个RepublishMessageRecoverer,关联队列和交换机:
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
对于error交换机和队列的声明与普通队列交换机声明无异,再此就不再做过多的赘述。