问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Spring Boot整合MQTT最详细教程(亲测有效)

创作时间:
作者:
@小白创作中心

Spring Boot整合MQTT最详细教程(亲测有效)

引用
CSDN
1.
https://blog.csdn.net/weixin_50083085/article/details/144747736

本文将详细介绍如何在Spring Boot项目中整合MQTT协议,包括依赖配置、参数设置、核心类实现以及测试验证。通过本文,你将能够掌握Spring Boot与MQTT的整合方法,实现消息的发布与订阅功能。

一、导入pom.xml依赖

在项目的pom.xml文件中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

二、配置MQTT相关信息到application.yml

在application.yml文件中添加MQTT相关配置:

spring:
  mqtt:
    username: 你的账号                           # 账号
    password: 你的密码                        # 密码
    hostUrl: tcp://127.0.0.1:1883           # mqtt连接tcp地址
    clientid: ${random.value}                # 客户端Id,不能相同,采用随机数 ${random.value}
    default-topic: /testtopic/#                      # 默认主题
    timeout: 3000                           # 超时时间
    keepalive: 600                            # 保持连接
    subscribeFlag: true                     #是否进行订阅true或者false
    enabled: true                 # 是否使用mqtt功能

三、创建MQTT相关类

在项目中创建mqtt文件夹,并添加以下类:

AjaxResult.java

public class AjaxResult extends HashMap<String, Object>
{
    private static final long serialVersionUID = 1L;
    /** 状态码 */
    public static final String CODE_TAG = "code";
    /** 返回内容 */
    public static final String MSG_TAG = "msg";
    /** 数据对象 */
    public static final String DATA_TAG = "data";

    // 构造函数和静态方法略
}

MqttConfig.java

@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {
    @Autowired
    private MqttPushClient mqttPushClient;

    // 属性和getter/setter略

    public MqttPushClient getMqttPushClient() {
        if(enabled == true){
            String mqtt_topic[] = StringUtils.split(defaultTopic, ",");
            System.out.println("开始连接======================="+clientId);
            mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
            System.out.println("开始订阅=======================");
            for(int i=0; i<mqtt_topic.length; i++){
                mqttPushClient.subscribe(mqtt_topic[i], 1);
            }
        }
        return mqttPushClient;
    }
}

MqttInit.java

@Component
public class MqttInit implements ApplicationRunner {
    @Autowired
    private MqttConfig mqttConfig;

    @Override
    public void run(ApplicationArguments args) throws Exception{
        mqttConfig.getMqttPushClient();
    }
}

MqttPushClient.java

@Component
@Order(value = 2)
public class MqttPushClient {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
    @Autowired
    private PushCallback pushCallback;
    private static MqttClient client;

    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
        try {
            client = new MqttClient(host, clientID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            MqttPushClient.setClient(client);
            client.setCallback(pushCallback);
            client.connect(options);
            System.out.println("连接成功==================="+clientID);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public AjaxResult publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
        if (null == mTopic) {
            logger.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            token = mTopic.publish(message);
            token.waitForCompletion();
            return AjaxResult.success();
        } catch (Exception e) {
            e.printStackTrace();
            return AjaxResult.error();
        }
    }

    public void subscribe(String topic, int qos) {
        logger.info("开始订阅主题" + topic);
        try {
            MqttPushClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

PushCallback.java

@Component
@Order(value = 1)
public class PushCallback implements MqttCallback {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
    @Autowired
    private MqttConfig mqttConfig;
    private static MqttClient client;
    private static String _topic;
    private static String _qos;
    private static String _msg;

    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("重连======================");
        logger.info("连接断开,可以做重连");
        boolean flag=true;
        while(flag) {
            try {
                Thread.sleep(5000);
                if (client == null || !client.isConnected()) {
                    mqttConfig.getMqttPushClient();
                }
                flag=false;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        logger.info("接收消息主题 : " + topic);
        logger.info("接收消息Qos : " + mqttMessage.getQos());
        logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));
        System.out.println("接收消息主题 : " + topic);
        System.out.println("接收消息Qos : " + mqttMessage.getQos());
        System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload()));
        _topic = topic;
        _qos = mqttMessage.getQos()+"";
        _msg = new String(mqttMessage.getPayload());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

    public String receive() {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("topic", _topic);
        jsonObject.put("qos", _qos);
        jsonObject.put("msg", _msg);
        return jsonObject.toString();
    }
}

四、测试

创建一个Controller用于测试MQTT消息的发送:

@RestController
@RequestMapping("/mttpTest")
public class mttqController extends BaseController {
    @Autowired
    private MqttConfig mqttConfig;
    @Autowired
    private MqttPushClient mqttPushClient;

    @GetMapping(value = "/a")
    public AjaxResult a()
    {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("hello", "你好");
        jsonObject.put("msg", "成功");
        AjaxResult publish = mqttPushClient.publish(2,false,"/testtopic/5",jsonObject.toString());
        return success(null);
    }
}

通过浏览器访问接口地址,可以看到消息成功发送。同时,使用MQTTX客户端工具可以查看到发送的消息,Spring Boot控制台也会打印出接收到的消息。

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号
Spring Boot整合MQTT最详细教程(亲测有效)