Java实现Modbus读写数据

这篇具有很好参考价值的文章主要介绍了Java实现Modbus读写数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

由于当时项目周期赶,引入了一个PLC4X组件,上手快。接下来就是使用这个组件遇到的一些问题:

  • 关闭连接NioEventLoop没有释放导致oom
  • 设计思想是一个设备一个连接,而不是一个网关一个连接
  • 连接断开后客户端无从感知
    前两个问题解决方案参考上篇文章,最后一个问题虽然可以通过isConnect()方法获取到状态,但是连接断开后这个状态并没有更新,只能代码实现失败重连。
    所以为了解决以上问题,我打算重新封装一个Modbus组件。

步骤

代码如下所示,目前只分享modbus-core相关的代码。

  • modbus-core:实现设备读写指令的下发以及应答。
  • modbus-app:实现通用的可灵活配置的modbus设备接入层,通过更新配置信息即可快速引入新设备,无需手写代码重启应用。

Java实现Modbus读写数据,Modbus
为了快速实现modbus组件封装,这里引入了Vertx框架(基于事件+异步)官网链接,而不是原生的Netty框架。

引入架包

<!-- 目前我这里引入最新的版本(4.4.4) -->
<dependency>
     <groupId>io.vertx</groupId>
     <artifactId>vertx-core</artifactId>
     <version>${vertx.version}</version>
 </dependency>

工具类

ByteUtil

package com.bho.modbus.utils;

import java.nio.ByteBuffer;

public class ByteUtil {

    /**
     * 字节数组转字符串
     * @param bytes
     * @return
     */
    public static  String bytesToHexString(byte[] bytes) {
        StringBuffer sb = new StringBuffer(bytes.length);
        String sTemp;
        for (int i = 0; i < bytes.length; i++) {
            sTemp = Integer.toHexString(0xFF & bytes[i]);
            if (sTemp.length() < 2) {
                sb.append(0);
            }
            sb.append(sTemp.toUpperCase());
        }
        return sb.toString();
    }

    /**
     * int整型转字节数组
     * @param data
     * @param offset
     * @param len
     * @return
     */
    public static byte[] intToBytes(int data, int offset, int len) {
        ByteBuffer buffer = ByteBuffer.allocate(4);
        buffer.putInt(data);
        byte[] bytes = buffer.array();
        if (len - offset == 4) {
            return bytes;
        }
        byte[] dest = new byte[len];
        System.arraycopy(bytes, offset, dest, 0, len);
        return dest;
    }

    /**
     * 字节数组转int整型
     * @param bytes
     * @param offset
     * @param len
     * @return
     */
    public static int bytesToInt(byte[] bytes, int offset, int len) {
        ByteBuffer buffer = ByteBuffer.allocate(4);
        for (int i = len; i < 4; i ++) {
            buffer.put((byte) 0x00);
        }
        for (int i = offset; i < offset + len; i++) {
            buffer.put(bytes[i]);
        }
        buffer.flip();
        return buffer.getInt();
    }



}

Crc16

package com.bho.modbus.utils;


public class Crc16 {

    /**
     * 获取CRC16校验码
     * @param arr_buff
     * @return
     */
    public static byte[] getCrc16(byte[] arr_buff) {
        int len = arr_buff.length;

        // 预置 1 个 16 位的寄存器为十六进制FFFF, 称此寄存器为 CRC寄存器。
        int crc = 0xFFFF;
        int i, j;
        for (i = 0; i < len; i++) {
            // 把第一个 8 位二进制数据 与 16 位的 CRC寄存器的低 8 位相异或, 把结果放于 CRC寄存器
            crc = ((crc & 0xFF00) | (crc & 0x00FF) ^ (arr_buff[i] & 0xFF));
            for (j = 0; j < 8; j++) {
                // 把 CRC 寄存器的内容右移一位( 朝低位)用 0 填补最高位, 并检查右移后的移出位
                if ((crc & 0x0001) > 0) {
                    // 如果移出位为 1, CRC寄存器与多项式A001进行异或
                    crc = crc >> 1;
                    crc = crc ^ 0xA001;
                } else
                    // 如果移出位为 0,再次右移一位
                    crc = crc >> 1;
            }
        }
        return intToBytes(crc);
    }


    private static byte[] intToBytes(int value) {
        byte[] src = new byte[2];
        src[1] = (byte) ((value >> 8) & 0xFF);
        src[0] = (byte) (value & 0xFF);
        return src;
    }



}

实体类

ModbusMode

目前只实现了以下两种通信方式,可根据自己需求加入其它通信方式。

package com.bho.modbus.model;

import com.bho.modbus.utils.ByteUtil;
import com.bho.modbus.utils.Crc16;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.log4j.Log4j2;

import java.nio.ByteOrder;

@Log4j2
public enum ModbusMode {


    /**
     * 【事务ID(2) + 协议标识(2) + 数据长度(2)】 + 从机地址(1) + 功能码(1) + 数据区(N)
     */
    TCP,
    /**
     * 从机地址(1) + 功能码(1) + 数据区(N) + 【校验码(2)】
     *
     */
    RTU,
    ;
    public ByteToMessageDecoder getDecoder() {
        if (this == ModbusMode.TCP) {
            return new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 65536, 4,
                    2, 0, 6, true);
        }
        if (this == ModbusMode.RTU){
            return new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 65536, 2,
                    1, 2, 0, true);
        }
        return null;
    }

    public byte[] readData(byte[] bytes) {
        int len = bytes.length;
        if (this == ModbusMode.RTU) {
            byte[] tempArr = new byte[len - 2];
            System.arraycopy(bytes, 0, tempArr, 0, tempArr.length);
            byte[] crc16 = Crc16.getCrc16(tempArr);
            if (crc16[0] != bytes[len -2] || crc16[1] != bytes[len - 1]) {
                log.error("Modbus receive illegal data:{}", ByteUtil.bytesToHexString(bytes));
                return null;
            }
            if (log.isDebugEnabled()) {
                log.debug("read data:{}", ByteUtil.bytesToHexString(tempArr));
            }
            return tempArr;
        }
        if (this == ModbusMode.TCP) {
            if (log.isDebugEnabled()) {
                log.debug("read data:{}", ByteUtil.bytesToHexString(bytes));
            }
            return bytes;
        }
        return null;
    }

    public byte[] writeData(byte[] bytes) {
        if (log.isDebugEnabled()) {
            log.debug("write data:{}",ByteUtil.bytesToHexString(bytes));
        }
        int len = bytes.length;
        if (this == ModbusMode.RTU) {
            byte[] crc16 = Crc16.getCrc16(bytes);
            byte[] tempArr = new byte[len + 2];
            System.arraycopy(bytes, 0, tempArr, 0, len);
            tempArr[len] = crc16[0];
            tempArr[len + 1] = crc16[1];
            return tempArr;
        }
        if (this == ModbusMode.TCP) {
            byte[] tempArr = new byte[len + 6];
            tempArr[1] = 0x01;
            byte[] lenBytes = ByteUtil.intToBytes(len, 2, 2);
            tempArr[4] = lenBytes[0];
            tempArr[5] = lenBytes[1];
            System.arraycopy(bytes, 0, tempArr, 6, len);
            return tempArr;
        }
        return null;
    }


}

ModbusFunc

功能码

package com.bho.modbus.model;


/**
 * Modbus常见功能码
 */
public enum ModbusFunc {

    /**
     * 错误代码
     * 01:非法的功能码
     * 02:非法的寄存器地址
     * 03:非法的数据值
     * 04:从机故障
     */


    /**
     * 请求:
     *      功能代码:1字节 0x01
     *      起始地址:2字节 0x0000-0xffff
     *      线圈数量:2字节 0x0001-0x07d0(2000)
     *
     * 正确响应:
     *      功能代码:1字节 0x01
     *        字节数:1字节 N(读线圈个数/8,余数不为0则加1)
     *      线圈状态:N字节
     *
     * 错误响应:
     *      功能代码:1字节 0x81
     *      错误代码:1字节 0x01-0x04
     */
    READ_COILS((byte)0x01),//读连续线圈状态
    READ_DISCRETE_COILS((byte)0x02),//读离散线圈状态 同上


    /**
     * 请求:
     *       功能代码:1字节 0x03
     *       起始地址:2字节 0x0000-0xffff
     *     寄存器数量:2字节 0x0001-0x007d(125)
     *
     * 正确响应:
     *       功能代码:1字节 0x03
     *         字节数:1字节 2N(N为寄存器数量)
     *      寄存器数量:2N字节
     *
     * 错误响应:
     *      功能代码:1字节 0x83
     *      错误代码:1字节 0x01-0x04
     */
    READ_HOLDING_REGISTERS((byte)0x03),//读保持寄存器值
    READ_INPUT_REGISTERS((byte)0x04),//读输入寄存器值 同上

    /**
     * 请求:
     *      功能代码:1字节 0x05
     *      起始地址:2字节 0x0000-0xffff
     *      线圈状态:2字节 0x0000/0xff00
     *
     * 正确响应:
     *      功能代码:1字节 0x05
     *      起始地址:2字节 0x0000-0xffff
     *      线圈状态:2字节 0x0000/0xff00
     *
     * 错误响应:
     *      功能代码:1字节 0x85
     *      错误代码:1字节 0x01-0x04
     */
    WRITE_SINGLE_COILS((byte)0x05),//写单个线圈
    /**
     * 请求:
     *      功能代码:1字节 0x06
     *      起始地址:2字节 0x0000-0xffff
     *      寄存器值:2字节 0x0000-0xffff
     *
     * 正确响应:
     *      功能代码:1字节 0x06
     *      起始地址:2字节 0x0000-0xffff
     *      寄存器值:2字节 0x0000-0xffff
     *
     * 错误响应:
     *      功能代码:1字节 0x86
     *      错误代码:1字节 0x01-0x04
     */
    WRITE_SINGLE_HOLDING_REGISTERS((byte)0x06),//写单个保持寄存器
    /**
     * 请求:
     *      功能代码:1字节 0x10
     *      起始地址:2字节 0x0000-0xffff
     * 写入寄存器个数:2字节 0x0001-0x007b(123)
     *    写入字节数:1字节 2N(N为寄存器个数)
     *     寄存器值:2N字节 0x0000-0xffff
     *
     * 正确响应:
     *      功能代码:1字节 0x10
     *      起始地址:2字节 0x0000-0xffff
     * 写入寄存器个数:2字节 0x0001-0x007b(123)
     *
     * 错误响应:
     *      功能代码:1字节 0x90
     *      错误代码:1字节 0x01-0x04
     */
    WRITE_MULTI_HOLDING_REGISTERS((byte)0x10),//写多个保持寄存器
    /**
     * 请求:
     *      功能代码:1字节 0x0F
     *      起始地址:2字节 0x0000-0xffff
     *   写入线圈个数:2字节 0x0001-0x07b0(1968)
     *    写入字节数:1字节 N(N为线圈个数/8,余数不为0则加1)
     *     线圈状态:N字节
     *
     * 正确响应:
     *      功能代码:1字节 0x0F
     *      起始地址:2字节 0x0000-0xffff
     *   写入线圈个数:2字节 0x0001-0x07b0(1968)
     *
     * 错误响应:
     *      功能代码:1字节 0x8F
     *      错误代码:1字节 0x01-0x04
     */
    WRITE_MULTI_COILS((byte)0x0F),//写多个线圈

    ;

    private byte func;

    ModbusFunc(byte func) {
        this.func = func;
    }



    public byte getFunc() {
        return func;
    }
}

ModbusParamConfig

下发指令参数配置信息

package com.bho.modbus.model;


import lombok.Data;


@Data
public class ModbusParamConfig {

    private RegisterType registerType;//寄存器类型
    private int registerAddress;//寄存器地址
    private String name;//指标名称
    private DataType dataType;//指标数据类型
    private int numberSplit;//(除)倍数

    public enum RegisterType {
        COIL,
        HOLDING_REGISTER,
        INPUT_REGISTER;
    }

    public enum DataType {
        BOOL,
        FLOAT,
        INT;
    }

}

SendCmdTask

下发指令任务

package com.bho.modbus.model;

import com.alibaba.fastjson.JSONObject;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import lombok.Data;

import java.util.List;


@Data
public class SendCmdTask {


    private List<ModbusParamConfig> paramConfigs;//参数列表
    private JSONObject reqParam;//请求参数 写数据必填
    private Boolean isWrite;//是否是写数据
    private Integer slaveId;//从机ID
    private Integer reqTimeout;//请求超时时间(秒)
    private Promise<JSONObject> promise;
    private Long timerId;


    public SendCmdTask(Vertx vertx, List<ModbusParamConfig> paramConfigs, JSONObject reqParam, Boolean isWrite, Integer slaveId, Integer reqTimeout) {
        this.paramConfigs = paramConfigs;
        this.reqParam = reqParam;
        this.isWrite = isWrite;
        this.slaveId = slaveId;
        this.reqTimeout = Math.max(reqTimeout, 5);
        Promise<JSONObject> promise = Promise.promise();
        this.promise = promise;
        this.timerId = vertx.setTimer(reqTimeout * 1000, hh -> promise.tryFail("Request timeout"));
    }
}

核心类

package com.bho.modbus.core;

import com.alibaba.fastjson.JSONObject;
import com.bho.modbus.model.SendCmdTask;
import com.bho.modbus.model.ModbusFunc;
import com.bho.modbus.model.ModbusMode;
import com.bho.modbus.model.ModbusParamConfig;

import com.bho.modbus.utils.ByteUtil;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.NetSocketImpl;
import lombok.extern.log4j.Log4j2;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

@Log4j2
public class ModbusConnection {
    private String ip;//从机IP
    private Integer port;//从机端口
    private AtomicBoolean isAlive;//从机是否在线
    private ModbusMode mode;//通讯模式
    private NetSocket netSocket;//客户端连接
    private boolean isInitiativeClose;//是否是主动关闭连接
    private Long failRetryTimerId;//失败重试定时器ID
    private Integer failRetryIntervalSecond;//连接断开后重连间隔时间
    private Integer reqTimeoutSecond = 1;//请求超时时间
    private Long queueTimerId;//队列定时器
    private ConcurrentLinkedQueue<SendCmdTask> writeQueue;//写队列 优先写
    private ConcurrentLinkedQueue<SendCmdTask> readQueue;//读队列
    private Map<String, Promise<byte[]>> promiseMap;

    private Vertx vertx;

    public ModbusConnection(Vertx vertx, String ip, Integer port, Integer failRetryIntervalSecond, ModbusMode mode) {
        this.vertx = vertx;
        this.ip = ip;
        this.port = port;
        this.failRetryIntervalSecond = failRetryIntervalSecond;
        this.mode = mode;
        this.isAlive = new AtomicBoolean(false);
        this.writeQueue = new ConcurrentLinkedQueue<>();
        this.readQueue = new ConcurrentLinkedQueue<>();
        this.promiseMap = new ConcurrentHashMap<>();
        consumerTaskQueue(true);
    }

    /**
     * 建立连接
     * @return
     */
    public Future<Boolean> connect(){
        NetClient netClient = vertx.createNetClient();
        return vertx.executeBlocking(b -> {
            netClient.connect(port, ip)
                    .onSuccess(socket -> {
                        log.info("Modbus connect success, ip:{}, port:{}", ip, port);
                        netSocket  = socket;
                        isAlive.set(true);
                        b.tryComplete(true);
                        NetSocketImpl netSocketImpl = (NetSocketImpl) socket;
                        netSocketImpl.channelHandlerContext().pipeline().addFirst(mode.getDecoder());
                        socket.handler(buf -> {
                            byte[] bytes = mode.readData(buf.getBytes());
                            if (bytes == null) {
                                return;
                            }
                            int slaveId = ByteUtil.bytesToInt(bytes, 0, 1);
                            int funcNo = ByteUtil.bytesToInt(bytes, 1, 1);
                            int errFuncNo = funcNo - 128;
                            String key = String.format("%s_%s", slaveId, funcNo);
                            String errKey = String.format("%s_%s", slaveId, errFuncNo);
                            if (promiseMap.containsKey(key)) {
                                Promise<byte[]> promise = promiseMap.get(key);
                                byte[] content = new byte[bytes.length - 2];
                                System.arraycopy(bytes, 2, content, 0, content.length);
                                promise.tryComplete(content);
                            } else if (promiseMap.containsKey(errKey)) {
                                Promise<byte[]> promise = promiseMap.get(errKey);
                                int data = ByteUtil.bytesToInt(bytes, 2, 1);
                                switch (data) {
                                    case 1:
                                        promise.tryFail("Illegal function code");
                                        break;
                                    case 2:
                                        promise.tryFail("Illegal register address");
                                        break;
                                    case 3:
                                        promise.tryFail("Illegal data value");
                                        break;
                                    case 4:
                                        promise.tryFail("Slave fault");
                                        break;
                                }
                            }
                        });
                        socket.closeHandler(h -> {
                            if (!isInitiativeClose) {
                                log.error("Modbus connect close, ip:{}, port:{}", ip, port);
                                failRetryTimerId = vertx.setTimer(failRetryIntervalSecond * 1000, hh -> connect());
                            } else {
                                log.info("Modbus connect close, ip:{}, port:{}", ip, port);
                            }
                        });
                    }).onFailure(err -> {
                        log.error("Modbus connect fail, ip:{}, port:{}, msg:{}", ip, port, err.getMessage());
                        isAlive.set(false);
                        b.fail(err.getMessage());
                        failRetryTimerId = vertx.setTimer(failRetryIntervalSecond * 1000, h -> connect());
                    });
        });
    }

    /**
     * 是否在线
     * @return
     */
    public boolean isActive() {
        return isAlive.get();
    }

    /**
     * 断开连接
     */
    public void close() {
        isInitiativeClose = true;
        if (failRetryTimerId != null) {
            vertx.cancelTimer(failRetryTimerId);
        }
        if (queueTimerId != null) {
            vertx.cancelTimer(queueTimerId);
        }
        if (netSocket != null) {
            netSocket.close();
        }
    }

    /**
     * 下发读写任务(串行 优先写任务)
     * 若并行可直接调用executeTask执行任务,无需排队等候一个个消费任务
     * @param task 读写任务
     * @return
     */
    public Promise<JSONObject> offerTask(SendCmdTask task) {
        if (task.getIsWrite()) {
            writeQueue.offer(task);
        } else {
            readQueue.offer(task);
        }
        return task.getPromise();
    }

    /**
     * 消费任务队列 500毫秒轮询一次 优先消费写任务
     * @param delayFlag
     */
    private void consumerTaskQueue(boolean delayFlag){
        if(delayFlag){
            queueTimerId = vertx.setTimer(500,id->{
                consumerTaskQueue(false);
            });
            return;
        }
        if(writeQueue.isEmpty() && readQueue.isEmpty()){
            consumerTaskQueue(true);
            return;
        }
        if(!writeQueue.isEmpty()){
            SendCmdTask sendCmdTask = writeQueue.poll();
            sendCmdTask.getPromise().future().onComplete(h->{
                consumerTaskQueue(false);
            });
            executeTask(sendCmdTask);
            return;
        }
        if(!readQueue.isEmpty()){
            SendCmdTask sendCmdTask = readQueue.poll();
            sendCmdTask.getPromise().future().onComplete(h->{
                consumerTaskQueue(false);
            });
            executeTask(sendCmdTask);
        }
    }

    private Future<Void> executeTask(SendCmdTask sendCmdTask){
        vertx.cancelTimer(sendCmdTask.getTimerId());
        Future<JSONObject> future;
        if (sendCmdTask.getIsWrite()) {
            future = executeWrite(sendCmdTask.getReqParam(), sendCmdTask.getParamConfigs(), sendCmdTask.getSlaveId());
        } else {
            future = executeQuery(sendCmdTask.getParamConfigs(), sendCmdTask.getSlaveId());
        }
        return future.onSuccess(res -> sendCmdTask.getPromise().tryComplete(res))
                .onFailure(err -> sendCmdTask.getPromise().tryFail(err)).map(o -> null);
    }

    /**
     * 写数据
     * @param reqParam 下发参数
     * @param paramConfigs 参数配置列表
     * @param slaveId 从机ID
     * @return
     */
    private Future<JSONObject> executeWrite(JSONObject reqParam, List<ModbusParamConfig> paramConfigs, Integer slaveId) {
        if (!isActive()) {
            return Future.failedFuture("Gateway offline");
        }
        boolean isMerge = isMergeSendCmd(paramConfigs);
        if (isMerge) {
            int registerAddress = paramConfigs.get(0).getRegisterAddress();
            ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();
            Promise<byte[]> promise = Promise.promise();
            List<String> keyList = paramConfigs.stream().map(ModbusParamConfig::getName).collect(Collectors.toList());
            return vertx.executeBlocking(h -> {
                Buffer buffer = getWriteCmd(registerAddress, slaveId, reqParam, keyList, registerType, promise);
                netSocket.write(buffer);
                promise.future().onSuccess(buf -> {
                    h.complete(reqParam);
                }).onFailure(err -> {
                    log.error("Modbus executeWrite fail, ip:{}, port:{}, slaveId:{}, msg:{}", ip, port, slaveId, err.getMessage());
                    h.tryFail(err.getMessage());
                });
            });
        }

        List<Future<Object>> futures = new ArrayList<>();
        Future blockingFuture = Future.succeededFuture();
        for (int i = 0; i < paramConfigs.size(); i++) {
            ModbusParamConfig paramConfig = paramConfigs.get(i);
            ModbusParamConfig.RegisterType registerType = paramConfig.getRegisterType();
            Promise<byte[]> promise = Promise.promise();
            blockingFuture = blockingFuture.compose(suc -> singleExecuteWrite(slaveId, reqParam, promise, registerType, paramConfig),
                    err -> singleExecuteWrite(slaveId, reqParam, promise, registerType, paramConfig));
            futures.add(blockingFuture);
        }
        return commonReplyResult(futures, paramConfigs);

    }

    private Future<Object> singleExecuteWrite(int slaveId, JSONObject reqParam, Promise<byte[]> promise, ModbusParamConfig.RegisterType registerType, ModbusParamConfig paramConfig) {
        return vertx.executeBlocking(h -> {
            Buffer buffer = getWriteCmd(paramConfig.getRegisterAddress(), slaveId, reqParam, Arrays.asList(paramConfig.getName()), registerType, promise);
            netSocket.write(buffer);
            promise.future().onSuccess(buf -> {
                h.tryComplete(reqParam.get(paramConfig.getName()));
            }).onFailure(err -> {
                log.error("Modbus executeWrite fail, ip:{}, port:{}, slaveId:{}, key:{}, msg:{}",
                        ip, port, slaveId, paramConfig.getName(), err.getMessage());
                h.tryFail(err.getMessage());
            });
        });
    }

    /**
     * 读数据
     * @param paramConfigs 参数配置列表
     * @param slaveId 从机ID
     * @return
     */
    private Future<JSONObject> executeQuery(List<ModbusParamConfig> paramConfigs, Integer slaveId) {
        if (!isActive()) {
            return Future.failedFuture("Gateway offline");
        }
        boolean isMerge = isMergeSendCmd(paramConfigs);
        if (isMerge) {
            int registerAddress = paramConfigs.get(0).getRegisterAddress();
            ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();
            int num = paramConfigs.size();
            Promise<byte[]> promise = Promise.promise();
            Buffer buffer = getQueryCmd(registerAddress, num, slaveId, registerType, promise);
            return vertx.executeBlocking(h -> {
                netSocket.write(buffer);
                promise.future().onSuccess(buf -> {
                    JSONObject jsonObject = new JSONObject();
                    for (int i = 0; i < paramConfigs.size(); i++) {
                        ModbusParamConfig paramConfig = paramConfigs.get(i);
                        switch (registerType) {
                            case COIL:
                                Integer pow = Double.valueOf(Math.pow(2, i % 8)).intValue();
                                jsonObject.put(paramConfig.getName(), (pow & buf[i / 8 + 1]) == pow);
                                break;
                            case INPUT_REGISTER:
                            case HOLDING_REGISTER:
                                jsonObject.put(paramConfig.getName(), getValue(ByteUtil.bytesToInt(buf, i * 2 + 1, 2), paramConfig.getNumberSplit(), paramConfig.getDataType()));
                                break;
                        }
                    }
                    h.complete(jsonObject);
                }).onFailure(err -> {
                    log.error("Modbus executeQuery fail, ip:{}, port:{}, slaveId:{}, msg:{}", ip, port, slaveId, err.getMessage());
                    h.tryFail(err.getMessage());
                });
            });
        }
        List<Future<Object>> futures = new ArrayList<>();
        Future blockingFuture = Future.succeededFuture();
        for (int i = 0; i < paramConfigs.size(); i++) {
            ModbusParamConfig paramConfig = paramConfigs.get(i);
            ModbusParamConfig.RegisterType registerType = paramConfig.getRegisterType();
            Promise<byte[]> promise = Promise.promise();
            blockingFuture = blockingFuture.compose(suc -> singleExecuteQuery(slaveId, promise, registerType, paramConfig),
                    err -> singleExecuteQuery(slaveId, promise, registerType, paramConfig));
            futures.add(blockingFuture);
        }
        return commonReplyResult(futures, paramConfigs);
    }

    private Future<Object> singleExecuteQuery(int slaveId, Promise<byte[]> promise, ModbusParamConfig.RegisterType registerType, ModbusParamConfig paramConfig) {
        return vertx.executeBlocking(h -> {
            Buffer buffer = getQueryCmd(paramConfig.getRegisterAddress(), 1, slaveId, paramConfig.getRegisterType(), promise);
            netSocket.write(buffer);
            promise.future().onSuccess(buf -> {
                switch (registerType) {
                    case COIL:
                        h.complete(Integer.valueOf(buf[1]) == 1);
                        break;
                    case INPUT_REGISTER:
                    case HOLDING_REGISTER:
                        h.complete(getValue(ByteUtil.bytesToInt(buf, 1, 2), paramConfig.getNumberSplit(), paramConfig.getDataType()));
                        break;
                }
            }).onFailure(err -> {
                log.error("Modbus executeQuery fail, ip:{}, port:{}, slaveId:{}, key:{}, msg:{}",
                        ip, port, slaveId, paramConfig.getName(), err.getMessage());
                h.tryFail(err.getMessage());
            });
        });
    }

    /**
     * 如果所有参数寄存器类型一致并且地址连续 则合并成一条命令下发
     * @param paramConfigs
     * @return 是否可以合并下发命令
     */
    private boolean isMergeSendCmd(List<ModbusParamConfig> paramConfigs) {
        if (paramConfigs.size() == 1) {
            return false;
        }
        int lastPos = paramConfigs.get(0).getRegisterAddress();
        ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();
        for (int i = 1; i < paramConfigs.size(); i++) {
            int curPos = paramConfigs.get(i).getRegisterAddress();
            if (curPos - lastPos != 1) {
                return false;
            }
            ModbusParamConfig.RegisterType curRegisterType = paramConfigs.get(i).getRegisterType();
            if (registerType != curRegisterType) {
                return false;
            }
            lastPos = curPos;
        }
        return true;
    }

    /**
     * 获取查询数据命令
     * @param startPos 查询地址
     * @param num 查询数量
     * @param slaveId 从机ID
     * @param registerType 寄存器类型
     * @param promise
     * @return
     */
    private Buffer getQueryCmd(int startPos, int num, int slaveId, ModbusParamConfig.RegisterType registerType, Promise<byte[]> promise) {
        byte[] bytes = new byte[6];
        bytes[0] = ByteUtil.intToBytes(slaveId, 3, 1)[0];
        switch (registerType) {
            case COIL:
                bytes[1] = ModbusFunc.READ_COILS.getFunc();
                break;
            case HOLDING_REGISTER:
                bytes[1] = ModbusFunc.READ_HOLDING_REGISTERS.getFunc();
                break;
            case INPUT_REGISTER:
                bytes[1] = ModbusFunc.READ_INPUT_REGISTERS.getFunc();
                break;
        }
        Integer func = ByteUtil.bytesToInt(bytes, 1, 1);
        String key = String.format("%s_%s", slaveId, func);
        byte[] startPosBytes = ByteUtil.intToBytes(startPos, 0, 4);
        bytes[2] = startPosBytes[2];
        bytes[3] = startPosBytes[3];
        byte[] numBytes = ByteUtil.intToBytes(num, 0, 4);
        bytes[4] = numBytes[2];
        bytes[5] = numBytes[3];
        Buffer buffer = new BufferImpl();
        buffer.appendBytes(mode.writeData(bytes));
        promiseMap.put(key, promise);
        long timeId = vertx.setTimer(reqTimeoutSecond * 1000, h -> promise.tryFail("Request timeout"));
        promise.future().onComplete(res -> {
            promiseMap.remove(key);
            vertx.cancelTimer(timeId);
        });
        return buffer;
    }

    /**
     * 获取写数据命令
     * @param startPos 查询地址
     * @param slaveId 从机ID
     * @param reqParam 写参数
     * @param keys  参数列表
     * @param registerType 寄存器类型
     * @param promise
     * @return
     */
    private Buffer getWriteCmd(int startPos, int slaveId, JSONObject reqParam,
                               List<String> keys, ModbusParamConfig.RegisterType registerType, Promise<byte[]> promise) {
        int len = keys.size() == 1 ? 6 : (registerType == ModbusParamConfig.RegisterType.HOLDING_REGISTER ?
                7 + keys.size() * 2 : 7 + Double.valueOf(Math.ceil(keys.size() / 8.0)).intValue());
        byte[] bytes = new byte[len];
        bytes[0] = ByteUtil.intToBytes(slaveId, 3, 1)[0];
        byte[] startPosBytes = ByteUtil.intToBytes(startPos, 0, 4);
        bytes[2] = startPosBytes[2];
        bytes[3] = startPosBytes[3];
        if (keys.size() == 1) {
            switch (registerType) {
                case COIL:
                    bytes[1] = ModbusFunc.WRITE_SINGLE_COILS.getFunc();
                    boolean value = reqParam.getBoolean(keys.get(0));
                    if (value) {
                        bytes[4] = (byte) 0xFF;
                    } else {
                        bytes[4] = 0x00;
                    }
                    bytes[5] = 0x00;
                    break;
                case HOLDING_REGISTER:
                    bytes[1] = ModbusFunc.WRITE_SINGLE_HOLDING_REGISTERS.getFunc();
                    byte[] dataArr = ByteUtil.intToBytes(reqParam.getInteger(keys.get(0)), 2, 2);
                    bytes[4] = dataArr[0];
                    bytes[5] = dataArr[1];
                    break;
            }
        } else {
            byte[] dataNum = ByteUtil.intToBytes(keys.size(), 2, 2);
            bytes[4] = dataNum[0];
            bytes[5] = dataNum[1];
            switch (registerType) {
                case COIL:
                    bytes[1] = ModbusFunc.WRITE_MULTI_COILS.getFunc();
                    int dataSize = Double.valueOf(Math.ceil(keys.size() / 8.0)).intValue();
                    bytes[6] = ByteUtil.intToBytes(dataSize, 3, 1)[0];
                    for (int i = 0; i < dataSize; i += 2) {
                        int sum = 0;
                        int startIndex = i * 8;
                        int endIndex = (i + 2) * 8;
                        endIndex = endIndex > keys.size() ? keys.size() : endIndex;
                        for (int j = startIndex; j < endIndex; j++) {
                            sum += Double.valueOf(Math.pow(2, j)).intValue() * (reqParam.getBoolean(keys.get(j)) ? 1 : 0);
                        }
                        byte[] sumArr = ByteUtil.intToBytes(sum, 2, 2);
                        if (i + 8 < keys.size()) {
                            bytes[i + 7] = sumArr[0];
                            bytes[i + 8] = sumArr[1];
                        } else {
                            bytes[i + 7] = sumArr[1];
                        }
                    }
                    break;
                case HOLDING_REGISTER:
                    bytes[1] = ModbusFunc.WRITE_MULTI_HOLDING_REGISTERS.getFunc();
                    bytes[6] = ByteUtil.intToBytes(keys.size() * 2, 3, 1)[0];
                    for (int i = 0; i < keys.size(); i++) {
                        String paramKey = keys.get(i);
                        Integer value = reqParam.getInteger(paramKey);
                        byte[] dataArr = ByteUtil.intToBytes(value, 2, 2);
                        bytes[i * 2 + 7] = dataArr[0];
                        bytes[i * 2 + 8] = dataArr[1];
                    }
                    break;
            }
        }
        Integer func = ByteUtil.bytesToInt(bytes, 1, 1);
        String key = String.format("%s_%s", slaveId, func);
        Buffer buffer = new BufferImpl();
        buffer.appendBytes(mode.writeData(bytes));
        promiseMap.put(key, promise);
        long timeId = vertx.setTimer(reqTimeoutSecond * 1000, h -> promise.tryFail("Request timeout"));
        promise.future().onComplete(res -> {
            promiseMap.remove(key);
            vertx.cancelTimer(timeId);
        });
        return buffer;
    }

    private Future<JSONObject> commonReplyResult(List<Future<Object >> futures, List<ModbusParamConfig> paramConfigs) {
        return vertx.executeBlocking(b -> {
            Future.join(futures).onComplete(h -> {
                JSONObject okJson = new JSONObject();
                JSONObject errJson = new JSONObject();
                for (int i = 0; i < paramConfigs.size(); i++) {
                    ModbusParamConfig paramConfig = paramConfigs.get(i);
                    Future<Object> objectFuture = futures.get(i);
                    if (objectFuture.succeeded()) {
                        okJson.put(paramConfig.getName(), objectFuture.result());
                    } else {
                        errJson.put(paramConfig.getName(), objectFuture.cause().getMessage());
                    }
                }
                if (okJson.size() > 0) {
                    b.tryComplete(okJson);
                } else {
                    b.tryFail(errJson.getString(paramConfigs.get(0).getName()));
                }
            });
        });
    }

    private Object getValue(int value, int numberSplit, ModbusParamConfig.DataType dataType) {
        if (numberSplit == 1) {
            return value;
        }
        Float temp = value * 1f / numberSplit;
        switch (dataType) {
            case INT :
                return Math.round(temp);
            case FLOAT:
                return temp;
        }
        return temp;
    }

}

测试

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.bho.modbus.model.ModbusMode;
import com.bho.modbus.core.ModbusConnection;
import com.bho.modbus.model.ModbusParamConfig;
import com.bho.modbus.model.SendCmdTask;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import lombok.extern.log4j.Log4j2;

import java.util.List;

@Log4j2
public class TestModbus {
    public static final String READ_DATA = "[" +
            "        {" +
            "          \"name\": \"a\"," +
            "          \"registerType\": \"HOLDING_REGISTER\"," +
            "          \"registerAddress\": 504," +
            "          \"dataType\": \"FLOAT\"," +
            "          \"numberSplit\": 10" +
            "        }," +
            "        {" +
            "          \"name\": \"b\"," +
            "          \"registerType\": \"HOLDING_REGISTER\"," +
            "          \"registerAddress\": 505," +
            "          \"dataType\": \"FLOAT\"," +
            "          \"numberSplit\": 10" +
            "        }," +
            "        {" +
            "          \"name\": \"c\"," +
            "          \"registerType\": \"HOLDING_REGISTER\"," +
            "          \"registerAddress\": 506," +
            "          \"dataType\": \"FLOAT\"," +
            "          \"numberSplit\": 10" +
            "        }," +
            "        {" +
            "          \"name\": \"d\"," +
            "          \"registerType\": \"HOLDING_REGISTER\"," +
            "          \"registerAddress\": 507," +
            "          \"dataType\": \"INT\"," +
            "          \"numberSplit\": 1" +
            "        }," +
            "        {" +
            "          \"name\": \"e\"," +
            "          \"registerType\": \"HOLDING_REGISTER\"," +
            "          \"registerAddress\": 508," +
            "          \"dataType\": \"INT\"," +
            "          \"numberSplit\": 1" +
            "        }]";

    private static final String WRITE_DATA = "[" +
            "        {" +
            "          \"name\": \"do0\"," +
            "          \"registerType\": \"COIL\"," +
            "          \"registerAddress\": 20," +
            "          \"dataType\": \"BOOL\"," +
            "          \"numberSplit\": 1" +
            "        }" +
            "        ,{" +
            "          \"name\": \"do1\"," +
            "          \"registerType\": \"COIL\"," +
            "          \"registerAddress\": 21," +
            "          \"dataType\": \"BOOL\"," +
            "          \"numberSplit\": 1" +
            "        }" +

            "]";
    public static void main(String[] args) {
        testReadData();
//        testWriteData();;


    }

    private static void testWriteData() {
        Vertx vertx = Vertx.vertx();
        ModbusConnection connection = new ModbusConnection(vertx,"127.0.0.1", 502, 30, ModbusMode.TCP);
        Future<Boolean> connectFuture = connection.connect();
        JSONObject reqParam = new JSONObject();
        reqParam.put("do0", false);
        reqParam.put("do1", false);
        List<ModbusParamConfig> modbusParamConfigs = JSONArray.parseArray(WRITE_DATA, ModbusParamConfig.class);
        connectFuture.onComplete(con -> {
            if (connectFuture.succeeded()) {
                SendCmdTask task = new SendCmdTask(vertx, modbusParamConfigs, null, false, 21, 10);
                Promise<JSONObject> promise = connection.offerTask(task);
                promise.future().onSuccess(suc -> {
                    log.info("read:"+suc);
                }).onFailure(err -> System.err.println(err.getMessage()));

                SendCmdTask task2 = new SendCmdTask(vertx, modbusParamConfigs, reqParam, true, 21, 10);
                Promise<JSONObject> promise2 = connection.offerTask(task2);
                promise2.future().onSuccess(suc -> {
                    log.info("write:"+suc);
                }).onFailure(err -> System.err.println(err.getMessage()));
            } else {
                System.err.println("gateway offline");
            }
        });

    }

    private static void testReadData() {
        Vertx vertx = Vertx.vertx();
        ModbusConnection connection = new ModbusConnection(vertx,"127.0.0.1", 502, 30, ModbusMode.TCP);
        Future<Boolean> connectFuture = connection.connect();
        List<ModbusParamConfig> modbusParamConfigs = JSONArray.parseArray(READ_DATA, ModbusParamConfig.class);
        connectFuture.onComplete(con -> {
            if (connection.isActive()) {
                SendCmdTask task = new SendCmdTask(vertx, modbusParamConfigs, null, false, 2, 10);
                Promise<JSONObject> promise = connection.offerTask(task);
                promise.future().onSuccess(suc -> {
                    log.info(suc);
                }).onFailure(err -> System.err.println(err.getMessage()));
            } else {
                System.err.println("gateway offline");
            }
        });

    }
}

运行结果如下:
其实这两个读写示例如果是一个网关可以共用一个Modbus连接。
Java实现Modbus读写数据,Modbus
Java实现Modbus读写数据,Modbus

modbus-app配置参数

格式如下:

{
  "readable": {
    "devType01": {
      "ReportData": [
        {
          "name" : "xxx",
          "registerType" : "COIL",
          "registerAddress" : 1,
          "dataType" : "BOOL",
          "numberSplit" : 1
        }
      ]
    },
    "devType02": {
      "ReportData": [
        {
          "name" : "a",
          "registerType" : "HOLDING_REGISTER",
          "registerAddress" : 1,
          "dataType" : "INT",
          "numberSplit" : 1
        },
        {
          "name" : "b",
          "registerType" : "HOLDING_REGISTER",
          "registerAddress" : 2,
          "dataType" : "INT",
          "numberSplit" : 10
        },
        {
          "name": "c",
          "registerType": "",
          "dataType": "FLOAT",
          "mbScript": "(a*10000+b)/10"
        }
      ]
    }
  },
  "writable": {
    "devType01": {
      "Control": [
        {
          "name": "operation",
          "registerType": "COIL",
          "registerAddress": 21,
          "dataType": "BOOL",
          "numberSplit": 1
        }
      ]
    }
  },
  "readDataPeriods": [
    {
      "period" : 60,
      "deviceTypes": ["devType01"]
    },
    {
      "period" : 600,
      "deviceTypes": ["devType02","devType03"]
    }
  ]
}

具体怎么实现这边就不过多讲解了…

结束

不保证代码正确,我这边只是大概实现了一下,仅供参考。若有问题,请批评指出,我会虚心接受并积极修复问题。文章来源地址https://www.toymoban.com/news/detail-704632.html

到了这里,关于Java实现Modbus读写数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • java集成mqtt、rabbitmq服务远程连接数dtu实现寄存器rtu数据读写

    数据采集及写入流程设计图 一、硬件设备 硬件设备与原有设备保持不变通过配置dtu设备进行mqtt穿透功能进行数据交互 1、dtu配置详解: 1.1 dtu工具 本项目使用塔石TAS-LTE-364支持4G无线dtu模块,下载安装塔石物联网厂家提供的串口测试程序Tool V2.7.1 D20220616.exe 1.2打开程序选择对

    2024年02月03日
    浏览(45)
  • JAVA modbus4j 实现modbus tcp通讯

    1.maven依赖 2.在modbus进行读写之前,需要先建立连接,例如:建立modbus tcp通讯  3.modbus4j 读工具类 4. modbus4j 写工具类

    2024年02月16日
    浏览(52)
  • java项目实现读写分离,项目连接Linux部署的数据库异常javax.net.ssl.SSLHandshakeException: No appropriate protocol

    1、对项目进行优化实现读写分离,项目启动时报错如下: Caused by: javax.net.ssl.SSLHandshakeException: No appropriate protocol (protocol is disabled or cipher suites are inappropriate) 原因:javax.net.ssl.SSLHandshakeException:没有适当的协议(协议被禁用或密码套件不合适) 2、bug解决:     注意: useSSL=f

    2024年02月04日
    浏览(52)
  • 【2023】java通过modbus4j实现modus TCP通讯

    主要分为三个子协议 RTU ASCII TCP Modbus RTU:——传输的是字节数组(bit[]) 通信:读写 输出:可以读写 输入:只能读 存储区:输出线圈、输入线圈、输出寄存器、输入寄存器 线圈:代表一个布尔量、最小单位是一个布尔(1或者0), 寄存器:一个寄存器代表16个最小单位,主

    2024年02月12日
    浏览(44)
  • Java文件读写数据流

    以下这几个类都是抽象类.并且都有对于文件操作的具体实现类.File+类名就是具体的实现类 1.1.1.InputStream 以二进制方式读.有两个主要方法. 1.read(); 该方法有三个版本 无参: read() 读取一个字节的数据,返回 -1 表示读取结束 一个参数: read(byte[] b) 最多读取 b.length 字节的数据到 b

    2024年02月16日
    浏览(44)
  • Java 中如何实现文件的读写操作?(十六)

    在Java中,文件I/O(输入/输出)操作是一项非常基础的任务。在Java中,可以使用File和FileInputStream、FileOutputStream、BufferedReader、PrintWriter等类来进行文件读写操作。 文件读取 在Java中,可以使用FileInputStream和BufferedReader类来读取文件。 FileInputStream: FileInputStream是一个用于从文件

    2024年02月02日
    浏览(37)
  • Java 基于Apache POI实现Excel读写操作

    Win10 Java JDK1.8 pom.xml配置 代码实现 exmple.xml 补充说明 创建工作簿 POI创建工作簿的API有3种: HSSFWorkbook : 此API用于操作Excel 2003及之前的版本(文件扩展名 .xls ),优点是导出速度快,缺点是导出的行数有局限性,最多为65535行,超出65536条后系统就会报错。对内存消耗比较大,容

    2024年02月15日
    浏览(40)
  • 【Java 编程】文件操作,文件内容的读写—数据流

    平时说的文件一般都是指存储在 硬盘 上的普通文件 形如 txt, jpg, mp4, rar 等这些文件都可以认为是普通文件,它们都是在硬盘上存储的 在计算机中,文件可能是一个 广义的概念 ,就不只是包含普通文件,还可以包含 目录 (把目录称为目录文件) 操作系统中,还会使用文件来描

    2023年04月08日
    浏览(50)
  • S7-1200中通过MODBUS TCP客户端在一次请求中实现从服务器读写一个或多个保持性寄存器的具体方法

    TIA博途V17中增加了MODBUS TCP客户端功能码 23,可以在一次请求作业下实现从服务器读取和写入一个或多个保持性寄存器,这样省去了轮询的编程工作量,提高了工作效率,如下图所示, 使用该指令的前提条件: • TIA Portal V17 及以上版本 • CPU 固件 V4.2 及以上版本 具体操作方

    2024年02月12日
    浏览(45)
  • 数据库:mycat实现读写分离

    目录 一、mycat 1、mycat实现读写分离原理 2、mycat应用场景 3、mycat作用 4、mycat实现读写分离实战 1、mycat实现读写分离原理 ①用户进行读操作则由mycat转给配置的从数据库。 ②用户进行写操作则由mycat转给配置的主数据库。 ③转发规则由mycat配置文件中定义,那台是读那台是写

    2024年02月07日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包