Python modbus_tk 库源码分析

这篇具有很好参考价值的文章主要介绍了Python modbus_tk 库源码分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Python modbus_tk 库源码分析

前言

modbus_tcp 协议是工业项目中常见的一种基于 TCP/IP 协议的设备数据交互协议。

作为 TCP/IP 协议的上层协议,modbus_tcp 协议涉及到两个概念:client 和 server。但更标准的叫法应该是 master 和 slave。

  • Slave:TCP/IP 协议中的 server 方
  • Master:TCP/IP 协议中的 client 方

而 modbus_tk 库作为 Python 中著名的 modbus 协议封装模块,其源码值得深入研究。

特别是在对并发量等方面有一定要求的情况下,如果需要在 modbus_tk 模块的基础上进行更进一步的开发,就更应该仔细研究其源代码和实现逻辑。

因此,我写下了这篇文章,希望对你有所帮助。

实例化 TcpMaster 对象

导入 TcpMaster 类:

from modbus_tk.modbus_tcp import TcpMaster

TcpMaster 继承于 Master,在其实例化的时候什么也没做。

class TcpMaster(Master):
    def __init__(self, host="127.0.0.1", port=502, timeout_in_sec=5.0):
        super(TcpMaster, self).__init__(timeout_in_sec)
        self._host = host
        self._port = port
        self._sock = None

Master 的 __init__() 方法中也没有做什么:

class Master(object):
    def __init__(self, timeout_in_sec, hooks=None):
        self._timeout = timeout_in_sec
        self._verbose = False
        self._is_opened = False   # 记住 _is_opened 现在为 False

建立 socket 链接

TcpMaster 的父类 Master 提供了 execute 方法,该方法提供以下参数:

self,
slave,
function_code,
starting_address,
quantity_of_x=0,
output_value=0,
data_format="",
expected_length=-1,
write_starting_address_fc23=0,
number_file=None,
pdu="",
returns_raw=False

此方法基本上算该模块的核心,无论是读写线圈、还是读写寄存器等都是调用该方法。

接下来其代码体的具体实现,我们将开始进行逐行分析:

is_read_function = False
nb_of_digits = 0
if number_file is None:
    number_file = tuple()

self.open()

is_read_function 这里赋值为 False、代表后续在 Master.execute() 方法真正执行前,作者会先认为使用者调用的是 write 方法而非 read 方法。

接下来代码中又调用了 self.open() 方法。 由于实例化 TcpMaster 类时什么也没做, 所以 TCP 链接在此时是还没有建立的,而 self.open() 方法就是创建一个 TCP 的 client 端。

def open(self):
    if not self._is_opened:  # 在初始化方法中,它默认是 False
        self._do_open()
        self._is_opened = True

这里执行的 self._do_open() 方法由 TcpMaster 实现:

def _do_open(self):
    if self._sock:  # 如果 self._sock 不是 None、就将 socket 对象关闭
        self._sock.close()

    # 创建一个 socket 对象,AF_INET 为 IPV4 地址家族
    # SOCK_STREAM 即为基于流的协议,也就是 TCP 协议
    self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 设置超时时间,即实例化 TcpMaster 传入的值,默认参数为 5
    self.set_timeout(self.get_timeout())

    # 允许重用地址(解决端口占用问题)
    self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    call_hooks("modbus_tcp.TcpMaster.before_connect", (self, ))
    # 进行链接
    self._sock.connect((self._host, self._port))
    call_hooks("modbus_tcp.TcpMaster.after_connect", (self, ))

这里的 self.set_timeout 由 TcpMaster 实现:

def set_timeout(self, timeout_in_sec):
    super(TcpMaster, self).set_timeout(timeout_in_sec)
    if self._sock:
        # 注意! 这里如果 timeout_in_sec 等于 0
        # 那么该 sock 对象就是一个链接时非阻塞的
        # 可用于 I/O 多路复用
        self._sock.setblocking(timeout_in_sec > 0)

        # 如果 timeout_in_sec 为 0,则设置为阻塞的 socket 对象
        # timeout 不应该传递负数
        if timeout_in_sec:
            self._sock.settimeout(timeout_in_sec)

看到这里,我们其实不难猜出 modbus_tk 模块中 TcpMaster 的 Master.execute() 方法其实是能支持 self._sock 异常后的无感重联的。

只需要在 slave 方失联后重新调用一次 TcpMaster._do_open() 方法即可,即可实现无感知的重新链接。

写入多个寄存器

接下来 Master.execute() 方法基本是对 TCP 协议的解包、组包代码,我将具体的组包等过程代码都先给注释掉了:

@threadsafe_function
def execute(
    self, slave, function_code, starting_address, quantity_of_x=0, output_value=0, data_format="",
    expected_length=-1, write_starting_address_fc23=0, number_file=None, pdu="", returns_raw=False
):

    is_read_function = False
    nb_of_digits = 0
    if number_file is None:
        number_file = tuple()

    self.open()

    if function_code == defines.READ_COILS or function_code == defines.READ_DISCRETE_INPUTS:
        pass
    elif function_code == defines.READ_INPUT_REGISTERS or function_code == defines.READ_HOLDING_REGISTERS:
        pass

    elif function_code == defines.READ_FILE_RECORD:
        pass

    elif (function_code == defines.WRITE_SINGLE_COIL) or (function_code == defines.WRITE_SINGLE_REGISTER):
        pass

    elif function_code == defines.WRITE_MULTIPLE_COILS:
        pass

    elif function_code == defines.WRITE_MULTIPLE_REGISTERS:
        pass

    elif function_code == defines.READ_EXCEPTION_STATUS:
        pass

    elif function_code == defines.DIAGNOSTIC:
        pass

    elif function_code == defines.READ_WRITE_MULTIPLE_REGISTERS:
        pass

    elif function_code == defines.RAW:
        pass

    elif function_code == defines.DEVICE_INFO:
        pass

    else:
        raise ModbusFunctionNotSupportedError("The {0} function code is not supported. ".format(function_code))

    query = self._make_query()

    request = query.build_request(pdu, slave)

    retval = call_hooks("modbus.Master.before_send", (self, request))
    if retval is not None:
        request = retval
    if self._verbose:
        LOGGER.debug(get_log_buffer("-> ", request))
    self._send(request)

    call_hooks("modbus.Master.after_send", (self, ))

    if slave != 0:
        pass

为了能够继续向下分析,我们这里先以写入多个寄存器的逻辑入手接着向下看:

READ_WRITE_MULTIPLE_REGISTERS

其代码为:

elif function_code == defines.WRITE_MULTIPLE_REGISTERS:
     # 输出值和 format,如果指定了 format 和输出值,将运行下面的逻辑
    if output_value and data_format:
        byte_count =  struct.calcsize(data_format)

    # 否则先计算整个 byte 的长度
    else:
        byte_count = 2 * len(output_value)

    # 使用 struct 对数据进行转换, 采用大端排列
    pdu = struct.pack(">BHHB", function_code, starting_address, byte_count // 2, byte_count)

     # 输出值和 format,如果指定了 format 和输出值,将运行下面的逻辑
    if output_value and data_format:
        pdu += struct.pack(data_format, *output_value)

    # 一般我们不会指定 data_format,所以直接往下看
    else:
        for j in output_value:
            # 若 j 大于 0 fmt 就是 H 否则是 h
            fmt = "H" if j >= 0 else "h"
            # 继续向 pdu 里加数据
            pdu += struct.pack(">" + fmt, j)
    data_format = ">HH"
    if expected_length < 0:
        expected_length = 8

构建数据包

无论是读取、写入线圈或者寄存器,每一个请求都会包含一个 pdu 数据单元。

在 Master.execute() 方法下面,每一种读写操作都会运行 TcpMaster._make_query() 方法:

query = self._make_query()
request = query.build_request(pdu, slave)

下面是 TcpMaster._make_query 的代码:

def _make_query(self):
    return TcpQuery()

TcpQuery 属于 Query 的派生类,但 Query 实际上是一个 interface 类,故没有实际代码:

class TcpQuery(Query):

    _last_transaction_id = 0  # 记住这个类属性

    def __init__(self):
        super(TcpQuery, self).__init__()
        self._request_mbap = TcpMbap()
        self._response_mbap = TcpMbap()

TcpMbap 类的实例化过程也非常简单,TcpQuery 中实际上就是封装了一个 request 和 response 而已:

class TcpMbap(object):
    def __init__(self):
        self.transaction_id = 0
        self.protocol_id = 0
        self.length = 0
        self.unit_id = 0

TcpQuery.build_request() 的实现:

def build_request(self, pdu, slave):
    if (slave < 0) or (slave > 255):
        raise InvalidArgumentError("{0} Invalid value for slave id".format(slave))

    self._request_mbap.length = len(pdu) + 1  #  pdu 数据单元的长度 + 1
    self._request_mbap.transaction_id = self._get_transaction_id() # 获取一个事务 id
    self._request_mbap.unit_id = slave  # 站号
    mbap = self._request_mbap.pack()    # 打包
    # mbap 和 pdu 数据单元拼接并返回
    # mbap 可以认为是 head 而 pdu 则是 body
    return mbap + pdu

TcpQuery._get_transaction_id() 会在每次收发包时,都让事务号自增 1,当事务号增加到 65535 后,置 0:

@threadsafe_function
def _get_transaction_id(self):
    if TcpQuery._last_transaction_id < 0xffff:  # 65535
        TcpQuery._last_transaction_id += 1
    else:
        TcpQuery._last_transaction_id = 0
    return TcpQuery._last_transaction_id

TcpMbap.pack() 方法会将所有 TcpMbap.__init__() 中的实例属性通过 struct 进行封包:

def pack(self):
    # transaction_id 事务号
    # protocol_id 0
    # length pdu 数据单元的长度 + 1
    # unit_id 设备站号 slave
    return struct.pack(">HHHB", self.transaction_id, self.protocol_id, self.length, self.unit_id)

至此,request 请求已经构建完毕了。

发送请求

让我们接着回到 Master.execute() 方法中:

# call_hooks 实际上是运行钩子函数,在后面会有详细介绍
retval = call_hooks("modbus.Master.before_send", (self, request))
if retval is not None:
    request = retval

# 是否需要打印更多的日志?这个可以通过 Master.set_verbose() 方法进行设置
# 其默认值为 False
if self._verbose:
    LOGGER.debug(get_log_buffer("-> ", request))

# 发送请求
self._send(request)

call_hooks("modbus.Master.after_send", (self, ))

在 TcpMaster._send() 方法中:

def _send(self, request):
    retval = call_hooks("modbus_tcp.TcpMaster.before_send", (self, request))
    if retval is not None:
        request = retval
    try:
        # 刷新 socket 确保链接可用
        flush_socket(self._sock, 3)
    except Exception as msg:
        LOGGER.error('Error while flushing the socket: {0}'.format(msg))
        # 异常后、将再次运行 TcpMaster._do_open() 尝试重联
        self._do_open()

    # 若 flush_socket() 函数运行没有抛出异常,则代表链接是可用的。
    # 这时候才会发送数据
    self._sock.send(request)

flush_socket() 函数非常有趣,它通过 select 模块来不断的轮询监听 sock 对象的可读状态,当可读时会自动读取每一次的 1024 个字节数据并将他们抛弃,这里是为了保持发送数据前的连接状态检测没有异常而做的一步操作:

def flush_socket(socks, lim=0):
    # lim 传入的是 3, 代表最多读 3 次
    input_socks = [socks]  # 做成一个监听列表
    cnt = 0 # 当前读取到的次数
    while True:
        # 放入 可读事件列表、可写事件列表、错误事件列表 及监听对象
        # 它会返回一个列表:
        #  [[r_fd, r_fd], [w_fd, w_fd], [e_fd, e_fd]]
        # 而 [0] 则是指只拿到可读的文件描述符列表
        # 循环事件时间设置的是 0.0 这代表它将一直阻塞在这里,直到 fd 事件被触发
        # 若不为 0,则等待 n 秒,进行下一次的循环
        i_socks = select.select(input_socks, input_socks, input_socks, 0.0)[0]

        # 没有可读的文件描述符,则跳出 while 循环
        if len(i_socks) == 0:
            break

        # 若拿到了,就循环得到 socks 进行 recv
        # 其实这里应该也可以写成 i_socks[0].recv(1024)
        # 因为可读事件文件描述符
        for sock in i_socks:
            sock.recv(1024)

        # 超出了最大读取限制, 这里应该代表的是连接断开了
        if lim > 0:
            cnt += 1
            if cnt >= lim:
                raise Exception("flush_socket: maximum number of iterations reached")

至此、我们一次完整的组包及发送数据的源码分析就走完了。

解析响应

我们接着来看 Master.execute() 方法中关于解析响应信息的代码:

if slave != 0:
    response = self._recv(expected_length)
    pass

首先,如果站号不等于 0 就会执行 TcpMaster._recv() 方法:

def _recv(self, expected_length=-1):
    # to_data 函数会根据 Python 版本来返回不同的内容
    # 若是 Python2 则直接返回 string ''
    # 若是 Python3 则会返回一个 bytearray('', 'ascii')
    response = to_data('')
    length = 255

    # 如果 response 小于 255, 则不断的读取
    while len(response) < length:
        rcv_byte = self._sock.recv(1)
        if rcv_byte:
            response += rcv_byte
            # 在第 6 个字节处、通过 struct.unpack() 进行拆包
            if len(response) == 6:
                to_be_recv_length = struct.unpack(">HHH", response)[2]
                length = to_be_recv_length + 6
        else:
            break
    retval = call_hooks("modbus_tcp.TcpMaster.after_recv", (self, response))
    if retval is not None:
        return retval
    return response

得到 response 后,Master.execute() 方法会开始解析响应信息:

retval = call_hooks("modbus.Master.after_recv", (self, response))
if retval is not None:
    response = retval
if self._verbose:
    LOGGER.debug(get_log_buffer("<- ", response))

response_pdu = query.parse_response(response)

TcpQuery.parse_response() 方法的代码主要将 mbap 和 pdu 进行分离,并且通过 TcpMbap.unpack() 方法将 mbap 解包并通过 TcpMbap.check_response() 进行数据校验:

def parse_response(self, response):
    if len(response) > 6:
        # 分别拿到 mbap 和 pdu
        mbap, pdu = response[:7], response[7:]
        # 解包
        self._response_mbap.unpack(mbap)
        # 校验数据,传入请求的 mbap 以及 pdu 的长度
        self._response_mbap.check_response(self._request_mbap, len(pdu))
        # 返回 pdu
        return pdu
    else:
        raise ModbusInvalidResponseError("Response length is only {0} bytes. ".format(len(response)))

TcpMbap.unpack() 方法代码如下,将 _response_mbap 的事务号协议 id 等信息进行更新:

def unpack(self, value):
    (self.transaction_id, self.protocol_id, self.length, self.unit_id) = struct.unpack(">HHHB", value)

TcpMbap.check_response() 方法代码如下:

def check_response(self, request_mbap, response_pdu_length):
    error_str = self._check_ids(request_mbap)
    error_str += self.check_length(response_pdu_length)

    if len(error_str) > 0:
        raise ModbusInvalidMbapError(error_str)

TcpMbap._check_ids() 方法代码如下:

    def _check_ids(self, request_mbap):
        # self 是响应体, request_mbap 是请求体
        # 对比他们的事务号等信息是否一致,若不一致则会在返回一个 error_str, 该 error_str 会在 TcpMbap.check_response()
        # 中被 raise
        error_str = ""

        if request_mbap.transaction_id != self.transaction_id:
            error_str += "Invalid transaction id: request={0} - response={1}. ".format(
                request_mbap.transaction_id, self.transaction_id)

        if request_mbap.protocol_id != self.protocol_id:
            error_str += "Invalid protocol id: request={0} - response={1}. ".format(
                request_mbap.protocol_id, self.protocol_id
            )

        if request_mbap.unit_id != self.unit_id:
            error_str += "Invalid unit id: request={0} - response={1}. ".format(request_mbap.unit_id, self.unit_id)

        return error_str

TcpMbap.check_length() 方法代码如下:

def check_length(self, pdu_length):
    # 这里思考 pdu 长度为什么 + 1?
    # 因为 response 在 TcpMbap.unpack() 方法中,self.length 是 mbap + pdu 的长度
    # 所以这里 pdu_length 长度 + 1 实际上就是指整个 head + body 的长度
    following_bytes_length = pdu_length+1

    # 判断长度是否相等、若不等可能造成的原因是数据拆包不正确 mbap 长了,或者 pdu 短了
    # 这种时候就直接返回一个字符串
    # TcpMbap.check_response() 中如果 error_str 的长度大于 0, 就会抛出异常了
    if self.length != following_bytes_length:
        return "Response length is {0} while receiving {1} bytes. ".format(self.length, following_bytes_length)
    return ""

至此,TcpQuery().parse_response() 方法就全部执行完毕了。

Master.execute() 方法中就得到了数据单元 pdu。也就是整个数据体。

我们接着往下看 Master.execute() 方法,其实后面已经没有再深层次调用某些内部代码了,也没有新的 I/O 操作了:


response_pdu = query.parse_response(response)

(return_code, byte_2) = struct.unpack(">BB", response_pdu[0:2])

# 如果返回的 code 大于 128,直接报错
if return_code > 0x80:
    # the slave has returned an error
    exception_code = byte_2
    raise ModbusError(exception_code)
else:
    # 下面都是解析出一个 body 和一个 data_format
    # 分别是 读操作、设备信息、写操作
    # 他们所得到的 body 都不一样
    if is_read_function:
        byte_count = byte_2
        data = response_pdu[2:]
        if byte_count != len(data):
            # the byte count in the pdu is invalid
            raise ModbusInvalidResponseError(
                "Byte count is {0} while actual number of bytes is {1}. ".format(byte_count, len(data))
            )
    elif function_code == defines.DEVICE_INFO:
        data = response_pdu[1:]
        data_format = ">" + (len(data) * "B")
    else:
        # returns what is returned by the slave after a writing function
        data = response_pdu[1:]

    # 默认为 False
    if returns_raw:
        return data

    # 解包,通过 读、写、设备信息所得到的 data_format 和 data
    # 对数据进行操作
    result = struct.unpack(data_format, data)

    # 只有 function_code 是 READ_COILS 时,nb_of_digits 才不为 0
    if nb_of_digits > 0:
        digits = []
        for byte_val in result:
            for i in range(8):
                if len(digits) >= nb_of_digits:
                    break
                digits.append(byte_val % 2)
                byte_val = byte_val >> 1
        result = tuple(digits)

    # 如果 function_code 是 READ_FILE_RECORD 读取文件记录,则也需要对 result 进行
    # 再次的修改
    if function_code == defines.READ_FILE_RECORD:
        sub_seq = list()
        ptr = 0
        while ptr < len(result):
            sub_seq += ((ptr + 2, ptr + 2 + result[ptr] // 2), )
            ptr += result[ptr] // 2 + 2
        result = tuple(map(lambda sub_seq_x: result[sub_seq_x[0]:sub_seq_x[1]], sub_seq))

    # 返回 result
    return result

threadsafe 装饰器

threadsafe 是一个装饰器函数,在 Master.execute() 方法头上和 TcpQuery._get_transaction_id() 方法头上都加了这个装饰器。

见名知意,该装饰器的主要目的就是为了保障线程安全(有的设备可能不支持同时对其进行读写操作),但是该装饰器也可能会带来另一些问题。

我们先看它的源码:

def threadsafe_function(fcn):
    # 实例化出了一把递归锁
    lock = threading.RLock()

    def new(*args, **kwargs):
        # 当 Master.execute() 和 TcpQuery._get_transaction_id() 方法没有通过
        # 关键字传参传入 threadsafe=False 时,将默认开启线程安全模式来执行
        # 这 2 个方法
        threadsafe = kwargs.pop('threadsafe', True)
        if threadsafe:
            lock.acquire()
        try:
            ret = fcn(*args, **kwargs)
        except Exception as excpt:
            raise excpt
        finally:
            if threadsafe:
                lock.release()
        return ret
    return new

这个 threading lock 会导致什么问题呢?当 Python 解释器运行到 Master.execute() 方法头上时,就会自动执行该装饰器。

而 lock 变量也就生成了,最后会返回内部闭函数 new()。

可以理解为这个 lock 已经被当成了一个全局变量,后续无论是创建多少个 TcpMaster 的实例对象,lock 变量所指向的锁都是同一个。

通过源码分析我们得知,Master.execute() 方法中会去建立 socket 链接,一旦有 1 个 device 链接时间过长,也将会导致其他的 device 通信或链接阻塞。

因为它们都是用的同一个 lock 锁。所以,一般来说在使用时我们会在 Master.execute() 方法中显式的传递 threadsafe=False 的关键字参数,自己实现 lock 来解决同一 device 不能同时读写的问题。

一些 hooks 钩子函数

在上面分析源码时,我们会看到很多 call_hooks 的运行,他们其实是 modbus_tk 模块所提供的钩子函数。只要实现相应的钩子函数就会在整个 modbus_tcp 的数据传递生命周期中自动运行。

以下是常见的钩子函数:文章来源地址https://www.toymoban.com/news/detail-479008.html

def install_hook(name, fct):
    """
    Install one of the following hook

    modbus_rtu.RtuMaster.before_open((master,))
    modbus_rtu.RtuMaster.after_close((master,)
    modbus_rtu.RtuMaster.before_send((master, request)) returns modified request or None
    modbus_rtu.RtuMaster.after_recv((master, response)) returns modified response or None

    modbus_rtu.RtuServer.before_close((server, ))
    modbus_rtu.RtuServer.after_close((server, ))
    modbus_rtu.RtuServer.before_open((server, ))
    modbus_rtu.RtuServer.after_open(((server, ))
    modbus_rtu.RtuServer.after_read((server, request)) returns modified request or None
    modbus_rtu.RtuServer.before_write((server, response))  returns modified response or None
    modbus_rtu.RtuServer.after_write((server, response))
    modbus_rtu.RtuServer.on_error((server, excpt))

    modbus_tcp.TcpMaster.before_connect((master, ))
    modbus_tcp.TcpMaster.after_connect((master, ))
    modbus_tcp.TcpMaster.before_close((master, ))
    modbus_tcp.TcpMaster.after_close((master, ))
    modbus_tcp.TcpMaster.before_send((master, request))
    modbus_tcp.TcpServer.after_send((master, request))
    modbus_tcp.TcpMaster.after_recv((master, response))


    modbus_tcp.TcpServer.on_connect((server, client, address))
    modbus_tcp.TcpServer.on_disconnect((server, sock))
    modbus_tcp.TcpServer.after_recv((server, sock, request)) returns modified request or None
    modbus_tcp.TcpServer.before_send((server, sock, response)) returns modified response or None
    modbus_tcp.TcpServer.on_error((server, sock, excpt))

    modbus_rtu_over_tcp.RtuOverTcpMaster.after_recv((master, response))

    modbus.Master.before_send((master, request)) returns modified request or None
    modbus.Master.after_send((master))
    modbus.Master.after_recv((master, response)) returns modified response or None

    modbus.Slave.handle_request((slave, request_pdu)) returns modified response or None
    modbus.Slave.handle_write_multiple_coils_request((slave, request_pdu))
    modbus.Slave.handle_write_multiple_registers_request((slave, request_pdu)) returns modified response or None
    modbus.Slave.handle_write_single_register_request((slave, request_pdu)) returns modified response or None
    modbus.Slave.handle_write_single_coil_request((slave, request_pdu)) returns modified response or None
    modbus.Slave.handle_read_input_registers_request((slave, request_pdu)) returns modified response or None
    modbus.Slave.handle_read_holding_registers_request((slave, request_pdu)) returns modified response or None
    modbus.Slave.handle_read_discrete_inputs_request((slave, request_pdu)) returns modified response or None
    modbus.Slave.handle_read_coils_request((slave, request_pdu)) returns modified response or None
    modbus.Slave.handle_read_write_multiple_registers_request((slave, request_pdu)) returns modified response or None
    modbus.Slave.handle_read_exception_status_request((slave, request_pdu)) returns modified response or None

    modbus.Slave.on_handle_broadcast((slave, response_pdu)) returns modified response or None
    modbus.Slave.on_exception((slave, function_code, excpt))


    modbus.Databank.on_error((db, excpt, request_pdu))

    modbus.ModbusBlock.setitem((self, slice, value))

    modbus.Server.before_handle_request((server, request)) returns modified request or None
    modbus.Server.after_handle_request((server, response)) returns modified response or None
    modbus.Server.on_exception((server, excpt))
    """
    with _LOCK:
        try:
            _HOOKS[name].append(fct)
        except KeyError:
            _HOOKS[name] = [fct]

到了这里,关于Python modbus_tk 库源码分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 工控CTF之协议分析1——Modbus

    流量分析 主要以工控流量和恶意流量为主,难度较低的题目主要考察Wireshark使用和找规律,难度较高的题目主要考察协议定义和特征 简单只能简单得干篇一律,难可以难得五花八门 常见的工控协议有:Modbus、MMS、IEC60870、MQTT、CoAP、COTP、IEC104、IEC61850、S7comm、OMRON等 由于工控

    2024年02月02日
    浏览(32)
  • MODBUS TCP协议实例数据帧详细分析

    Modbus由MODICON公司于1979年开发,是一种工业现场总线协议标准。1996年施耐德公司推出基于以太网TCP/IP的Modbus协议:ModbusTCP。Modbus协议是一项应用层报文传输协议,包括ASCII、RTU、TCP三种报文类型。标准的Modbus协议物理层接口有RS232、RS422、RS485和以太网接口,采用master/slave方式

    2024年02月05日
    浏览(68)
  • 工程师必备串口数据截取工具modbus命令分析串口数据分析

    工程师必备串口数据截取工具modbus命令分析串口数据分析。 主要功能: ·支持监控COM端口类型:标准电脑端口,内核虚拟COM端口,USB转串口等; ·可以实时监控并采集串口数据; ·可以同时监控多个串行端口; ·监控已经被其他应用程序打开的串口(需要服务启动); ·支持

    2024年02月13日
    浏览(33)
  • 最新版Modbus RTU 51单片机从机(源码)

    最新版Modbus RTU 51单片机从机源码,在原源码基础上添加了芯片型号选择,适应的芯片型号多了STC12系列,STC15系列,STC8系列。 支持01,02,03,04,05,06,15,16等常用功能码。 免去小白移植烦恼。 包括以前的版本加更新版本,赠送4个组态触摸屏测试工程文件,下载地址: 51单片机modbu

    2024年02月13日
    浏览(63)
  • Modbus PLC攻击分析:从Modbus Poll Slave到M340_intouch modbustcp 读取 m340

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新网络安全全套学习资料》

    2024年04月28日
    浏览(35)
  • 基于python舆情分析可视化系统+情感分析+爬虫+机器学习(源码)✅

    大数据毕业设计:Python招聘数据采集分析可视化系统✅ 毕业设计:2023-2024年计算机专业毕业设计选题汇总(建议收藏) 毕业设计:2023-2024年最新最全计算机专业毕设选题推荐汇总 🍅 感兴趣的可以先收藏起来,点赞、关注不迷路,大家在毕设选题,项目以及论文编写等相关

    2024年01月20日
    浏览(49)
  • python高校舆情分析系统+可视化+情感分析 舆情分析+Flask框架(源码+文档)✅

    毕业设计:2023-2024年计算机专业毕业设计选题汇总(建议收藏) 毕业设计:2023-2024年最新最全计算机专业毕设选题推荐汇总 🍅 感兴趣的可以先收藏起来,点赞、关注不迷路,大家在毕设选题,项目以及论文编写等相关问题都可以给我留言咨询,希望帮助同学们顺利毕业 。

    2024年02月01日
    浏览(43)
  • python毕设 大数据用户画像分析系统(源码分享)

    Hi,大家好,这里是丹成学长,今天做一个电商销售预测分析,这只是一个demo,尝试对电影数据进行分析,并可视化系统 🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新和亮点,往往达不到毕业答辩的要求,这两年不断有学弟学妹告诉学

    2024年01月17日
    浏览(41)
  • 【python】python旅游网数据抓取分析(源码+论文)【独一无二】

    👉博__主👈:米码收割机 👉技__能👈:C++/Python语言 👉公众号👈:测试开发自动化【获取源码+商业合作】 👉荣__誉👈:阿里云博客专家博主、51CTO技术博主 👉专__注👈:专注主流机器人、人工智能等相关领域的开发、测试技术。 每个爬虫针对特定的目标网站(去哪儿网

    2024年02月04日
    浏览(50)
  • tk-mybatis使用介绍,springboot整合tk-mybatis、PageHelper实现分页查询

    Mybatis-Plus极大简化了我们的开发,作为mybatis的增强版,Mybatis-Plus确实帮我们减少了很多SQL语句的编写,通过其提供的API,可以方便地完成增删查改操作。但是,其实除了Mybatis-Plus以外,还有一个技术tk-mybatis可以达到同样的效果,只不过随着MP的兴起,tk-mybatis已经被渐渐淡忘

    2024年02月12日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包