RabbitMQ特性 - 持久性
RabbitMQ特性 - 持久性
在分布式系统中,消息队列是实现异步通信和解耦的重要组件。RabbitMQ作为一款广泛使用的开源消息队列系统,其持久性特性对于保证数据的可靠传输至关重要。本文将详细介绍RabbitMQ的持久化机制,包括交换器、队列和消息的持久化设置,以及如何通过确认机制确保消息的可靠传输。
RabbitMQ持久化机制
1. 交换机持久化
交换器的持久化是通过在声明交换机时将durable
参数置为true
实现的。当MQ的服务器发生意外或关闭之后,重启RabbitMQ时不需要重新去建立交换机,交换机会自动建立,相当于一直存在。如果交换器不设置持久化,那么在RabbitMQ服务重启之后,相关的交换机元数据会丢失,对一个长期使用的交换器来说,建议将其置为持久化的。
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()
2. 队列持久化
队列的持久化是通过在声明队列时将durable
参数置为true
实现的。如果队列不设置持久化,那么在RabbitMO服务重启之后,该队列就会被删掉,此时数据也会丢失。(队列没有了,消息也无处可存了)
队列的持久化能保证该队列本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会去失。要确保消息不会去失,需要将消息设置为持久化。
// 持久化
QueueBuilder.durable(Constant.ACK_QUEUE).build();
// 非持久化
QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();
队列的durable方法默认是true(开启的)
3. 消息持久化
消息实现持久化,需要把消息的投递模式(MessageProperties 中的deliveryMode)设置为2也就是 MessageDeliveryMode.PERSISTENT。
public enum MessageDeliveryMode {
NON_PERSISTENT,//⾮持久化
PERSISTENT;//持久化
}
设置了队列和消息的持久化,当RabbitMQ服务重启之后,消息依旧存在。如果只设置队列持久化,重启之后消息会去失,如果只设置消息的持久化,重启之后队列消失,继而消息也去失。所以单单设置消息特久化而不设置队列的持久化显得毫无意义。
如果使用RabbitTemplate发送持久化消息,代码如下:
// 要发送的消息内容
String message = "This is a persistent message";
// 创建⼀个Message对象,设置为持久化
Message messageObject = new Message(message.getBytes(), new MessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 使⽤RabbitTemplate发送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);
注:RabbitMQ默认情况下会将消息视为持久化的,除非队列被声明为非持久化,或者消息在发送时被标记为非持久化。
将所有的消息都设置为持久化,会严重影响RabbitMQ的性能(随机)。写入磁盘的速度比写入内存的速度慢得不只一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。
在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做一个权衡。
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?答案是否定的
从消费者来说,如果在订阅消费队列时将autoAck参数设置为true,那么当消费者接收到相关消息之后,还没来得及处理就岩机了,这样也算数据丢失。这种情况很好解决,将autoAck参数设置为false,并进行手动确认。
在持久化的消息正确存入RabbitMQ之后,还需要有一段时间(虽然很短,但是不可忽视)才能存入磁盘中。RabbitMQ并不会为每条消息都进行同步存盘(调用内核的fsync方法)的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内RabbitMo服务节点发生了机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。
这个问题怎么解决?
引入RabbitM的仲裁队列,如果主节点(master)在此特殊时间内挂掉,可以自动切换到从节点(slave)。这样有效地保证了高可用性,除非整个集群都挂掉(此方法也不能保证100%可靠,但是配置了仲裁队列要比没有配置仲裁队列的可靠性要高很多,实际生产环境中的关键业务队列一般都会设置仲裁队列)。
还可以在发送端引入事务机制或者发送方确认机制来保证消息已经正确地发送并存储至RabbitMQ中。
发送方确认
在使用RabbitMo的时候,可以通过消息持久化来解决因为服务器的异常而导致的消息丢失。但是有一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果在消息达服务器之前已经丢失(比如RabbitMQ重启,那么RabbitMQ重启期间生产者消息投递失败),持久化作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?RabbitMQ为我们提供了两种解决方案:
(1)通过事务机制实现
(2)通过发送方确认(publisherconfirm)机制实现
事务机制比较消耗性能,在实际工作中使用也不多,咱们主要介绍confirm机制采实现发送方的确认
RabbitMQ为我们提供了两个方式来控制消息的可靠性投递:
(1)confirm确认模式
Producer 在发送消息的时候,对发送端设置一个Confirmcallback的监听,无论消息是否到达 Exchange,这个监听都会被执行。如果Exchange成功收到,ACK(Acknowledge character ,确认字符)为true,如果没收到消息,ACK就为false。
步骤如下:
(1)配置RabbitMQ
(2)设置确认回调逻辑并发送消息
(3)测试
配置RabbitMQ
spring:
rabbitmq:
addresses: amqp://admin:admin
listener:
simple:
acknowledge-mode: manual #消息接收确认
publisher-confirm-type: correlated #消息发送确认
设置确认回调逻辑并发送消息
无论消息确认成功还是失败,都会调用Confirmcallback的confirm方法。如果消息成功发送到Broker.ack为true。如果消息发送失败,ack为false,并且cause提供失败的原因。
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.printf("消息已成功接收, ID: %s\n", correlationData.getId());
} else {
System.out.printf("消息接收失败, ID: %s, 原因: %s\n", correlationData.getId(), cause);
}
}
});
return rabbitTemplate;
}
@Resource(name = "confirmRabbitTemplate")
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/confirm")
public String confirm() throws InterruptedException {
CorrelationData correlationData1 = new CorrelationData("1");
confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME, "confirm", "confirm test...", correlationData1);
return "确认成功";
}
方法说明:
public interface ConfirmCallback {
/**
* 确认回调
* @param correlationData 发送消息时的附加信息,通常用于在确认回调中识别特定的消息
* @param ack 表示交换机是否成功接收到消息,true为已接收,false为未接收
* @param cause 当消息确认失败时,此字符串参数将提供失败的具体原因,用于调试和错误处理。若消息确认成功,则此参数为null
*/
void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
}
4.2 return 退回模式
消息到达Exchange之后,会根据路由规则匹配,把消息放入Queue中。Exchange到Queue的过程,如果一条消息无法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等),可以选择把消息退回给发送者,消息退回给发送者时,我们可以设置一个返回回调方法,对消息进行处理。
设置确认回调逻辑并发送消息
消息无法被路由到任何队列,它将返回给发送者,这时setReturncallback设置的回调将被触发
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.printf("消息已被退回: %s", returned);
}
});
return rabbitTemplate;
}
@RequestMapping("/msgReturn")
public String msgReturn() {
CorrelationData correlationData = new CorrelationData("2");
// 发送消息到RabbitMQ,指定交换机、路由键和消息内容
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm11", "message return test...", correlationData);
return "消息发送成功";
}
使用RabbitTemplate的setMandatory方法设置消息的mandatory属性为true(默认为false)。这个属性的作用是告诉RabbitMO,如果一条消息无法被任何队列消费,RabbitMQ应该将消息返回给发送者,此时 Returncallback就会被触发。
如何保证RabbitMO消息的可靠传输?
首先,根据图解
从这个图中,可以看出,消息可能丢失的场景以及解决方案:
1. 生产者将消息发送到RabbitMQ失败
a. 可能原因:网络问题等。
b. 解决办法:发送方确认-confirm确认模式。
2. 消息在交换机中无法路由到指定队列
a. 可能原因:代码或者配置层面错误,导致消息路由失败。
b. 解决办法:发送方确认-return模式。
3. 消息队列自身数据丢失
a. 可能原因:消息到达RabbitMQ之后,RabbitMQServer机导致消息丢失。
b. 解决办法:
开启RabbitMQ持久化,就是消息写入之后会持久化到磁盘。如果RabbitMQ挂了,恢复之后会自动读取之前存储的数据。(极端情况下,RabbitMQ还未持久化就挂了,可能导致少量数据丢失,这个概率极低,也可以通过集群的方式提高可靠性。
4. 消费者异常,导致消息丢失
a. 可能原因:消息到达消费者,还没来得及消费,消费者岩机。消费者逻辑有问题。
b. 解决办法:
RabbitMO提供了消费者应答机制来使RabbitMO能够感知到消费者是否消费成功消息。默认情况下消费者应答机制是自动应答的,可以开启手动确认,当消费者确认消费成功后才会删除消息,从而避免消息丢失。除此之外,也可以配置重试机制,当消息消费异常时,通过消息重试确保消息的可靠性 。