问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

RocketMQ订阅一致性解决方案:从ClientId到负载均衡策略

创作时间:
作者:
@小白创作中心

RocketMQ订阅一致性解决方案:从ClientId到负载均衡策略

引用
CSDN
1.
https://blog.csdn.net/qq_35323137/article/details/140219064

RocketMQ在多topic订阅场景下,如何解决订阅关系变更时可能出现的订阅不一致问题?本文将通过修改ClientId和自定义负载均衡策略,提供一个安全可行的解决方案,并通过实际部署和测试验证其可行性。

背景

RocketMQ官方对订阅关系一致性的描述是:同一个消费者分组Group ID下,所有Consumer实例所订阅的Topic和Tag必须完全一致。如果订阅关系不一致,可能导致消息消费逻辑混乱,消息被重复消费或遗漏。

在实际应用中,由于历史原因,系统中可能存在多个Topic和Group的订阅关系,这导致了维护成本的增加。虽然RocketMQ支持一个消费Group消费多个Topic,但在进行订阅关系变更时(如添加或删除Topic订阅),可能会出现订阅不一致的问题。

解决方案

为了解决订阅不一致的问题,关键在于让每个客户端都知道整个Group集群中所有客户端的订阅关系。这可以通过利用ClientId的特性来实现,具体方法是在ClientId中追加当前客户端的订阅关系信息。

以下是具体的代码实现:

public String buildMQClientId() {
    StringBuilder sb = new StringBuilder();
    sb.append(this.getClientIP());
    sb.append("@");
    sb.append(this.getInstanceName());
    if (!UtilAll.isBlank(this.unitName)) {
        sb.append("@");
        sb.append(this.unitName);
    }
    if (enableStreamRequestType) {
        sb.append("@");
        sb.append(RequestType.STREAM);
    }
    MessageInstance instance = MessageStorage.getInstance(this.getInstanceName());
    if (instance != null) {
        sb.append("#");
        sb.append(MessageStorage.generateInstanceSubInfoEncode(instance));
    } else {
        sb.append("#[]");
    }
    return sb.toString();
}

接下来,需要实现一个自定义的负载均衡策略,以确保订阅关系的一致性:

@Slf4j
public class EnhanceAllocateMessageQueueStrategyImpl extends AllocateMessageQueueAveragely {

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
            return Collections.emptyList();
        }
        if (mqAll.stream().anyMatch(mq -> mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {
            return super.allocate(consumerGroup, currentCID, mqAll, cidAll);
        }
        String topic = mqAll.get(0).getTopic();
        boolean isSomeClientVersionLower = cidAll.stream().anyMatch(c -> c.lastIndexOf(MqConstant.GROUP_ENHANCE_TAG) == -1);
        if (isSomeClientVersionLower) {
            log.warn("[enhance allocate]: group:{}sub topic:{} has lower version client,use the default avg strategy", consumerGroup, topic);
            return super.allocate(consumerGroup, currentCID, mqAll, cidAll);
        }
        if (log.isDebugEnabled()) {
            log.info("[enhance allocate]: group:{} start topic rebalance:{},current client num:{},current queues num:{}", consumerGroup, topic, cidAll.size(), mqAll.size());
        }
        Map<String, List<MessageConsumer>> allClientsSubInfo = MessageStorage.getDecodeSubInfo(cidAll);
        Map<String, MessageConsumer> eachClientGroup = new HashMap<>(allClientsSubInfo.size());
        allClientsSubInfo.forEach((k, v) -> {
            for (MessageConsumer messageConsumer : v) {
                if (messageConsumer.getActualGroup().equals(consumerGroup)) {
                    eachClientGroup.put(k, messageConsumer);
                    break;
                }
            }
        });
        List<String> validCids = new ArrayList<>(eachClientGroup.size());
        for (Map.Entry<String, MessageConsumer> consumerEntry : eachClientGroup.entrySet()) {
            List<MessageConsumer.ListenTopic> currentConsumerSubTopics = consumerEntry.getValue().getTopics();
            if (currentConsumerSubTopics.stream()
                    .anyMatch(listenTopic ->
                            listenTopic.getActualTopic().equals(topic)
                                    || listenTopic.getTopic().equals(topic)
                                    || listenTopic.getSourceTopic().equals(topic))) {
                validCids.add(consumerEntry.getKey());
            }
        }
        if (validCids.size() != cidAll.size()) {
            List<MessageQueue> messageQueues = balanceAllocate(consumerGroup, currentCID, mqAll, validCids);
            log.warn("[enhance allocate]: group:{}sub topic:{} has not-balance-sub condition,sdk start enhance,clients {} complete {} queues rebalance,currentId:{},\n allocate result:{}", consumerGroup, topic,
                    MessageStorage.getClientsIp(validCids), mqAll.size(), currentCID, MessageStorage.joinMessageQueue(messageQueues));
            return messageQueues;
        } else {
            return doAllocate(consumerGroup, currentCID, mqAll, cidAll);
        }
    }

    public List<MessageQueue> doAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
        return balanceAllocate(consumerGroup, currentCID, mqAll, cidAll);
    }

    public final List<MessageQueue> balanceAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
        return super.allocate(consumerGroup, currentCID, mqAll, cidAll);
    }

    @Override
    public String getName() {
        return "Enhance";
    }
}

该策略的主要思路如下:

  1. 排除重试Topic
  2. 通过ClientId判断是否存在不同版本的SDK
  3. 通过解密所有客户端Id,判断当前Topic有哪一些客户端在监听
  4. 如果发现订阅不一致,让有当前Topic订阅关系的客户端分配所有队列
  5. 在保证订阅一致的前提下,提供可扩展的分配算法,默认使用平均分配

验证

通过实际部署和测试验证了方案的可行性:

  1. 升级SDK版本到2.1.0,订阅关系不变,发布灰度:消费正常
  2. 升级SDK到2.1.0,直接发布上线,无订阅关系变更:队列分配正确,消费正常
  3. 新增Topic订阅关系,发布灰度:灰度Pod接管新Topic全部队列,正常Pod不受影响
  4. 减少Topic订阅关系,发布灰度:灰度Pod只参与旧Topic分配,正常Pod分配全部新Topic队列

结论

该方案被验证是安全可行的,但在实际接入时需要注意:

  1. 不要在首次升级SDK时就变更订阅关系发灰度
  2. 生产环境不要使用公网接入点

通过上述方案,可以有效解决RocketMQ在多Topic订阅场景下的订阅一致性问题,确保消息消费的可靠性和稳定性。

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号