RocketMQ消息发送之广播模式
创作时间:
作者:
@小白创作中心
RocketMQ消息发送之广播模式
引用
CSDN
1.
https://m.blog.csdn.net/qq_53847859/article/details/140938543
RocketMQ作为一款高性能的消息队列中间件,在分布式系统中扮演着重要的角色。本文将详细介绍RocketMQ的消息发送广播模式,包括其特点、应用场景以及具体的实现方法。
广播模式概述
在RocketMQ中,广播消息是一种特殊的消息消费模式。在广播模式下,每条消息都会被推送到集群内所有注册过的客户端,保证消息至少被每台机器消费一次。与集群模式不同,集群模式下,同一个消费者组内的消息只会被消费一次,而广播模式下,每个消费者都会收到并消费相同的消息。
广播模式特点
- 消息重复消费:广播模式下,每条消息都会被推送到所有订阅了Topic的消费者,这意味着消息可能会被重复消费多次。
- 不支持顺序消息:广播模式下不支持顺序消息,因为顺序消息需要保证消息的消费顺序,这在广播模式下很难实现的,毕竟所有消费者都会同时收到消息。
- 不支持重置消费位点:广播模式下,消费进度在客户端维护,不支持在服务端重置消费位点。如果客户端重启,它将从最新消息开始消费,并不会继续消费之前未处理完的消息。
- 消费失败不重试:在广播模式下,如果消息消费失败,RocketMQ不会自动重试。因此,我们很多时候需要自行处理消费失败的情况,比如记录日志、发送告警或这手动重试。
广播模式应用场景
当需要将一条消息通知到多个系统或组件时,可以使用广播消息模式。比如,在某多多或某宝这些电商系统中,订单状态变更需要通知库存系统、物流系统等多个下游系统,这时候就可以使用广播消息。
除此之外,在日志收集系统中,我们可以将日志消息以广播模式发送到多个日志处理组件,每个组件根据自身的职责进行日志处理和分析。
还有,在实时通知推送场景中,比如发送短信、邮件、APP推送等等,我们可以使用广播消息模式将通知消息推送给所有相关的接收者。
手动实现
首先还是使用以往的方式启动搭建的RocketMQ集群。
消息生产者代码
消息生产者比较简单,代码如下:
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 100; i++){
Message msg = new Message("TopicTest",
"TagA",
"xiaowei",
("Hello world"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
消费者代码
在RocketMQ中,实现广播消息模式非常简单。我们可以通过设置消息模型为MessageModel.BROADCASTING来启用广播模式。
接下来是消费者代码,如下(流程在注释中):
// 创建DefaultMQPushConsumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
// 消费者从队列的开始位置消费消息
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 消费者的消息模型为广播模式,消息会被广播给所有消费者
consumer.setMessageModel(MessageModel.BROADCASTING);
// 订阅指定的Topic,使用"*"订阅Topic下的所有消息
consumer.subscribe("TopicTest", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 遍历消息列表,处理每一条消息
for(MessageExt msg:msgs){
// 打印消息内容
System.out.println("消息内容:"+new String(msg.getBody()));
}
// 返回消费状态,消息已成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
// 打印启动成功的提示信息
System.out.printf("Broadcast Consumer Started.%n");
配置生产者和消费者的环境变量
为了更方便的看到广播模式的消息影响,这里我们设置消费者允许多实例启动:
这里可以看到控制台上多个消费者实例都能够收到同样的消息:
本篇文章到这里就结束了,后续会继续分享RocketMQ相关的知识,感谢各位小伙伴们的支持!
热门推荐
日本科技之光:四大顶尖技术引领全球,全部领先甚至超越美国
铝业属于什么行业?铝业行业的市场现状和发展前景如何?
蓝牙耳机暂停和开始连接的正确方法是什么?
思域Si和Type R发动机对比:动力心脏的较量
头皮老长毛囊炎,你可以这样应对
超详细!U 盘 选购诀窍与数据防护宝典!错过血亏!速看!
揭开北海道的神秘面纱:日本北方仙境终极指南 - 顶级景点& 活动
社保基金投运保持稳健
㵲水河畔这座400年古镇的前世今生
带状光纤熔接机和普通光纤熔接机的区别
日本国债市场复盘及启示|低利率驱动因素及阶段反转必要条件
清明,汕头人可不只吃点朴籽粿!
属龙和属蛇相合吗?十二生肖婚配指南
河南新密非遗之美——大隗马家五香牛肉制作技艺
单目视觉SLAM的空间感知定位技术:从PTAM到ORB-SLAM3
湖人官方解析决胜规则:在西部有明显优势 目标分区冠军+两胜火箭
五分钟入门双拼!
脊柱结核:症状多样,如何识别与应对?
CMF年度报告:2025年中国经济发展拥有四大新机遇
青春不迷茫:大学生心理咨询指南
益生菌药品选择全解析:从肠道菌群失衡到疾病治疗
探索肠道外领域,一览益生元最新发展动态
机器人操作系统ROS2之安装(Ubuntu 24)
流苏树:我国珍贵树种,价值数万元一棵的“四月雪”
双11选购指南:SSD固态硬盘选购攻略
鲜牛奶、纯牛奶、生牛乳、复原乳到底有什么区别?搞懂再买不吃亏
上古四圣指的是哪四人?他们分别有哪些功绩?
阅文IP盛典榜单:年收入破50万新人作家大增 作品出海影响力显著提升
王昭君:从冷宫宫女到宁胡阏氏的悲剧人生
三山五岳,到底是指哪几座山?分别在什么地方?一文看懂!