Spring Boot整合MQTT最详细教程(亲测有效)
创作时间:
作者:
@小白创作中心
Spring Boot整合MQTT最详细教程(亲测有效)
引用
CSDN
1.
https://blog.csdn.net/weixin_50083085/article/details/144747736
本文将详细介绍如何在Spring Boot项目中整合MQTT协议,包括依赖配置、参数设置、核心类实现以及测试验证。通过本文,你将能够掌握Spring Boot与MQTT的整合方法,实现消息的发布与订阅功能。
一、导入pom.xml依赖
在项目的pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
二、配置MQTT相关信息到application.yml
在application.yml文件中添加MQTT相关配置:
spring:
mqtt:
username: 你的账号 # 账号
password: 你的密码 # 密码
hostUrl: tcp://127.0.0.1:1883 # mqtt连接tcp地址
clientid: ${random.value} # 客户端Id,不能相同,采用随机数 ${random.value}
default-topic: /testtopic/# # 默认主题
timeout: 3000 # 超时时间
keepalive: 600 # 保持连接
subscribeFlag: true #是否进行订阅true或者false
enabled: true # 是否使用mqtt功能
三、创建MQTT相关类
在项目中创建mqtt文件夹,并添加以下类:
AjaxResult.java
public class AjaxResult extends HashMap<String, Object>
{
private static final long serialVersionUID = 1L;
/** 状态码 */
public static final String CODE_TAG = "code";
/** 返回内容 */
public static final String MSG_TAG = "msg";
/** 数据对象 */
public static final String DATA_TAG = "data";
// 构造函数和静态方法略
}
MqttConfig.java
@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {
@Autowired
private MqttPushClient mqttPushClient;
// 属性和getter/setter略
public MqttPushClient getMqttPushClient() {
if(enabled == true){
String mqtt_topic[] = StringUtils.split(defaultTopic, ",");
System.out.println("开始连接======================="+clientId);
mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
System.out.println("开始订阅=======================");
for(int i=0; i<mqtt_topic.length; i++){
mqttPushClient.subscribe(mqtt_topic[i], 1);
}
}
return mqttPushClient;
}
}
MqttInit.java
@Component
public class MqttInit implements ApplicationRunner {
@Autowired
private MqttConfig mqttConfig;
@Override
public void run(ApplicationArguments args) throws Exception{
mqttConfig.getMqttPushClient();
}
}
MqttPushClient.java
@Component
@Order(value = 2)
public class MqttPushClient {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private PushCallback pushCallback;
private static MqttClient client;
public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
try {
client = new MqttClient(host, clientID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
MqttPushClient.setClient(client);
client.setCallback(pushCallback);
client.connect(options);
System.out.println("连接成功==================="+clientID);
} catch (Exception e) {
e.printStackTrace();
}
}
public AjaxResult publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
if (null == mTopic) {
logger.error("topic not exist");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
return AjaxResult.success();
} catch (Exception e) {
e.printStackTrace();
return AjaxResult.error();
}
}
public void subscribe(String topic, int qos) {
logger.info("开始订阅主题" + topic);
try {
MqttPushClient.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
PushCallback.java
@Component
@Order(value = 1)
public class PushCallback implements MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private MqttConfig mqttConfig;
private static MqttClient client;
private static String _topic;
private static String _qos;
private static String _msg;
@Override
public void connectionLost(Throwable throwable) {
System.out.println("重连======================");
logger.info("连接断开,可以做重连");
boolean flag=true;
while(flag) {
try {
Thread.sleep(5000);
if (client == null || !client.isConnected()) {
mqttConfig.getMqttPushClient();
}
flag=false;
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
logger.info("接收消息主题 : " + topic);
logger.info("接收消息Qos : " + mqttMessage.getQos());
logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + mqttMessage.getQos());
System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload()));
_topic = topic;
_qos = mqttMessage.getQos()+"";
_msg = new String(mqttMessage.getPayload());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
public String receive() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", _topic);
jsonObject.put("qos", _qos);
jsonObject.put("msg", _msg);
return jsonObject.toString();
}
}
四、测试
创建一个Controller用于测试MQTT消息的发送:
@RestController
@RequestMapping("/mttpTest")
public class mttqController extends BaseController {
@Autowired
private MqttConfig mqttConfig;
@Autowired
private MqttPushClient mqttPushClient;
@GetMapping(value = "/a")
public AjaxResult a()
{
JSONObject jsonObject = new JSONObject();
jsonObject.put("hello", "你好");
jsonObject.put("msg", "成功");
AjaxResult publish = mqttPushClient.publish(2,false,"/testtopic/5",jsonObject.toString());
return success(null);
}
}
通过浏览器访问接口地址,可以看到消息成功发送。同时,使用MQTTX客户端工具可以查看到发送的消息,Spring Boot控制台也会打印出接收到的消息。
热门推荐
DIFF线的作用是什么?如何利用DIFF线进行分析?
北京租房租金费用参考(海淀区)2025版本
姿势不对,血压测不准
工作案例丨优化宿舍关系,构建和谐班级
1998年男虎最佳配偶:如何成为一位完美的伴侣
菊花与普洱茶的完美搭配:探索其独特风味与健康益处
吉他的基本和弦,新手赶紧码住!
在线PH计的重要性
安全驾驶评估是什么
海子,一位悲情的诗人
未成年犯罪会影响以后子女吗
学业预测:八字命理学中的学术成就
LED灯光设计:光学亚克力板的材质选择与厚度考量
上古有灵妖T0妖灵角色推荐:大禹、神农、玄女详解
沪深300ETF VS 中证A500ETF,怎么选?
通货膨胀率和CPI的关系
澳洲留学和香港留学费用到底有多大差别
磨砺心智:提升抗压能力的实用方法
王者荣耀体验服更新内容详解:新玩法、系统优化与问题修复全方位升级
王者荣耀新纪元:积分制改革引领公平竞技新风尚
弗兰克·哈恩:英国著名一般均衡理论学家
反思的力量:在生活中探索自我与成长的重要性
武大樱花何时开?最新答案来了
数据库设计中如何优化数据的存储和检索
网站定位的类型有哪些呢
购买AMD显卡需谨慎:性价比与兼容性问题全解析
如何提高文档信息提取的准确率和效率?
护士晨间护理的内容有哪些
危化品仓库如何管理流程
历时5年绘制《三体》图像小说 吴青松:尽量忠实原著