RocketMQ订阅一致性解决方案:从ClientId到负载均衡策略
创作时间:
作者:
@小白创作中心
RocketMQ订阅一致性解决方案:从ClientId到负载均衡策略
引用
CSDN
1.
https://blog.csdn.net/qq_35323137/article/details/140219064
RocketMQ在多topic订阅场景下,如何解决订阅关系变更时可能出现的订阅不一致问题?本文将通过修改ClientId和自定义负载均衡策略,提供一个安全可行的解决方案,并通过实际部署和测试验证其可行性。
背景
RocketMQ官方对订阅关系一致性的描述是:同一个消费者分组Group ID下,所有Consumer实例所订阅的Topic和Tag必须完全一致。如果订阅关系不一致,可能导致消息消费逻辑混乱,消息被重复消费或遗漏。
在实际应用中,由于历史原因,系统中可能存在多个Topic和Group的订阅关系,这导致了维护成本的增加。虽然RocketMQ支持一个消费Group消费多个Topic,但在进行订阅关系变更时(如添加或删除Topic订阅),可能会出现订阅不一致的问题。
解决方案
为了解决订阅不一致的问题,关键在于让每个客户端都知道整个Group集群中所有客户端的订阅关系。这可以通过利用ClientId的特性来实现,具体方法是在ClientId中追加当前客户端的订阅关系信息。
以下是具体的代码实现:
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
if (enableStreamRequestType) {
sb.append("@");
sb.append(RequestType.STREAM);
}
MessageInstance instance = MessageStorage.getInstance(this.getInstanceName());
if (instance != null) {
sb.append("#");
sb.append(MessageStorage.generateInstanceSubInfoEncode(instance));
} else {
sb.append("#[]");
}
return sb.toString();
}
接下来,需要实现一个自定义的负载均衡策略,以确保订阅关系的一致性:
@Slf4j
public class EnhanceAllocateMessageQueueStrategyImpl extends AllocateMessageQueueAveragely {
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return Collections.emptyList();
}
if (mqAll.stream().anyMatch(mq -> mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {
return super.allocate(consumerGroup, currentCID, mqAll, cidAll);
}
String topic = mqAll.get(0).getTopic();
boolean isSomeClientVersionLower = cidAll.stream().anyMatch(c -> c.lastIndexOf(MqConstant.GROUP_ENHANCE_TAG) == -1);
if (isSomeClientVersionLower) {
log.warn("[enhance allocate]: group:{}sub topic:{} has lower version client,use the default avg strategy", consumerGroup, topic);
return super.allocate(consumerGroup, currentCID, mqAll, cidAll);
}
if (log.isDebugEnabled()) {
log.info("[enhance allocate]: group:{} start topic rebalance:{},current client num:{},current queues num:{}", consumerGroup, topic, cidAll.size(), mqAll.size());
}
Map<String, List<MessageConsumer>> allClientsSubInfo = MessageStorage.getDecodeSubInfo(cidAll);
Map<String, MessageConsumer> eachClientGroup = new HashMap<>(allClientsSubInfo.size());
allClientsSubInfo.forEach((k, v) -> {
for (MessageConsumer messageConsumer : v) {
if (messageConsumer.getActualGroup().equals(consumerGroup)) {
eachClientGroup.put(k, messageConsumer);
break;
}
}
});
List<String> validCids = new ArrayList<>(eachClientGroup.size());
for (Map.Entry<String, MessageConsumer> consumerEntry : eachClientGroup.entrySet()) {
List<MessageConsumer.ListenTopic> currentConsumerSubTopics = consumerEntry.getValue().getTopics();
if (currentConsumerSubTopics.stream()
.anyMatch(listenTopic ->
listenTopic.getActualTopic().equals(topic)
|| listenTopic.getTopic().equals(topic)
|| listenTopic.getSourceTopic().equals(topic))) {
validCids.add(consumerEntry.getKey());
}
}
if (validCids.size() != cidAll.size()) {
List<MessageQueue> messageQueues = balanceAllocate(consumerGroup, currentCID, mqAll, validCids);
log.warn("[enhance allocate]: group:{}sub topic:{} has not-balance-sub condition,sdk start enhance,clients {} complete {} queues rebalance,currentId:{},\n allocate result:{}", consumerGroup, topic,
MessageStorage.getClientsIp(validCids), mqAll.size(), currentCID, MessageStorage.joinMessageQueue(messageQueues));
return messageQueues;
} else {
return doAllocate(consumerGroup, currentCID, mqAll, cidAll);
}
}
public List<MessageQueue> doAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
return balanceAllocate(consumerGroup, currentCID, mqAll, cidAll);
}
public final List<MessageQueue> balanceAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
return super.allocate(consumerGroup, currentCID, mqAll, cidAll);
}
@Override
public String getName() {
return "Enhance";
}
}
该策略的主要思路如下:
- 排除重试Topic
- 通过ClientId判断是否存在不同版本的SDK
- 通过解密所有客户端Id,判断当前Topic有哪一些客户端在监听
- 如果发现订阅不一致,让有当前Topic订阅关系的客户端分配所有队列
- 在保证订阅一致的前提下,提供可扩展的分配算法,默认使用平均分配
验证
通过实际部署和测试验证了方案的可行性:
- 升级SDK版本到2.1.0,订阅关系不变,发布灰度:消费正常
- 升级SDK到2.1.0,直接发布上线,无订阅关系变更:队列分配正确,消费正常
- 新增Topic订阅关系,发布灰度:灰度Pod接管新Topic全部队列,正常Pod不受影响
- 减少Topic订阅关系,发布灰度:灰度Pod只参与旧Topic分配,正常Pod分配全部新Topic队列
结论
该方案被验证是安全可行的,但在实际接入时需要注意:
- 不要在首次升级SDK时就变更订阅关系发灰度
- 生产环境不要使用公网接入点
通过上述方案,可以有效解决RocketMQ在多Topic订阅场景下的订阅一致性问题,确保消息消费的可靠性和稳定性。
热门推荐
自动化工作流:企业数字化转型的智能引擎与效率革命
保肝利胆的药有哪些中药
传承鹤山狮艺,看鹤山这样出招
高铁1小时“朋友圈”!上海出发到这些城市景美又好玩!
保终身重疾险包括哪些? 保终身重疾险有哪些?
二次函数的几何性质,动图演示让你轻松理解!
中学生如何健康饮食?
周雨彤《180天重启计划》的"辞职发疯戏",引发全网热议!
为何现在军校本科毕业,授“少尉”军衔,而不是“中尉”呢?
肉眼看紫金山彗星?可以做到吗?
影视后期制作必备:如何选择适合的笔记本电脑配置?
所有女人,请趁早戒掉对慕强的幻想
25年传承之作!日本国民级RPG《传说》系列的轨迹回顾
近现代西方奇幻文学中龙元素的发展及其作用
如何预防痔疮,得了痔疮饮食要注意什么
肝腹水怎样减轻病人的痛苦
PS5存储空间管理指南:硬件扩展与高效设置技巧全解析
2025年银行贷款利率表最新,四大银行贷款利率表一览如下
八字命理与人际关系:枭印食伤旺者如何改善社交?
反诈宣传 | 防范电信诈骗知识宣传
无极CU250摩托车省油指南:5个实用技巧助力油耗优化
南北战争:美国历史的转折点、血与火铸就的现代国度!
代替白葡萄酒:饮品与调味品的完美替代方案
固态硬盘数据恢复几率揭秘:关键因素与实用解决方案
王菲进不了前五,徐小凤排第二,十大殿堂级女歌手究竟谁是第一
关联交易存在哪些风险
猪场异常猪只诊断指南:从症状识别到病因分析
“鱼”游古厝 “丸”美邂逅!中国鱼丸之乡连江端出国庆“文旅大餐”
百度核心事业群再迎管理层变动:两位集团副总裁肖阳、王凤阳离职
玫瑰花怎么晾干不变黑(玫瑰晾晒做成干花不变黑的方法)