RocketMQ客户端消息确认机制详解
创作时间:
作者:
@小白创作中心
RocketMQ客户端消息确认机制详解
引用
CSDN
1.
https://blog.csdn.net/m0_71845127/article/details/145960214
RocketMQ的消息确认机制(ACK)是保障消息可靠性的核心机制,分为生产者发送确认和消费者消费确认两部分。本文将详细解析这两种确认机制,并通过源码分析帮助读者深入理解其工作原理。
一、生产者消息发送确认机制
- 机制说明
- 同步发送:生产者发送消息后阻塞等待Broker返回
SendResult,包含消息状态(SEND_OK、FLUSH_DISK_TIMEOUT等)。 - 异步发送:通过回调函数
SendCallback处理成功或异常。 - 单向发送:不关心发送结果,无确认机制。
- 重试机制:默认同步发送重试2次(可配置
retryTimesWhenSendFailed)。
- 源码解析
- 核心类:
DefaultMQProducerImpl - 关键方法:
sendDefaultImpl()
private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, ...) {
// 选择消息队列(负载均衡)
MessageQueue mq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
// 实际发送逻辑
sendResult = this.sendKernelImpl(msg, mq, communicationMode, ...);
// 处理结果或异常,触发重试
}
- 发送结果处理:根据
communicationMode处理同步/异步逻辑。 - 异常处理:网络异常或Broker不可用时触发重试。
二、消费者消息消费确认机制
- 机制说明
- PushConsumer:客户端监听消息,消费完成后返回状态:
ConsumeConcurrentlyStatus.CONSUME_SUCCESS:确认消费成功。ConsumeConcurrentlyStatus.RECONSUME_LATER:消费失败,触发重试。
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
- 重试策略:消息失败后进入重试队列(
%RETRY%),默认最多重试16次,之后进入死信队列(%DLQ%)。 - 顺序消费:需实现
MessageListenerOrderly,消费失败时暂停当前队列消费。
public enum ConsumeOrderlyStatus {
/**
* Success consumption
*/
SUCCESS,
/**
* Rollback consumption(only for binlog consumption)
*/
@Deprecated
ROLLBACK,
/**
* Commit offset(only for binlog consumption)
*/
@Deprecated
COMMIT,
/**
* Suspend current queue a moment
*/
SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
- 源码解析
- 核心类:
ConsumeMessageConcurrentlyService - 关键方法:
processConsumeResult()
public void processConsumeResult(ConsumeConcurrentlyStatus status, ...) {
if (status == CONSUME_SUCCESS) {
// 更新消费进度
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(...);
} else {
// 发送重试消息到Broker
sendMessageBack(msg, delayLevel);
}
}
- ACK提交:消费成功后更新本地消费进度(
RemoteBrokerOffsetStore)。 - 重试处理:调用
sendMessageBack()将消息发回Broker并延迟重试。
三、Broker端的确认处理
- 消息存储:Broker将消息持久化到CommitLog后返回ACK。
- 消费进度管理:通过
ConsumerOffsetManager记录消费进度。 - 重试队列:Broker维护
SCHEDULE_TOPIC处理延迟重试消息。
四、示例代码
生产者同步发送确认
SendResult sendResult = producer.send(msg);
System.out.println("Send Status: " + sendResult.getSendStatus());
生产者异步发送确认
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// do something
}
@Override
public void onException(Throwable e) {
// do something
}
});
System.out.printf("%s%n", sendResult);
消费者消费确认
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
// 处理消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
五、总结
- 生产者确认:通过同步/异步回调确保消息发送到Broker。
- 消费者确认:通过返回状态或手动ACK控制消息重试逻辑。
- 可靠性保障:结合重试队列、消费进度持久化和死信队列实现端到端可靠性。
源码中的关键逻辑集中在DefaultMQProducerImpl(发送)和ConsumeMessageConcurrentlyService(消费),通过状态机管理消息生命周期。
热门推荐
再见了,大学绩点
美国海啸预警系统:DART项目的科学突破与应用
页岩气废水怎么处理
气膜温室大棚:现代农业高效生产的创新之选
揭秘出租车司机收入全面分析:你所不知道的司机经济现状
机器学习中的过拟合与欠拟合现象:理论与实践案例研究
深入理解机器学习中的欠拟合与过拟合
如何分析行业趋势?行业趋势分析有哪些依据?
活塞环应该如何正确安装?安装活塞环时需要注意哪些细节?
常温水温度是多少
TFT-LCD液晶显示屏芯片ST7735S技术详解
上海首份“赏樱攻略”来了!花开无声却述说着“气候密码”
跨校联合招生 启动“智能”培养方案!高校探索人才培养“试验田”
载人登月计划新动向:俄美相继推迟 日印合作推进
小叶紫檀佛珠手串价格影响因素全解析
美国空军“联网战斗”计划概况
核查|完颜慧德的含金量如何?如何评判心理咨询的收费是否合理?
油痘肌用什么保湿水
盘点:《黑神话 悟空》之后,最令人期待的10款国产单机游戏!
31家电力上市公司碳交易收入盘点
流浪地球变成纪录片?MOSS真的要来了?全球首款通用AI智能体Manus发布
肺间质纤维化最新治疗方案
有机固废垃圾制氢技术研发与产业化进展分析
宴请座次安排的九大原则及应用场景详解
从波动到粒子再到规范场论:全面了解光的多面性
江西多地倡导文明婚俗 "零彩礼"集体婚礼渐成新风尚
“步步高系” 3800 万撬动73亿市值利源股份,张源能否再创资本奇迹?
如何明确学习思路:10个实用方法助你理清学习思路
中科院遗传发育所陈化榜团队发现ZmL75调控菌根真菌定殖,提高玉米耐盐碱性
中科院遗传发育所陈化榜团队发现ZmL75调控菌根真菌定殖,提高玉米耐盐碱性