RocketMQ客户端消息确认机制详解
创作时间:
作者:
@小白创作中心
RocketMQ客户端消息确认机制详解
引用
CSDN
1.
https://blog.csdn.net/m0_71845127/article/details/145960214
RocketMQ的消息确认机制(ACK)是保障消息可靠性的核心机制,分为生产者发送确认和消费者消费确认两部分。本文将详细解析这两种确认机制,并通过源码分析帮助读者深入理解其工作原理。
一、生产者消息发送确认机制
- 机制说明
- 同步发送:生产者发送消息后阻塞等待Broker返回
SendResult,包含消息状态(SEND_OK、FLUSH_DISK_TIMEOUT等)。 - 异步发送:通过回调函数
SendCallback处理成功或异常。 - 单向发送:不关心发送结果,无确认机制。
- 重试机制:默认同步发送重试2次(可配置
retryTimesWhenSendFailed)。
- 源码解析
- 核心类:
DefaultMQProducerImpl - 关键方法:
sendDefaultImpl()
private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, ...) {
// 选择消息队列(负载均衡)
MessageQueue mq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
// 实际发送逻辑
sendResult = this.sendKernelImpl(msg, mq, communicationMode, ...);
// 处理结果或异常,触发重试
}
- 发送结果处理:根据
communicationMode处理同步/异步逻辑。 - 异常处理:网络异常或Broker不可用时触发重试。
二、消费者消息消费确认机制
- 机制说明
- PushConsumer:客户端监听消息,消费完成后返回状态:
ConsumeConcurrentlyStatus.CONSUME_SUCCESS:确认消费成功。ConsumeConcurrentlyStatus.RECONSUME_LATER:消费失败,触发重试。
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
- 重试策略:消息失败后进入重试队列(
%RETRY%),默认最多重试16次,之后进入死信队列(%DLQ%)。 - 顺序消费:需实现
MessageListenerOrderly,消费失败时暂停当前队列消费。
public enum ConsumeOrderlyStatus {
/**
* Success consumption
*/
SUCCESS,
/**
* Rollback consumption(only for binlog consumption)
*/
@Deprecated
ROLLBACK,
/**
* Commit offset(only for binlog consumption)
*/
@Deprecated
COMMIT,
/**
* Suspend current queue a moment
*/
SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
- 源码解析
- 核心类:
ConsumeMessageConcurrentlyService - 关键方法:
processConsumeResult()
public void processConsumeResult(ConsumeConcurrentlyStatus status, ...) {
if (status == CONSUME_SUCCESS) {
// 更新消费进度
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(...);
} else {
// 发送重试消息到Broker
sendMessageBack(msg, delayLevel);
}
}
- ACK提交:消费成功后更新本地消费进度(
RemoteBrokerOffsetStore)。 - 重试处理:调用
sendMessageBack()将消息发回Broker并延迟重试。
三、Broker端的确认处理
- 消息存储:Broker将消息持久化到CommitLog后返回ACK。
- 消费进度管理:通过
ConsumerOffsetManager记录消费进度。 - 重试队列:Broker维护
SCHEDULE_TOPIC处理延迟重试消息。
四、示例代码
生产者同步发送确认
SendResult sendResult = producer.send(msg);
System.out.println("Send Status: " + sendResult.getSendStatus());
生产者异步发送确认
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// do something
}
@Override
public void onException(Throwable e) {
// do something
}
});
System.out.printf("%s%n", sendResult);
消费者消费确认
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
// 处理消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
五、总结
- 生产者确认:通过同步/异步回调确保消息发送到Broker。
- 消费者确认:通过返回状态或手动ACK控制消息重试逻辑。
- 可靠性保障:结合重试队列、消费进度持久化和死信队列实现端到端可靠性。
源码中的关键逻辑集中在DefaultMQProducerImpl(发送)和ConsumeMessageConcurrentlyService(消费),通过状态机管理消息生命周期。
热门推荐
查文红:一位上海退休女工的砀山支教之路
腹式呼吸法:简单有效的助眠神器
睡前两小时养生法,你get了吗?
喝杯热牛奶,今晚睡个好觉!
中国睡眠研究会发布:这些睡前习惯助你一夜好眠!
热“雪”沸腾!哈工程以科技创新助力亚冬会
如何停止磨牙:原因、症状与解决方案全解析
儿童口腔粘膜病小知识
孩子夜间经常磨牙怎么处理?
無論大人還小朋友,10個最適合睡覺磨牙解決方法
大运会上热搜的花花,揭秘熊猫爪子的秘密
揭秘大熊猫“拇指”进化之谜:从伪拇指到竹子专家
揭秘大熊猫“六指琴魔”的爪子秘密
混动车保养秘籍大揭秘!
威海到大连5天自驾游:网红打卡点全攻略!
生菜炒鸡蛋和酱炒鸡蛋:简单美味的家常菜新宠
青菜鸡蛋新吃法:韭菜、番茄、蒜头大比拼!
青菜炒鸡蛋的创新搭配与烹饪技巧
《极速之星》:当芭蕾舞者遇上500公里时速的赛车
清朝黑丝:东西方时尚交流的见证
纲手黑丝造型爆红,你被圈粉了吗?
探究黄芪搭配普洱茶同饮的适宜性与功效
鹿目圆的白丝魅力大揭秘:从审美符号到角色灵魂
白丝、紫色上衣和高跟鞋:打造优雅时尚造型的绝美搭配
沙雕模拟器playmods修改菜单:让你的游戏体验飞起!
《沙雕模拟器》新版本上线:趣味玩法大揭秘!
我国科学家取得全固态锂电池研究新突破
揭秘“山羊人”:从古希腊神话到中国乡村的神秘传说
牧神潘:古希腊神话中的山羊人传奇
天津必打卡:庆王府&邮政博物馆