springboot+Netty搭建MQTT协议的服务端

这篇具有很好参考价值的文章主要介绍了springboot+Netty搭建MQTT协议的服务端。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文基于基础版的netty实现mqtt
在此功能基础上,进行了功能强化,新增了用户鉴权、多用户订阅推送,qos2级别消息处理,后续新增topic filter功能,本人会持续更新
Netty是业界最流行的nio框架之一,结合springboot可以满足快速开发

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上的。MQTT协议的可以用在物联网、小型设备、还有移动应用上。

Netty也可以实现MQTT协议,他的内部封装了MQTT协议的相关对象。
MQTT消息包含固定头、可变头、载体
固定头
netty搭建mqtt服务器,spring boot,java,后端,nio
可变头
netty搭建mqtt服务器,spring boot,java,后端,nio

配置文件(appliction.yml)

配置文件配置了mqtt服务的端口、账号、密码,

server:
  port: 8880
netty:
  mqtt:
    port: 1883
#    mqtt账号
    username: admin
#mqtt密码
    password: 123456
# 日记配置
logging:
  level:
    # 开启debug日记打印
    com.hyx: debug

jar包依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.10</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.hyx</groupId>
    <artifactId>superNetty</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>superNetty</name>
    <description>superNetty</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!--      netty包  -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.75.Final</version>
        </dependency>
        <!--   常用JSON工具包 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.80</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
<!--        mqtt服务端-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!-- 打包成一个可执行jar -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

代码结构

netty搭建mqtt服务器,spring boot,java,后端,nio
消息发送类

package com.hyx.supernetty.MQTTServer.callBack;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;


import io.netty.channel.ChannelId;
import io.netty.handler.codec.mqtt.*;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import org.springframework.stereotype.Component;
import com.hyx.supernetty.MQTTServer.config.MQTTServerProperties;
import static com.hyx.supernetty.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceAdd;
import static com.hyx.supernetty.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
import static com.hyx.supernetty.MQTTServer.server.MQTTServer.*;

/**
 * 大黄
 */
@Component
@RequiredArgsConstructor
public class BootNettyMqttMsgBack {

	private static final Logger log =  LoggerFactory.getLogger(BootNettyMqttMsgBack.class);
	private final MQTTServerProperties MQTTserverProperties;

	/**
	 * 	确认连接请求
	 * @param channel
	 * @param mqttMessage
	 */
	public void connack (Channel channel, MqttMessage mqttMessage) {
		MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
		MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
		MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();

		//	构建返回报文, 可变报头
		MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());
		//	构建返回报文, 固定报头
		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
		//	构建CONNACK消息体
		MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
		//log.info("back--"+connAck.toString());
		log.debug("设备上线,channelId:{}", channel.id());
		MQTTdeviceAdd(channel);
		channel.writeAndFlush(connAck);
	}
	public void disconnack (Channel channel, MqttMessage mqttMessage) {
		MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
		MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
		MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
		//	构建返回报文, 可变报头
		MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BANNED, mqttConnectVariableHeaderInfo.isCleanSession());
		//	构建返回报文, 固定报头
		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.DISCONNECT,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
		//	构建CONNACK消息体
		MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
		//log.info("back--"+connAck.toString());
		channel.writeAndFlush(connAck);
		log.debug("设备下线,channelId:{}", channel.id());
		MQTTdeviceRemove(channel);
	}

	/**
	 * 	根据qos发布确认
	 * @param channel
	 * @param mqttMessage
	 */
	public  void puback (Channel channel, MqttMessage mqttMessage) {
		MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
		MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
		MqttQoS qos =  mqttFixedHeaderInfo.qosLevel();
		//注意:	readableBytes会改变写指针位置,使后续推送数据时,读取数据为空,需要重置	读指针
        byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
        mqttPublishMessage.payload().readBytes(headBytes);
        String data = new String(headBytes);
        System.out.println("publish data-->"+data);
		//重置读取的指针
		mqttPublishMessage.payload().resetReaderIndex();
        switch (qos) {
	        case AT_MOST_ONCE: 		//	至多一次
				//推送到订阅的客户端
				subscribSend(mqttMessage);
	            break;
	        case AT_LEAST_ONCE:		//	至少一次
	    		//	构建返回报文, 可变报头
	    		MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
	    		//	构建返回报文, 固定报头
	    		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
	    		//	构建PUBACK消息体
	    		MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
	    		log.info("back--"+pubAck.toString());
	    		channel.writeAndFlush(pubAck);
				//推送到订阅的客户端
				subscribSend(mqttMessage);
	            break;
	        case EXACTLY_ONCE:		//	刚好一次
	            //	构建返回报文, 固定报头
	        	MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
	            //	构建返回报文, 可变报头
	            MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
	            MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);
				//服务端收到publis的QoS2的消息之后,服务端需要保存一个msgid的记录,并且进入一个状态,
				// 即之后不管来了几个这个msgid的消息,都不管他,认为是重复的,丢弃。
				//接收到publish的QoS2消息之后,不能马上投递给上层,而是在本地做持久化,将消息保存起来。
				int mqttMessageId=mqttPublishMessage.variableHeader().packetId();
				if(!mqttMessageIdMap.containsKey(mqttMessageId)){
					//不存在此消息,将此消息暂存
					mqttMessageIdMap.put(mqttMessageId, mqttMessage);
					log.info("消息ID"+mqttMessageId+"-->Qos2级别消息,消息缓存");
				}else{
					//重复发送消息,直接返回
					log.info(mqttPublishMessage.variableHeader().packetId()+"消息重复:"+mqttPublishMessage.fixedHeader().isDup());
					return;
				}
	    		channel.writeAndFlush(mqttMessageBack);
	            break;
			default:
				break;
        }
	}

	/**
	 * 	发布完成 qos2
	 * @param channel
	 * @param mqttMessage
	 */
	public  void pubcomp (Channel channel, MqttMessage mqttMessage) {
        MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
        //	构建返回报文, 固定报头
    	MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02);
        //	构建返回报文, 可变报头
        MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
        MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);
		//log.info("back--"+mqttMessageBack.toString());
		channel.writeAndFlush(mqttMessageBack);
	}

	/**
	 * 	订阅确认
	 * @param channel
	 * @param mqttMessage
	 */
	public  void suback(Channel channel, MqttMessage mqttMessage) {
		MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
		MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();
		//	构建返回报文, 可变报头
		MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
		Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());
		//log.info(topics.toString());
		List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());
		for (int i = 0; i < topics.size(); i++) {
			grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
		}
		//	构建返回报文	有效负载
		MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);
		//	构建返回报文	固定报头
		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());
		//	构建返回报文	订阅确认
		MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);
		channel.writeAndFlush(subAck);
	}

	/**
	 * 	取消订阅确认
	 * @param channel
	 * @param mqttMessage
	 */
	public  void unsuback(Channel channel, MqttMessage mqttMessage) {
		MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
		//	构建返回报文	可变报头
		MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
		//	构建返回报文	固定报头
		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
		//	构建返回报文	取消订阅确认
		MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);
		channel.writeAndFlush(unSubAck);
	}

	/**
	 * 	心跳响应
	 * @param channel
	 * @param mqttMessage
	 */
	public  void pingresp (Channel channel, MqttMessage mqttMessage) {
		//	心跳响应报文	11010000 00000000  固定报文
		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
		MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);
		channel.writeAndFlush(mqttMessageBack);
	}

	/**
	 * 订阅推送
	 */
	public  void subscribSend(MqttMessage mqttMessage){
		MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
		Object obj=mqttMessage.variableHeader();
		MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;
		String topicName=variableHeader.topicName();
		int packetId=variableHeader.packetId();
		//固定消息头 注意此处的消息类型PUBLISH mqtt协议
		MqttFixedHeader FixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.AT_LEAST_ONCE,false,0);
		//可变消息头
		MqttPublishVariableHeader mqttPublishVariableHeader=new MqttPublishVariableHeader(topicName,packetId);
		//推送消息体
		MqttPublishMessage mqttPublishMessageResult=new MqttPublishMessage(FixedHeader,mqttPublishVariableHeader, mqttPublishMessage.content());
		log.info("推送地址————》"+topicName);
		if(subscribeMap.containsKey(topicName)){
			List<ChannelId> channelList=subscribeMap.get(topicName);
			for (int i = 0; i < channelList.size(); i++) {
				//订阅次此topic的Mqtt客户端搜到此消息,
				Channel channelSub=MQTTdeviceChannelGroup.find(channelList.get(i));
				//writeAndFlush会将ByteBuf的引用释放refCnt会减去1,使用retain加1
				mqttPublishMessageResult.retain();
				channelSub.writeAndFlush(mqttPublishMessageResult);
			}
			mqttPublishMessageResult.release();

		}
	}

	/**
	 * 用户鉴权
	 */
	public boolean authentication(MqttConnectPayload payload){

		String username=MQTTserverProperties.getUsername();
		String password=MQTTserverProperties.getPassword();
		//无账号或者无密码通过
		if(stringEmptyCheck(password)||stringEmptyCheck(username)){
			return true;
		}else {
			//消息中账号密码为空
			if(payload.passwordInBytes()==null||payload.userName()==null){
				return false;
			}
			String passwordAuthen=new String(payload.passwordInBytes());
			String usernameAuthen=payload.userName();
			if(password.equals(passwordAuthen)&&username.equals(usernameAuthen)){
				return true;
			}else {
				return false;
			}
		}
	}
	//判断字符字符为空
	private boolean stringEmptyCheck(String str){
		if(str==null||"".equals(str)){
			return true;
		}else {
			return false;
		}
	}

}

初始化消息通道的handler

package com.hyx.supernetty.MQTTServer.handler;

/**
 * @author HuangYaoXuan
 * @date 2023/4/21 15:46
 */
import com.hyx.supernetty.MQTTServer.callBack.BootNettyMqttMsgBack;
import io.netty.channel.*;
import io.netty.handler.codec.mqtt.*;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static com.hyx.supernetty.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
import static com.hyx.supernetty.MQTTServer.server.MQTTServer.*;


/**
 * 消息处理,单例启动
 *
 * @author qiding
 */
@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class MQTTMessageHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    private BootNettyMqttMsgBack BootNettyMqttMsgBack;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (null != msg) {
            MqttMessage mqttMessage = (MqttMessage) msg;
            log.info("info--"+mqttMessage.toString());
            MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
            Channel channel = ctx.channel();
            if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){
                //用户鉴权(配置文件中配置账号和密码,如果没有默认)
               boolean authentag=  BootNettyMqttMsgBack.authentication((MqttConnectPayload)mqttMessage.payload());
               if(!authentag){
                    return;
               }
                //	在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
                if(MQTTdeviceChannelGroup.contains(channel)){
                    //移除次设备channel和topic
                    BootNettyMqttMsgBack.disconnack(channel,mqttMessage);
                }
                //	to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息
                BootNettyMqttMsgBack.connack(channel, mqttMessage);
            }
            //对于没有鉴权的设备,请求不处理
            if(!MQTTdeviceChannelGroup.contains(channel)){
                log.warn(channel.id()+"无鉴权操作");
                return;
            }
            switch (mqttFixedHeader.messageType()){
                case PUBLISH:		//	客户端发布消息
                    //	PUBACK报文是对QoS 1等级的PUBLISH报文的响应
                    BootNettyMqttMsgBack.puback(channel, mqttMessage);
                    break;
                // PUBREL	Qos2级别消息,客户端返回
                case PUBREL:
                    //	PUBREL(客户端发给服务端)报文是对PUBREC(服务端发给客户端)报文的响应
                    //服务端收到pubrel之后,正式将消息投递给上层应用层。
                    MqttMessageIdVariableHeader VariableHeader=(MqttMessageIdVariableHeader)mqttMessage.variableHeader();
                    if(mqttMessageIdMap.containsKey(VariableHeader.messageId())) {
                        log.warn("移除消息缓存-->消息id"+VariableHeader.messageId());
                        BootNettyMqttMsgBack.subscribSend(mqttMessageIdMap.get(VariableHeader.messageId()));
                        BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
                        mqttMessageIdMap.remove(VariableHeader.messageId());
                    }else {
                        //后续多次收到REL消息,制作comp响应
                        BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
                    }
                    break;
                case SUBSCRIBE:		//	客户端订阅主题
                    //	客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。
                    //	为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。
                    //	SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端
                    // 	to do
                    BootNettyMqttMsgBack.suback(channel, mqttMessage);
                    MqttSubscribePayload SubscribePayload=(MqttSubscribePayload) mqttMessage.payload();;
                    for (int i = 0; i < SubscribePayload.topicSubscriptions().size(); i++) {
                        String topicname=SubscribePayload.topicSubscriptions().get(i).topicName();
                        boolean tag=subscribeMap.containsKey(topicname);
                        if(tag){
                            List<ChannelId> channelIds=subscribeMap.get(topicname);
                            if(!channelIds.contains(channel.id())) {
                                channelIds.add(channel.id());
                            }else {
                                log.warn(channel.id()+"重复订阅");
                            }
                            subscribeMap.put(topicname, channelIds);
                        }else {
                            List<ChannelId> channelIds=new ArrayList<>();
                            channelIds.add(channel.id());
                            subscribeMap.put(topicname,channelIds);
                        }
                        log.info(channel.id()+"订阅地址————》"+topicname);
                    }


                    break;
                case UNSUBSCRIBE:	//	客户端取消订阅
                    //	客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题
                    //	to do
                    BootNettyMqttMsgBack.unsuback(channel, mqttMessage);
                    Object Unsubscribe=mqttMessage.payload();
                    MqttUnsubscribePayload unsubscribePayload=(MqttUnsubscribePayload)Unsubscribe;
                    int len=unsubscribePayload.topics().size();
                    for (int i = 0; i < len; i++) {
                        String topicname=unsubscribePayload.topics().get(i);
                        boolean tag=subscribeMap.containsKey(topicname);
                        if(tag){
                            List<ChannelId> channelIds=subscribeMap.get(topicname);
                            channelIds.remove(channel.id());
                            subscribeMap.put(topicname,channelIds);
                        }else {
                           log.error("不存在订阅地址——>"+topicname);
                        }
                        log.info(channel.id()+"取消订阅地址————》"+topicname);
                    }

                    break;
                case PINGREQ:		//	客户端发起心跳
                    //	客户端发送PINGREQ报文给服务端的
                    //	在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着
                    //	请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开
                    BootNettyMqttMsgBack.pingresp(channel, mqttMessage);
                    break;
                case DISCONNECT:	//	客户端主动断开连接
                    log.debug("设备下线,channelId:{}", channel.id());
                    MQTTdeviceRemove(channel);
                    //	DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0
                    //	to do
                    break;
                default:
                    break;
            }
        }
        else {
            return;
        }
    }

    /**
     * 	从客户端收到新的数据、读取完成时调用
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
    }

    /**
     * 	客户端与服务端第一次建立连接时执行 在channelActive方法之前执行
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
    }

    /**
     * 	客户端与服务端 断连时执行 channelInactive方法之后执行
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);

    }

    /**
     * 	当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        log.debug("\n");


    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }
    /**
     * 	服务端 当读超时时 会调用这个方法
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
        ctx.close();
    }
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
    }

}


配置类


import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * 读取YML中的服务配置
 *
 * /**
 *  * @author HuangYaoXuan
 *  * @date 2023/4/21 15:44
 *
 */
@Configuration
@ConfigurationProperties(prefix = MQTTServerProperties.MQTTPREFIX)
@Data
public class MQTTServerProperties {

    public static final String MQTTPREFIX = "netty.mqtt";

    /**
     * 服务器端口
     */
    private Integer port;

    /**
     * mqtt服务器用户名
     */
    private String username;

    /**
     * mqtt服务器密码
     */
    private String password;


}

消息处理类


/**
 * @author HuangYaoXuan
 * @date 2023/4/21 15:46
 */
import com.hyx.supernetty.MQTTServer.callBack.BootNettyMqttMsgBack;
import io.netty.channel.*;
import io.netty.handler.codec.mqtt.*;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static com.hyx.supernetty.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
import static com.hyx.supernetty.MQTTServer.server.MQTTServer.MQTTdeviceChannelGroup;
import static com.hyx.supernetty.MQTTServer.server.MQTTServer.subscribeMap;


/**
 * 消息处理,单例启动
 *
 * @author qiding
 */
@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class MQTTMessageHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    private BootNettyMqttMsgBack BootNettyMqttMsgBack;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (null != msg) {
            MqttMessage mqttMessage = (MqttMessage) msg;
            log.info("info--"+mqttMessage.toString());
            MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
            Channel channel = ctx.channel();
            if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){
                //用户鉴权(配置文件中配置账号和密码,如果没有默认通过)
               boolean authentag=  BootNettyMqttMsgBack.authentication((MqttConnectPayload)mqttMessage.payload());
               if(!authentag){
                    return;
               }
                //	在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
                if(MQTTdeviceChannelGroup.contains(channel)){
                    //移除次设备channel和topic
                    BootNettyMqttMsgBack.disconnack(channel,mqttMessage);
                }
                //	to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息
                BootNettyMqttMsgBack.connack(channel, mqttMessage);
            }
			//对于没有鉴权的设备,拒绝发表和订阅等一系列操作
            if(!MQTTdeviceChannelGroup.contains(channel)){
                log.warn(channel.id()+"无鉴权操作");
                return;
            }
            switch (mqttFixedHeader.messageType()){
                case PUBLISH:		//	客户端发布消息
                    //	PUBACK报文是对QoS 1等级的PUBLISH报文的响应
                    BootNettyMqttMsgBack.puback(channel, mqttMessage);
                    break;
                case PUBREL:		//	发布释放
                    //	PUBREL报文是对PUBREC报文的响应
                    //	to do 回应报文
                    BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
                    break;
                case SUBSCRIBE:		//	客户端订阅主题
                    //	客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。
                    //	为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。
                    //	SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端
                    // 	to do
                    BootNettyMqttMsgBack.suback(channel, mqttMessage);
                    MqttSubscribePayload SubscribePayload=(MqttSubscribePayload) mqttMessage.payload();;
                    for (int i = 0; i < SubscribePayload.topicSubscriptions().size(); i++) {
                        String topicname=SubscribePayload.topicSubscriptions().get(i).topicName();
                        boolean tag=subscribeMap.containsKey(topicname);
                        if(tag){
                            List<ChannelId> channelIds=subscribeMap.get(topicname);
                            if(!channelIds.contains(channel.id())) {
                                channelIds.add(channel.id());
                            }else {
                                log.warn(channel.id()+"重复订阅");
                            }
                            subscribeMap.put(topicname, channelIds);
                        }else {
                            List<ChannelId> channelIds=new ArrayList<>();
                            channelIds.add(channel.id());
                            subscribeMap.put(topicname,channelIds);
                        }
                        log.info(channel.id()+"订阅地址————》"+topicname);
                    }


                    break;
                case UNSUBSCRIBE:	//	客户端取消订阅
                    //	客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题
                    //	to do
                    BootNettyMqttMsgBack.unsuback(channel, mqttMessage);
                    Object Unsubscribe=mqttMessage.payload();
                    MqttUnsubscribePayload unsubscribePayload=(MqttUnsubscribePayload)Unsubscribe;
                    int len=unsubscribePayload.topics().size();
                    for (int i = 0; i < len; i++) {
                        String topicname=unsubscribePayload.topics().get(i);
                        boolean tag=subscribeMap.containsKey(topicname);
                        if(tag){
                            List<ChannelId> channelIds=subscribeMap.get(topicname);
                            channelIds.remove(channel.id());
                            subscribeMap.put(topicname,channelIds);
                        }else {
                           log.error("不存在订阅地址——>"+topicname);
                        }
                        log.info(channel.id()+"取消订阅地址————》"+topicname);
                    }

                    break;
                case PINGREQ:		//	客户端发起心跳
                    //	客户端发送PINGREQ报文给服务端的
                    //	在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着
                    //	请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开
                    BootNettyMqttMsgBack.pingresp(channel, mqttMessage);
                    break;
                case DISCONNECT:	//	客户端主动断开连接
                    log.debug("设备下线,channelId:{}", channel.id());
                    MQTTdeviceRemove(channel);
                    //	DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0
                    //	to do
                    break;
                default:
                    break;
            }
        }
    }

    /**
     * 	从客户端收到新的数据、读取完成时调用
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
    }

    /**
     * 	客户端与服务端第一次建立连接时执行 在channelActive方法之前执行
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
    }

    /**
     * 	客户端与服务端 断连时执行 channelInactive方法之后执行
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);

    }

    /**
     * 	当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        log.debug("\n");


    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }
    /**
     * 	服务端 当读超时时 会调用这个方法
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
        ctx.close();
    }
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
    }

}

netty主程序


import com.hyx.supernetty.MQTTServer.channel.MqttChannelInit;
import com.hyx.supernetty.MQTTServer.config.MQTTServerProperties;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 启动 Server
 *
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class MQTTServer implements IMQTTServer {

    private final MqttChannelInit mqttChannelInit;

    private final MQTTServerProperties MQTTserverProperties;

    //保存接入的MQTT设备channel
    public static ChannelGroup MQTTdeviceChannelGroup;
    //保存订阅地址和chanelid,当推送数据时,会向此订阅地址的多个channel发送数据
    public static Map<String, List<ChannelId>> subscribeMap =new ConcurrentHashMap();
    //保存设备名称和通道编号,向设备发送消息可以通过名称找到通道
    public static ConcurrentHashMap<String, ChannelId> MQTTdeviceMap = new ConcurrentHashMap<>();
     //存放Qos2消息等级的消息ID,这里可使用redis之类的工具做缓存,为了简化配置,使用map暂存
    public static ConcurrentHashMap<Integer, MqttMessage> mqttMessageIdMap=new ConcurrentHashMap();
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    @Override
    public void start() {
        log.info("初始化 Mqttserver ...");
        bossGroup = new NioEventLoopGroup();
        workerGroup =  new NioEventLoopGroup();
        MQTTdeviceChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        this.MqttServer();
    }
    /**
     * 初始化
     */
    private void MqttServer() {
        try {
            new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    .channel( NioServerSocketChannel.class )
                    .localAddress(new InetSocketAddress(MQTTserverProperties.getPort()))
                    // 配置 编码器、解码器、业务处理
                    .childHandler(mqttChannelInit)
                    // tcp缓冲区
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    // 保持长连接
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    // 绑定端口,开始接收进来的连接
                    .bind().sync();
            log.info("MQTT服务启动成功!开始监听端口:{}", MQTTserverProperties.getPort());
        } catch (Exception e) {
            e.printStackTrace();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    /**
     * 销毁
     */
    @PreDestroy
    @Override
    public void destroy() {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

}

import javax.annotation.PreDestroy;

/**
 * @author HuangYaoXuan
 * @date 2023/4/21 15:53
 */
public interface IMQTTServer {

    /**
     * 主启动程序,初始化参数
     *
     * @throws Exception 初始化异常
     */
    void start() throws Exception;

    /**
     * 优雅的结束服务器
     *
     * @throws InterruptedException 提前中断异常
     */
    @PreDestroy
    void destroy() throws InterruptedException;
}


最后,启动服务


import com.hyx.supernetty.MQTTServer.server.MQTTServer;
import com.hyx.supernetty.TcpServer.server.TcpServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @author HuangYaoXuan
 * @date 2023/4/21 15:58
 */
@Component
public class startSrver {
    @Autowired
    private TcpServer tcpServer;
    @Autowired
    private MQTTServer MQTTServer;
    @PostConstruct
    public void startNetty(){

        new Thread(() -> {
            try {
                MQTTServer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
}

更新一下:
新增了mqtt鉴权的加密方法

	 * 用户鉴权
	 */
	public boolean authentication(MqttConnectPayload payload){
		//todo 这里只是使用了最直接的账号密码鉴权,这里可以进行diy,
		// 例如客户端用sha256算法加密(设备名(username)+系统时间戳+设备密钥(password))得到加密密钥
		// 服务器根据设备名和设备密钥再进行同样的操作,再比较服务端和客户端加密的密钥是否一致
		// 加密算法放下面了
		log.warn("clientId"+payload.clientIdentifier());
		String username=MQTTserverProperties.getUsername();
		String password=MQTTserverProperties.getPassword();
		//无账号或者无密码通过
		if(stringEmptyCheck(password)||stringEmptyCheck(username)){
			return true;
		}else {
			//消息中账号密码为空
			if(payload.passwordInBytes()==null||payload.userName()==null){
				return false;
			}
			String passwordAuthen=new String(payload.passwordInBytes());
			String usernameAuthen=payload.userName();
			log.warn("username:{},password:{}",usernameAuthen,passwordAuthen);
			if(password.equals(passwordAuthen)&&username.equals(usernameAuthen)){
				return true;
			}else {
				return false;
			}
		}
	}
	/**
	 * sha256_HMAC加密
	 * @param message 设备名+时间戳
	 * @param secret  设备秘钥
	 * @return 加密密钥字符串
	 */
	public String hmacSHA256(String secret, String message) throws Exception {
		String hash = "";
		Mac hmacSha256 = Mac.getInstance("HmacSHA256");
		SecretKeySpec secret_key = new SecretKeySpec(secret.getBytes(), "HmacSHA256");
		hmacSha256.init(secret_key);
		byte[] bytes = hmacSha256.doFinal(message.getBytes());
		hash = byteArrayToHexString(bytes);
		return hash;
	}
	/**
	 * 将加密后的字节数组转换成字符串
	 *
	 * @param b 字节数组
	 * @return 字符串
	 */
	public  String byteArrayToHexString(byte[] b) {
		StringBuilder hs = new StringBuilder();
		String stmp;
		for (int n = 0; b!=null && n < b.length; n++) {
			stmp = Integer.toHexString(b[n] & 0XFF);
			if (stmp.length() == 1)
				hs.append('0');
			hs.append(stmp);
		}
		return hs.toString().toLowerCase();
	}

应各位伙伴要求,放一个源码下载地址吧
链接:https://pan.baidu.com/s/1PAjaGdrnEuya-LF4TcX95w?pwd=8888
提取码:8888
百度网盘地址文章来源地址https://www.toymoban.com/news/detail-563401.html

到了这里,关于springboot+Netty搭建MQTT协议的服务端的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • netty学习(3):SpringBoot整合netty实现多个客户端与服务器通信

    创建一个SpringBoot工程,然后创建三个子模块 整体工程目录:一个server服务(netty服务器),两个client服务(netty客户端) pom文件引入netty依赖,springboot依赖 NettySpringBootApplication NettyServiceHandler SocketInitializer NettyServer NettyStartListener application.yml Client1 NettyClientHandler SocketInitializ

    2024年02月11日
    浏览(56)
  • MQTT通信架构 搭建MQTT服务器

    MQ 遥测传输 (MQTT) 是 基于代理 的 发布/订阅 的消息传输协议。 传输屏蔽消息内容 TCP/IP有连接传输(可靠) 小型传输,开销很小,降低网络流量 使用lastwill等机制告知客户端异常中断(本次实验用不到) 三种消息发布服务质量 至多一次:消息发布完全依赖底层TCP/IP网络,会

    2024年02月02日
    浏览(48)
  • Springboot整合Netty实现RPC服务器

    try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 60)); pipeline.addLast(new JsonDecoder()); pipeline.addLast(new JsonEnco

    2024年04月09日
    浏览(39)
  • 【智能家居入门4】(FreeRTOS、MQTT服务器、MQTT协议、微信小程序)

    主控仍旧是STM32F103C8T6,实时操作系统选择的是FreeRTOS。 主要功能: ①环境信息采集并上传至微信小程序 ②微信小程序下发指令控制家电 ③由雨滴传感器和步进电机能够实现下雨自动收起衣服,停雨自动晒出衣服(由于驱动板和步进电机不在身边,这里代码中就用舵机来模拟

    2024年04月11日
    浏览(53)
  • 自己搭建mqtt服务器

            前言:网上资料大部分都是使用的云服务,我是采用自己搭建的服务器来进行试验的,接下来将记录过程。 云服务器有很多种网上也有很多教学在这里不进行过多的解释了,我实验的时候采用的阿里云国内的服务器这里以后还会进行介绍。         本实验主要

    2024年02月03日
    浏览(51)
  • mqtt服务器搭建与qt下的mqtt客户端实现

      MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(Io

    2024年02月06日
    浏览(89)
  • MQTT协议-发布消息(服务器向客户端发送)

    发布消息报文组成:https://blog.csdn.net/weixin_46251230/article/details/129414158 在了解了发布信息的PUBLISH报文后,就可以分析出阿里云服务器向本地客户端发送的报文数据了 实验前需要在阿里云创建产品和设备,并创建简单的温度和湿度物模型:https://blog.csdn.net/weixin_46251230/article/de

    2024年02月06日
    浏览(54)
  • 个人云服务器搭建MQTT服务器

    🔮🔮🔮🔮🔮相关文章🔮🔮🔮🔮🔮 ESP32连接MQ Sensor实现气味反应 🔗 https://blog.csdn.net/ws15168689087/article/details/131365573 ESP32连接云服务器【WebSocket】 🔗 https://blog.csdn.net/ws15168689087/article/details/131406163 ESP32+MQTT+MySQL实现发布订阅【气味数据收集】 🔗 https://blog.csdn.net/ws1516868

    2024年02月15日
    浏览(50)
  • Windows下搭建MQTT服务器

    MQ遥测传输(MQTT)是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、易于实现。这些特点使它适用于低带宽受限环境。 特点包括以下: 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。 对负载内容屏蔽的消息传输。 使用

    2024年02月03日
    浏览(71)
  • 本地MQTT服务器搭建(EMQX)

    下载地址:EMQ (emqx.com) 打开官网后,选择右边的免费试用按钮 然后单击EMQX Enterprise标签,然后选择下面的EMQX开源版,选择开源版的系统平台为Windows,单击免费下载。 在新页面下单击立即下载 将下载的emqx-5.1.6-windows-amd64.zip解压出来,解压目录不能存在中文、空格、特殊字符

    2024年02月09日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包