SpringBoot集成Mqtt发送消息

这篇具有很好参考价值的文章主要介绍了SpringBoot集成Mqtt发送消息。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. MQTT简介

MQTT是一种物联网消息协议,为Message Queuing Telemetry Transport的缩写,即消息队列传输探测,协议基于发布订阅模式进行通信,有开销低、带宽小、轻量的特点,通常应用在物联网数据采集、移动应用、智能硬件、电力、能源等领域。

相关概念

三种身份:

spring 如何发送mqtt消息,技术学习,spring boot,java

  • 客户端(Client):MQTT 客户端是发送接收消息的应用程序。
  • 服务器(Broker):也叫“代理”,服务器是处理消息的应用程序,位于发布者和订阅者中间,负责接收消息,并按照某种规则发送给订阅者。
  • 主题(Topic): 主题是消息的标识符,用于区分不同类型的消息。

MQTT 消息

MQTT传输的消息可以分为:主题(topic)和负载(payload)两部分

  • 主题,可以理解为消息的类型
  • 负载,可以理解为消息的内容

消息服务质量QoS(Quality of Service)

Qos用于保证在不同的网络环境下消息传递的可靠性,分为3个等级

  • 0 消息最多传递一次,消息发布完全依赖底层TCP/IP网络,可能会发生消息丢失, 也就是发出去就不管了,也被叫做“即发即弃”
  • 1 消息传递至少 1 次,确保消息到达,但消息重复可能会发生,发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。
  • 2 消息仅传送一次,确保消息到达一次

2. SpringBoot集成Mqtt

Spring集成Mqtt常用的有两种方式,一种是直接使用Mqtt的客户端库,如Eclipse Paho,另外一种是spring integration mqtt
第一种:使用Mqtt客户端库
依赖引入:org.eclipse.paho.client.mqttv3

<dependency>
	<groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.0</version>
</dependency>

服务端配置

public class MqttSendMsgService {
    private static String clientId = "test";
    private static String username = "admin";
    private static String password = "xxxxxx";
    private static String broker = "tcp://xxxxx:1883";
    public ReturnT<String> mqttSend(String param) {
        MqttClient client;
        try {
            client = new MqttClient(broker, clientId, new MemoryPersistence());
            client.setCallback(new MqttCallback() {
                public void connectionLost(Throwable cause) {
                    System.out.println("Connection lost: " + cause.getMessage());
                }
                @Override
                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    System.out.println("Message arrived: " + mqttMessage.getPayload());
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("Delivery complete");
                }
            });

            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(username);
            connOpts.setPassword(password.toCharArray());

            client.connect(connOpts);
            log.info("Connected to MQTT Broker!");


            //主题
            String topic="test/simple";
            //消息
            String content="发送测试";

            MqttMessage message = new MqttMessage();
            message.setQos(1);
            message.setRetained(false);
            message.setPayload(content.getBytes());
            //消息发送
            client.publish(topic,message);
        } catch (MqttException e) {
            e.printStackTrace();
        }
        return ReturnT.SUCCESS;
    }
}

上面这种使用起来比较简单,生产环境使用最多的还是下面这种

第二种:使用 Spring integration进行集成,这里以发送消息为例
依赖引入

<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-mqtt</artifactId>
	<version>5.5.14</version>
</dependency>

添加yaml配置

mqtt.url = tcp://xxxxx:1883
mqtt.username = admin
mqtt.password = 123456
mqtt.clientId = test
mqtt.defaultTopic = /test/send
mqtt.keepAliveInterval = 60
mqtt.automaticReconnect = true
mqtt.cleanSession = false
mqtt.connectionTimeout = 30
mqtt.maxInflight = 1024

添加对应的属性配置类

@Component
public class MqttConfigProperties {
    @Value("${mqtt.url}")
    private String url;
    @Value("${mqtt.username}")
    private String username;
    @Value("${mqtt.password}")
    private String password;
    @Value("${mqtt.clientId}")
    private String clientId;
    @Value("${mqtt.defaultTopic}")
    private String defaultTopic;
    @Value("${mqtt.keepAliveInterval}")
    private Integer keepAliveInterval;
    @Value("${mqtt.automaticReconnect}")
    private Boolean automaticReconnect;
    @Value("${mqtt.cleanSession}")
    private Boolean cleanSession;
    @Value("${mqtt.connectionTimeout}")
    private Integer connectionTimeout;
    @Value("${mqtt.maxInflight}")
    private Integer maxInflight;
}

创建客户端配置类

@Configuration
@IntegrationComponentScan
public class MqttConfig {
    @Autowired
    private MqttConfigProperties mqttConfigProperties;

    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        log.info("初始化mqtt信息{}", JSON.toJSON(mqttConfigProperties));
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(mqttConfigProperties.getUsername());
        options.setPassword(mqttConfigProperties.getPassword().toCharArray());
        options.setServerURIs(new String[]{mqttConfigProperties.getUrl()});
        options.setKeepAliveInterval(mqttConfigProperties.getKeepAliveInterval());
        options.setAutomaticReconnect(mqttConfigProperties.getAutomaticReconnect());
        options.setCleanSession(mqttConfigProperties.getCleanSession());
        options.setConnectionTimeout(mqttConfigProperties.getConnectionTimeout());
        options.setMaxInflight(mqttConfigProperties.getMaxInflight());
        return options;
    }
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions mqttConnectOptions) {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(mqttConnectOptions);
        return factory;
    }

    // 推送通道
    @Bean
    public MessageChannel mqttOutputChannel() {
        return new DirectChannel();
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler sendHandler(MqttPahoClientFactory mqttPahoClientFactory) {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigProperties.getClientId() + "-publish", mqttPahoClientFactory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(1);
        messageHandler.setDefaultTopic(mqttConfigProperties.getDefaultTopic());
        log.info("初始化mqttOutputChannel...");
        return messageHandler;
    }


}

发送网关接口

@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {
    /**
     * 发送消息
     *
     * @param topic
     * @param data
     */
    void send(@Header(MqttHeaders.TOPIC) String topic, String data);
}

这样,在发送消息时,直接将消息网关注入,调用发送方法就可以发送了

mqttGateway.send(topic, JSONObject.toJSONString(msg));

参考:
https://mqtt.org/文章来源地址https://www.toymoban.com/news/detail-858125.html

到了这里,关于SpringBoot集成Mqtt发送消息的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MQTT协议-EMQX技术文档-spring-boot整合使用--发送接收-消费

    MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的通信协议,它与MQ(Message Queue,消息队列)有一定的关联,但二者并不完全相同。 MQTT是一种轻量级的通信协议,专门为在物联网(IoT)设备之间的消息传递而设计。它运行在TCP协议之上,以“发布-订阅”模式进行

    2024年02月12日
    浏览(27)
  • 一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布&MQTT 客户端重连

    简介: 之前介绍了RabbitMQ以及如何在SpringBoot项目中整合使用RabbitMQ,看过的朋友都说写的比较详细,希望再总结一下目前比较流行的MQTT。所以接下来,就来介绍什么MQTT?它在IoT中有着怎样的作用?如何在项目中使用MQTT? 之前介绍了RabbitMQ以及如何在SpringBoot项目中整合使用

    2024年02月05日
    浏览(29)
  • MQTT协议-发布消息(服务器向客户端发送)

    发布消息报文组成:https://blog.csdn.net/weixin_46251230/article/details/129414158 在了解了发布信息的PUBLISH报文后,就可以分析出阿里云服务器向本地客户端发送的报文数据了 实验前需要在阿里云创建产品和设备,并创建简单的温度和湿度物模型:https://blog.csdn.net/weixin_46251230/article/de

    2024年02月06日
    浏览(44)
  • Android集成MQTT教程:实现高效通信和实时消息传输

        随着物联网技术的不断发展,Android应用程序对于实时通信和消息传输的需求越来越迫切。MQTT(Message Queuing Telemetry Transport)作为一种轻量级的、可扩展的通信协议,被广泛应用于物联网领域。本文将为您详细介绍如何在Android应用中集成MQTT,实现高效通信和实时消息传输

    2024年02月07日
    浏览(30)
  • 【MQTT】使用MQTT在Spring Boot项目中实现异步消息通信

    前置文章: (一)MQTT协议与指令下发;MQTT与Kafka比较 (二)用MQTT在Spring Boot项目中实现异步消息通信 MQTT(Message Queuing Telemetry Transport)是一种轻量级的、开放的消息协议,特别适用于物联网设备之间的通信。本篇文章将介绍如何在Spring Boot项目中使用MQTT来实现异步消息通信

    2024年01月17日
    浏览(42)
  • MQTT 与 Kafka|物联网消息与流数据集成实践

    MQTT (Message Queuing Telemetry Transport) 是一种轻量级的消息传输协议,专为受限网络环境下的设备通信而设计。Apache Kafka 是一个分布式流处理平台,旨在处理大规模的实时数据流。 Kafka 和 MQTT 是实现物联网数据端到端集成的互补技术。通过结合使用 Kafka 和 MQTT,企业可以构建一个

    2024年02月16日
    浏览(28)
  • SpringBoot详细整合MQTT消息

    消息队列遥测传输 (MQTT) , 是一种常用的轻量级 \\\"发布-订阅\\\"消息协议 , 非常适合通过互联网连接物联网(LOT) 或者机器对机器 (M2M) 设备与应用. MQTT可在低带宽或者低功耗环境中高效运行,因此是有众多远程客户端应用的理想之选,适合用于多个行业,包括消费类电子产品,汽车,运输

    2024年01月17日
    浏览(23)
  • springboot集成mqtt(超级无敌详细)

    1. 引入pom依赖 2. application.yml application.properties 3. MqttConfiguration.java 4. MyMQTTClient.java 5. MyMQTTCallback.java 6. MqttMsg.java 7. MqttController.java 8.SpringUtils.java 8.测试 发送和接收 springboot后台日志

    2024年02月03日
    浏览(30)
  • MQTT记录(概述,docker部署,基于spring-integration-mqtt实现消息订阅与发布,客户端工具测试)

    需要spring-boot集成spring-integration-mqtt代码的直接跳到第5部分 1.1 MQTT是什么呢? message queue telemetry translation 是一种基于发布与订阅的轻量级消息传输协议.适用于低带宽或网络不稳定的物联网应用.开发者可以使用极少的代码来实现物联网设备之间的消息传输.mqtt协议广泛应用于物

    2024年02月12日
    浏览(37)
  • 【Spring Boot】集成Kafka实现消息发送和订阅

    最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目 注意Type选Maven,java选8,其他默认 点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来 在maven配置没问

    2024年02月09日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包