项目实战 — 消息队列(8){网络通信设计①}

这篇具有很好参考价值的文章主要介绍了项目实战 — 消息队列(8){网络通信设计①}。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、自定义应用层协议

🍅 1、格式定义

🍅 2、准备工作

🎄定义请求和响应 

 🎄 定义BasicArguments

🎄 定义BasicReturns

🍅 2、创建参数类

        🎄 交换机

        🎄 队列

        🎄 绑定

        🎄发布消息

        🎄 订阅消息

        🎄确认应答

        🎄 消息推送

二、服务器设计

 🍅 1、编写实例变量和构造方法

🍅 2、编写启动类和关闭类

🍅 3、编写处理连接的方法:processConnection()

 🍅 4、编写读取请求readRequest()和写回响应writeResponse方法

🍅 5、实现根据请求计算响应:process()方法编写


一、自定义应用层协议

🍅 1、格式定义

本消息队列,是需要通过网络进行通信的。这里主要基于TCP协议,自定义应用层协议。

由于当前交互的Message数据,是二进制数据,由于HTTP和JSON都是文本协议,所以这里就不适用了。使用自定义的应用层协议。

约定自定义应用层协议格式:

        以下是请求和响应的组成部分:

项目实战 — 消息队列(8){网络通信设计①},项目实战 — 消息队列,网络,项目,消息队列,java

 type:

描述当前请求和响应式做什么的,描述当前请求/响应是在调用哪个API(VirtualHost中的核心API)

        以下是type标识请求相应不同的功能,取值如下:

        其中Channel代表的是Connection(TCP的连接)内部的”逻辑上"的连接。此时一个           Connection中可能会含有多个Channel。存在的意义是为了让TCP连接

VirtualHost中的十多个方法:
0x1创建channel
0x2关闭channel
0x3创建exchange
0x4销毁exchange
0x5创建queue
0x6销毁queue
0x7创建binding
0x8销毁binding
0x9发送message
0xa订阅message
0xb返回ack
0xc服务器给客户端推送的消息(被订阅的消息)(响应独有)

length:描述了payload的长度

payload: 会根据当前是请求还是响应,以及当前的type有不同的取值。

比如当前是0x3(创建交换机),

/*
* 表示一个网络通信中的请求对象,按照自定义协议的格式来展开
* */
@Data
public class Request {
    private int type;
    private int length;
    private byte[] payload;
}

当前是一个请求,那么pyload中的内容是exchangeDeclare的参数的序列化的结果;

如果当前是一个响应,那么payload里面的内容就是exchangeDeclare的返回结果的序列化内容。

那么接下来就进行代码设计

以下都是再commen包中创建。

🍅 2、准备工作

🎄定义请求和响应 

/*
* 表示一个网络通信中的请求对象,按照自定义协议的格式来展开
* */
@Data
public class Request {
    private int type;
    private int length;
    private byte[] payload;
}
/*
* 表示一个网络通信中的响应对象,也是根据自定义应用层协议来的
* */
@Data
public class Response {
    private int type;
    private int length;
    private byte[] payload;
}

 🎄 定义BasicArguments

使用这个类表示方法的公共参数/辅助的字段 ,后续的每个方法会有一些不同的参数,不同的参数再使用不同的子类来表示。

rid代表请求的id,和响应的id一样,他们是一对

channel表示的是“逻辑连接”,表示客户端各种模块复用一个TCP连接,

channelId就代表这些连接。

@Data
public class BasicArguments implements Serializable {
//     表示一次请求/响应的身份标识,可以把请求和响应对上
    protected String rid;
//    客户端的身份标识
    protected String channelId;
}

🎄 定义BasicReturns

使用这个类标识各个远程调用的方法的返回值的公共信息

/*
* 标识各个远程调用的方法的返回值的公共信息
* */
@Data
public class BasicReturns implements Serializable {
//    用来标识唯一的请求和响应
    protected String rid;
    protected String channelId;
//    用来表示当前远程调用方法的返回值
    protected boolean ok;
}

🍅 2、创建参数类

根据前面VirtualHost中的十多个方法,每个方法创建一个类,标识该方法中的相关参数。

那么这个参数到底是如何进行传递的?

如下图,以交换机的参数进行举例。

关于我们远程调用的过程:当发起请求时,就把这些参数通过请求传过去,然后调用VirtualHost中的API(就是VirtualHost中的那些创建删除方法),调用完以后再返回响应。

项目实战 — 消息队列(8){网络通信设计①},项目实战 — 消息队列,网络,项目,消息队列,java

以下是有关交换机的请求报文:

项目实战 — 消息队列(8){网络通信设计①},项目实战 — 消息队列,网络,项目,消息队列,java

以下是创建交换机的响应报文:没有请求报文复杂是因为,响应只需要返回请求是否执行远程调用是否成功即可。 

项目实战 — 消息队列(8){网络通信设计①},项目实战 — 消息队列,网络,项目,消息队列,java

以下就创建这些参数类: 

        🎄 交换机

 创建交换机:

@Data
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {
    private String ExchangeName;
    private ExchangeType exchangeType;
    private boolean durable;
}

删除交换机:

@Data
public class ExchangeDeleteArguments extends BasicArguments implements Serializable {
    private String exchangeName;
}

        🎄 队列

创建队列:

@Data
public class QueueDeclareArguments extends BasicArguments implements Serializable {
    private String QueueName;
    private boolean durable;
}

删除队列:

@Data
public class QueueDeleteArguments extends BasicArguments implements Serializable {
    private String queueName;
}

        🎄 绑定

创建绑定:

@Data
public class QueueBindArguments extends BasicArguments implements Serializable {
    private String exchangeName;
    private String queueName;
    private String bindingKey;
}

删除绑定:

@Data
public class QueueUnbindArguments extends BasicArguments implements Serializable {
    private String queueName;
    private String exchangeName;
}

        🎄发布消息

@Data
public class BasicPublishArguments extends BasicArguments implements Serializable {
    private String exchangeName;
    private String routingKey;
    private BasicProperties basicProperties;
    private byte[] body;
}

        🎄 订阅消息

这个方法参数,还包含一个Consumer consumer。

这是一个回调函数,这个回调函数是不能作为参数进行传输的,因为这个回调函数,是客户端这边的。

比如,这里请求调用一个”订阅队列“的远程方法,

客户端这边:服务器收到了请求,执行了basicConsume方法,并且返回了响应。订阅以后,客户端的消费者就会在后面收到消息,而这个回调函数是在消费者收到消息以后,才会进行逻辑处理,而不是再发送请求时进行传递的。

服务器这边:执行的是一个固定的回调函数:把消息返回给客户端。

项目实战 — 消息队列(8){网络通信设计①},项目实战 — 消息队列,网络,项目,消息队列,java

@Data
public class BasicConsumeArguments extends BasicArguments implements Serializable {
    private String consumerTag;
    private String queueName;
    private boolean autoAck;
}

        🎄确认应答

@Data
public class BasicAckArguments extends BasicArguments implements Serializable {
    private String queueName;
    private String messageId;
}

        🎄 消息推送

前面的都是客户端给服务器发送消息,这里是服务器给消费者推送消息。所以要继承BasicReturns。

@Data
public class SubScribeReturns extends BasicReturns implements Serializable {
    private String consumerTag;
    private BasicProperties basicProperties;
    private byte[] body;
}

二、服务器设计

在 mqServer包中创建一个BrokerServer类。

 🍅 1、编写实例变量和构造方法

 private ServerSocket serverSocket = null;

    private VirtualHost virtualHost = new VirtualHost("default");

//    使用这个哈希表,表示当前所有会话(那些客户端在和这个服务器进行通信)
//    此处的key是channelId,value是对应的 socket对象
    private ConcurrentHashMap<String , Socket> sessions = new ConcurrentHashMap<String ,Socket>();

//    引入线程池,处理多个客户端的请求
    private ExecutorService executorService = null;

//    引入boolean变量控制服务器是否运行
    private volatile boolean runnable = true;

     public BrokerServer(int port) throws IOException {
//        端口号
        serverSocket = new ServerSocket(port);
    }

🍅 2、编写启动类和关闭类

 这里利用了线程池,不断的处理连接

public void start() throws IOException {
        System.out.println("[BrokerServer] 启动!");
        executorService = Executors.newCachedThreadPool();
        try {
            while (runnable) {
                Socket clientSocket = serverSocket.accept();
                // 把处理连接的逻辑丢给这个线程池.
                executorService.submit(() -> {
                    processConnection(clientSocket);
                });
            }
        } catch (SocketException e) {
            System.out.println("[BrokerServer] 服务器停止运行!");
            // e.printStackTrace();
        }
    }


    public void stop() throws IOException {
        runnable = false;
//        停止线程池
        executorService.shutdownNow();
        serverSocket.close();
    }


private void processConnection(Socket clientSocket) {
    //TODO
}

🍅 3、编写处理连接的方法:processConnection()

处理一个客户端的连接,主要有以下几步:

        (1)读取请求并且解析

        (2)根据请求计算响应

        (3)把相应协写回给客户端

//    通过该方法,处理一个客户端的连接
//    在一个连接中,可能会涉及到多个连接和请求
    private void processConnection(Socket clientSocket) throws IOException {
//        获取到流对象,读取应用层协议
        try(InputStream inputStream = clientSocket.getInputStream();
            OutputStream outputStream = clientSocket.getOutputStream()){
//                按照特定格式来读取并且解析(转换),此时就需要用到DataInputStream和DataOutputStream
             try (DataInputStream dataInputStream = new DataInputStream(inputStream);
                 DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                while (true) {
//                  1、读取请求并且解析
                    Request request = readRequest(dataInputStream);
//                  2、根据请求计算响应
                    Response response = process(request, clientSocket);
//                  3、把响应写回给客户端
                    writeResponse(dataOutputStream,response);
                }
            }
        }catch (EOFException|SocketException e) {
//                DataInputStream如果读到EOF(文件末尾),会抛出一个EOFException异常
//                视为正常的异常,用或者异常来结束循环
                System.out.println("[BrokerServer] connection 关闭! 客户端的地址: " + clientSocket.getInetAddress().toString()
                    + ":" + clientSocket.getPort());
        } catch (IOException | ClassNotFoundException | MqException e) {
            System.out.println("[BrokerServer] connection 出现异常!");
            e.printStackTrace();
        } finally {
            try {
            clientSocket.close();
//          一个TCP连接中,可能含有多个channel,需要把当前socket对应的channel也顺便清理掉
            clearClosedSession(clientSocket);
        }catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

 🍅 4、编写读取请求readRequest()和写回响应writeResponse方法

这里就是根据前面设定的报文格式来编写的读取请求和写回响应的方法,这里的payload的具体内容在这里不作解析,在后面的process方法中进行解析

//    读取请求并且解析
    private Request readRequest(DataInputStream dataInputStream) throws IOException {
        Request request = new Request();
 //        读取出请求中4个字节的type
        request.setType(dataInputStream.readInt());
//        读出4个字节的length
        request.setLength(dataInputStream.readInt());
        byte[] payload = new byte[request.getLength()];
        int n = dataInputStream.read(payload);
        if (n != request.getLength()){
            throw new IOException("读取请求格式出错");
        }
        request.setPayload(payload);
        return request;
    }

//    把响应写回给客户端
    private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {
        dataOutputStream.writeInt(response.getType());
        dataOutputStream.writeInt(response.getLength());
        dataOutputStream.write(response.getPayload());
//        刷新缓冲区
        dataOutputStream.flush();
    }

🍅 5、实现根据请求计算响应:process()方法编写

这里就要针对具体的payload进行编写了。

当前请求中的payload里面的内容,是根据type来的,如下

VirtualHost中的十多个方法:
0x1创建channel
0x2关闭channel
0x3创建exchange
0x4销毁exchange
0x5创建queue
0x6销毁queue
0x7创建binding
0x8销毁binding
0x9发送message
0xa订阅message
0xb返回ack
0xc服务器给客户端推送的消息(被订阅的消息)(响应独有)

如果是0x3,就是创建交换机对应的参数...... 

主要分为以下几步:

        1、把request中的payload作出一个初步的解析

        2、根据type的值,进一步区分请求要做什么

        3、构造响应

private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
        // 1. 把 request 中的 payload 做一个初步的解析.
        BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
        System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()
                + ", type=" + request.getType() + ", length=" + request.getLength());
        // 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥.
        boolean ok = true;
        if (request.getType() == 0x1) {
            // 创建 channel
            sessions.put(basicArguments.getChannelId(), clientSocket);
            System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + basicArguments.getChannelId());
        } else if (request.getType() == 0x2) {
            // 销毁 channel
            sessions.remove(basicArguments.getChannelId());
            System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + basicArguments.getChannelId());
        } else if (request.getType() == 0x3) {
            // 创建交换机. 此时 payload 就是 ExchangeDeclareArguments 对象了.
            ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
            ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable());
        } else if (request.getType() == 0x4) {
//        删除交换机
            ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
            ok = virtualHost.exchangeDelete(arguments.getExchangeName());
        } else if (request.getType() == 0x5) {
//            创建队列
            QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
            ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable());
        } else if (request.getType() == 0x6) {
//        删除队列
            QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
            ok = virtualHost.queueDelete((arguments.getQueueName()));
        } else if (request.getType() == 0x7) {
//            创建绑定
            QueueBindArguments arguments = (QueueBindArguments) basicArguments;
            ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());
        } else if (request.getType() == 0x8) {
        //    删除绑定
            QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;
            ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());
        } else if (request.getType() == 0x9) {
            BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
            ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),
                    arguments.getBasicProperties(), arguments.getBody());
        } else if (request.getType() == 0xa) {
            BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
            ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),
                    new Consumer() {
                        // 这个回调函数要做的工作, 就是把服务器收到的消息可以直接推送回对应的消费者客户端
                        @Override
                        public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                            // 先知道当前这个收到的消息, 要发给哪个客户端.
                            // 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的
                            // socket 对象了, 从而可以往里面发送数据了
                            // 1. 根据 channelId 找到 socket 对象
                            Socket clientSocket = sessions.get(consumerTag);
                            if (clientSocket == null || clientSocket.isClosed()) {
                                throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");
                            }
                            // 2. 构造响应数据
                            SubScribeReturns subScribeReturns = new SubScribeReturns();
                            subScribeReturns.setChannelId(consumerTag);
                            subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要.
                            subScribeReturns.setOk(true);
                            subScribeReturns.setConsumerTag(consumerTag);
                            subScribeReturns.setBasicProperties(basicProperties);
                            subScribeReturns.setBody(body);
                            byte[] payload = BinaryTool.toBytes(subScribeReturns);
                            Response response = new Response();
                            // 0xc 表示服务器给消费者客户端推送的消息数据.
                            response.setType(0xc);
                            // response 的 payload 就是一个 SubScribeReturns
                            response.setLength(payload.length);
                            response.setPayload(payload);
                            // 3. 把数据写回给客户端.
                            //    注意! 此处的 dataOutputStream 这个对象不能 close !!!
                            //    如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了.
                            //    此时就无法继续往 socket 中写入后续数据了.
                            DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
                            writeResponse(dataOutputStream, response);
                        }
                    });
        } else if (request.getType() == 0xb) {
            // 调用 basicAck 确认消息.
            BasicAckArguments arguments = (BasicAckArguments) basicArguments;
            ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());
        } else {
            // 当前的 type 是非法的.
            throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());
        }
        // 3. 构造响应
        BasicReturns basicReturns = new BasicReturns();
        basicReturns.setChannelId(basicArguments.getChannelId());
        basicReturns.setRid(basicArguments.getRid());
        basicReturns.setOk(ok);
        byte[] payload = BinaryTool.toBytes(basicReturns);
        Response response = new Response();
        response.setType(request.getType());
        response.setLength(payload.length);
        response.setPayload(payload);
        System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()
                + ", type=" + response.getType() + ", length=" + response.getLength());
        return response;
    }

🍅 6、清理过期的sessions:clearClosedSession()文章来源地址https://www.toymoban.com/news/detail-646271.html

    //    遍历sessions hash表,把该被关闭的socket对应的键值对都删掉
    private void clearClosedSession(Socket clientSocket) {
        List<String> toDeleteChannelId = new ArrayList<>();
        for(Map.Entry<String,Socket> entry : sessions.entrySet()){
            if(entry.getValue() == clientSocket){
//                使用集合类,不能一边遍历,一边删除
                toDeleteChannelId.add(entry.getKey());
            }
        }
        for (String channelId : toDeleteChannelId){
            sessions.remove(channelId);
        }
        System.out.println("[BrokerServer]清理session完成~ 被清理的channeId = " + toDeleteChannelId);
    }

到了这里,关于项目实战 — 消息队列(8){网络通信设计①}的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Java】基于GUI的网络通信程序设计

      一. 程序内容 二. 要求分析 三. 程序编写 0. 程序结构 1. 服务端程序的GUI设计 2. 服务端业务逻辑的编写 3. 为GUI界面绑定按钮事件 4. 将服务端的源码复制后,进行重构,并加以修改为客户端 四、源代码 这是合工大软件工程专业Java程序设计课程实验二的内容,该实验要求编写

    2023年04月23日
    浏览(80)
  • 消息队列(11) - 通信协议的设计

    对于我们客户端与服务器之间的通信协议我们约定如下: 具体的协议设计: 之后我们传递的参数也是这些 关于 type其实是在描述当前这个请求 、 响应是在调用那个API 约定如下 对于channel ,是tcp链接中的一个逻辑上的链接,一个TCP可以有多个Channel,存在的意义是为了让TCP得到复用

    2024年02月13日
    浏览(30)
  • 【嵌入式学习】网络通信基础-项目篇:简单UDP聊天室

    源码已在GitHub开源:0clock/LearnEmbed-projects/chat 客户端功能: 上线发送登录的用户名[yes] 发送消息和接收消息[yes] quit退出 服务器端功能: 统计用户上线信息,放入链表中[yes] 接收用户信息并给其他用户发送消息[yes] 服务器也支持给所有用户群发消息[yes] 接收下线提醒

    2024年01月25日
    浏览(58)
  • 网络安全实验——安全通信软件safechat的设计

    仅供参考,请勿直接抄袭,抄袭者后果自负。 仓库地址: 后端地址:https://github.com/yijunquan-afk/safechat-server 前端地址: https://github.com/yijunquan-afk/safechat-client CosUpload.java中的COS设置,需要自己配 结合所学安全机制设计实现一个简单的安全通信软件,包含 机密性,消息认证 等基

    2024年01月23日
    浏览(55)
  • windows 达梦数据库服务连接时提示:登录服务器失败,错误号6001,错误消息:网络通信异常 之数据库服务不存在的处理方式

    在windows客户端上连接部署在windows操作系统上的达梦数据库, 使用DM管理工具连接数据库    正确输入用户名与密码之后点击确定按钮之后出现: 登录服务器失败,错误号6001,错误消息:网络通信异常  现象 如下图所示:   在之前也发布了一篇关于此错误的博文: 达梦管

    2024年02月11日
    浏览(52)
  • 项目实战 — 消息队列(4){消息持久化}

    目录  一、消息存储格式设计        🍅 1、queue_data.txt:保存消息的内容         🍅 2、queue_stat.txt:保存消息的统计信息 二、消息序列化 三、自定义异常类 四、创建MessageFileManger类 🍅 1、约定消息文件所在的目录和文件名字  🍅 2、队列的统计信息 🍅 3、创建队列对应

    2024年02月14日
    浏览(38)
  • 项目实战 — 消息队列(1) {需求分析}

    目录 一、什么是消息队列 二、需求分析 🍅1、核心概念  🍅2、核心API 🍅3、交换机类型 🍅 4、持久化 🍅5、网络通信  🍅6、应答模式 三、模块划分 四、测试用例设计 五、总结 消息队列(Message Queue ,MQ),就是将阻塞队列的数据结构,提取成了一个程序,独立进行部署。

    2024年02月15日
    浏览(37)
  • 项目实战 — 消息队列(2){创建核心类}

    目录  一、创建项目 二、创建核心类 🍅 1、 编写交换机类,Exchange 🍅 2、编写存储消息的队列,MSGQueue 🍅 3、编写绑定类,binding 🍅 4、编写消息,Message   代码解释主要在注释上面,注意一下注释 核心类主要是在服务器模块中的。  主要是以下几个类 * 交换机 exchange *

    2024年02月15日
    浏览(33)
  • 项目实战 — 消息队列(5){统一硬盘操作}

    前面已经使用数据库管理了交换机、绑定、队列,然后又使用了数据文件管理了消息。 那么,这里就创建一个类,讲之前的两个部分整合起来,对上层提供统一的一套接口,表示硬盘上存储的所有的类的信息。  创建了这个类,我们就不用去直接调用DataBaseManager和MessageFile

    2024年02月14日
    浏览(49)
  • 树莓派学习:建立socket进行网络通信+tcp+udp+端口+字节序+socketAPI+地址转换API+聊天对话框实战

    目录 socket套接字网络通信学习 数据协议 tcp udp ip地址 端口 字节序 步骤  API介绍 地址转换API 实战  聊天对话框 服务器  运行时后面要传IP地址和端口 客户端   运行时后面要传IP地址和端口 socket是网络通信,通信的数据协议有http、tcp、udp等等,简单来说就是传输数据的格式

    2024年02月05日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包