问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

RocketMQ客户端消息确认机制详解

创作时间:
作者:
@小白创作中心

RocketMQ客户端消息确认机制详解

引用
CSDN
1.
https://blog.csdn.net/m0_71845127/article/details/145960214

RocketMQ的消息确认机制(ACK)是保障消息可靠性的核心机制,分为生产者发送确认和消费者消费确认两部分。本文将详细解析这两种确认机制,并通过源码分析帮助读者深入理解其工作原理。

一、生产者消息发送确认机制

  1. 机制说明
  • 同步发送:生产者发送消息后阻塞等待Broker返回SendResult,包含消息状态(SEND_OKFLUSH_DISK_TIMEOUT等)。
  • 异步发送:通过回调函数SendCallback处理成功或异常。
  • 单向发送:不关心发送结果,无确认机制。
  • 重试机制:默认同步发送重试2次(可配置retryTimesWhenSendFailed)。
  1. 源码解析
  • 核心类DefaultMQProducerImpl
  • 关键方法sendDefaultImpl()
private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, ...) {
    // 选择消息队列(负载均衡)
    MessageQueue mq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    // 实际发送逻辑
    sendResult = this.sendKernelImpl(msg, mq, communicationMode, ...);
    // 处理结果或异常,触发重试
}
  • 发送结果处理:根据communicationMode处理同步/异步逻辑。
  • 异常处理:网络异常或Broker不可用时触发重试。

二、消费者消息消费确认机制

  1. 机制说明
  • PushConsumer:客户端监听消息,消费完成后返回状态:
  • ConsumeConcurrentlyStatus.CONSUME_SUCCESS:确认消费成功。
  • ConsumeConcurrentlyStatus.RECONSUME_LATER:消费失败,触发重试。
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
   System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
  • 重试策略:消息失败后进入重试队列(%RETRY%),默认最多重试16次,之后进入死信队列(%DLQ%)。
  • 顺序消费:需实现MessageListenerOrderly,消费失败时暂停当前队列消费。
public enum ConsumeOrderlyStatus {
    /**
     * Success consumption
     */
    SUCCESS,
    /**
     * Rollback consumption(only for binlog consumption)
     */
    @Deprecated
    ROLLBACK,
    /**
     * Commit offset(only for binlog consumption)
     */
    @Deprecated
    COMMIT,
    /**
     * Suspend current queue a moment
     */
    SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
  1. 源码解析
  • 核心类ConsumeMessageConcurrentlyService
  • 关键方法processConsumeResult()
public void processConsumeResult(ConsumeConcurrentlyStatus status, ...) {
    if (status == CONSUME_SUCCESS) {
        // 更新消费进度
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(...);
    } else {
        // 发送重试消息到Broker
        sendMessageBack(msg, delayLevel);
    }
}
  • ACK提交:消费成功后更新本地消费进度(RemoteBrokerOffsetStore)。
  • 重试处理:调用sendMessageBack()将消息发回Broker并延迟重试。

三、Broker端的确认处理

  • 消息存储:Broker将消息持久化到CommitLog后返回ACK。
  • 消费进度管理:通过ConsumerOffsetManager记录消费进度。
  • 重试队列:Broker维护SCHEDULE_TOPIC处理延迟重试消息。

四、示例代码

生产者同步发送确认

SendResult sendResult = producer.send(msg);
System.out.println("Send Status: " + sendResult.getSendStatus());

生产者异步发送确认

producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
       // do something
    }
    @Override
    public void onException(Throwable e) {
       // do something
    }
});
System.out.printf("%s%n", sendResult);

消费者消费确认

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    try {
        // 处理消息
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
});

五、总结

  • 生产者确认:通过同步/异步回调确保消息发送到Broker。
  • 消费者确认:通过返回状态或手动ACK控制消息重试逻辑。
  • 可靠性保障:结合重试队列、消费进度持久化和死信队列实现端到端可靠性。

源码中的关键逻辑集中在DefaultMQProducerImpl(发送)和ConsumeMessageConcurrentlyService(消费),通过状态机管理消息生命周期。

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号