Java基于RabbitMQ实现MQTT

这篇具有很好参考价值的文章主要介绍了Java基于RabbitMQ实现MQTT。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

要想使用MQ的MQTT服务需要先开启MQTT服务,因为RabbitMQ的MQTT默认是关闭的,具体启动方法可以参考:rabbitmq使用mqtt协议_panda_225400的博客-CSDN博客_rabbitmq mqtt

下面具体实现我就直接贴代码吧,一切说明都在代码里面,方便直接

POM依赖

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

配置类

#RabbitMQ缓存配置MQTT
#MQTT默认端口1883,和RabbitMQ不使用同一个端口,具体端口怎么看,可以看我下面的截图
spring.rabbitmq.url=tcp://127.0.0.1:1883
#MQ登录账号
spring.rabbitmq.username=root
#MQ登录密码
spring.rabbitmq.password=123456
spring.rabbitmq.pubTopic=pubTopic
#要订阅的频道,多个用逗号隔开  admin/+/admin表示中间的加号我可以替换任何字符
spring.rabbitmq.subTopic=subTopic,test,admin/+/admin
#默认写法
spring.rabbitmq.clientId=${random.value}

java mqtt服务器搭建,java,java-rabbitmq,rabbitmq

HTTP处理相关类

package com.smart.tcp.mqtt;

import com.smart.tcp.config.MqttConfig;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Objects;

/**
 * mqtt服务类
 * 一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,
 * 是物联网(Internet of Thing)中的一个标准传输协议
 * ClientId是MQTT客户端的标识。MQTT服务端用该标识来识别客户端。因此ClientId必须是独立的。
 * clientID需为全局唯一。如果不同的设备使用相同的clientID同时连接物联网平台,那么先连接的那个设备会被强制断开。
 *
 * @author WeiWei
 * @date 2022/08/08
 */
@Slf4j
@Configuration
public class MqttServer {
    /**
     * 出站通道
     */
    public static final String OUTBOUND_CHANNEL = "mqttOutboundChannel";
    /**
     * 输入通道
     */
    public static final String INPUT_CHANNEL = "mqttInputChannel";
    /**
     * mqtt配置
     */
    @Resource
    private MqttConfig mqttConfig;

    /**
     * 初始化
     */
    @PostConstruct
    public void init() {
        log.info(mqttConfig.toString());
    }

    /**
     * mqtt客户工厂
     *
     * @return {@link MqttPahoClientFactory}
     */
    @Bean
    public MqttPahoClientFactory clientFactory() {
        MqttConnectOptions options = new MqttConnectOptions();
        //配置MqttConnectOptions
        options.setServerURIs(new String[]{mqttConfig.getUrl()});
        options.setUserName(mqttConfig.getUsername());
        options.setPassword(mqttConfig.getPassword().toCharArray());
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(options);
        return factory;
    }

    /**
     * mqtt出站通道
     *
     * @return {@link MessageChannel}
     */
    @Bean(value = OUTBOUND_CHANNEL)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * mqtt出站handler  1
     *
     * @return {@link MessageHandler}
     */
    @Bean
    @ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
    public MessageHandler mqttOutboundHandler() {
        //MqttPahoMessageHandler初始化
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttConfig.getClientId(), clientFactory());
        //设置默认的qos级别
        handler.setDefaultQos(1);
        //保留标志的默认值。如果没有mqtt_retained找到标题,则使用它。如果提供了自定义,则不使用它converter。这里不启用
        handler.setDefaultRetained(false);
        //设置发布的主题
        handler.setDefaultTopic(mqttConfig.getPubTopic());
        //当 时true,调用者不会阻塞。相反,它在发送消息时等待传递确认。默认值为false(在确认交付之前发送阻止)。
        handler.setAsync(false);
        //当 async 和 async-events 都为 true 时,会发出 MqttMessageSentEvent(请参阅事件)。它包含消息、主题、客户端库生成的messageId、clientId和clientInstance(每次连接客户端时递增)。当客户端库确认交付时,会发出 MqttMessageDeliveredEvent。它包含 messageId、clientId 和 clientInstance,使传递与发送相关联。任何 ApplicationListener 或事件入站通道适配器都可以接收这些事件。请注意,有可能在 MqttMessageSentEvent 之前接收到 MqttMessageDeliveredEvent。默认值为false。
        handler.setAsyncEvents(false);
        return handler;
    }

    public MessageHandler mqttOutboundHandler123(String code,String topic) {
        //MqttPahoMessageHandler初始化
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(code, clientFactory());
        //设置默认的qos级别
        handler.setDefaultQos(1);
        //保留标志的默认值。如果没有mqtt_retained找到标题,则使用它。如果提供了自定义,则不使用它converter。这里不启用
        handler.setDefaultRetained(false);
        //设置发布的主题
        handler.setDefaultTopic(topic);
        //当 时true,调用者不会阻塞。相反,它在发送消息时等待传递确认。默认值为false(在确认交付之前发送阻止)。
        handler.setAsync(false);
        //当 async 和 async-events 都为 true 时,会发出 MqttMessageSentEvent(请参阅事件)。它包含消息、主题、客户端库生成的messageId、clientId和clientInstance(每次连接客户端时递增)。当客户端库确认交付时,会发出 MqttMessageDeliveredEvent。它包含 messageId、clientId 和 clientInstance,使传递与发送相关联。任何 ApplicationListener 或事件入站通道适配器都可以接收这些事件。请注意,有可能在 MqttMessageSentEvent 之前接收到 MqttMessageDeliveredEvent。默认值为false。
        handler.setAsyncEvents(false);
        return handler;
    }

    /**
     * mqtt输入通道
     *
     * @return {@link MessageChannel}
     */
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    /**
     * 入站
     *
     * @return {@link MessageProducer}
     */
    @Bean
    public MessageProducer inbound() {
        //配置订阅端MqttPahoMessageDrivenChannelAdapter
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        mqttConfig.getClientId() + "_inbound", clientFactory(), mqttConfig.getSubTopic().split(","));
        //设置超时时间
        adapter.setCompletionTimeout(3000);
        //设置默认的消息转换类
        adapter.setConverter(new DefaultPahoMessageConverter());
        //设置qos级别
        adapter.setQos(1);
        //设置入站管道
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    /**
     * 消息处理程序
     *
     * @return {@link MessageHandler}
     */
    @Bean
    @ServiceActivator(inputChannel = INPUT_CHANNEL)
    public MessageHandler messageHandler() {
        return message -> {
            String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
            log.info("主题:"+topic+"+接收到该主题消息为: {}", message.getPayload());
        };
    }
}

网关类

package com.smart.tcp.mqtt;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * mqtt网关(发布端需要用到)
 *
 * @author WeiWei
 * @date 2022/08/08
 */
@Component
@MessagingGateway(defaultRequestChannel = MqttServer.OUTBOUND_CHANNEL)
public interface MqttGateway {
    /**
     * 发送到mqtt
     *
     * @param payload 有效载荷
     */
    void sendToMqtt(String payload);

    /**
     * 发送到mqtt
     *
     * @param topic   主题
     * @param payload 消息内容
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 发送到mqtt
     *
     * @param topic   主题
     * @param qos     qos
     * @param payload 消息内容
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

控制层测试类

package com.smart.tcp.controller;

import com.ruoyi.common.model.BaseResult;
import com.smart.server.api.mall.AddressApi;
import com.smart.server.domain.Address;
import com.smart.tcp.mqtt.MqttGateway;
import com.smart.tcp.mqtt.MqttServer;
import com.smart.tcp.server.ChargeServer;
import com.smart.tcp.server.EquipmentApiServer;
import org.apache.dubbo.config.annotation.DubboReference;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.util.Assert;
import org.springframework.web.bind.annotation.*;

import java.util.Date;
import java.util.List;

/**
 * @author HayDen
 * @date 2022-11-15
 */
@RestController
@RequestMapping("/api/top")
public class TopController
{
    @Autowired
    private MqttGateway mqttGateway;

    /**
     * 发送MQTT消息
     *
     * @param message 消息内容
     * @return 返回
     */
    @PostMapping(value = "/mqtt", produces = MediaType.APPLICATION_JSON_VALUE)
    public ResponseEntity<String> sendMqtt(@RequestParam(value = "msg") String message) {
        System.out.println("================生产MQTT消息================" + message);
        mqttGateway.sendToMqtt(message);
        return new ResponseEntity<>("OK", HttpStatus.OK);
    }


    /**
     * 发送MQTT消息
     * @param key  表示要发给那个频道,例(admin/123/admin、admin/sss/admin、test、subTopic.............)
     * @param message 消息内容
     * @return 返回
     */
    @PostMapping(value = "/mqtt2", produces = MediaType.APPLICATION_JSON_VALUE)
    public ResponseEntity<String> sendMqtt2(@RequestParam(value = "key")String key,@RequestParam(value = "msg") String message) {
        System.out.println("================生产MQTT消息================" + message);
        mqttGateway.sendToMqtt(key, message);
        return new ResponseEntity<>("OK", HttpStatus.OK);
    }
}

配置文件 文章来源地址https://www.toymoban.com/news/detail-654603.html

#RabbitMQ缓存配置MQTT
#MQTT默认端口1883
spring.rabbitmq.url=tcp://127.0.0.1:1883
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.pubTopic=pubTopic
#这里表示需要处理MQTT发送的主题key,+号表示中间任意字符,当然还有#号通配符
# +号admin/+/admin :admin/a/admin  或者  admin/b/admin  或者  admin/c/admin等等
# #号admin/# :admin/a/a 或者 admin/b/a 或者 admin/a/b等等(可以替换后面两位为任意字符)
spring.rabbitmq.subTopic=subTopic,test,admin/+/admin
#默认写法
spring.rabbitmq.clientId=${random.value}

到了这里,关于Java基于RabbitMQ实现MQTT的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于C语言从0开始手撸MQTT协议代码连接标准的MQTT服务器,完成数据上传和命令下发响应(华为云IOT服务器)

    近年来,物联网的发展如火如荼,已经渗透到我们生活的方方面面。从智能家居到工业自动化,从智慧城市到智慧农业,物联网正在以前所未有的速度改变着我们的生活。 大家现在可能已经习惯了通过手机控制家里的灯光、空调和电视,这就是物联网在智能家居领域的应用,

    2024年02月05日
    浏览(65)
  • mqtt服务器搭建与qt下的mqtt客户端实现

      MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(Io

    2024年02月06日
    浏览(89)
  • Windows搭建MQTT服务器:详细步骤及代码实现

    Windows搭建MQTT服务器:详细步骤及代码实现 MQTT是一种轻量级的通信协议,常用于物联网领域中设备与设备之间的通信。在Windows操作系统下,搭建MQTT服务器可作为物联网通信的基础设施。本文将详细介绍Windows下如何搭建MQTT服务器,包括安装软件、配置服务、使用代码实现等

    2024年02月06日
    浏览(50)
  • 基于ESP32搭建物联网服务器十二(使用MQTT协议与ESP32互动)

    在之前的文章中:基于ESP32搭建物联网服务器十一(用WEB页面控制引脚(GPIO)功能)_esp32webserver 控制io_你的幻境的博客-CSDN博客 已经简单地介绍了MQTT协议,对比于其它网络协议,MQTT协议在物联网的开发中,它的特点使它适用于大多数受限的环境。例如网络代价昂贵,带宽低、不可

    2024年02月02日
    浏览(47)
  • 搭建自己的MQTT服务器,实现设备上云(Ubuntu+EMQX)

    这篇文章教大家在ECS云服务器上部署EMQX,搭建自己私有的MQTT服务器,配置EMQX实现设备上云,设备数据转发,存储;服务器我采用的华为云的ECS服务器,系统选择Ubuntu系统。 Windows版本的看这里: https://blog.csdn.net/xiaolong1126626497/article/details/134280836 EMQX是一款大规模可弹性伸缩

    2024年02月04日
    浏览(47)
  • 华为云云耀云服务器L实例评测|基于华为云云耀云服务器L实例搭建EMQX大规模分布式 MQTT 消息服务器场景体验

    EMQX 是一款国内开发的大规模分布式MQTT消息服务器,它旨在为物联网应用提供高效可靠的连接,实时处理和分发消息以及事件流数据。作为一个关键的物联网基础设施组件,EMQX为企业和开发者提供了一个强大的工具,用于构建各种规模和复杂度的物联网与云应用。 EMQX的主要

    2024年02月08日
    浏览(54)
  • nodejs 实现MQTT协议的服务器端和客户端的双向交互

    公司和第三方合作开发一个传感器项目,想要通过电脑或者手机去控制项目现场的传感器控制情况。现在的最大问题在于,现场的边缘终端设备接入的公网方式是无线接入,无法获取固定IP,所以常规的HTTP协议通信就没法做,现在打算使用MQTT来实现云平台和边缘终端(传感器

    2024年02月05日
    浏览(66)
  • esp8266模块--MQTT协议连接服务器实现数据接收和发送+源码

    首先推荐中国移动的代码,我觉得中国移动的代码更为合理:(但是有一些其他的模块在里面) OneNET开发板代码、资料--2020-09-27--标准板、Mini板bug修复 - 开发板专区 - OneNET设备云论坛 (10086.cn) 以及这位b站up做的视频:(wifi模块在p9节) 【挽救小白第一季】STM32+8266+小程序智能

    2024年02月08日
    浏览(59)
  • 【网络原理】使用Java基于UDP实现简单客户端与服务器通信

    我们用Java实现UDP数据报套接字编程,需要借用以下API来实现 网络编程, 本质上是要操作网卡. 但是网卡不方便直接操作. 在操作系统内核中, 使用了一种特殊的叫做 “socket” 这样的文件来抽象表示网卡. 因此进行网络通信, 势必需要先有一个 socket 对象. DatagramSocket 是UDP Socket,

    2024年03月11日
    浏览(58)
  • Modbus网关BL101 既实现Modbus转MQTT,还能当串口服务器使用

    随着工业4.0的迅猛发展,人们深刻认识到在工业生产和生活中,实时、可靠、安全的数据传输至关重要。在此背景下,高性能的工业电力数据传输解决方案——协议转换网关应运而生,广泛应用于工业自动化系统、远程监控和物联网应用应用环境中。 钡铼技术始终坚持以用户

    2024年01月23日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包