Spring Boot整合MQTT最详细教程(亲测有效)
创作时间:
作者:
@小白创作中心
Spring Boot整合MQTT最详细教程(亲测有效)
引用
CSDN
1.
https://blog.csdn.net/weixin_50083085/article/details/144747736
本文将详细介绍如何在Spring Boot项目中整合MQTT协议,包括依赖配置、参数设置、核心类实现以及测试验证。通过本文,你将能够掌握Spring Boot与MQTT的整合方法,实现消息的发布与订阅功能。
一、导入pom.xml依赖
在项目的pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
二、配置MQTT相关信息到application.yml
在application.yml文件中添加MQTT相关配置:
spring:
mqtt:
username: 你的账号 # 账号
password: 你的密码 # 密码
hostUrl: tcp://127.0.0.1:1883 # mqtt连接tcp地址
clientid: ${random.value} # 客户端Id,不能相同,采用随机数 ${random.value}
default-topic: /testtopic/# # 默认主题
timeout: 3000 # 超时时间
keepalive: 600 # 保持连接
subscribeFlag: true #是否进行订阅true或者false
enabled: true # 是否使用mqtt功能
三、创建MQTT相关类
在项目中创建mqtt文件夹,并添加以下类:
AjaxResult.java
public class AjaxResult extends HashMap<String, Object>
{
private static final long serialVersionUID = 1L;
/** 状态码 */
public static final String CODE_TAG = "code";
/** 返回内容 */
public static final String MSG_TAG = "msg";
/** 数据对象 */
public static final String DATA_TAG = "data";
// 构造函数和静态方法略
}
MqttConfig.java
@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {
@Autowired
private MqttPushClient mqttPushClient;
// 属性和getter/setter略
public MqttPushClient getMqttPushClient() {
if(enabled == true){
String mqtt_topic[] = StringUtils.split(defaultTopic, ",");
System.out.println("开始连接======================="+clientId);
mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
System.out.println("开始订阅=======================");
for(int i=0; i<mqtt_topic.length; i++){
mqttPushClient.subscribe(mqtt_topic[i], 1);
}
}
return mqttPushClient;
}
}
MqttInit.java
@Component
public class MqttInit implements ApplicationRunner {
@Autowired
private MqttConfig mqttConfig;
@Override
public void run(ApplicationArguments args) throws Exception{
mqttConfig.getMqttPushClient();
}
}
MqttPushClient.java
@Component
@Order(value = 2)
public class MqttPushClient {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private PushCallback pushCallback;
private static MqttClient client;
public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
try {
client = new MqttClient(host, clientID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
MqttPushClient.setClient(client);
client.setCallback(pushCallback);
client.connect(options);
System.out.println("连接成功==================="+clientID);
} catch (Exception e) {
e.printStackTrace();
}
}
public AjaxResult publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
if (null == mTopic) {
logger.error("topic not exist");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
return AjaxResult.success();
} catch (Exception e) {
e.printStackTrace();
return AjaxResult.error();
}
}
public void subscribe(String topic, int qos) {
logger.info("开始订阅主题" + topic);
try {
MqttPushClient.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
PushCallback.java
@Component
@Order(value = 1)
public class PushCallback implements MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private MqttConfig mqttConfig;
private static MqttClient client;
private static String _topic;
private static String _qos;
private static String _msg;
@Override
public void connectionLost(Throwable throwable) {
System.out.println("重连======================");
logger.info("连接断开,可以做重连");
boolean flag=true;
while(flag) {
try {
Thread.sleep(5000);
if (client == null || !client.isConnected()) {
mqttConfig.getMqttPushClient();
}
flag=false;
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
logger.info("接收消息主题 : " + topic);
logger.info("接收消息Qos : " + mqttMessage.getQos());
logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + mqttMessage.getQos());
System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload()));
_topic = topic;
_qos = mqttMessage.getQos()+"";
_msg = new String(mqttMessage.getPayload());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
public String receive() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", _topic);
jsonObject.put("qos", _qos);
jsonObject.put("msg", _msg);
return jsonObject.toString();
}
}
四、测试
创建一个Controller用于测试MQTT消息的发送:
@RestController
@RequestMapping("/mttpTest")
public class mttqController extends BaseController {
@Autowired
private MqttConfig mqttConfig;
@Autowired
private MqttPushClient mqttPushClient;
@GetMapping(value = "/a")
public AjaxResult a()
{
JSONObject jsonObject = new JSONObject();
jsonObject.put("hello", "你好");
jsonObject.put("msg", "成功");
AjaxResult publish = mqttPushClient.publish(2,false,"/testtopic/5",jsonObject.toString());
return success(null);
}
}
通过浏览器访问接口地址,可以看到消息成功发送。同时,使用MQTTX客户端工具可以查看到发送的消息,Spring Boot控制台也会打印出接收到的消息。
热门推荐
豆类食品的主要营养是什么
选对睡衣,告别失眠!
写好作文的有效策略
企业如何正确与残疾人签订劳动合同及注意事项
地瓜、红薯、甘薯、番薯,是一回事吗?
建筑安全员 C 证考试:用这些策略,开启成功之门
甘肃艺术统考包含哪些专业
适合新手的计算机等级考试分析与建议
泡脚十大好处具体有哪些
美洲杯决赛前瞻:中场之争、10号之争、“同门”之争
德军黑豹坦克的现世,使苏军大为惊恐,经典T-34坦克真的被暴揍?
如何确保人脸识别考勤系统的隐私合规性?
高端酒店进化论:从网红打卡到旅游目的地
甜蜜:科学解读味觉背后的秘密
提升PPT演示效果的背景图片选择与设置技巧详解
背烂过几个蔻驰包,才能混明白职场
粤人重巧夕,灯火到天明。 你见过广州乞巧文化习俗吗?
“七夕”不只关乎爱情,看广府乞巧正酣|文化中国行
华佗风痛宝片的功效
合同正式工的定义与法律地位解析
建业引进球员:如何找到合适的球员提升球队实力
仓鼠跑了怎么找回来?教你几招快速找到它的方法
为啥只有人类,会害怕同类的尸体?
C语言数据结构——详细讲解 双链表
细菌超标!公厕这两样东西,千万别碰!
转基因食品对人体究竟有没有危害?请专家们给一个明确的说法!
千古第一才女——李清照资料详解
血脂高于多少不能献血
漫步五马街,探寻浙江千年商贸文化
呼吸法:实现内心平静和幸福的呼吸技巧