Spring Boot整合MQTT最详细教程(亲测有效)
创作时间:
作者:
@小白创作中心
Spring Boot整合MQTT最详细教程(亲测有效)
引用
CSDN
1.
https://blog.csdn.net/weixin_50083085/article/details/144747736
本文将详细介绍如何在Spring Boot项目中整合MQTT协议,包括依赖配置、参数设置、核心类实现以及测试验证。通过本文,你将能够掌握Spring Boot与MQTT的整合方法,实现消息的发布与订阅功能。
一、导入pom.xml依赖
在项目的pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
二、配置MQTT相关信息到application.yml
在application.yml文件中添加MQTT相关配置:
spring:
mqtt:
username: 你的账号 # 账号
password: 你的密码 # 密码
hostUrl: tcp://127.0.0.1:1883 # mqtt连接tcp地址
clientid: ${random.value} # 客户端Id,不能相同,采用随机数 ${random.value}
default-topic: /testtopic/# # 默认主题
timeout: 3000 # 超时时间
keepalive: 600 # 保持连接
subscribeFlag: true #是否进行订阅true或者false
enabled: true # 是否使用mqtt功能
三、创建MQTT相关类
在项目中创建mqtt文件夹,并添加以下类:
AjaxResult.java
public class AjaxResult extends HashMap<String, Object>
{
private static final long serialVersionUID = 1L;
/** 状态码 */
public static final String CODE_TAG = "code";
/** 返回内容 */
public static final String MSG_TAG = "msg";
/** 数据对象 */
public static final String DATA_TAG = "data";
// 构造函数和静态方法略
}
MqttConfig.java
@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {
@Autowired
private MqttPushClient mqttPushClient;
// 属性和getter/setter略
public MqttPushClient getMqttPushClient() {
if(enabled == true){
String mqtt_topic[] = StringUtils.split(defaultTopic, ",");
System.out.println("开始连接======================="+clientId);
mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
System.out.println("开始订阅=======================");
for(int i=0; i<mqtt_topic.length; i++){
mqttPushClient.subscribe(mqtt_topic[i], 1);
}
}
return mqttPushClient;
}
}
MqttInit.java
@Component
public class MqttInit implements ApplicationRunner {
@Autowired
private MqttConfig mqttConfig;
@Override
public void run(ApplicationArguments args) throws Exception{
mqttConfig.getMqttPushClient();
}
}
MqttPushClient.java
@Component
@Order(value = 2)
public class MqttPushClient {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private PushCallback pushCallback;
private static MqttClient client;
public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
try {
client = new MqttClient(host, clientID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
MqttPushClient.setClient(client);
client.setCallback(pushCallback);
client.connect(options);
System.out.println("连接成功==================="+clientID);
} catch (Exception e) {
e.printStackTrace();
}
}
public AjaxResult publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
if (null == mTopic) {
logger.error("topic not exist");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
return AjaxResult.success();
} catch (Exception e) {
e.printStackTrace();
return AjaxResult.error();
}
}
public void subscribe(String topic, int qos) {
logger.info("开始订阅主题" + topic);
try {
MqttPushClient.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
PushCallback.java
@Component
@Order(value = 1)
public class PushCallback implements MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private MqttConfig mqttConfig;
private static MqttClient client;
private static String _topic;
private static String _qos;
private static String _msg;
@Override
public void connectionLost(Throwable throwable) {
System.out.println("重连======================");
logger.info("连接断开,可以做重连");
boolean flag=true;
while(flag) {
try {
Thread.sleep(5000);
if (client == null || !client.isConnected()) {
mqttConfig.getMqttPushClient();
}
flag=false;
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
logger.info("接收消息主题 : " + topic);
logger.info("接收消息Qos : " + mqttMessage.getQos());
logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + mqttMessage.getQos());
System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload()));
_topic = topic;
_qos = mqttMessage.getQos()+"";
_msg = new String(mqttMessage.getPayload());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
public String receive() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", _topic);
jsonObject.put("qos", _qos);
jsonObject.put("msg", _msg);
return jsonObject.toString();
}
}
四、测试
创建一个Controller用于测试MQTT消息的发送:
@RestController
@RequestMapping("/mttpTest")
public class mttqController extends BaseController {
@Autowired
private MqttConfig mqttConfig;
@Autowired
private MqttPushClient mqttPushClient;
@GetMapping(value = "/a")
public AjaxResult a()
{
JSONObject jsonObject = new JSONObject();
jsonObject.put("hello", "你好");
jsonObject.put("msg", "成功");
AjaxResult publish = mqttPushClient.publish(2,false,"/testtopic/5",jsonObject.toString());
return success(null);
}
}
通过浏览器访问接口地址,可以看到消息成功发送。同时,使用MQTTX客户端工具可以查看到发送的消息,Spring Boot控制台也会打印出接收到的消息。
热门推荐
体检项目如何选?“1+X”来帮忙
如何利用开源软件减少成本
大人突然发烧怎么回事
引江济淮圆千年通江梦,庐江将实现“河海直达”
身体好不好,看嘴就知道?若嘴唇出现这4种颜色,当心是大病信号
【实用攻略】老年人活动假牙哪种好?树脂|金属|bps|种植牙解析
香港西贡旅游攻略:景点、美食、住宿全攻略
博人传第90话:九尾复活真相曝光,向日葵成最强人柱力
巧妙计数:揭秘组合数学中的“容斥原理”
腹股沟斜疝和直疝的鉴别诊断要点
食管卡异物处理不当会危及生命 专家告诉你该如何预防和检查
桃花庵歌原文翻译及赏析
LAMP研究揭示:0.05%阿托品对近视控制的长期疗效
卫星电话科普:定义、原理、应用及未来展望
高度近视能顺产吗?| 2024 年第二届「星辉杯」科普讲解大赛参赛作品展示
70㎡小户型逆天改造!三房一厅竟然还能塞下书房和衣帽间?
邮局发出邮件查询时间
Windows防火墙规则设置指南:创建、修改和删除规则
养己式养宠,年轻人情绪价值新代餐
负面情绪与肠易激综合征:探索其内在关联
"哈利·波特制片厂之旅"究竟是什么?一起来看
晚上喝牛奶助眠?喝牛奶的正确时间是什么?
看完《哪吒》,学一下导演讲故事的手法,考试作文高分拿下
创下五项世界纪录!看深中大桥背后的超级“智慧”
脚部总是脱皮、发痒?这几个护理细节你可能忽略了!
建筑变形缝的设计与施工要求详解
经常喝茶这五种茶不要喝,第一种经常被忽略!
欧派股指的情况如何分析?分析的结果对投资有什么指导作用?
“刘三姐”黄婉秋:与儿子先后离世,遗言催人泪下,丈夫悲痛欲绝
光遇起源故事:一场关于光与暗的奇幻之旅