根据源码,模拟实现 RabbitMQ - 网络通讯设计,自定义应用层协议,实现 BrokerServer (8)

这篇具有很好参考价值的文章主要介绍了根据源码,模拟实现 RabbitMQ - 网络通讯设计,自定义应用层协议,实现 BrokerServer (8)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、网络通讯协议设计

1.1、交互模型

1.2、自定义应用层协议

1.2.1、请求和响应格式约定

​编辑

1.2.2、参数说明

1.2.3、具体例子

1.2.4、特殊栗子

1.3、实现 BrokerServer

1.3.1、属性和构造

1.3.2、启动 BrokerServer

1.3.3、停止 BrokerServer

1.3.4、处理每一个客户端连接

1.3.5、读取请求和写响应

1.3.6、根据请求计算响应

1.3.7、清除 channel


一、网络通讯协议设计


1.1、交互模型

目前我们需要考虑的交互模型:生产者消费者都是客户端,都需要通过 网络 和 BrokerServer 进行通信

根据源码,模拟实现 RabbitMQ - 网络通讯设计,自定义应用层协议,实现 BrokerServer (8),RabbitMQ,rabbitmq,分布式

此处我们使⽤ TCP 协议, 来作为通信的底层协议. 同时在这个基础上⾃定义应⽤层协议, 完成客⼾端对服 务器这边功能的远程调⽤.

TCP 是有连接的(Connection),创建 / 断开 TCP 连接成本还是挺高的(需要三次握手啥的),那么这里就是用 Channel 来表示 Connection 内部的 “逻辑上” 的连接,使得 “一个管道,多个网线传输” 的效果,使得 TCP连接得到复用

Ps:要远程调用的功能就是在 VirtualHost 中 public 的方法.

1.2、自定义应用层协议

1.2.1、请求和响应格式约定

之前我们定义的 Message 对象,本体就是二进制的数据,因此这里不方便使用 JSON 这种文本协议 / 格式.

因此这里使用 二进制 的方式来设定协议.

请求如下:

/**
 * 表示一个网络通信中的请求对象,按照自定义协议的格式展开
 */
public class Request {

    private int type;
    private int length;
    private byte[] payload;

    public int getType() {
        return type;
    }

    public void setType(int type) {
        this.type = type;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public byte[] getPayload() {
        return payload;
    }

    public void setPayload(byte[] payload) {
        this.payload = payload;
    }
}

根据源码,模拟实现 RabbitMQ - 网络通讯设计,自定义应用层协议,实现 BrokerServer (8),RabbitMQ,rabbitmq,分布式

响应如下:

/**
 * 这个对象表示一个响应,是根据自定义应用层协议来的
 */
public class Response {

    private int type;
    private int length;
    private byte[] payload;

    public int getType() {
        return type;
    }

    public void setType(int type) {
        this.type = type;
    }

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public byte[] getPayload() {
        return payload;
    }

    public void setPayload(byte[] payload) {
        this.payload = payload;
    }
}

根据源码,模拟实现 RabbitMQ - 网络通讯设计,自定义应用层协议,实现 BrokerServer (8),RabbitMQ,rabbitmq,分布式

1.2.2、参数说明

1)type是一个整形,用来表示当前这个请求和响应是用来干啥的(对应 VirtualHost 中的核心 API),取值如下:

  • 0x1 创建 channel
  • 0x2 关闭 channel
  • 0x3 创建 exchange
  • 0x4 销毁 exchange
  • 0x5 创建 queue
  • 0x6 销毁 queue
  • 0x7 创建 binding
  • 0x8 销毁 binding
  • 0x9 发送 message
  • 0xa 订阅 message
  • 0xb 返回 ack
  • 0xc 服务器给客⼾端推送的消息.(被订阅的消息) 响应独有的.

2)length 就是用来描述 payload 长度(防止粘包问题)

3)payload 就是具体要传输的二进制数据。数据具体是什么,会根据当前是请求还是响应,以及当前的 type 的不同取值来确定。

比如 type 是 0x3(创建交换机),同时当前是一个请求,此时 payload 里的内容,就相当于 exchangeDeclare 的 参数 的序列化的结果.

比如 type 是 0x3(创建交换机),同时当前是一个响应,此时 payload 里的内容,就是 exchangDeclare 的 返回结果 的序列化内容.

根据源码,模拟实现 RabbitMQ - 网络通讯设计,自定义应用层协议,实现 BrokerServer (8),RabbitMQ,rabbitmq,分布式

1.2.3、具体例子

栗子如下:

1)请求

当前需要远程调用 exchangeDeclare 方法,那么我们就需要传递核心 API 以下参数

根据源码,模拟实现 RabbitMQ - 网络通讯设计,自定义应用层协议,实现 BrokerServer (8),RabbitMQ,rabbitmq,分布式

使用一个公共的父类包装每次 请求 中公共(每个请求都要传输)的参数

/**
 * 这个类用来表示方法的公共参数/辅助字段
 * 后续每个方法会有一些不同的参数,不同的参数再用不同的子类来表示
 */
public class BasicArguments implements Serializable {

    // 表示一次 请求/响应 的身份标识,让请求和响应能对的上
    protected String rid;
    // 表示这次通信使用的 channel 的身份标识
    protected String channelId;

    public String getRid() {
        return rid;
    }

    public void setRid(String rid) {
        this.rid = rid;
    }

    public String getChannelId() {
        return channelId;
    }

    public void setChannelId(String channelId) {
        this.channelId = channelId;
    }
    
}

创建 ExchangeDeclareArguments 类(当前这个类将来会被序列化成 request 类中的 payload),继承 BasicArguments(公共参数),实现 Serializable 接口(避免序列化问题),要传递的参数如下:

public class ExchangeDeclareArguments extends BasicArguments implements Serializable {

    private String exchangeName;
    private ExchangeType exchangeType;
    private boolean durable;
    private boolean autoDelete;
    private Map<String, Object> arguments;

    public String getExchangeName() {
        return exchangeName;
    }

    public void setExchangeName(String exchangeName) {
        this.exchangeName = exchangeName;
    }

    public ExchangeType getExchangeType() {
        return exchangeType;
    }

    public void setExchangeType(ExchangeType exchangeType) {
        this.exchangeType = exchangeType;
    }

    public boolean isDurable() {
        return durable;
    }

    public void setDurable(boolean durable) {
        this.durable = durable;
    }

    public boolean isAutoDelete() {
        return autoDelete;
    }

    public void setAutoDelete(boolean autoDelete) {
        this.autoDelete = autoDelete;
    }

    public Map<String, Object> getArguments() {
        return arguments;
    }

    public void setArguments(Map<String, Object> arguments) {
        this.arguments = arguments;
    }
}

2)响应

当前 VirtualHost 中的核心 API 返回值都是 Boolean 类型,因此我们使用一个公共类来封装响应(当前这个类将来会被序列化成 response 类中的 payload 参数)

public class BasicReturns implements Serializable {

    //用来标识唯一的请求和响应
    protected String rid;
    //标识一个 channel
    protected String channelId;
    //标识当前这个远程调用方法的返回值
    protected boolean ok;

    public String getRid() {
        return rid;
    }

    public void setRid(String rid) {
        this.rid = rid;
    }

    public String getChannelId() {
        return channelId;
    }

    public void setChannelId(String channelId) {
        this.channelId = channelId;
    }

    public boolean isOk() {
        return ok;
    }

    public void setOk(boolean ok) {
        this.ok = ok;
    }
}

Ps:其他核心 API 自定义应用层协议也一样

1.2.4、特殊栗子

0xa 订阅 message ,这个核心 API 比较特殊,参数中有回调函数

根据源码,模拟实现 RabbitMQ - 网络通讯设计,自定义应用层协议,实现 BrokerServer (8),RabbitMQ,rabbitmq,分布式

 1)请求

创建 BasicConsumeArguments 类(当前这个类将来会被序列化成 request 类中的 payload) 表示要传递的参数,需要注意的是 Consumer 这个回调,在发送的请求中不需要携带这个参数(实际上也携带不了)

Ps:因为服务器收到这个订阅消息请求之后,就直接取拿队列中的消息,接着直接反馈给客户端,客户端拿到消息后才执行回调方法(要拿这个消息干什么事)。

这就类似于你去商店订阅报纸,接着拿到报纸以后,你要对这个报纸做什么,商店是不知道的~~

public class BasicConsumeArguments extends BasicArguments implements Serializable {

    private String consumerTag;
    private String queueName;
    private boolean autoAck;

    //注意! 这里的 Consumer 回调函数不用发送给服务器(实际上也发送不了)
    //因为服务器收到这个订阅消息请求之后,就直接取拿队列中的消息,接着直接反馈给客户端
    //客户端拿到消息后才执行回调方法
    //这就类似于你去商店订阅报纸,接着拿到报纸以后,你要对这个报纸做什么,商店是不知道的~~


    public String getConsumerTag() {
        return consumerTag;
    }

    public void setConsumerTag(String consumerTag) {
        this.consumerTag = consumerTag;
    }

    public String getQueueName() {
        return queueName;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    public boolean isAutoAck() {
        return autoAck;
    }

    public void setAutoAck(boolean autoAck) {
        this.autoAck = autoAck;
    }
}

2)响应

创建 SubScribeReturns 类(当前这个类将来会被序列化成 response 类中的 payload 参数) 来描述响应, 这个响应中不光要携带 BasicReturns (返回的公共响应参数),还需要带上回调中消息的参数,如下:

public class SubScribeReturns extends BasicReturns implements Serializable {

    private String consumerTag;
    private BasicProperties basicProperties;
    private byte[] body;

    public String getConsumerTag() {
        return consumerTag;
    }

    public void setConsumerTag(String consumerTag) {
        this.consumerTag = consumerTag;
    }

    public BasicProperties getBasicProperties() {
        return basicProperties;
    }

    public void setBasicProperties(BasicProperties basicProperties) {
        this.basicProperties = basicProperties;
    }

    public byte[] getBody() {
        return body;
    }

    public void setBody(byte[] body) {
        this.body = body;
    }
}

1.3、实现 BrokerServer

这里的写法就和以前写过的 TCP 回显服务器很类似了,只是根据请求计算响应的方式不同

1.3.1、属性和构造

    private ServerSocket serverSocket = null;

    //当前考虑一个 BrokerServer 上只有一个 虚拟主机
    private VirtualHost virtualHost = new VirtualHost("default");
    //使用 哈希表 来标识当前所有会话(哪个客户端正在和服务器进行通信)
    //key 是 channelId, value 为对应的 Socket 对象
    private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();
    //用线程池来处理多个客户端请求
    private ExecutorService executorService = null;
    //引入一个 Boolean 变量控制服务器是否继续运行
    private volatile boolean runnable = true;

    public BrokerServer(int port) throws IOException {
        serverSocket = new ServerSocket(port);
    }

1.3.2、启动 BrokerServer

    public void start() throws IOException {
        System.out.println("[BrokerServer] 启动!");
        executorService = Executors.newCachedThreadPool();
        while(runnable) {
            Socket clientSocket = serverSocket.accept();
            //处理连接的逻辑给线程池
            executorService.submit(() -> {
                processConnection(clientSocket);
            });
        }
    }

1.3.3、停止 BrokerServer

    /**
     * 停止服务器,一般是直接 kill 就可以了
     * 此处这个单独的方法,主要是为了后续的单元测试
     */
    public void stop() throws IOException {
        runnable = false;
        //放弃线程池中的任务,并销毁线程
        executorService.shutdown();
        serverSocket.close();
    }

1.3.4、处理每一个客户端连接

    private void processConnection(Socket clientSocket) {
        try (InputStream inputStream = clientSocket.getInputStream();
             OutputStream outputStream = clientSocket.getOutputStream()) {
            // 这里需要按照特定格式来读取并解析. 此时就需要用到 DataInputStream 和 DataOutputStream
            try (DataInputStream dataInputStream = new DataInputStream(inputStream);
                 DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                while (true) {
                    // 1. 读取请求并解析.
                    Request request = readRequest(dataInputStream);
                    // 2. 根据请求计算响应
                    Response response = process(request, clientSocket);
                    // 3. 把响应写回给客户端
                    writeResponse(dataOutputStream, response);
                }
            }
        } catch (EOFException | SocketException e) {
            // 对于这个代码, DataInputStream 如果读到 EOF , 就会抛出一个 EOFException 异常.
            // 需要借助这个异常来结束循环
            System.out.println("[BrokerServer] connection 关闭! 客户端的地址: " + clientSocket.getInetAddress().toString()
                    + ":" + clientSocket.getPort());
        } catch (IOException | ClassNotFoundException | MqException e) {
            System.out.println("[BrokerServer] connection 出现异常!");
            e.printStackTrace();
        } finally {
            try {
                // 当连接处理完了, 就需要记得关闭 socket
                clientSocket.close();
                // 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉.
                clearClosedSession(clientSocket);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

1.3.5、读取请求和写响应

    private Request readRequest(DataInputStream dataInputStream) throws IOException {
        Request request = new Request();
        request.setType(dataInputStream.readInt());
        request.setLength(dataInputStream.readInt());
        byte[] body = new byte[request.getLength()];
        int n = dataInputStream.read(body);
        if(n != request.getLength()) {
            throw new IOException("读出请求格式出错!");
        }
        request.setPayload(body);
        return request;
    }

    private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
        dataOutputStream.write(response.getType());
        dataOutputStream.write(response.getLength());
        dataOutputStream.write(response.getPayload());
        dataOutputStream.flush();
    }

1.3.6、根据请求计算响应

这里就是根据不同的 type 类型,来远程调用 VirtualHost 中不同的核心 API(需要特别注意订阅消息功能的回调函数)

    private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
        //1.将 request 初步解析成 BasicArguments
        BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
        System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId() +
                ", type=" + request.getType() + ", length=" + request.getLength());
        //2.根据 type 的值,进一步区分接下来要干什么
        boolean ok = true;
        if (request.getType() == 0x1) {
            //创建 channel
            sessions.put(basicArguments.getChannelId(), clientSocket);
            System.out.println("[BrokerServer] 创建 channel 完成!channelId=" + basicArguments.getChannelId());
        } else if(request.getType() == 0x2) {
            //销毁 channel
            sessions.remove(basicArguments.getChannelId());
            System.out.println("[BrokerServer] 销毁 channel 完成!channelId=" + basicArguments.getChannelId());
        } else if(request.getType() == 0x3) {
            //创建交换机,此时 payLoad 就是 ExchangDeclareArguments 了
            ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
            ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),
                    arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());
        } else if(request.getType() == 0x4) {
            ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
            ok = virtualHost.exchangeDelete(arguments.getExchangeName());
        } else if(request.getType() == 0x5) {
            QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
            ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),
                    arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());
        } else if(request.getType() == 0x6) {
            QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
            ok = virtualHost.queueDelete(arguments.getQueueName());
        } else if(request.getType() == 0x7) {
            QueueBindArguments arguments = (QueueBindArguments) basicArguments;
            ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());
        } else if(request.getType() == 0x8) {
            QueueUnBindArguments arguments = (QueueUnBindArguments) basicArguments;
            ok = virtualHost.queueUnBind(arguments.getQueueName(), arguments.getExchangeName());
        } else if(request.getType() == 0x9) {
            BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
            ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(), arguments.getBasicProperties(), arguments.getBody());
        } else if(request.getType() == 0xa) {
            BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
            ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(), new Consumer() {
                //这个回调函数要做的就是,把服务器收到的消息可以直接推送回对应的消费者客户端
                @Override
                public void handlerDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                    //首先需要知道收到的消息要发给哪个客户端
                    //此处 consumerTag 其实就是 channelId,根据 channelId 去 sessions 中查询,既可以得到对应的
                    //socket 对象了,从而往里面发送数据
                    //1.根据 channelId 找到 socket 对象
                    Socket clientSocket = sessions.get(consumerTag);
                    if(clientSocket == null || clientSocket.isClosed()) {
                        throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");
                    }
                    //2.构造响应数据
                    SubScribeReturns subScribeReturns = new SubScribeReturns();
                    subScribeReturns.setChannelId(consumerTag);
                    subScribeReturns.setRid("");//由于这里只有响应,没有请求,不需要去对应,rid 暂时不需要
                    subScribeReturns.setOk(true);
                    subScribeReturns.setConsumerTag(consumerTag);
                    subScribeReturns.setBasicProperties(basicProperties);
                    subScribeReturns.setBody(body);
                    byte[] payload = BinaryTool.toBytes(subScribeReturns);
                    Response response = new Response();
                    // 0xc 表示服务器给消费者客户端推送的消息数据
                    response.setType(0xc);
                    response.setLength(payload.length);
                    response.setPayload(payload);
                    //3.把数据写回给客户端
                    //  注意!此处的 dataOutputStream 这个对象不能 close
                    //  如果把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了
                    //  此时就无法继续往 socket 中写后续的数据了
                    DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
                    writeResponse(dataOutputStream, response);
                }
            });
        } else if(request.getType() == 0xb) {
            //调用 basicAck 确认消息
            BasicAckArguments arguments = (BasicAckArguments) basicArguments;
            ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());
        } else {
            throw new MqException("[BrokerServer] 未知 type!type=" + request.getType());
        }
        //构造响应
        BasicReturns basicReturns = new BasicReturns();
        basicReturns.setRid(basicArguments.getRid());
        basicReturns.setChannelId(basicArguments.getChannelId());
        basicReturns.setOk(ok);
        byte[] payload = BinaryTool.toBytes(basicReturns);
        Response response = new Response();
        response.setType(request.getType());
        response.setLength(request.getLength());
        response.setPayload(payload);
        System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()
                + ", type=" + response.getType() + ", length=" + response.getLength());
        return response;
    }

1.3.7、清除 channel

清理 map 中对应的(clientSocket) session 信息

    private void clearClosedSession(Socket clientSocket) {
        List<String> toDeleteChannelId = new ArrayList<>();
        for(Map.Entry<String, Socket> entry : sessions.entrySet()) {
            if(entry.getValue() == clientSocket) { //这里一个 key 可能对应多个相同的 Socket
                //在集合类中不能一边用迭代器一边删除,会破坏迭代器结构的!
                //sessions.remove(entry.getKey());
                //因此这里先记录下 key
                toDeleteChannelId.add(entry.getKey());
            }
        }
        for(String channelId : toDeleteChannelId) {
            sessions.remove(channelId);
        }
        System.out.println("[BrokerServer] 清理 session 完毕!channelId=" + toDeleteChannelId);
    }

根据源码,模拟实现 RabbitMQ - 网络通讯设计,自定义应用层协议,实现 BrokerServer (8),RabbitMQ,rabbitmq,分布式文章来源地址https://www.toymoban.com/news/detail-683214.html

到了这里,关于根据源码,模拟实现 RabbitMQ - 网络通讯设计,自定义应用层协议,实现 BrokerServer (8)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 根据源码,模拟实现 RabbitMQ - 从需求分析到实现核心类(1)

    目录 一、需求分析 1.1、对 Message Queue 的认识 1.2、消息队列核心概念 1.3、Broker Server 内部关键概念 1.4、Broker Server 核心 API (重点实现) 1.5、交换机类型 Direct 直接交换机 Fanout 扇出交换机 Topic 主题交换机 1.6、持久化 1.7、网络通信 通信流程 远程调用设计思想 1.8、模块设计图

    2024年02月12日
    浏览(31)
  • 根据源码,模拟实现 RabbitMQ - 虚拟主机 + Consume设计 (7)

    目录 一、虚拟主机 + Consume设计 1.1、承接问题 1.2、具体实现 1.2.1、消费者订阅消息实现思路 1.2.2、消费者描述自己执行任务方式实现思路 1.2.3、消息推送给消费者实现思路 1.2.4、消息确认 前面已经实现了虚拟主机大部分功能以及转发规则的判定,也就是说,现在消息已经可

    2024年02月11日
    浏览(27)
  • 根据源码,模拟实现 RabbitMQ - 实现消息持久化,统一硬盘操作(3)

    目录 一、实现消息持久化 1.1、消息的存储设定 1.1.1、存储方式 1.1.2、存储格式约定 1.1.3、queue_data.txt 文件内容  1.1.4、queue_stat.txt 文件内容 1.2、实现 MessageFileManager 类 1.2.1、设计目录结构和文件格式 1.2.2、实现消息的写入 1.2.3、实现消息的删除(随机访问文件) 1.2.4、获取队

    2024年02月12日
    浏览(38)
  • C++ 简单实现RPC网络通讯

            RPC是远程调用系统简称,它允许程序调用运行在另一台计算机上的过程,就像调用本地的过程一样。RPC 实现了网络编程的“过程调用”模型,让程序员可以像调用本地函数一样调用远程函数。最近在做的也是远程调用过程,所以通过重新梳理RPC来整理总结一下。  

    2023年04月08日
    浏览(34)
  • 根据源码,模拟实现 RabbitMQ - 通过 SQLite + MyBatis 设计数据库(2)

    目录 一、数据库设计 1.1、数据库选择 1.2、环境配置 1.3、建库建表接口实现 1.4、封装数据库操作 1.5、针对 DataBaseManager 进行单元测试 1.6、心得 MySQL 是我们最熟悉的数据库,但是这里我们选择使用 SQLite,原因如下: SQLite 比 MySQL 更轻量:一个完整的 SQLite 数据库,只有一个单

    2024年02月13日
    浏览(36)
  • [Qt网络编程]之UDP通讯的简单编程实现

    hello!欢迎大家来到我的Qt学习系列之 网络编程之UDP通讯的简单编程实现。 希望这篇文章能对你有所帮助!!! 本篇文章的相关知识请看我的上篇文章: 目录 UDP通讯  基于主窗口的实现  基于线程的实现          UDP数据报协议是一个面向无连接的传输层报文协议 ,它简

    2024年04月25日
    浏览(50)
  • 物联网网络通讯知识

    RTU英文全称Remote Terminal Units,中文全称为远程终端单元。远程终端设备(RTU)是安装在远程现场的 电子设备 ,用来监视和测量安装在远程现场的传感器和设备。通俗理解就是能够编程的还可以将数据传输到服务器的工具。RTU内部是包含 通讯模块 的,RTU仪表配置服务器后,就可

    2024年02月05日
    浏览(45)
  • 网络通讯组件性能优化之路

    BIO为同步阻塞IO,blocking queue的简写,也就是说多线程情况下只有一个线程操作内核的queue,当前线程操作完queue后,才能给下一个线程操作; 问题 在BIO下,一个连接就对应一个线程,如果连接特别多的情况下,就会有特别多的线程,很费线程;在早期的时候,世界上的计算机

    2024年02月02日
    浏览(36)
  • 网络通讯录服务器

    简易版本 服务端完整版本 客户端完整版本 Protobuf还常⽤于通讯协议、服务端数据交换场景。那么在这个⽰例中,我们将实现⼀个⽹络版本的 通讯录,模拟实现客⼾端与服务端的交互,通过Protobuf来实现各端之间的协议序列化。 需求如下: 客⼾端可以选择对通讯录进⾏以下操

    2024年02月12日
    浏览(37)
  • 【Linux Day15 TCP网络通讯】

    接口介绍 socket()方法是用来创建一个套接字 ,有了套接字就可以通过网络进行数据的收发。创建套接字时要指定使用的服务类型,使用 TCP 协议选择流式服务(SOCK_STREAM)。 **bind()方法是用来指定套接字使用的 IP 地址和端口。**IP 地址就是自己主机的地址,测试程序时可以使

    2024年02月19日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包