Springboot整合mqtt最新教程,完整教程,最佳实践

这篇具有很好参考价值的文章主要介绍了Springboot整合mqtt最新教程,完整教程,最佳实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言:

  关于整合mqtt网上的教程很多,但大部分都是cv来cv去,中间的监听代码也没有讲清楚。教程也是很久之前的。所以决定自己来写一个教程。废话不多说直接开始教程。
本文只有教程,没有其他废话,如果需要请留言,后续更新下一版(包括主题消息的订阅方式改变,其他断线重连方式,EMQX的API对接-监听设备更加方便)。
Springboot整合mqtt采用注解进行监听(第二篇)

第一步安装 EMQX:

  MQTT服务用的是EMQX,安装方式请搜索EMQX,移步他们官网。
  官网地址:https://www.emqx.io/zh
  测试工具请下载MQTTX
  安装完成后默认管理地址:ip:18083 账户:admin 密码:public

第二步加入依赖:
  <!-- MQTT相关配置 -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
第三步加入配置:

  在yml文件中加入以下配置

#MQTT客户端
publish:
    mqtt:
        host: tcp://127.0.0.1:1883
        clientId: mqtt_publish
        options:
            userName: GuoShun
            password: qq1101165230
            # 这里表示会话不过期
            cleanSession: false
            # 配置一个默认的主题,加载时不会用到,只能在需要时手动提取
            defaultTopic: devops
            timeout: 1000
            KeepAliveInterval: 10
            #断线重连方式,自动重新连接与会话不过期配合使用会导致
            #断线重新连接后会接收到断线期间的消息。需要更改设置请看password联系我
            automaticReconnect: true
            connectionTimeout: 3000
            # 最大链接数
            maxInflight: 100
第四步创建一个MQTTConfigBuilder类,用来加载yml中的配置
/**
 * @author by Guoshun
 * @version 1.0.0
 * @description mqtt配置类
 * @date 2023/12/12 15:10
 */
@Configuration
@ConfigurationProperties(MQTTConfigBuilder.PREFIX)
@Data
public class MQTTConfigBuilder {
	
	//配置的名称
    public static final String PREFIX = "publish.mqtt";
    /**
     * 服务端地址
     */
    private String host;

    /**
     * 客户端id
     */
    private String clientId;
    /**
     * 配置链接项
     */
    private MqttConnectOptions options;

}
第五步创建一个MQTTClientUtils
@Slf4j
@Configuration
public class MQTTClientUtils {

    @Autowired
    private MQTTConfigBuilder mqttConfig;

    private MqttClient mqttClient;

    public MQTTClientUtils createDevOpsMQTTClient() {
        this.createMQTTClient();
        return this;
    }

    private MQTTClientUtils connect() {
        try {
            this.mqttClient.connect(mqttConfig.getOptions());
            log.info("MQTTClient连接成功!");
        }catch (MqttException mqttException){
            mqttException.printStackTrace();
            log.error("MQTTClient连接失败!");
        }
        return this;
    }

    private MqttClient createMQTTClient() {
        try{
            this.mqttClient = new MqttClient( mqttConfig.getHost(), mqttConfig.getClientId());
            log.info("MQTTClient创建成功!");
            return this.mqttClient;
        }catch (MqttException exception){
            exception.printStackTrace();
            log.error("MQTTClient创建失败!");
            return null;
        }
    }

    /**
     * 消息发送
     * @param topicName
     * @param message
     * @return
     */
    public boolean publish(String topicName, String message) {
        log.info("订阅主题名:{}, message:{}", topicName, message);
        MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));
        try {
            this.mqttClient.publish(topicName, mqttMessage);
            return true;
        }catch (MqttException exception){
            exception.printStackTrace();
            return false;
        }
    }

    /**
     * 消息发送 : retained 默认为 false
     * "retained message" 指的是 Broker 会保留的最后一条发布到某个主题的消息。
     * 当新的订阅者连接到该主题时,Broker 会将这条保留消息立即发送给订阅者,即使在订阅者订阅时该消息并未被重新发布。
     * 这对于一些需要初始状态或者最后一次已知状态的应用场景非常有用。
     * @param topicName
     * @param message
     * @param qos
     * @return
     */
    public boolean publish(String topicName, int qos, String message) {
        log.info("主题名:{}, qos:{}, message:{}", topicName, qos, message);
        MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));
        try {
            this.mqttClient.publish(topicName, mqttMessage.getPayload(), qos, false);
            return true;
        }catch (MqttException exception){
            exception.printStackTrace();
            return false;
        }
    }

    /**
     * 订阅某个主题
     *
     * @param topicName
     * @param qos
     */
    public void subscribe(String topicName, int qos) {
        log.info("订阅主题名:{}, qos:{}", topicName, qos);
        try {
            this.mqttClient.subscribe(topicName, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 订阅某个主题
     *
     * @param topicName
     * @param qos
     */
    public void subscribe(String topicName, int qos, IMqttMessageListener messageListener) {
        log.info("订阅主题名:{}, qos:{}, Listener类:{}", topicName, qos, messageListener.getClass());
        try {
            this.mqttClient.subscribe(topicName, qos, messageListener);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 取消订阅主题
     * @param topicName 主题名称
     */
    public void cleanTopic(String topicName) {
        log.info("取消订阅主题名:{}", topicName);
        try {
            this.mqttClient.unsubscribe(topicName);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

//这里是初始化方法
    @PostConstruct
    public void initMqttClient(){
        //创建连接
        MQTTClientUtils mqttClientUtils = this.createDevOpsMQTTClient().connect();
        //这里主要是项目启动时订阅一些主题。看个人需要使用
        //mqttClientUtils.subscribe("test/#", 2, new HeartBeatListener());
        //MessageCallbackListener订阅主题,接受到该主题消息后交给MessageCallbackListener去处理
         mqttClientUtils.subscribe("message/call/back", 2, new MessageCallbackListener());
        //需要注意的是new MessageCallbackListener()虽然会接收到消息,但这么做不对。
        //举个简单列子:就是你有切面对MessageCallbackListener中重写的方法做一些其他操作,
        //那么接收到消息后该切面并不会生效,所以不建议这么做,以下是修改过后的。
      	//@Resource
    	//private MessageCallbackListener messageCallbackListener;
    	//mqttClientUtils.subscribe("message/call/back", 2, messageCallbackListener);
    }
}

使用方式创建一个MQTTService

 可以MQTTClientUtils去调用方法,也可以将MQTTService 扩展开来,使用MQTTService 去调用方法文章来源地址https://www.toymoban.com/news/detail-858638.html


/**
 * @author by Guoshun
 * @version 1.0.0
 * @description MQTT服务类,负责调用发送消息
 * @date 2023/12/12 16:53
 */
@Service
public class MQTTService {

    @Resource
    private MQTTClientUtils mqttClientUtils;

    /**
     * 向主题发送消息
     * @param topicName
     * @param message
     */
    public void sendMessage(String topicName, String message){
        mqttClientUtils.publish(topicName, message);
    }

    /**
     * 向主题发送消息
     * @param topicName 主题名称
     * @param qos qos
     * @param message 具体消息
     */
    public void sendMessage(String topicName,int qos, String message){
        mqttClientUtils.publish(topicName, qos, message);
    }
}
消息监听处理(简单实现)
/**
 * @author by Guoshun
 * @version 1.0.0
 * @description 消息回调返回
 * @date 2023/12/12 17:27
 */
@Component
public class MessageCallbackListener implements IMqttMessageListener {

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String messageBody = new String(message.getPayload(), StandardCharsets.UTF_8);
        System.out.println("收到消息:"+topic+", 消息内容是:"+ messageBody);
    }
}

到了这里,关于Springboot整合mqtt最新教程,完整教程,最佳实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot详细整合MQTT消息

    消息队列遥测传输 (MQTT) , 是一种常用的轻量级 \\\"发布-订阅\\\"消息协议 , 非常适合通过互联网连接物联网(LOT) 或者机器对机器 (M2M) 设备与应用. MQTT可在低带宽或者低功耗环境中高效运行,因此是有众多远程客户端应用的理想之选,适合用于多个行业,包括消费类电子产品,汽车,运输

    2024年01月17日
    浏览(32)
  • SpringBoot整合EMQX(MQTT协议)

    原文:springboot当中使用EMQX(MQTT协议) 1.1、MQTT简介 MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输),是一种基于 发布/订阅 模式的 轻量级物联网消息传输协议。IBM 公司的 安迪·斯坦福-克拉克 及 Arcom 公司的 阿兰·尼普 于 1999 年撰写了该协议的第一个版本1,之

    2024年02月08日
    浏览(36)
  • SpringBoot 异常处理的最佳实践

    在 Web 开发中,异常处理是非常重要的一环。在 SpringBoot 框架中,异常处理方式有很多种,但是如何选择最佳实践呢?本文将介绍 SpringBoot 异常处理的最佳实践,并附带代码示例。 异常处理的最佳实践包括以下几个方面: 统一异常处理 在 SpringBoot 中,我们可以使用 @Controll

    2024年02月10日
    浏览(50)
  • MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图

    MQTT(EMQX) - Linux CentOS Docker 安装 MQTT (Message Queue Telemetry Transport) 是一个轻量级传输协议,它被设计用于轻量级的发布/订阅式消息传输,MQTT协议针对低带宽网络,低计算能力的设备,做了特殊的优化。是一种简单、稳定、开放、轻量级易于实现的消息协议,在物联网的应用下的

    2023年04月10日
    浏览(34)
  • SpringBoot整合RabbitMQ(最新笔记)

    1.2 创建工程并导入依赖 我们使用的springboot版本为2.5.6,其他都是根据 spring-boot-starter-parent 自动选择版本 引入以下工程即可 spring-boot-starter-test 用于测试 junit 用于单元测试 spring-boot-starter-amqp SpringBoot和RabbitMQ的整合方案 1.2 创建配置文件并配置 SpringBoot配置文件名称为 applica

    2024年02月05日
    浏览(47)
  • SpringBoot整合RestTemplate用法讲解(完整详细)

    前言:本篇主要介绍了RestTemplate中的GET,POST,PUT,DELETE、文件上传和文件下载6大常用的功能,每一个方法和每一行代码都进行了详细的讲解,代码都是亲自测试过的,整篇博客写完以后自己也是受益匪浅,于是在这做个技术分享! 目录 一、RestTemplate简介 二、基础配置 2.1、

    2024年02月12日
    浏览(34)
  • SpringBoot 实战 开发中 16 条最佳实践

    Spring Boot是最流行的用于开发微服务的Java框架。在本文中,我将与你分享自2016年以来我在专业开发中使用Spring Boot所采用的最佳实践。这些内容是基于我的个人经验和一些熟知的Spring Boot专家的文章。 在本文中,我将重点介绍Spring Boot特有的实践(大多数时候,也适用于Spri

    2024年02月07日
    浏览(33)
  • vue3 整合 springboot 打完整jar包

    .env.developmen .env.production axios 配置 package.json vite.config.js pom.xml

    2024年02月09日
    浏览(34)
  • SpringBoot整合Elasticsearch(最新最全,高效安装到使用)

    为了避免使用的Elasticsearch版本和SpringBoot采用的版本不一致导致的问题,尽量使用一致的版本。下表是对应关系: 我的SpringBoot版本: 所以选择对应Elasticsearch版本为7.12.0。 Elasticsearch各版本下载 Elasticsearch7.12.0官网下载 下载上面链接的安装包 解压到任意目录 启动es /bin/elasti

    2024年02月07日
    浏览(37)
  • RabbitMQ与SpringBoot整合实践

    作者:禅与计算机程序设计艺术 2020年是一个转折点,现代化的信息社会已经开启了数字化进程,越来越多的人开始接受信息技术作为工作的一部分。相较于传统的技术岗位,人工智能、大数据、云计算领域的软件工程师更加需要具备实际项目应用能力、高超的计算机和通信

    2024年02月09日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包