SpringBoot详细整合MQTT消息

这篇具有很好参考价值的文章主要介绍了SpringBoot详细整合MQTT消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

什么是Mqtt

消息队列遥测传输 (MQTT) , 是一种常用的轻量级 "发布-订阅"消息协议 , 非常适合通过互联网连接物联网(LOT) 或者机器对机器 (M2M) 设备与应用.

MQTT可在低带宽或者低功耗环境中高效运行,因此是有众多远程客户端应用的理想之选,适合用于多个行业,包括消费类电子产品,汽车,运输,制造及医疗行业.

HTTP与MQTT的区别

HTTP与MQTT都是用于通过互联网传输数据的网络协议,下面我们来看看二者的区别.

HTTP

  • 一种 "请求-响应"协议,基于该协议,客户端向服务器发送请求,服务器返回请求的数据
  • 主要设计用于在web服务器和浏览器之间传输Web内容,(如 html , 表单 , 图像 等信息)

MQTT

  • 一种轻量级 "发布-订阅"消息传递协议,基础该协议,客户端订阅主题并接受客户端围绕主题发布的消息.
  • 专为需要重点考虑低带宽,连接稳定性以及硬件设备而设计.

为何在物联网 (LOT) 中使用Mqtt

MQTT的许多特性让其成为物联网设备(物联网中的"物")和后端系统之间进行消息传递的理想协议,此处,我们将重点介绍以下四个特性

  • 轻量级 - MQTT的代码占有空间很小,适用于处理能力和内存有限的设备,例如传感器
  • 可靠性 - 许多物联网设备通过蜂窝网络连接,MQTT是一种适合低带宽网络协议,适合传输使用较少数据的简洁消息,这使得MQTT更加可靠,
    即使在网络带宽有限或者不稳定的情况下也不例外.
  • 可扩展性 - "发布-订阅"模型很容易随设备和后端系统的增加而扩展,住宅智能电表就是单台发布到两个独立后端网络 (订阅者) 的例子,
    它将公用事业使用数据发送到公用事业的系统 (用于计费) 和面向客户的应用,(房主可访问该应用了解其住宅的能源使用情况).
  • 安全性 - MQTT消息可以使用标准传输层安全防护 (TLS) 进行加密 , 并支持可用于身份验证的凭证,这让MQTT称为物联网应用中安全消息的协议,
    可处理敏感信息,例如,各种医疗设备的健康检测指标等信息.

MQTT使用什么传输协议

MQTT支持传输控制协议/互联网协议 (TCP/IP)协议,作为其底层传输协议.

  • TCP/IP 协议被认为可靠高效性的原因如下
  1. 错误检测和纠正 - 多种技术验证数据包的完整性和重传机制 , 以恢复丢失的数据包
  2. 流量控制 - 在指定网络中,数据以最佳速率进行传输,可防止传输延迟,并加强高效通信
  3. 多路复用 - 可通过单个连接发送一个数据流,因此多个应用可同时使用同一个连接
  4. 兼容性 - 可支持各种设备和操作系统
  5. 可扩展性 - 可在大型复杂网络中使用,即使在处理大量的数据流程也不影响性能

虽然TCP/IP协议是最常见的协议,但并非传输MQTT消息是唯一的选择,可可使用UDP和webSocket

搭建服务端

下载 emqx 服务器 (linxu)

官网下载地址 : https://www.emqx.io/docs/zh/v4.3/faq/use-guide.html

  1. 下载emqx 使用docker (拉取镜像)
[root@lep ~]# docker pull emqx/emqx:5.3.1
v4.0.0: Pulling from emqx/emqx
89d9c30c1d48: Pull complete
d1c907393fbf: Pull complete
4f534f3dfa46: Pull complete
c0044c0a242c: Pull complete
432bcb7ac615: Pull complete
1c89b5520019: Pull complete
e3bf682944db: Downloadin
  1. 启动镜像
[root@lep ~]# docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:5.3.1
f8ca59e1a21b040f1c28d4a3f09e908e67f11bf61f7fb11b8fd3576b283a61a0
[root@lep ~]#
  1. 访问 127.0.0.1:18083 (默认账号密码 ; admin/public)

SpringBoot详细整合MQTT消息,消息队列,spring boot,后端,java

第一次进入 ,需要修改密码

SpringBoot详细整合MQTT消息,消息队列,spring boot,后端,java

端口号

  • 控制台连接端口号 18083

  • 客户端连接端口号 1883

SpringBoot整合MQTT

  1. pom.xml文件,导入相关依赖包
        <!--        引入 mqtt 相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
            <version>2.3.12.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
  1. 修改properties / yaml 文件,增加mqtt相关的连接配置
spring.application.name=test
# MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开
spring.mqtt.url=tcp://127.0.0.1:1883
# 用户名
spring.mqtt.username=admin
# 密码
spring.mqtt.password=lep-88888888
  1. mqtt连接
package com.example.springmaven.controller.conf;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/**
 * @Date 2023/11/21 18:31
 */
@Configuration
public class MqttConfig {

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.application.name}")
    private String applicationName;

    /**
     * 客户端对象
     */
    private MqttClient client;

    /**
     * 在bean初始化后连接到服务器
     */
    @PostConstruct
    public void init() {
        this.connect();
    }

    /**
     * 断开连接
     */
    @PreDestroy
    public void disConnect() {
        try {
            client.disconnect();
            client.close();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 客户端连接服务端
     */
    public void connect() {
        try {
            // 创建MQTT客户端对象
            client = new MqttClient(hostUrl, applicationName, new MemoryPersistence());
            // 连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            // 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
            // 设置为true表示每次连接服务器都是以新的身份
            options.setCleanSession(true);
            // 设置连接用户名
            options.setUserName(username);
            // 设置连接密码
            options.setPassword(password.toCharArray());
            // 设置超时时间,单位为秒
            options.setConnectionTimeout(100);
            // 设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(20);
            // 设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic", (applicationName + "与服务器断开连接").getBytes(), 0, false);
            // 设置回调
            client.setCallback(new MqttCallBack());
            // 连接
            client.connect(options);
            // 订阅主题 (接受此主题的消息)
            this.subscribe("warn_topic", 2);
            this.subscribe("warn_topic2", 2);
        } catch (MqttException e) {
            e.printStackTrace();
        }

    }

    /**
     * 发布消息
     *
     * @param topic
     * @param message
     */
    public boolean publish(String topic, String message) {
        MqttMessage mqttMessage = new MqttMessage();
        // 0:最多交付一次,可能丢失消息
        // 1:至少交付一次,可能消息重复
        // 2:只交付一次,既不丢失也不重复
        mqttMessage.setQos(2);
        // 是否保留最后一条消息
        mqttMessage.setRetained(false);
        // 消息内容
        mqttMessage.setPayload(message.getBytes());
        // 主题的目的地,用于发布/订阅信息
        MqttTopic mqttTopic = client.getTopic(topic);
        // 提供一种机制来跟踪消息的传递进度
        // 用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
        MqttDeliveryToken token;
        try {
            // 将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
            // 一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
            return true;
        } catch (MqttException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 订阅主题
     */
    public void subscribe(String topic, int qos) {
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

  1. 消息回调
package com.example.springmaven.controller.conf;

import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.annotation.Configuration;

/**
 * @Date 2023/11/21 18:32
 */
@Configuration
public class MqttCallBack implements MqttCallback {

    /**
     * 与服务器断开的回调
     */
    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("与服务器断开连接");
    }

    /**
     * 消息到达的回调
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        System.out.println(String.format("接收消息主题 : %s", topic));
        System.out.println(String.format("接收消息Qos : %d", message.getQos()));
        System.out.println(String.format("接收消息内容 : %s", new String(message.getPayload())));
        System.out.println(String.format("接收消息retained : %b", message.isRetained()));
    }

    /**
     * 消息发布成功的回调
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        IMqttAsyncClient client = token.getClient();
        System.out.println(client.getClientId() + "发布消息成功!");
    }
}

  1. 发送消息
@RestController
@RequestMapping(value = "/test")
@Slf4j
public class TestController {

    @Autowired
    private MqttConfig mqttConfig;

    @GetMapping("/sendMessage")
    public String sendMessage(@RequestParam("topic") String topic,
                              @RequestParam("message") String message) {

        boolean publish = mqttConfig.publish(topic, message);
        if (publish) {
            return "ok";
        }
        return "no";
    }
}
  1. 启动项目
    SpringBoot详细整合MQTT消息,消息队列,spring boot,后端,java

两个订阅方 , 两个主题方
SpringBoot详细整合MQTT消息,消息队列,spring boot,后端,java

  1. 模拟发送消息

SpringBoot详细整合MQTT消息,消息队列,spring boot,后端,java

  1. 查看消息是否消费

查看监控
SpringBoot详细整合MQTT消息,消息队列,spring boot,后端,java

查看控制台输出文章来源地址https://www.toymoban.com/news/detail-799231.html

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.5.6)

2023-11-22 10:37:29.324  INFO 16044 --- [           main] c.e.springmaven.SpringMavenApplication   : Starting SpringMavenApplication using Java 11.0.1 on lep with PID 16044 (D:\Corporation\CZGJ\spring-maven\target\classes started by lep in D:\Corporation\CZGJ\spring-maven)
2023-11-22 10:37:29.324  INFO 16044 --- [           main] c.e.springmaven.SpringMavenApplication   : No active profile set, falling back to default profiles: default
2023-11-22 10:37:29.853  INFO 16044 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-11-22 10:37:29.861  INFO 16044 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-11-22 10:37:29.892  INFO 16044 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-11-22 10:37:29.908  INFO 16044 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-11-22 10:37:29.908  INFO 16044 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-11-22 10:37:30.098  INFO 16044 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)
2023-11-22 10:37:30.098  INFO 16044 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2023-11-22 10:37:30.114  INFO 16044 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.54]
2023-11-22 10:37:30.114  INFO 16044 --- [           main] o.a.catalina.core.AprLifecycleListener   : Loaded Apache Tomcat Native library [1.2.31] using APR version [1.7.0].
2023-11-22 10:37:30.114  INFO 16044 --- [           main] o.a.catalina.core.AprLifecycleListener   : APR capabilities: IPv6 [true], sendfile [true], accept filters [false], random [true], UDS [true].
2023-11-22 10:37:30.114  INFO 16044 --- [           main] o.a.catalina.core.AprLifecycleListener   : APR/OpenSSL configuration: useAprConnector [false], useOpenSSL [true]
2023-11-22 10:37:30.114  INFO 16044 --- [           main] o.a.catalina.core.AprLifecycleListener   : OpenSSL successfully initialized [OpenSSL 1.1.1l  24 Aug 2021]
2023-11-22 10:37:30.178  INFO 16044 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2023-11-22 10:37:30.178  INFO 16044 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 822 ms
2023-11-22 10:37:30.871  INFO 16044 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-11-22 10:37:30.871  INFO 16044 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'test.errorChannel' has 1 subscriber(s).
2023-11-22 10:37:30.871  INFO 16044 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2023-11-22 10:37:30.887  INFO 16044 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2023-11-22 10:37:30.887  INFO 16044 --- [           main] c.e.springmaven.SpringMavenApplication   : Started SpringMavenApplication in 1.845 seconds (JVM running for 2.87)
2023-11-22 10:42:34.668  INFO 16044 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-11-22 10:42:34.668  INFO 16044 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2023-11-22 10:42:34.669  INFO 16044 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
test发布消息成功!
接收消息主题 : warn_topic
接收消息Qos : 2
接收消息内容 : 我是告警消息
接收消息retained : false

到了这里,关于SpringBoot详细整合MQTT消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • rocketMq消息队列详细使用与实践整合spring

    使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。 首先创建一个基于Maven的SpringBoot工程,引入如下依赖: RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较

    2024年02月13日
    浏览(41)
  • 【MQTT】使用MQTT在Spring Boot项目中实现异步消息通信

    前置文章: (一)MQTT协议与指令下发;MQTT与Kafka比较 (二)用MQTT在Spring Boot项目中实现异步消息通信 MQTT(Message Queuing Telemetry Transport)是一种轻量级的、开放的消息协议,特别适用于物联网设备之间的通信。本篇文章将介绍如何在Spring Boot项目中使用MQTT来实现异步消息通信

    2024年01月17日
    浏览(54)
  • Spring Boot 整合 Shiro(后端)

    1 Shiro 什么是 Shiro 官网: http://shiro.apache.org/ 是一款主流的 Java 安全框架,不依赖任何容器,可以运行在 Java SE 和 Java EE 项目中,它的主要作用是对访问系统的用户进行身份认证、 授权、会话管理、加密等操作。 Shiro 就是用来解决安全管理的系统化框架。 2 Shiro 核心组件 用

    2024年02月09日
    浏览(49)
  • 消息队列-RabbitMQ:延迟队列、rabbitmq 插件方式实现延迟队列、整合SpringBoot

    1、延迟队列概念 延时队列内部是有序的 , 最重要的特性 就体现在它的 延时属性 上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列使用场景: 订单在十分钟之内未支付则

    2024年02月22日
    浏览(52)
  • MQTT协议-EMQX技术文档-spring-boot整合使用--发送接收-消费

    MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的通信协议,它与MQ(Message Queue,消息队列)有一定的关联,但二者并不完全相同。 MQTT是一种轻量级的通信协议,专门为在物联网(IoT)设备之间的消息传递而设计。它运行在TCP协议之上,以“发布-订阅”模式进行

    2024年02月12日
    浏览(37)
  • RabbitMQ 消息队列(Spring boot AMQP)

    几种常见MQ的对比: RabbitMQ ActiveMQ RocketMQ Kafka 公司/社区 Rabbit Apache 阿里 Apache 开发语言 Erlang Java Java ScalaJava 协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议 可用性 高 一般 高 高 单机吞吐量 一般 差 高 非常高 消息延迟 微秒级 毫秒级 毫秒级 毫

    2024年02月13日
    浏览(43)
  • Spring Boot 整合 RabbitMQ 实现延迟消息

    消息队列(Message Queuing,简写为 MQ)最初是为了解决金融行业的特定业务需求而产生的。慢慢的,MQ 被应用到了更多的领域,然而商业 MQ 高昂的价格让很多初创公司望而却步,于是 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)应运而生。 随着 AMQP 草案的发布,两个月

    2024年04月08日
    浏览(48)
  • Spring Boot如何实现分布式消息队列

    在分布式系统中,消息队列是非常重要的一部分,可以帮助开发人员实现异步处理、解耦系统、提高系统可靠性等。本文将介绍如何使用 Spring Boot 实现分布式消息队列。 消息队列是一种存储消息的容器,可以缓存消息并在需要的时候按照一定的规则将消息发送给消费者。常

    2024年02月14日
    浏览(41)
  • Spring Boot 3.x 系列【32】集成消息队列

    有道无术,术尚可求,有术无道,止于术。 本系列Spring Boot版本3.0.5 源码地址:https://gitee.com/pearl-organization/study-spring-boot3 MQ/

    2024年02月04日
    浏览(48)
  • rocketMq消息队列原生api使用以及rocketMq整合springboot

    使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。 首先创建一个基于Maven的SpringBoot工程,引入如下依赖: RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较

    2024年02月12日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包