MQTT协议及其使用案例

这篇具有很好参考价值的文章主要介绍了MQTT协议及其使用案例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

MQTT 概述

MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布、订阅信息传输协议。 可以在不可靠的网络环境中进行扩展,适用于设备硬件存储空间或网络带宽有限的场景。 使用MQTT协议,消息发送者与接收者不受时间和空间的限制。

Docker 部署 MQTT(采用docker-compose.yml)

version: "3" 
services:
    mqtt:
        image: eclipse-mosquitto
        container_name: mqtt
        privileged: true 
        ports: 
            - 1883:1883
            - 9001:9001
        volumes:
            - ./config:/mosquitto/config
            - ./data:/mosquitto/data
            - ./log:/mosquitto/log
  • 文件夹
    MQTT协议及其使用案例

  • 创建 config/mosquitto.conf文章来源地址https://www.toymoban.com/news/detail-835807.html

persistence true
listener 1883
persistence_location /mosquitto/data
log_dest file /mosquitto/log/mosquitto.log
 
# 关闭匿名模式
# allow_anonymous true
# 指定密码文件
password_file /mosquitto/config/pwfile.conf
  • docker部署执行:docker compose up -d
  • 设置访问权限(用户名:admin,密码:admin123)
docker exec -it mqtt sh
touch /mosquitto/config/pwfile.conf
chmod -R 755 /mosquitto/config/pwfile.conf
mosquitto_passwd -b /mosquitto/config/pwfile.conf admin admin123
  • 重启mqtt容器:docker compose restart

Springboot 整合

  • 依赖
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath/>
    </parent>
    
    <dependencies>
    		<!--  spring mqtt协议  -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <!--  lombok  -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--spring boot and web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>

        <!--Http 请求 组件-->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
        </dependency>
        <!--测试组件-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
        </dependency>
    </dependencies>
  • 配置文件
mqtt.host=tcp://127.0.0.1:1883
mqtt.clientId=mqttx_a071ba88
mqtt.username=admin
mqtt.password=admin123
mqtt.topic=test_topic
mqtt.timeout=36000
mqtt.keepAlive=6000
  • 配置类
@Slf4j
@Configuration
public class MyMqttConfiguration {
    @Value("${mqtt.host}")
    String broker;
    @Value("${mqtt.clientId}")
    String clientId;
    @Value("${mqtt.username}")
    String username;
    @Value("${mqtt.password}")
    String password;
    @Value("${mqtt.timeout}")
    Integer timeout;
    @Value("${mqtt.keepAlive}")
    Integer keepAlive;
    @Value("${mqtt.topic}")
    String topic;
    @Autowired
    MyHandle myHandle;

    @Bean
    public MyMqttClient myMqttClient(){
        MyMqttClient mqttClient = new MyMqttClient(broker, username, password, clientId, timeout, keepAlive,myHandle);
        for (int i = 0; i < 10; i++) {
            try {
                mqttClient.connect();
                mqttClient.subscribe(topic,0);
                return mqttClient;
            } catch (MqttException e) {
                log.error("MQTT connect exception,connect time = " + i);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
        return mqttClient;
    }

}
  • 客户端
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.util.ObjectUtils;

@Slf4j
public class MyMqttClient {
    private static MqttClient client;
    private String host;
    private String clientId;
    private String username;
    private String password;
    private Integer timeout;
    private Integer keepAlive;
    private MyHandle myHandle;

    public  MyMqttClient(){
        System.out.println("MyMqttClient空构造函数");
    }

    public MyMqttClient(String host, String username, String password, String clientId, Integer timeOut, Integer keepAlive,MyHandle myHandle) {
        System.out.println("MyMqttClient全参构造");
        this.host = host;
        this.username = username;
        this.password = password;
        this.clientId = clientId;
        this.timeout = timeOut;
        this.keepAlive = keepAlive;
        this.myHandle = myHandle;
    }

    public static MqttClient getClient() {
        return client;
    }

    public static void setClient(MqttClient client) {
        MyMqttClient.client = client;
    }

    /**
     * 设置mqtt连接参数
     */
     public MqttConnectOptions setMqttConnectOptions(String username,String password,Integer timeout, Integer keepAlive){
         MqttConnectOptions options = new MqttConnectOptions();
         options.setUserName(username);
         options.setPassword(password.toCharArray());
         options.setConnectionTimeout(timeout);
         options.setKeepAliveInterval(keepAlive);
         options.setCleanSession(true);
         options.setAutomaticReconnect(true);
         return options;
     }

    /**
     * 连接mqtt服务端
     */
    public void connect() throws MqttException {
        if(client == null){
            client = new MqttClient(host,clientId,new MemoryPersistence());
            client.setCallback(new MyMqttCallback(MyMqttClient.this,this.myHandle));
        }
        MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepAlive);
        if(!client.isConnected()){
            client.connect(mqttConnectOptions);
        }else{
            client.disconnect();
            client.connect(mqttConnectOptions);
        }
        log.info("MQTT connect success");
    }

    /**
     * 断开连接
     * @throws MqttException
     */
    public void disconnect()throws MqttException{
        if(null!=client && client.isConnected()){
            client.disconnect();;
        }
    }
    /**
     * 发布,qos默认为0,非持久化
     */
     public void publish(String pushMessage,String topic,int qos){
         publish(pushMessage, topic, qos, false);
     }

    /**
     * 发布消息
     *
     * @param pushMessage
     * @param topic
     * @param qos
     * @param retained:留存
     */
    public void publish(String pushMessage, String topic, int qos, boolean retained) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(pushMessage.getBytes());
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic);
        if(ObjectUtils.isEmpty(mqttTopic)){
            log.error("主题不存在");
        }
        synchronized (this){
            try{
                MqttDeliveryToken mqttDeliveryToken = mqttTopic.publish(mqttMessage);
                mqttDeliveryToken.waitForCompletion(1000L);
            }catch (MqttException e){
                e.printStackTrace();
            }
        }
    }

    /**
     * 订阅
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos) {
        try {
            MyMqttClient.getClient().subscribe(topic, qos);
            log.info("订阅主题:"+topic+"成功!");
        } catch (MqttException e) {
            log.error("订阅主题:"+topic+"失败!",e);
        }
    }
    /**
     * 取消订阅
     */
    public void cleanTopic(String topic){
        if(!ObjectUtils.isEmpty(client) && client.isConnected()){
            try{
                client.unsubscribe(topic);
            }catch (MqttException e){
                log.error("取消订阅失败!"+e);
            }
        }else{
            log.info("主题不存在或未连接!");
        }
    }
}
  • 回调类(消息发送和接收时响应)
@Slf4j
public class MyMqttCallback implements MqttCallbackExtended {
    private MyMqttClient myMqttClient;
    private MyHandle myHandle;
    public MyMqttCallback(MyMqttClient myMqttClient,MyHandle myHandle) {
        this.myMqttClient = myMqttClient;
        this.myHandle = myHandle;
    }

    /**
     * 连接完成
     * @param reconnect
     * @param serverURI
     */
    @Override
    public void connectComplete(boolean reconnect,String serverURI) {
        log.info("MQTT 连接成功,连接方式:{}",reconnect?"重连":"直连");
        //订阅主题(可以在这里订阅主题)
        try {
            MyMqttClient.getClient().subscribe("topic1");
        } catch (MqttException e) {
            log.error("主题订阅失败");
        }
    }

    /**
     * 连接丢失 进行重连操作
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.warn("mqtt connectionLost >>> 5S之后尝试重连: {}", throwable.getMessage());
        long reconnectTimes = 1;
        while (true){
            try{
                Thread.sleep(5000);
            }catch (InterruptedException ignored){}
            try{
                if(MyMqttClient.getClient().isConnected()){ // 已连接
                    return;
                }
                reconnectTimes+=1;
                log.warn("mqtt reconnect times = {} try again...  mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
                MyMqttClient.getClient().reconnect();
            }catch (MqttException e){
                log.error("mqtt断链异常",e);
            }
        }
    }

    /**
     * 订阅者收到消息之后执行
     * @param topic
     * @param mqttMessage
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("接收消息主题 : {},接收消息内容 : {}", topic, new String(mqttMessage.getPayload()));
        myHandle.handle(topic,mqttMessage);
    }

    /**
     * * 消息到达后
     * subscribe后,执行的回调函数
     * publish后,配送完成后回调的方法
     *
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用");
        log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
    }
}
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class MyHandle {
    @Async
    public void handle(String topic, MqttMessage message) {
        log.info("处理消息主题:" + topic + " 信息:" + message);
    }
}

到了这里,关于MQTT协议及其使用案例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【STM32】基于MQTT协议实时监控项目

    MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛,相关具体内容以及MQTT.fx软件的相关操作可见上一篇:MQTT协议与使用 MQTT是基于TCP/IP协议、与编程语言无关的标准物联网通信协议,正因为其在物联网系统中有非常广泛的应用,所以各种编程语言都

    2024年02月10日
    浏览(44)
  • 基于mqtt协议的物联网项目之微信小程序mqtt连接(三)

    官方连接 https://www.emqx.com/zh/blog/how-to-use-mqtt-in-wechat-miniprogram 所以uniapp使用v4.1.0版本,试过v3.0.0也可以用 默认从阿里云下载的SSL证书为cert.pem和key.key格式,我们要使用转换工具把.key转换成.pem 在线转换地址为:https://www.myssl.cn/tools/merge-pem-cert.html 替换EMQX安装路径下etc/emqx/cer

    2024年02月10日
    浏览(53)
  • 【MQTT协议】使用c++实现mqtt协议(Mosquitto源码编译)

    MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的消息传输协议,其最初由IBM开发,现已成为OASIS标准。MQTT协议常用于物联网领域,特别是在传输 低带宽、高延迟、不稳定网络 条件下的数据,例如传感器数据和遥测数据等。 MQTT协议的优点: 轻量级:

    2023年04月12日
    浏览(90)
  • mosquitto心跳和网络重连机制(基于MQTT协议)

    在网络通信中,心跳(Heartbeat)指的是一种周期性的消息,用于维持通信连接的活动状态。心跳包的主要作用是检测连接是否处于活动状态,及时发现连接异常并重新恢复连接,维护网络通信的稳定性和可靠性。 MQTT(Message Queuing Telemetry Transport)是一种轻量级、开放式的消息

    2024年02月03日
    浏览(45)
  • 【MQTT协议】使用Mosquitto实现mqtt协议(二):编写视频帧的发布/订阅服务

    更多内容详见 【MQTT协议】使用c++实现mqtt协议(Mosquitto源码编译) MQTT协议中的QoS(Quality of Service)表示消息传输的服务质量等级,它是MQTT协议中非常重要的一个概念。 MQTT协议中定义了三个不同等级的QoS: QoS 0:最多一次(At most once)传输。消息发布者只发送一次消息,不

    2023年04月14日
    浏览(43)
  • rocketmq使用mqtt协议

    rocketmq从4.9.3开始,可以兼容mqtt协议,需要安装编译一个rocketmq-mqtt工程,参考:https://rocketmq.apache.org/zh/docs/4.x/mqtt/02RocketMQMQTTQuickStart/ 需要安装rocketmq4.9.3以上的版本 安装过程略 broker.conf配置文件中添加参数,开启多队列分发特性 安装maven配置环境变量 过程略 下载并打包 下面

    2024年02月17日
    浏览(47)
  • 基于mqtt协议的物联网项目之微信小程序(二)

    硬件设备通过 mqtt:tcp port:1883 与服务器建立连接 网页/小程序 —websocket—mqtt:ws port:8083 mqtt:wss(wxs) prot:8084与服务器连接 小程序规定只能用8084端口 HTTP 协议有一个缺陷:通信只能由客户端发起,websocket是一种新的协议,所有浏览器都支持。 WebSocket是一种在单个TCP连接上进行全双

    2024年01月20日
    浏览(60)
  • 基于MQTT协议的物联网网关实现远程数据采集及监控

    在数字化时代的浪潮中,工业界正面临着前所未有的变革与机遇。而在这场变革中,基于MQTT协议的物联网网关崭露头角,成为连接工业设备、实现远程数据采集与监控的利器。其中,HiWoo Box作为一款出色的工业边缘网关,引领着这股数字化风潮,下面我们一起探寻其在实现远

    2024年02月11日
    浏览(63)
  • 基于STM32和oneNET云平台的数据采集系统(MQTT协议)

    该篇为基于stm32+esp8266通过 mqtt 协议连接 onenet 物联网云平台,单片机部分将采集到的数据(温湿度、光照强度、压强等等)上传至云平台服务器,云平台可下发指令操控单片机,实现远程通信。 1. 第一步,注册账号后点击右上角 控制台 2. 第二步,看左上角 选择切换旧版本 3.

    2024年02月06日
    浏览(58)
  • 基于智慧家居场景的端云互通实验——基于MQTT协议的智慧家居端云互通开发

    使用Wi-Fi模组并基于MQTT协议进行联网 将温湿度传感数据上传至云平台 对平台下发的开启蜂鸣器命令进行处理 将命令响应上报至平台 在使用MQTT协议与平台进行对接时,使用的IP地址和端口号是121.36.42.100:1883 按照下表填写注册信息: 其中相连服务器的IP地址与端口即华为云物

    2023年04月08日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包