问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

RabbitMQ如何保证消息不丢失?

创作时间:
作者:
@小白创作中心

RabbitMQ如何保证消息不丢失?

引用
CSDN
1.
https://blog.csdn.net/2401_86998737/article/details/146383653

在分布式系统中,消息队列是实现异步通信和解耦的关键组件。RabbitMQ作为一款广泛使用的开源消息队列系统,其可靠性是系统稳定运行的重要保障。本文将详细介绍RabbitMQ如何通过生产者确认机制、消息持久化、消费者确认机制等手段,确保消息不丢失,并探讨如何解决消息重复消费问题。

一、什么是RabbitMQ?

RabbitMQ是一个开源的消息代理软件,实现了高级消息队列协议(AMQP),用于在分布式系统中传递消息。它是一个功能强大的消息队列工具,可以用于在不同应用程序、服务或组件之间传递消息,实现异步通信。

RabbitMQ的主要作用包括:

  1. 消息队列:RabbitMQ充当消息队列的角色,可以存储和转发消息,实现生产者和消费者之间的解耦,提高系统的灵活性和可扩展性。

  2. 异步通信:通过RabbitMQ,可以实现应用程序之间的异步通信,不同组件之间可以通过消息进行交互,而不需要直接调用对方的接口。

  3. 消息确认机制:RabbitMQ支持消息的可靠投递机制,生产者可以确认消息是否成功发送到队列,消费者可以确认消息是否成功消费。

  4. 路由和订阅机制:RabbitMQ支持多种交换机类型,可以实现不同的消息路由策略,满足复杂的消息传递需求,同时支持发布/订阅模式。

二、生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中对事。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功

消息失败之后如何处理呢?

  • 回调方法即时重发

  • 记录日志

  • 保存到数据库然后定时重发,成功发送后即刻删除表中的数据

总结生产者确认机制流程图:

三、消息持久化

如何保证消息持久化?

MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失

1、交换机持久化


@Bean  
public DirectExchange simpleExchange(){  
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除   
    return new DirectExchange("simple.direct", true, false);  
}  

2、队列持久化


@Bean  
public Queue simpleQueue(){  
    // 使用QueueBuilder构建队列,durable就是持久化的  
    return QueueBuilder.durable("simple.queue").build();  
}  

DeliveryMode来指定


Message msg = MessageBuilder  
       .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体  
       .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化   
       .build();  

四、消费者确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。

  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,设置重试次数,当次数达到了以后,如果消息依然失败,将消息投递到异常交换机,交由人工处理

五、如何解决RabbitMQ中消息的重复消费问题

  1. 消息去重:在消费者端对接收到的消息进行去重操作。可以通过维护一个消息ID的集合或使用唯一标识来判断消息是否已经被消费过。

  2. 幂等性处理:确保即使消息被重复消费,也不会产生错误的结果。这可以通过在数据库操作中使用唯一约束、使用更新语句代替插入语句等方式实现。

  3. 消费者确认机制:消费者成功消费消息后,向RabbitMQ发送确认消息,RabbitMQ会将该消息标记为已确认并删除。如果消费者在处理消息过程中发生异常,可以选择拒绝消息并将其重新放回队列,等待下次重新消费。

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号