RocketMQ订阅一致性解决方案:从ClientId到负载均衡策略
创作时间:
作者:
@小白创作中心
RocketMQ订阅一致性解决方案:从ClientId到负载均衡策略
引用
CSDN
1.
https://blog.csdn.net/qq_35323137/article/details/140219064
RocketMQ在多topic订阅场景下,如何解决订阅关系变更时可能出现的订阅不一致问题?本文将通过修改ClientId和自定义负载均衡策略,提供一个安全可行的解决方案,并通过实际部署和测试验证其可行性。
背景
RocketMQ官方对订阅关系一致性的描述是:同一个消费者分组Group ID下,所有Consumer实例所订阅的Topic和Tag必须完全一致。如果订阅关系不一致,可能导致消息消费逻辑混乱,消息被重复消费或遗漏。
在实际应用中,由于历史原因,系统中可能存在多个Topic和Group的订阅关系,这导致了维护成本的增加。虽然RocketMQ支持一个消费Group消费多个Topic,但在进行订阅关系变更时(如添加或删除Topic订阅),可能会出现订阅不一致的问题。
解决方案
为了解决订阅不一致的问题,关键在于让每个客户端都知道整个Group集群中所有客户端的订阅关系。这可以通过利用ClientId的特性来实现,具体方法是在ClientId中追加当前客户端的订阅关系信息。
以下是具体的代码实现:
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
if (enableStreamRequestType) {
sb.append("@");
sb.append(RequestType.STREAM);
}
MessageInstance instance = MessageStorage.getInstance(this.getInstanceName());
if (instance != null) {
sb.append("#");
sb.append(MessageStorage.generateInstanceSubInfoEncode(instance));
} else {
sb.append("#[]");
}
return sb.toString();
}
接下来,需要实现一个自定义的负载均衡策略,以确保订阅关系的一致性:
@Slf4j
public class EnhanceAllocateMessageQueueStrategyImpl extends AllocateMessageQueueAveragely {
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return Collections.emptyList();
}
if (mqAll.stream().anyMatch(mq -> mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {
return super.allocate(consumerGroup, currentCID, mqAll, cidAll);
}
String topic = mqAll.get(0).getTopic();
boolean isSomeClientVersionLower = cidAll.stream().anyMatch(c -> c.lastIndexOf(MqConstant.GROUP_ENHANCE_TAG) == -1);
if (isSomeClientVersionLower) {
log.warn("[enhance allocate]: group:{}sub topic:{} has lower version client,use the default avg strategy", consumerGroup, topic);
return super.allocate(consumerGroup, currentCID, mqAll, cidAll);
}
if (log.isDebugEnabled()) {
log.info("[enhance allocate]: group:{} start topic rebalance:{},current client num:{},current queues num:{}", consumerGroup, topic, cidAll.size(), mqAll.size());
}
Map<String, List<MessageConsumer>> allClientsSubInfo = MessageStorage.getDecodeSubInfo(cidAll);
Map<String, MessageConsumer> eachClientGroup = new HashMap<>(allClientsSubInfo.size());
allClientsSubInfo.forEach((k, v) -> {
for (MessageConsumer messageConsumer : v) {
if (messageConsumer.getActualGroup().equals(consumerGroup)) {
eachClientGroup.put(k, messageConsumer);
break;
}
}
});
List<String> validCids = new ArrayList<>(eachClientGroup.size());
for (Map.Entry<String, MessageConsumer> consumerEntry : eachClientGroup.entrySet()) {
List<MessageConsumer.ListenTopic> currentConsumerSubTopics = consumerEntry.getValue().getTopics();
if (currentConsumerSubTopics.stream()
.anyMatch(listenTopic ->
listenTopic.getActualTopic().equals(topic)
|| listenTopic.getTopic().equals(topic)
|| listenTopic.getSourceTopic().equals(topic))) {
validCids.add(consumerEntry.getKey());
}
}
if (validCids.size() != cidAll.size()) {
List<MessageQueue> messageQueues = balanceAllocate(consumerGroup, currentCID, mqAll, validCids);
log.warn("[enhance allocate]: group:{}sub topic:{} has not-balance-sub condition,sdk start enhance,clients {} complete {} queues rebalance,currentId:{},\n allocate result:{}", consumerGroup, topic,
MessageStorage.getClientsIp(validCids), mqAll.size(), currentCID, MessageStorage.joinMessageQueue(messageQueues));
return messageQueues;
} else {
return doAllocate(consumerGroup, currentCID, mqAll, cidAll);
}
}
public List<MessageQueue> doAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
return balanceAllocate(consumerGroup, currentCID, mqAll, cidAll);
}
public final List<MessageQueue> balanceAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
return super.allocate(consumerGroup, currentCID, mqAll, cidAll);
}
@Override
public String getName() {
return "Enhance";
}
}
该策略的主要思路如下:
- 排除重试Topic
- 通过ClientId判断是否存在不同版本的SDK
- 通过解密所有客户端Id,判断当前Topic有哪一些客户端在监听
- 如果发现订阅不一致,让有当前Topic订阅关系的客户端分配所有队列
- 在保证订阅一致的前提下,提供可扩展的分配算法,默认使用平均分配
验证
通过实际部署和测试验证了方案的可行性:
- 升级SDK版本到2.1.0,订阅关系不变,发布灰度:消费正常
- 升级SDK到2.1.0,直接发布上线,无订阅关系变更:队列分配正确,消费正常
- 新增Topic订阅关系,发布灰度:灰度Pod接管新Topic全部队列,正常Pod不受影响
- 减少Topic订阅关系,发布灰度:灰度Pod只参与旧Topic分配,正常Pod分配全部新Topic队列
结论
该方案被验证是安全可行的,但在实际接入时需要注意:
- 不要在首次升级SDK时就变更订阅关系发灰度
- 生产环境不要使用公网接入点
通过上述方案,可以有效解决RocketMQ在多Topic订阅场景下的订阅一致性问题,确保消息消费的可靠性和稳定性。
热门推荐
农学专业毕业后可以干什么?2025年就业前景分析
南皋和奶西,差距越来越大了。
宋太宗赵光义:统一事业与文治武功
利差高至150BP 提前还贷潮持续 存量房贷利率会否调降?
“131”机制在行动:邵阳县筑牢森林防火坚固防线
何足为重:探索一个古代成语的现代意义
大理旅游攻略:机场交通便利性分析
亚麻籽:微小的种子,具有巨大的健康益处
【儀式感】烹飪儀式感,讓忙碌的生活更有味道

厦门鼓浪屿至东山岛旅行指南:距离、交通方式及行程规划一览
探究翡翠手镯丰富色彩之美:揭秘绿意盎然的秘密
皮肤烂了、发烧了?小心是坏死性筋膜炎!
警惕寒冷性荨麻疹——别让“冰爽”变成“痒爽”
如何分析金上涨的原因和影响?这种分析对投资有哪些指导意义?
GAC课程在中国及其他国家的开办情况
「辟谷」好处多,但须配合服气、药食才算正确
2025年AMC8考试的详细流程和注意事项来啦!哪些行为是违规行为?
婴儿荨麻疹的原因及日常护理要点
中国最凉快的3座城市,19°C的夏天让空调下岗,适合中老年人康养
如何理解股票K线图中的形态?这种形态对股票趋势有哪些预示?
股票日K线是什么意思?如何设置适合自己的交易策略?
打坐与养生
河北综合实力较强的4所大学,王牌专业有特色,就业率高,可报考
三角洲行动深蓝有什么技能 新干员阿列克谢技能机制
在西游记中,玉帝和如来谁更厉害呢?原著中的细节早已说明一切
贵州必游十大景点排行榜:自然与文化的完美融合
三国全面战争秘籍及最强兵种排名 全阵营人物介绍和玩法解析
如何分辨猫便便的正常情况?(教你识别猫的大便是否健康,保护它的消化系统)
大肠癌会遗传吗?如何才能有效预防?
运城:高舞文旅产业发展新龙头