IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)

这篇具有很好参考价值的文章主要介绍了IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本系列教程包括:
IOT云平台 simple(0)IOT云平台简介
IOT云平台 simple(1)netty入门
IOT云平台 simple(2)springboot入门
IOT云平台 simple(3)springboot netty实现TCP Server
IOT云平台 simple(4)springboot netty实现简单的mqtt broker
IOT云平台 simple(5)springboot netty实现modbus TCP Master
IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)

本章首先简单介绍了IOT云平台最基本的架构,然后基于springboot netty实现IOT Server;最后进行了测试验证。

测试环境:

  1. mqtt终端,这里用MQTT.fx 工具软件模拟;
  2. IOT server:基于springboot netty进行开发;
  3. Rabbitmq broker;本地安装Windows 64位环境;
  4. Rabbitmq consumer,订阅mq message的server,这里用MQTT Assistant工具软件模拟;

1 IOT云平台最基本的架构

本章涉及的IOT云平台最基本架构图:
IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)
说明
1)为了简单,这里只包括mqtt上行链路,即mqtt终端上传数据;
2)这里通过Rabbitmq 进行消息的分发,也可以用其他mq中间件,如kafka。
3)Mqtt terminal:mqtt终端,指的是具体的设备传感器或者mqtt协议网关。

具体流程
第1步,mqtt终端
实现mqtt协议或者其他协议(modbus、wifi、蓝牙)转换为mqtt协议。

第2步,mqtt终端->IOT server
mqtt终端publish message到IOT server;

第3步,IOT server->Rabbitmq broker
IOT Server中mqtt broker的模块收到mqtt message,进行解析,然后通过Rabbitmq producer模块,publish message到Rabbitmq broker;

第4步,Rabbitmq broker->不同server
Rabbitmq broker收到消息存入指定的queue,然后分发到订阅消息的不同server;

第5步,不同server
不同server监听收到消息进行相应的处理;如:存入到时序数据库、进行大数据流的计算、具体业务的处理。

2 集成开发

第1步:POM文件引入netty、Rabbitmq的依赖:

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.63.Final</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>3.0.1</version>
        </dependency>

2.1实现mqtt broker

创建主要的类:
1)TCPServer
server类,实现mqtt broker。
2 )TCPServerStartListener
监听到springboot启动后,启动server。
3)TCPServerChannelInitializer
server channel初始化的类

包括两个server channel处理的类:
1) MqttMessageChannelHandler:
server channel处理的类;实现mqtt消息的解析;

@Component
@Slf4j
@ChannelHandler.Sharable
public class MqttMessageChannelHandler extends ChannelInboundHandlerAdapter  {

    @Autowired
    MessageStrategyManager messageStrategyManager;

    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
        MqttMessage mqttMessage = (MqttMessage) msg;
        log.info("--------------------------channelRead begin---------------------------*");
        log.info("from client:" + channelHandlerContext.channel().remoteAddress());
        log.info("receive message:" + mqttMessage.toString());
        try {
            MqttMessageType type = mqttMessage.fixedHeader().messageType();
            MessageStrategy messageStrategy =  messageStrategyManager.getMessageStrategy(type);
            if(messageStrategy!=null){
                messageStrategy.sendResponseMessage(channelHandlerContext,mqttMessage);
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
        log.info("--------------------------channelRead end---------------------------*");

    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

2) MqMessageChannelHandler:
server channel处理的类;实现mq消息发布到Rabbitmq broker;

@Component
@Slf4j
public class MqMessageChannelHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    ProducerService producerService;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!(msg instanceof MqMessage)) {
            return;
        }
        MqMessage mqMessage = (MqMessage) msg;
        log.info("转发到Rabbitmq Server:" + mqMessage.data);
        producerService.sendData(mqMessage.data);
    }

}

2.2实现Rabbitmq producer

1 定义配置类ProducerConfig:

exchange(交换机):topic_exchange
queue(队列):topic_queue
bindKey(绑定key):project1.station1.*

@Slf4j
@Configuration
public class ProducerConfig {
    String bindKey = "project1.station1.*";

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //开启Mandatory,触发回调函数
        rabbitTemplate.setMandatory(true);
        //ack
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("-----------confirm begin---------------");
                log.info("data:" + correlationData);
                if(ack){
                    log.info("Ack:true");
                }else{
                    log.info("Ack:false");
                }
                log.info("cause:" + cause);
                log.info("-----------confirm end---------------");
            }
        });
        //return
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.info("-----------return begin---------------");
                log.info("message:"+returnedMessage.getMessage());
                log.info("reply code:"+returnedMessage.getReplyCode());
                log.info("reply text:"+returnedMessage.getReplyText());
                log.info("exchange:"+returnedMessage.getExchange());
                log.info("routeKey:"+returnedMessage.getRoutingKey());
                log.info("-----------return end---------------");
            }
        });

        return rabbitTemplate;
    }

    @Bean("topic_exchange")
    public TopicExchange topicExchange() {
        return ExchangeBuilder.topicExchange("topic_exchange").durable(true).build();
    }

    @Bean("topic_queue")
    public Queue topicQueue(){
        return QueueBuilder.durable("topic_queue").build();
    }

    @Bean
    public Binding topicBind(){
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(bindKey);
    }
}

2 定义ProducerService类,实现发送消息的功能:
这里发送:
exchange(交换机):topic_exchange
routeKey (路由key):project1.station1.device1

@Slf4j
@Component
public class ProducerService {
    String exchange = "topic_exchange";
    String routeKey = "project1.station1.device1";
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendData(String data) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        log.info("------------topic producer begin------------");
        rabbitTemplate.convertAndSend(exchange, routeKey, data);
        log.info("exchange:"+exchange);
        log.info("routeKey:"+routeKey);
        log.info("send data:"+data);
        log.info("------------topic producer end------------");
    }
}

3 测试验证

第1步:mqtt终端发送消息:temperature is 26, 2023.1.14 17:00。
IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)
第2步:IOT Server接收到mqtt终端发送的消息;然后转发到Rabbitmq Server;
IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)
第3步:Rabbitmq Server的topic_queue中有1条消息;
IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)
第4步:MQTT Assistant中从topic_queue中消费消息:
IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)
可见,对于物联网的数据,实现了从端到平台(解析、分发、存储)的整个流程。

代码详见:
https://gitee.com/linghufeixia/iot-simple
code5文章来源地址https://www.toymoban.com/news/detail-407014.html

到了这里,关于IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【IoT网络层】STM32 + ESP8266 +MQTT + 阿里云物联网平台 |开源,附资料|

    🌟博主领域:嵌入式领域人工智能软件开发 本节目标: 通过MQTT.fx模拟连接或通过串口连接ESP8266发送AT命令,实现阿里云物联网平台发送数据同时接收数据,IOT studio界面显示数据。具体来说:使用ESP8266 ESP-01来连接网络,获取设备数据发送到阿里云物联网平台并显示且oled显

    2024年02月04日
    浏览(63)
  • springboot第37集:kafka,mqtt,Netty,nginx,CentOS,Webpack

    image.png binzookeeper-server-start.shconfigzookeeper.properties.png image.png image.png 消费 image.png image.png image.png image.png image.png image.png image.png image.png image.png Netty的优点有很多: API使用简单,学习成本低。 功能强大,内置了多种解码编码器,支持多种协议。 性能高,对比其他主流的NIO框架

    2024年02月11日
    浏览(39)
  • Mainflux IoT:Go语言轻量级开源物联网平台,支持HTTP、MQTT、WebSocket、CoAP协议

    Mainflux是一个由法国的创业公司开发并维护的 安全、可扩展 的开源物联网平台,使用 Go语言开发、采用微服务的框架。Mainflux支持多种接入设备,包括设备、用户、APP;支持多种协议,包括HTTP、MQTT、WebSocket、CoAP,并支持他们之间的协议互转。 Mainflux的南向接口连接设备,北

    2024年02月01日
    浏览(115)
  • Zeus IoT : 基于 SpringBoot 的分布式开源物联网大数据平台

    Zeus IoT 是一个集设备数据 采集、存储、分析、观测 为一体的开源物联网平台,全球首创基于 Zabbix 的物联网分布式数据采集架构,具备超 百万级 物联网设备的并发监控能力,真正具备 工业级性能与稳定性 的开源物联网大数据中台。 Zeus IoT 致力于让设备接入和数据处理变得

    2024年02月05日
    浏览(60)
  • 铱塔 (iita) 开源 IoT 物联网开发平台,基于 SpringBoot + TDEngine +Vue3

    01   铱塔 (iita)  物联网平台 铱塔智联 (open-iita) 基于Java语言的开源物联网基础开发平台,提供了物联网及相关业务开发的常见基础功能, 能帮助你快速搭建自己的物联网相关业务平台。 铱塔智联平台包含了品类、物模型、消息转换、通讯组件(mqtt/EMQX通讯组件、小度音箱接

    2024年02月20日
    浏览(55)
  • MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图

    MQTT(EMQX) - Linux CentOS Docker 安装 MQTT (Message Queue Telemetry Transport) 是一个轻量级传输协议,它被设计用于轻量级的发布/订阅式消息传输,MQTT协议针对低带宽网络,低计算能力的设备,做了特殊的优化。是一种简单、稳定、开放、轻量级易于实现的消息协议,在物联网的应用下的

    2023年04月10日
    浏览(35)
  • 应用开发平台前端集成vue-simple-uploader实现文件分块上传

    文件的上传是系统的必备功能,Element提供了上传组件upload,也基本能满足常见常用的文件上传功能,特别是应对小型文件(10M以下)的处理。但如果是遇到要求更多更高的场景,上传几百兆甚至上G的视频文件,要求分块上传,能断点续传,显示进度,能暂停,能重试……这

    2024年02月08日
    浏览(56)
  • Netty系列(一):Springboot整合Netty,自定义协议实现

    Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单

    2023年04月25日
    浏览(55)
  • 1-基本控制篇(阿里云物联网平台)-C#,网页,android,微信小程序,单片机等使用MQTT接入阿里云物联网平台

    \\\"ProductKey\\\": \\\"a1m7er1nJbQ\\\", \\\"DeviceName\\\": \\\"Mqtt\\\", \\\"DeviceSecret\\\": \\\"7GUrQwgDUcXWV3EIuLwdEvmRPWcl7VsU\\\" 如何使用MQTT协议模拟设备快速接入物联网平台_物联网平台(IoT)-阿里云帮助中心    阿里云的说明文档 a1m7er1nJbQ.iot-as-mqtt.${region}.aliyuncs.com a1m7er1nJbQ.iot-as-mqtt.cn-shanghai.aliyuncs.com 112233445566|securemode=3,s

    2024年04月12日
    浏览(135)
  • IOT开发---Android MQTT使用

    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议。 该协议构建于TCP/IP协议上,它的设计思想是轻巧、开放、 简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于

    2024年02月10日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包