java集成mqtt、rabbitmq服务远程连接数dtu实现寄存器rtu数据读写

这篇具有很好参考价值的文章主要介绍了java集成mqtt、rabbitmq服务远程连接数dtu实现寄存器rtu数据读写。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

xxx智慧管控一体化平台mqtt穿透数据采集写入方案

数据采集及写入流程设计图

连接dtu 服务端java,网络,物联网,linux


一、硬件设备

硬件设备与原有设备保持不变通过配置dtu设备进行mqtt穿透功能进行数据交互

1、dtu配置详解:
1.1 dtu工具

本项目使用塔石TAS-LTE-364支持4G无线dtu模块,下载安装塔石物联网厂家提供的串口测试程序Tool V2.7.1 D20220616.exe

连接dtu 服务端java,网络,物联网,linux

1.2打开程序选择对应dtu型号

连接dtu 服务端java,网络,物联网,linux

1.3 配置串口

点击右上角三角符号选择端口(为你插入电脑的串口),波特率(dtu出厂默认9600),校验参数选择8,N,1;点击打开串口

连接dtu 服务端java,网络,物联网,linux

1.4 COM 口查看

电脑右键进入属性界面,再进入设备管理界面,最后点击“端口”查看

连接dtu 服务端java,网络,物联网,linux

1.5 连接成功状态

如果 COM 口及波特率等参数选择正确,设备上电开机后会上报 AT Ready,证明设备启动成功; 若没有出现,则先断电再重新上电即可出现;如下图:

连接dtu 服务端java,网络,物联网,linux

1.6 进入配置状态

显示框出现 AT Ready 后,点击“进入配置状态”;如果 DTU 配置工具的右边显示框返回 OK,则代表 进入配置模式成功了;如下图:

连接dtu 服务端java,网络,物联网,linux

1.7 配置mqtt穿透通道

点击通道1,工作模式选择MQTT透传,MQTT连接参数,目标地址填写搭建mqtt服务地址,目标端口1883(默认),设备账户和密码为搭建MQTT服务设置的账号密码,clientId自己设置不能有中文,如下图:

连接dtu 服务端java,网络,物联网,linux

1.8 添加订阅和推送

订阅参数订阅开关选择订阅输入主题,订阅指令选择0、1或2,订阅主题按照井厂编号/in或out(订阅为in,推送为out)规则进行设置,推送参数设置与订阅方法一致注意区分主题,如图:

连接dtu 服务端java,网络,物联网,linux

1.9 配置完成重启

配置完成后先点击一键配置参数等待右边显示框返回ok,在点击重启重启后可在mqtt服务后台查看设备是否上线;

二、搭建MQTT,rabbitmq以及开发应用程序代码;

1、MQQT服务搭建
1.1、下载并安装

在官网https://www.emqx.io/zh/downloads进行emqx开源版安装包下载并解压;

1.2、命令行下进入解压路径,启动 EMQX
./emqx/bin/emqx start
注:也可通过以下命令操作 EMQX,通过 命令行cmd进入emqx安装目录bin
emqx start  #启动服务
emqx ping  #返回pong 连接正常
emqx stop  #停止服务
1.3、在安装环境下访问http://localhost:18083 进入后台管理页面

默认账号密码

admin
public

连接dtu 服务端java,网络,物联网,linux

2、rabbitMQ服务搭建

(对mqtt消息进行缓存避免直接写入数据库并发量大击垮mysql造成服务宕机,起到流量削峰作用)

2.1 下载并安装

进入rabbitmq官网 https://www.rabbitmq.com 下载Erlang和与其对应rabbitmq-server包

2.2 安装Erlang

设置系统环境变量,然后在命令窗口输入Erl返回版本号说明安装成功

连接dtu 服务端java,网络,物联网,linux
连接dtu 服务端java,网络,物联网,linux

连接dtu 服务端java,网络,物联网,linux

2.3 安装rabbitmq

进入rabbitMQ安装目录的sbin目录点击上方的路径框输入cmd,按下回车键

连接dtu 服务端java,网络,物联网,linux

连接dtu 服务端java,网络,物联网,linux

输入命令点击回车

 rabbitmq-plugins enable rabbitmq_management

重启服务,双击rabbitmq-server.bat(双击后可能需要等待一会)连接dtu 服务端java,网络,物联网,linux

打开浏览器,地址栏输入http://127.0.0.1:15672 ,即可看到管理界面的登陆页(账号密码guest)

连接dtu 服务端java,网络,物联网,linux

3、 Java集成mqtt和rabbitmq
3.1 mqtt
3.11 添加依赖mqtt依赖
<!-- mqtt数据对接 -->
    <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>1.2.5</version>
    </dependency>
3.12 mqtt配置类示例
/**
 * <b>mqtt配置类</b>
 *
 * @date 2022/11/15
 */
public class MqttConfig {
	/** 订阅消息主题 - 可为多个 现指一个作为示例 */
	public static final String TOPIC = "dtu/out";
	/** QOS */
	public static final Integer QOS = 0;
	/** 链接地址 */
	public static final String IP_ADDRESS = "tcp://localhost:1883"; //服务器地址
	/** 用户名 */
	public static final String USERNAME = "admin";
	/** 密码 */
	public static final String PASSWD = "admin@123";
    /**轮询查询语句,该语句为modbus指令 */
	public static final String []  datas={"0103012B007C35DF",
										"010301B7000A7417",
										"01030D20002AC773",
										"010303D4001405B9",
										"010303E80078C598",
										"0103046000730501",
										"010304D3000FF507",
										"010304E2006E6520",
										"0103055000698539",
										"010305B90023D53A",
										"010305DC00648517",
										"01030640005F04AE",
										"0103069F003734BA",
										"010306D6005A2481",
										"010307300055848E",
										"01030785004B1560"};
	/*QoS0 代表,Sender 发送的一条消息,Receiver 最多能收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,也就算了;
    QoS1 代表,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息;
    QoS2 代表,Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。*/
}

3.13 启动mqtt客户端代码

包含连接状态获取,断线重连,启动后可在mqtt服务后台查看连接

public class MqttInitialized{
	/** MQTT客户端 */
	private static MqttClient client = null;
	/** 连接选项 */
	private static MqttConnectOptions connOpts = null;
	/** 连接状态 */
	private static Boolean connectStatus = false;

	/**
	 * 设置连接信息
	 */
	static {
		try {
			// MQTT 连接选项
			connOpts = new MqttConnectOptions();
			// 设置认证信息
			connOpts.setUserName(MqttConfig.USERNAME);
			connOpts.setPassword(MqttConfig.PASSWD.toCharArray());
			connOpts.setAutomaticReconnect(true);//启用自动重新连接
			//  持久化
			MemoryPersistence persistence = new MemoryPersistence();
			// MQ客户端建立
			client = new MqttClient(MqttConfig.IP_ADDRESS, "lxb-lunxun", persistence);
			// 设置回调
			client.setCallback(new MqttHandle());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
			/**
			 * MQTT客户端启动
			 */
			@PostConstruct
			public static void connect() {
				try {
					// 建立连接
					client.connect(connOpts);
					connectStatus = client.isConnected();
					log.info("MQTT服务器连接成功~~~");
				} catch (Exception e) {
					connectStatus = client.isConnected();
					log.error("MQTT服务器连接失败!!");
					e.printStackTrace();
					reconnection();
				}
			}
			
			/**
             * 获取MQTT客户端连接状态
             * @return
             */
            public static Boolean getConnectStatus() {
                return connectStatus;
            }

			/**
			 * 断线重连
			 */
			public static void reconnection() {
				// 尝试进行重新连接
				while (true) {
					if (MqttInitialized.getConnectStatus()) {
						// 查询连接状态 连接成功则停止重连
						break;
					}
					try {
						log.info("开始进行MQTT服务器连接.......");
						// 进行连接
						connect();
						Thread.sleep(10000);
					} catch (Exception e) {
						log.error("重新连接出现异常");
						e.printStackTrace();
						break;
					}
				}
			}
		}
3.14 使用mqtt消息发布获取寄存器数据

此处只需要将modbus查询指令发送给dtu,dtu得到指令会自动取获取寄存器数据,查询语句为配置类中的datas数据,发布的主题必须要与dtu设置的订阅主题,消息质量一致;使用循环每10秒发送一条进行查询实现轮询效果也可使用dtu自定义轮询配置与此效果一致;

注意:在此还需要把modus指令16进制字符串转换为Byte型数组16进制源字符串才能进行发送(mqtt发送消息只能是byte);

			/**
			 * 消息发布
			 * @throws MqttException
			 */
			public  void publish() throws MqttException, InterruptedException {
				while (true){
					for (int i=0; i<MqttConfig.datas.length;i++) {
						byte[] bytes =HexStringToByte(MqttConfig.datas[i]);
						MqttMessage mqttMessage = new MqttMessage(bytes);
						mqttMessage.setQos(MqttConfig.QOS);
						client.publish("dtu/in", mqttMessage);
						System.out.println(MqttConfig.datas[i]);
						Thread.sleep(10000);
					}
				}
			}
3.15 16进制字符串转换为Byte型数组16进制源字符串方法
	/**
	 * 16进制字符串转换为Byte型数组16进制源字符串
	 *
	 * @param
	 * @return Byte类型数组
	 */
	public static byte[] HexStringToByte(String hexString) {
		hexString = hexString.replace(" ", "");
		int len = hexString.length();
		if (len % 2 != 0)
			return null;
		byte[] bufD = new byte[len / 2];
		byte[] tmpBuf = hexString.getBytes();
		int i = 0, j = 0;
		for (i = 0; i < len; i++) {
			if (tmpBuf[i] >= 0x30 && tmpBuf[i] <= 0x39)
				tmpBuf[i] -= 0x30;
			else if (tmpBuf[i] >= 0x41 && tmpBuf[i] <= 0x46)
				tmpBuf[i] -= 0x37;
			else if (tmpBuf[i] >= 0x61 && tmpBuf[i] <= 0x66)
				tmpBuf[i] -= 0x57;
			else
				tmpBuf[i] = 0xF;
		}
		for (i = 0, j = 0; i < len; i += 2, j++) {
			bufD[j] = (byte) ((tmpBuf[i] << 4) | tmpBuf[i + 1]);
		}
		return bufD;
	}
3.16 mqtt消息订阅主题

需要主题名称以及消息质量两个参数(此处订阅主题则为dtu设置的发布主题)

			/**
			 * 消息订阅
			 * @throws MqttException
			 */
	public static void subscribe(String topic, Integer qos) throws MqttException {
				client.subscribe(topic,qos);
			}
3.17 mqtt回调消息

处理查看订阅主题的消息和发送消息成功的回调(也就是dtu获取的寄存器数据)需要实现MqttCallback接口;

注意:mqtt收到消息默认为byte要将byte转为16进制字符串

public class MqttHandle implements MqttCallback {

	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Autowired
	private RedisTemplate redisTemplate;
	private static MqttHandle mqttHandle;
	@PostConstruct
	private void init(){
		mqttHandle = this;
		rabbitTemplate=mqttHandle.rabbitTemplate;
		redisTemplate=mqttHandle.redisTemplate;
	}
	/**
	 * 连接丢失
	 * @param cause
	 */
	@Override
	public void connectionLost(Throwable cause) {
		log.info("connection lost:" + cause.getMessage());
		MqttInitialized.reconnection();
	}

	/**
	 * 收到消息
	 * @param topic
	 * @param message
	 */
	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		String string = bytesToHexString(message.getPayload());
	}

	/**
	 * 消息传递成功
	 * @param token
	 */
	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {
		System.out.println("发送消息---------" + token.isComplete());
	}

	/**
	 * byte[] ->string
	 * @param src
	 * @return
	 */
	public static String bytesToHexString(byte[] src){
	   StringBuilder stringBuilder = new StringBuilder("");
	    if (src == null || src.length <= 0) {
				return null;
		     }
	     for (int i = 0; i < src.length; i++) {
		        int v = src[i] & 0xFF;
		        String hv = Integer.toHexString(v);
		        if (hv.length() < 2) {
			             stringBuilder.append(0);
			         }
		         stringBuilder.append(hv);
		     }
	    return stringBuilder.toString();
	}

}
3.2 rabbitmq**
3.21 添加pom依赖
  <!-- AMQP-rabbitmq 依赖 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
3.22 添加yml配置

需要在rabbitmq管理页面添加新账号,guest账号只能本地环境使用,开启手动应答避免重复消费

spring:
  rabbitmq:
    port: 5672
    username: user
    password: password
    virtual-host: /
    host: localhost
    listener:
      simple:
        acknowledge-mode: manual #开启手动应答
        prefetch: 1 #消费者每次消费一个消息
3.23 添加配置类

​ 设置交换机、队列、绑定关系、路由key(并开启交换机及队列持久消息默认持久避免rabbitmq服务重启导致消息丢失)

//代码示例
@Configuration
public class RabbitMqConfiguration {
	//1.声明direct模式交换机
	@Bean
	public DirectExchange directExchange(){
		return new DirectExchange("direct_mqqt_exchage",true,false );
	}
	//direct模式队列
	@Bean
	public Queue directMqttQueue(){
		return new Queue("mqtt.direct.queue",true);
	}
	//3.声明direct模式绑定关系
	@Bean
	public Binding directMqttBindings(){
		return BindingBuilder.bind(directMqttQueue()).to(directExchange()).with("mqtt");
	}
}

3.24 将mqtt查询回调消息放入rabbitmq队列

修改mqtt回调消息代码,判断消息如果已0103开头则为查询数据,交换机和路由key必须和设置的一致;


@Component
@Slf4j
public class MqttHandle implements MqttCallback {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	private static MqttHandle mqttHandle;
	@PostConstruct
	private void init(){
		mqttHandle = this;
		rabbitTemplate=mqttHandle.rabbitTemplate;
	}

	/**
	 * 收到消息
	 * @param topic
	 * @param message
	 */
	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		String string = bytesToHexString(message.getPayload());
		if (ObjectUtil.isNotEmpty(string)&&string.startsWith("0103")){
				String exchageName="direct_mqqt_exchage";
				String routingKey="mqtt";
				mqttHandle.rabbitTemplate.convertAndSend(exchageName,routingKey,string);//存储消息
		}
	}
3.25 对rabbitmq中的消息持久到数据库

在@RabbitListener注解中 填写配置的队列即可消费消息,开启手动应当需手动确认消息

public class MessagesConsumer {
	@Autowired
	private MessageMapper mapper;
	private static MessagesConsumer messagesConsumer;
	@PostConstruct
	private void init(){
		messagesConsumer = this;
		mapper=messagesConsumer.mapper;
	}
	@RabbitListener(queues = "mqtt.direct.queue")
	public void processMessage(String msg,Message message, Channel channel) throws IOException {
//		获得消息内容
		String messageId = message.getMessageProperties().getMessageId();
		String s = new String(message.getBody(), StandardCharsets.UTF_8);
		try {
			/*
			 *   业务代码,出现 异常则放回丢列不进行消费
			 * */
			int count=0;
				count=messagesConsumer.mapper.insert(s);
			if (count >0){
				System.out.println("插入成功");
				//确认消息
				channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
			}else{
				System.out.println("插入失败");
				//退回消息,将消息重新放回队列
				channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
			}
		} catch (Exception e) {
			e.printStackTrace();
			//退回消息,将消息重新放回队列
			channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
		}
	}
}
3.3 使用mqtt写入单个保持寄存器
3.31 写入单个寄存器指令组成如下,如果写入成功则返回发送的指令
从机地址 功能码 寄存器地址高位 寄存器地址低位 数据高位 数据低位 CRC高位 CRC低位
01 06 00 01 00 03 98 0B

1、从机地址和功能码不表为0106,寄存器高低位地址和数据高低位地址需要前台传参,crc校验通过计算获得从而可以获得完整的写入指令;

2、使用mqtt发送写入指令消息给对应的dtu,并使用当前用户userId作为key,当前时间设为value存入redis;

3、发送成功会返回发送的指令,在mqtt消息回调方法处将由0106开头的消息+topic主题作为key,当前时间设为value存入redis;

4、发送完消息后每隔1.5秒执行一次判断key为0106开头的消息+topic是否大于key为userId的时间,执行三次来判断是否写入成功;

5、如果为写入成功则查询一次修改保持寄存器的数据进行数据更新 ;

3.32 代码实现

注意发送消息的topic与回调时放入到redis作为 key的topic不相同

@RestController
@RequestMapping("/mqtt")
@Slf4j
public class PublishController {
	@Autowired
	private MqttInitialized mqttInitialized;

	@Autowired
	private RedisTemplate redisTemplate;
	/**
	 * 写入保持寄存器
	 * @param addr
	 * @param data
	 * @param topic
	 * @return
	 * @throws InterruptedException
	 * @throws MqttException
	 */
	@PostMapping("/writeSingle")
	public boolean writeSingle(@RequestParam  String addr, @RequestParam String data, @RequestParam String topic) throws InterruptedException, MqttException {
		if (ObjectUtil.isEmpty(addr)||ObjectUtil.isEmpty(data)||ObjectUtil.isEmpty(topic))return false;
		ValueOperations<String, Long> valueOperations = redisTemplate.opsForValue();
		String  message="0106"+addr+data+ModbusCRCUtils.getCRC("0106"+addr+data);
		valueOperations.set("userId",System.currentTimeMillis());
		mqttInitialized.publish(topic,message);
		for (int i = 0; i < 3; i++) {
			if (ObjectUtil.isNotEmpty(valueOperations.get(message+"dtu/in"))&&valueOperations.get(message+"dtu/in")>(valueOperations.get("userId"))){
				mqttInitialized.publish(topic,"0103012B007C35DF");//修改成功后查询
				return true;
			}
			Thread.sleep(1500);
		}
		return false;
	}

}
3.33 CRC校验码计算原理
ModBus 通信协议的 CRC (冗余循环校验码含2个字节, 即 16 位二进制数)。
CRC 码由发送设备计算, 放置于所发送信息帧的尾部。
接收信息设备再重新计算所接收信息 (除 CRC 之外的部分)的 CRC,
比较计算得到的 CRC 是否与接收到CRC相符, 如果两者不相符, 则认为数据出错。
1) 预置 1 个 16 位的寄存器为十六进制FFFF(即全为 1) , 称此寄存器为 CRC寄存器。
2) 把第一个 8 位二进制数据 (通信信息帧的第一个字节) 与 16 位的 CRC寄存器的低 8 位相异或, 把结果放于 CRC寄存器。
3) 把 CRC 寄存器的内容右移一位(朝低位)用 0 填补最高位, 并检查右移后的移出位。
4) 如果移出位为 0, 重复第 3 步 (再次右移一位); 如果移出位为 1, CRC 寄存器与多项式A001 ( 1010 0000 0000 0001) 进行异或。
5) 重复步骤 3 和步骤 4, 直到右移 8 次,这样整个8位数据全部进行了处理。
6) 重复步骤 2 到步骤 5, 进行通信信息帧下一个字节的处理。
7) 将该通信信息帧所有字节按上述步骤计算完成后,得到的16位CRC寄存器的高、低字节进行交换。
8) 最后得到的 CRC寄存器内容即为 CRC码。
/**
 * <b>计算CRC16校验码工具类</b>
 *
 * @author Lixubo
 * @date 2023/6/2
 */
public class ModbusCRCUtils {
   /**
    * 计算CRC16校验码
    * @param str16
    * @return
    */
   public static String getCRC(String str16) {
      byte[] bytes = HexStringToByte(str16);
      int CRC = 0x0000ffff;
      int POLYNOMIAL = 0x0000a001;
      int i, j;
      for (i = 0; i < bytes.length; i++) {
         CRC ^= (int) bytes[i];
         for (j = 0; j < 8; j++) {
            if ((CRC & 0x00000001) == 1) {
               CRC >>= 1;
               CRC ^= POLYNOMIAL;
            } else {
               CRC >>= 1;
            }
         }
      }
      //高低位转换,看情况使用
      CRC = ( (CRC & 0x0000FF00) >> 8) | ( (CRC & 0x000000FF ) << 8);
      return Integer.toHexString(CRC);
   }
   /**
    * 16进制字符串转换为Byte型数组16进制源字符串
    *
    * @param
    * @return Byte类型数组
    */
   public static byte[] HexStringToByte(String hexString) {
      hexString = hexString.replace(" ", "");
      int len = hexString.length();
      if (len % 2 != 0)
         return null;
      byte[] bufD = new byte[len / 2];
      byte[] tmpBuf = hexString.getBytes();
      int i = 0, j = 0;
      for (i = 0; i < len; i++) {
         if (tmpBuf[i] >= 0x30 && tmpBuf[i] <= 0x39)
            tmpBuf[i] -= 0x30;
         else if (tmpBuf[i] >= 0x41 && tmpBuf[i] <= 0x46)
            tmpBuf[i] -= 0x37;
         else if (tmpBuf[i] >= 0x61 && tmpBuf[i] <= 0x66)
            tmpBuf[i] -= 0x57;
         else
            tmpBuf[i] = 0xF;
      }
      for (i = 0, j = 0; i < len; i += 2, j++) {
         bufD[j] = (byte) ((tmpBuf[i] << 4) | tmpBuf[i + 1]);
      }
      return bufD;
   }

}
3.34 调整mqtt消息回调代码加入redis
/**
 * <b></b>
 *
 * @author Lixubo
 * @date 2022/11/15
 */
@Component
@Slf4j
public class MqttHandle implements MqttCallback {

	@Autowired
	private RabbitTemplate rabbitTemplate;
	@Autowired
	private RedisTemplate redisTemplate;
	private static MqttHandle mqttHandle;
	@PostConstruct
	private void init(){
		mqttHandle = this;
		rabbitTemplate=mqttHandle.rabbitTemplate;
		redisTemplate=mqttHandle.redisTemplate;

	/**
	 * 收到消息
	 * @param topic
	 * @param message
	 */
	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		ValueOperations<String, Long> valueOperations = mqttHandle.redisTemplate.opsForValue();
		String string = bytesToHexString(message.getPayload());
		if (ObjectUtil.isNotEmpty(string)&&string.startsWith("0106")){
			valueOperations.set(string.concat(topic),System.currentTimeMillis());
			System.out.println("=========="+string+topic+"==========");
		}
	}

	/**
	 * byte[] ->string
	 * @param src
	 * @return
	 */
	public static String bytesToHexString(byte[] src){
	   StringBuilder stringBuilder = new StringBuilder("");
	    if (src == null || src.length <= 0) {
				return null;
		     }
	     for (int i = 0; i < src.length; i++) {
		        int v = src[i] & 0xFF;
		        String hv = Integer.toHexString(v);
		        if (hv.length() < 2) {
			             stringBuilder.append(0);
			         }
		         stringBuilder.append(hv);
		     }
	    return stringBuilder.toString();
	}
}

3.4 根据clientId获取客户端连接状态(emqx官方提供的api)
3.41 在emqx管理后台添加api密钥进行访问授权

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2sBqFGXe-1686193465745)(C:\Users\Lixubo\AppData\Roaming\Typora\typora-user-images\image-20230606173903317.png)]

3.42 添加pom依赖
    <dependency>
      <groupId>com.squareup.okhttp3</groupId>
      <artifactId>okhttp</artifactId>
      <version>3.8.1</version>
    </dependency>
public static boolean getClientStstus(String clientId) {
//     clientId="175-4";
      try {
         /*
         服务端生成
          */
         String username = "1f408e6ac4af426a";
         String password = "qEEdNrgr4FI6q289B5wJy9BXOir25eoKdG2ygxsEu5onA";

         OkHttpClient client = new OkHttpClient();

         Request request = new Request.Builder()
               .url("http://localhost:18083/api/v5/clients/"+clientId)
               .header("Content-Type", "application/json")
               .header("Authorization", Credentials.basic(username, password))
               .build();

         Response response = client.newCall(request).execute();
         JSONObject jsonObject=JSONObject.parseObject(response.body().string());
         System.out.println(jsonObject.toString());
         if (ObjectUtil.isNotEmpty(jsonObject.getBoolean("connected"))&&jsonObject.getBoolean("connected")){
            return true;
         }
      } catch (IOException e) {
         e.printStackTrace();
      }
      return false;
   }

三、数据层使用mysql数据持久,redis作为缓存;

四、 其它注意事项

1、mqtt发布消息查询数据如果为同一个寄存器时注意消息间隔时间必须大于Modbus 超时时间
2、Modbus 超时时间计算:

如果通讯速率为9600时,按照常规的Modbus RTU,8个数据位、1个停止位、偶校验方式,每传输1个字节数据需要的时间为:(8+1+1)/9600=1.04ms/Byte

因此,主站发出响应到从站返回数据的时间周期为:(8+5+2*n)*1.04+T1+T2,其中n为寄存器个数,T1为从站的响应时间(如果是PLC,则为PLC的扫描时间),T2为通讯余量,一般为20~50ms。如果读取10个字的数据,从站响应时间为50ms,则整个周期为:(8+5+2x10)x1.04+50+50=134.32ms。因此,超时时间必须大于134.32ms,可以设置为150ms以上。

如果超时时间太短,响应不能完全返回,通讯会报错。

如果超时时间设置太长,按照上述例子,一共10个从站,每个从站读取10个字的数据,

如果超时时间设置为1s,如果有1个从站出现故障,则整个轮询周期为:1000+9*134.32=2208.88 ms。

同理,如果超时时间设为150ms,则整个轮询周期为:150+9*134.32=1358.88 ms。

因此,可以看出正确的超时时间设置可缩短整个轮询周期,不正确的设置,将导致通讯出错或整个通讯周期过长。


3、mqtt多主题订阅通配符规则
3.1 主题层级分隔符/

/ 被用来分割主题树的每一层,并给主题空间提供分等级的结构。当两个通配符在一个主题中出现的时候,主题层次分隔符的使用是很重要的。

3.2 多层通配符#

#是一个匹配主题中任意层次数的通配符。比如说,如果你订阅了finance/stock/ibm/#,你就可以接收到以下这些主题的消息。
finance/stock/ibm
finance/stock/ibm/closingprice
finance/stock/ibm/currentprice
多层通配符有可以表示大于等于0的层次。因此,finance/#也可以匹配到单独的finance,在这种情况下#代表0层。在这种语境下主题层次分隔符/就没有意义了。因为没有可以分的层次。

多层通配符只可以确定当前层或者下一层。因此,#和finance/#都是有效的,但是finance#不是有效的。多层通配符一定要是主题树的最后一个字符。比如说,finance/#是有效的,但是finance/#/closingprice是无效的。

3.3 单层通配符+

+只匹配主题的一层。比如说,finance/stock/+匹配finance/stock/ibm和finance/stock/xyz,但是不匹配finance/stock/ibm/closingprice。另外,因为单层通配符只匹配1层,finance/+不匹配finance。
单层通配符可以被用于主题树的任意层级,连带多层通配符。它必须被用在主题层级分隔符/的右边,除非它是指定自己。因此,+和finance/+都是有效的,但是finance+无效。单层通配符可以用在主题树的末端,也可以用在中间。比如说,finance/+和finance/+/ibm都是有效的。

3.4 主题语法和用法

当你建立一个应用,设计主题树的时候应该考虑以下的主题名字的语法和语义:

主题至少有一个字符长。
主题名字是大小写敏感的。比如说,ACCOUNTS和Accounts是两个不同的主题。
主题名字可以包含空格。比如,Accounts payable是一个有效的主题。
以/开头会产生一个不同的主题。比如说,/finnace与finance不同。/finance匹配"+/+"和/+,但不匹配+
不要在任何主题中包含null(Unicode \x0000)字符。
以下的原则应用于主题树的建造和内容

在主题树中,长度被限制于64k内但是在这以内没有限制层级的数目 。
可以有任意数目的根节点;也就是说,可以有任意数目的主题树。文章来源地址https://www.toymoban.com/news/detail-776095.html


层。在这种语境下主题层次分隔符/就没有意义了。因为没有可以分的层次。

多层通配符只可以确定当前层或者下一层。因此,#和finance/#都是有效的,但是finance#不是有效的。多层通配符一定要是主题树的最后一个字符。比如说,finance/#是有效的,但是finance/#/closingprice是无效的。

3.3 单层通配符+

+只匹配主题的一层。比如说,finance/stock/+匹配finance/stock/ibm和finance/stock/xyz,但是不匹配finance/stock/ibm/closingprice。另外,因为单层通配符只匹配1层,finance/+不匹配finance。
单层通配符可以被用于主题树的任意层级,连带多层通配符。它必须被用在主题层级分隔符/的右边,除非它是指定自己。因此,+和finance/+都是有效的,但是finance+无效。单层通配符可以用在主题树的末端,也可以用在中间。比如说,finance/+和finance/+/ibm都是有效的。

3.4 主题语法和用法

当你建立一个应用,设计主题树的时候应该考虑以下的主题名字的语法和语义:

主题至少有一个字符长。
主题名字是大小写敏感的。比如说,ACCOUNTS和Accounts是两个不同的主题。
主题名字可以包含空格。比如,Accounts payable是一个有效的主题。
以/开头会产生一个不同的主题。比如说,/finnace与finance不同。/finance匹配"+/+"和/+,但不匹配+
不要在任何主题中包含null(Unicode \x0000)字符。
以下的原则应用于主题树的建造和内容

在主题树中,长度被限制于64k内但是在这以内没有限制层级的数目 。
可以有任意数目的根节点;也就是说,可以有任意数目的主题树。


到了这里,关于java集成mqtt、rabbitmq服务远程连接数dtu实现寄存器rtu数据读写的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 手机、电脑mqtt客户端通过腾讯云服务器远程连接ESP32

            本文将实现:         1、esp32与腾讯云物联网服务器通过mqtt协议通信         2、电脑和手机客户端通过mqtt与腾讯云相通信         3、腾讯云服务器内部消息转发,将手机、电脑发布的主题转发给esp32订阅,实现手机、电脑与esp32的远程通信。      

    2024年02月11日
    浏览(61)
  • Java基于RabbitMQ实现MQTT

    要想使用MQ的MQTT服务需要先开启MQTT服务,因为RabbitMQ的MQTT默认是关闭的,具体启动方法可以参考:rabbitmq使用mqtt协议_panda_225400的博客-CSDN博客_rabbitmq mqtt 下面具体实现我就直接贴代码吧,一切说明都在代码里面,方便直接 POM依赖 配置类 HTTP处理相关类 网关类 控制层测试类

    2024年02月12日
    浏览(42)
  • java实现连接远程服务器,并可以执行shell命令

    你可以使用Java中的SSH库来连接远程服务器并执行shell命令。下面是一个简单的示例代码: 请注意替换 your_host , your_username , your_password 和 your_shell_command 为实际的远程服务器信息和要执行的shell命令。该示例代码使用JSch库来建立SSH连接并执行命令。

    2024年01月20日
    浏览(64)
  • 内网穿透-外远程连接中的RabbitMQ服务

    RabbitMQ是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。 由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,由于erlang 语言的高并发特性,性能较好,本质是个队列,FIFO 先入先出,里面存放的内容

    2024年02月12日
    浏览(44)
  • 5G DTU实现燃气管道数据采集远程管理

    随着物联网技术与智慧城市的不断发展,燃气管道户外组网的需求逐渐浮现。在户外组网应用中5G DTU(Data Terminal Unit)发挥着至关重要的作用。5G DTU可用于数据采集、传输与远程管理,能够实现燃气数据的单点或多点采集和传输,为燃气管道系统的远程管理提供了数据传输和

    2024年02月21日
    浏览(40)
  • DTU和MQTT网关优缺点

    目前市面上有两种设备实现Modbus转MQTT网关。网关式、DTU式。 钡铼技术网关内部进行转换 网关式 优点: 1、通讯模块和MCU分开,通讯模块只做通讯功能,协议转换有单独主控MCU,“硬转换”; 2、数据点是通过映射到主控ARM芯片,配置更方便,点表多更加无压力; 3、网关式,

    2024年02月07日
    浏览(46)
  • 【物联网开发】-微信小程序之MQTT连接,基于MQTT实现设备-服务器-小程序的消息传输

    想要开发微信小程序,首先要有一些基础知识:html、cs、js、json等,小程序中要用到的知识框架大体相同,一个页面包括js、json、wxml、wxss格式的文件。 由于本人此前从未接触过小程序开发,本篇文章将会以新手小白的角度一步步剖析如何使用微信小程序通过MQTT服务器连接设

    2023年04月24日
    浏览(58)
  • Linux本地部署Mosquitto MQTT协议消息服务端并实现远程访问【内网穿透】

    Mosquitto是一个开源的消息代理,它实现了MQTT协议版本3.1和3.1.1。它可以在不同的平台上运行,包括Windows、Linux、macOS等。mosquitto可以用于物联网、传感器、移动应用程序等场景,提供了一种轻量级的、可靠的、基于发布/订阅模式的消息传递机制。 MQTT协议远程访问的好处在于

    2024年02月05日
    浏览(42)
  • esp8266模块--MQTT协议连接服务器实现数据接收和发送+源码

    首先推荐中国移动的代码,我觉得中国移动的代码更为合理:(但是有一些其他的模块在里面) OneNET开发板代码、资料--2020-09-27--标准板、Mini板bug修复 - 开发板专区 - OneNET设备云论坛 (10086.cn) 以及这位b站up做的视频:(wifi模块在p9节) 【挽救小白第一季】STM32+8266+小程序智能

    2024年02月08日
    浏览(63)
  • ESP32的MQTT AT固件烧录+STM32以ESP32的MQTT AT固件的AT指令连接EMQX下mqtt服务器实现消息订阅和发布

    目录 写在前面 三种方案(利用ESP32连接EMQX下的MQTT) 步骤 ESP32烧录固件并AT指令进行测试。 下载固件  烧录工具下载 烧录固件(选择ESP32)  关于AT 指令与MQTT服务器断开后自动重连MQTT服务器 关于AT指令设置上电自动连接WIFI 关于AT指令设置断开后自动重新连接WIFI STM32对接E

    2023年04月12日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包