聊聊分布式架构04——RPC通信原理

这篇具有很好参考价值的文章主要介绍了聊聊分布式架构04——RPC通信原理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

RPC通信的基本原理

RPC结构

手撸简陋版RPC

知识点梳理

1.Socket套接字通信机制

2.通信过程的序列化与反序列化

3.动态代理

4.反射

思维流程梳理

码起来

服务端时序图

服务端—Api与Provider模块

客户端时序图


聊聊分布式架构04——RPC通信原理,分布式架构,分布式,架构,rpc

RPC通信的基本原理

RPC(Remote Procedure Call)是一种远程过程调用协议,用于在分布式系统中进行远程通信,允许一个计算机程序调用另一个地址空间(通常是在不同的机器上)的函数或过程,就像调用本地函数一样。下面是RPC通信的基本原理:

  1. 客户端调用:远程客户端(调用方)希望调用远程服务器(提供方)上的一个或多个远程过程(函数)。客户端在本地创建一个请求,并指定要调用的远程过程的名称以及传递给该过程的参数。

  2. 参数序列化:在将请求发送到远程服务器之前,客户端需要将参数序列化为字节流或其他适合传输的格式。序列化是将数据转换为可以在网络上传输的形式的过程。

  3. 网络传输:客户端通过网络将请求发送到远程服务器。通常,这涉及到将请求数据打包成网络消息,然后通过网络协议(如HTTP、TCP/IP)将消息发送到服务器的地址。

  4. 服务器接收:远程服务器接收到客户端的请求消息,通常通过网络协议(如HTTP服务器、Socket服务器)监听特定的端口。

  5. 参数反序列化:服务器从请求消息中提取参数数据,并将其反序列化为本地数据结构,以便将其传递给远程过程。

  6. 远程过程调用:服务器调用相应的远程过程,将参数传递给该过程并执行相应的操作。远程过程可以位于服务器的本地代码中或远程服务器上的远程服务中。

  7. 结果序列化:远程过程执行完毕后,服务器将结果序列化为字节流或其他适合传输的格式。

  8. 结果传输:服务器通过网络将结果数据打包成响应消息,并将其发送回客户端。

  9. 客户端接收:客户端接收到服务器的响应消息。

  10. 结果反序列化:客户端从响应消息中提取结果数据,并将其反序列化为本地数据结构。

  11. 客户端处理:客户端可以根据远程过程的执行结果采取相应的行动,可能是继续执行本地代码或返回结果给调用方。

  12. 通信完成:一次RPC调用完成后,客户端和服务器之间的通信过程结束。

RPC结构

聊聊分布式架构04——RPC通信原理,分布式架构,分布式,架构,rpc

  1. 客户端模块代理所有远程方法的调用

  2. 将目标服务、目标方法、调用目标方法的参数等必要信息序列化

  3. 序列化之后的数据包进一步压缩,压缩后的数据包通过网络通信传输到目标服务节点

  4. 服务节点将接受到的数据包进行解压

  5. 解压后的数据包反序列化成目标服务、目标方法、目标方法的调用参数

  6. 通过服务端代理调用目标方法获取结果,结果同样需要序列化、压缩然后回传给客户端

手撸简陋版RPC

总觉得文字描述太干了,有点咽不下去,还是手撸下试试吧。毕竟艾瑞莉娅的奶奶总说:

纸上得来终觉浅,绝知此事要躬行。

知识点梳理
1.Socket套接字通信机制

在Java中,Socket是用于网络通信的基础类之一,它提供了一种机制,通过该机制,计算机程序可以在网络上建立连接、发送数据、接收数据和关闭连接。

  • 服务器套接字(ServerSocket)

    • 服务器套接字用于在服务器端监听并接受客户端的连接请求。

    • 使用以下步骤创建和使用服务器套接字:

      • 创建ServerSocket对象,并绑定到一个特定的端口号。

      • 使用ServerSocket对象的accept()方法来等待客户端的连接请求,并接受连接。

      • 一旦接受连接,可以创建新的Socket对象来处理客户端的通信。

      • 完成通信后,关闭SocketServerSocket连接。

    ServerSocket serverSocket = new ServerSocket(port_number);
    Socket clientSocket = serverSocket.accept(); // 等待连接
    InputStream inputStream = clientSocket.getInputStream();
    OutputStream outputStream = clientSocket.getOutputStream();
    // 使用输入流和输出流进行数据读写
    clientSocket.close();
    serverSocket.close();
    ​
  • 客户端套接字(Socket)

    • 客户端套接字用于连接到远程服务器,发送请求并接收响应。

    • 使用以下步骤创建和使用客户端套接字:

      • 创建Socket对象,指定远程服务器的主机名或IP地址以及端口号。

      • 使用Socket对象的输入流和输出流来进行数据的读取和写入。

      • 完成通信后,关闭Socket连接。

    Socket socket = new Socket("server_hostname", port_number);
    InputStream inputStream = socket.getInputStream();
    OutputStream outputStream = socket.getOutputStream();
    // 使用输入流和输出流进行数据读写
    socket.close();
2.通信过程的序列化与反序列化

在Java中通过 JDK 提供了 Java 对象的序列化方式实现对象序列化传输,主要通过输出流java.io.ObjectOutputStream和输入流java.io.ObjectInputStream来实现;

java.io.ObjectOutputStream:表示对象输出流 , 它的 writeObject(Object obj)方法可以对参数指定的 obj 对象进行序列化,把得到的字节序列写到一个目标输出流中;

java.io.ObjectInputStream:表示对象输入流 ,它的 readObject()方法源输入流中读取字节序列,再把它们反序列化成为一个对象,并将其返回;

3.动态代理

动态代理是在运行时动态生成代理类的方式。Java提供了java.lang.reflect.Proxy类和java.lang.reflect.InvocationHandler接口来实现动态代理。JDK中提供了基于接口动态代理的方法:

        // 创建代理对象
         MyInterface proxyObject = (MyInterface) Proxy.newProxyInstance(
                MyInterface.class.getClassLoader(),
                new Class[]{MyInterface.class},
                new MyInvocationHandler(realObject));

使用动态代理的目的:

  1. 透明远程调用: 动态代理可以隐藏底层的远程调用细节,使得远程过程调用看起来像本地方法调用一样简单。客户端无需关心网络通信、数据序列化和反序列化等细节,因为这些都由代理对象处理。

  2. 减少重复性代码: 使用动态代理可以减少编写和维护远程调用代码的工作量。代理类负责处理通用的远程调用逻辑,开发者只需关注具体的业务逻辑。

  3. 集中管理远程调用逻辑: 动态代理将远程调用逻辑集中在一个地方,这样可以更容易地管理和维护,例如添加统一的错误处理、日志记录或性能监控等功能。

4.反射

在RPC(Remote Procedure Call,远程过程调用)中使用反射的主要目的是实现透明的远程调用,即使在客户端和服务器之间存在远程分离,也可以像调用本地方法一样调用远程服务。反射在RPC中的具体目的和用途如下:

  1. 动态代理生成代理对象: 反射机制允许在运行时生成代理对象,这些代理对象可以代替实际的远程服务对象执行方法调用。这样,客户端代码不需要提前知道要调用的具体远程方法和对象,而可以动态生成代理并执行方法。

  2. 动态识别方法和参数: 反射允许在运行时识别远程方法和方法参数的名称、类型和数量。这对于将方法调用信息打包成请求并传递给远程服务器非常有用,服务器可以根据这些信息正确地解析请求并调用相应的方法。

  3. 动态序列化和反序列化: 反射可以用于动态地序列化请求和响应数据。在RPC中,请求和响应数据通常需要以某种格式进行序列化和反序列化,以便在网络上进行传输。反射可以帮助动态识别数据类型,将数据转换为适当的格式,并在服务器端进行反序列化。

思维流程梳理

聊聊分布式架构04——RPC通信原理,分布式架构,分布式,架构,rpc

码起来

假设需求是客户端想要访问服务端(ip:prot/helloService/sayHello)上的helloService调用sayHello()。先定义服务端的实现。

服务端时序图

聊聊分布式架构04——RPC通信原理,分布式架构,分布式,架构,rpc

服务端—Api与Provider模块

服务端作为服务提供者,自然需要具备常规的业务模块:Api与Provider模块

聊聊分布式架构04——RPC通信原理,分布式架构,分布式,架构,rpc

Api模块定义简单的HelloService接口,包含两个方法sayHello(String content)和saveUSer(User user)

public interface IHelloService {
    String sayHello(String content);
    String saveUSer(User user);
}

定义一个简单对象User类

public class User {
    private String name;
    private int age;
​
    // getter和setter
    public String getName() {
        return name;
    }
​
    public void setName(String name) {
        this.name = name;
    }
​
    public int getAge() {
        return age;
    }
​
    public void setAge(int age) {
        this.age = age;
    }
​
    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

Provider模块定义HelloService接口的实现类HelloServiceImpl(rpc-server-provider模块的pom中需要添加rpc-server-api模块的依赖)

public class HelloServiceImpl implements IHelloService {
    @Override
    public String sayHello(String content) {
        System.out.println("request in sayHello:" + content);
        return "Say hello:" + content;
    }
​
    @Override
    public String saveUSer(User user) {
        System.out.println("request in saveUser:" + user);
        return "Save user success";
    }
}

定义RpcServerProxy通过代理的方式使用Socket对外暴露服务接口

public class RpcServerProxy {
    private ExecutorService executorService = Executors.newCachedThreadPool();
​
    public void publisher(Object service, int port) {
        ServerSocket serverSocket = null;
        try {
          serverSocket = new ServerSocket(port);
          while (true) {
              Socket socket = serverSocket.accept(); // 使用accept()等待客户端连接,接收请求
              executorService.execute(new ProcessorHandler(socket, service)); // 每一个socket交给一个processorHandler处理
          }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // 关闭资源,jdk1.7后提供了try-with可以自动关闭
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

需要定义一个线程ProcessorHandler对每次的socket请求进行处理

public class ProcessorHandler implements Runnable{
​
    private Socket socket;
    private Object service;
​
    public ProcessorHandler(Socket socket, Object service) {
        this.socket = socket;
        this.service = service;
    }
​
​
    @Override
    public void run() {
        // 使用ObjectOutputStream和ObjectInputStream配合socket的输入流输出流进行序列化和反序列化
        ObjectOutputStream objectOutputStream = null;
        ObjectInputStream objectInputStream = null;
        try {
            objectInputStream = new ObjectInputStream(socket.getInputStream());
​
            // 客户端传过来的信息:请求哪个类,哪个方法,方法的参数  ——>  封装为RpcRequest类
            RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject(); // 拿到客户端通信传递的请求类
            Object result = invoke(rpcRequest); // 反射调用本地服务
​
            objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(result);
            objectOutputStream.flush(); // 手动将缓冲区中的数据强制刷新到输出流中,以确保数据被立即写入底层的输出流。
​
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } finally {
            // 关闭关联资源,jdk1.7后提供了try-with可以自动关闭
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            if (objectOutputStream != null) {
                try {
                    objectOutputStream.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
​
    private Object invoke(RpcRequest rpcRequest) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        // 反射调用
        Object[] args = rpcRequest.getParameters(); // 获取客户端传递的RpcRequest中的参数
        Class<?>[] types = new Class[args.length]; // 获得每个参数的类型
        for (int i = 0; i < args.length; i++) {
            types[i] = args.getClass();
        }
        Class clazz = Class.forName(rpcRequest.getClassName()); // 根据请求的类名反射加载类
        Method method = clazz.getMethod(rpcRequest.getMethodName(), types); // 获取类中的方法
        Object result = method.invoke(service, args); // 进行反射调用方法
        return result;
    }
}

定义一个通用的RpcRequest类参与通信过程中的请求处理,这里是需要序列化的

public class RpcRequest implements Serializable {
    private String className;
    private String methodName;
    private Object[] parameters;
    
    // getter and setter
    public String getClassName() {
        return className;
    }
​
    public void setClassName(String className) {
        this.className = className;
    }
​
    public String getMethodName() {
        return methodName;
    }
​
    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }
​
    public Object[] getParameters() {
        return parameters;
    }
​
    public void setParameters(Object[] parameters) {
        this.parameters = parameters;
    }
}

Provider的app中发布下服务,运行没有报错,服务端OK

public class App 
{
    public static void main( String[] args )
    {
        IHelloService helloService = new HelloServiceImpl();
        RpcServerProxy rpcServerProxy = new RpcServerProxy();
        rpcServerProxy.publisher(helloService, 8080); // 把服务发布出去
    }
}

聊聊分布式架构04——RPC通信原理,分布式架构,分布式,架构,rpc

客户端时序图

聊聊分布式架构04——RPC通信原理,分布式架构,分布式,架构,rpc

定义动态代理RpcClientProxy,JDK的接口代理方式就是一句话:

public class RpcClientProxy {
    public <T> T clientProxy(final Class<T> interfaceCls, final String host, final int port) {
       return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port));
    }
}

定义被代理接口RemoteInvocationHandler

public class RemoteInvocationHandler implements InvocationHandler {
    private String host;
    private int port;
​
    public RemoteInvocationHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }
​
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 客户端请求会进入这里进行包装
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParameters(args);
        // 远程通信交给RpcNetTransport
        RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
        Object result = rpcNetTransport.send(rpcRequest);
        return result;
    }
}

定义通信传输类RpcNetTransport

public class RpcNetTransport {
    private String host;
    private int port;
​
    public RpcNetTransport(String host, int port) {
        this.host = host;
        this.port = port;
    }
​
    public Object send(RpcRequest rpcRequest) {
        Socket socket = null;
        Object result = null;
        ObjectInputStream objectInputStream = null;
        ObjectOutputStream objectOutputStream = null;
        try {
            socket = new Socket(host, port); // 建立连接
​
            objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); // 客户端通信写入
            objectOutputStream.writeObject(rpcRequest);
            objectOutputStream.flush();
​
            objectInputStream = new ObjectInputStream(socket.getInputStream()); // 客户端通信输出
            result = objectInputStream.readObject();
            return result;
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } finally {
            // 关闭关联资源
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            if (objectOutputStream != null) {
                try {
                    objectOutputStream.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        return result;
    }
}

客户端请求远程服务

public class App 
{
    public static void main( String[] args )
    {
        RpcClientProxy rpcClientProxy = new RpcClientProxy();
        IHelloService helloService = rpcClientProxy.clientProxy(IHelloService.class, "localhost", 8080);
        String result = helloService.sayHello("Elaine");
        System.out.println(result);
    }
}

请求成功,客户端获得返回结果

聊聊分布式架构04——RPC通信原理,分布式架构,分布式,架构,rpc

服务端打印了请求日志

聊聊分布式架构04——RPC通信原理,分布式架构,分布式,架构,rpc

到这里,一个简陋且粗糙的RPC通信版本就好了。

别灰心,要记住艾瑞莉娅的奶奶总说

路漫漫其修选兮,吾将上下而求索。文章来源地址https://www.toymoban.com/news/detail-728440.html

到了这里,关于聊聊分布式架构04——RPC通信原理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式系统消息通信技术:MOM与RPC

    中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件+平台+通信,这

    2024年02月11日
    浏览(41)
  • 聊聊分布式架构02——Http到Https

    目录 HTTP通信协议 请求报文 响应报文 持久连接 状态管理 HTTPS通信协议 安全的HTTPS HTTP到HTTPS的演变 对称加密 非对称加密 混合加密机制 证书机构 SSL到底是什么 HTTPS是身披SSL外壳的HTTP HTTP通信协议 一次HTTP请求的通信流程:客户端浏览器通过域名访问网页资源,由DNS解析得到

    2024年02月07日
    浏览(45)
  • RPC分布式网络通信框架(二)—— moduo网络解析

    网络部分,包括寻找rpc服务主机,发起rpc调用请求和响应rpc调用结果,使用muduo网络和zookeeper 服务配置中心 (专门做服务发现) 其中MprpcApplication类负责框架的一些初始化操作,注意去除类拷贝构造和移动构造函数(实现单例模式)。其中项目还构建了MprpcConfig类负责读取服

    2024年02月17日
    浏览(48)
  • RPC分布式网络通信框架(一)—— protobuf的使用

    常见序列化和反序列化协议有XML、JSON、protobuf,相比于其他protobuf更有优势: 1、protobuf是二进制存储的,xml和json都是文本存储的。故protobuf占用带宽较低 2、protobuf不需要存储额外的信息。 json如何存储数据?键值对。例:Name:”zhang san”, pwd: “12345”。 protobuf存储数据的方式

    2024年02月16日
    浏览(54)
  • 聊聊分布式架构08——SpringBoot开启微服务时代

    目录 微服务架构时代 快速入门 入门详解 SpringBoot的自动配置 石器时代:XML配置bean 青铜时代:SpringConfig 铁器时代:AutoConfigurationImportSelector 手写简单Starter SpringApplication启动原理 微服务架构时代 Spring Boot的出现与微服务架构有关,它是Spring Framework的一部分,旨在简化开发独

    2024年02月06日
    浏览(49)
  • RPC分布式网络通信框架(三)—— 服务配置中心Zookeeper模块

    分布式系统存在的问题: 为了支持高并发,每个客户端都保存了一份服务提供者的 列表 。但是如果 列表 有更新,想要得到最新的URL列表(rpc服务的ip和端口号),必须要手动更新配置文件,很不方便。 如图所示,实例3挂掉了,但是 列表 并没有得到更新。 故需要动态的更

    2024年02月15日
    浏览(48)
  • 聊聊分布式架构06——[NIO入门]简单的Netty NIO示例

    目录 Java NIO和Netty NIO比较 Java NIO: Netty: Netty NIO中的主要模块 Transport(传输层) Buffer(缓冲区) Codec(编解码器) Handler(处理器) EventLoop(事件循环) Bootstrap和Channel(引导和通道) Future和Promise(异步编程) Netty示例 服务端时序图 服务端代码 客户端时序图 客户端代码

    2024年02月07日
    浏览(37)
  • 聊聊Seata分布式解决方案AT模式的实现原理

    Seata是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。为用户提供了AT、TCC、SAGA和XA事务模式,为用户打造一站式的分布式解决方案。 AT模式目前来看是Seata框架独有的一种模式,其它的分布式框架上并没有此种模式的实现。其是由二阶段提

    2024年02月05日
    浏览(42)
  • 单体架构 微服务架构 分布式 微服务通信方式 网关与nginx

    单体架构 优点:架构简单,维护成本低 缺点:各个模块耦合度太高,当对一个模块进行更新修改时,会影响到其他模块,要一起进行修改。当存在性能瓶颈的时候,需要对整个服务进行扩容,不能有针对性的扩容,如一个程序的主要功能时其中某个服务,要对其增加机器,

    2024年02月10日
    浏览(60)
  • 【分布式技术专题】「分布式技术架构」 探索Tomcat技术架构设计模式的奥秘(Server和Service组件原理分析)

    Tomcat的总体结构从外到内进行分布,最大范围的服务容器是Server组件,Service服务组件(可以有多个同时存在),Connector(连接器)、Container(容器服务),其他组件:Jasper(Jasper解析)、Naming(命名服务)、Session(会话管理)、Logging(日志管理)、JMX(Java 管理器扩展服务

    2024年01月24日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包