Netty核心技术十一--用Netty 自己 实现 dubbo RPC

这篇具有很好参考价值的文章主要介绍了Netty核心技术十一--用Netty 自己 实现 dubbo RPC。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. RPC基本介绍

  1. RPC(Remote Procedure Call):远程 过程调用,是一个计算机 通信协议。该协议允许运 行于一台计算机的程序调 用另一台计算机的子程序, 而程序员无需额外地为这 个交互作用编程

  2. 两个或多个应用程序都分 布在不同的服务器上,它 们之间的调用都像是本地 方法调用一样(如图)

    Netty核心技术十一--用Netty 自己 实现 dubbo RPC,NIO&Netty,dubbo,rpc,网络协议

  3. 常见的 RPC 框架有: 比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpcx、Apache的thrift, Spring 旗下的 Spring Cloud。

2. RPC调用流程图

术语说明:在RPC中,

  • Client叫服务消费者
  • Server叫服务提供者

Netty核心技术十一--用Netty 自己 实现 dubbo RPC,NIO&Netty,dubbo,rpc,网络协议

3. PRC调用流程说明

  1. **服务消费方(client)**以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 地服务执行并将结果返回给 server stub
  7. server stub 将返回导入结果进行编码并发送至消费方
  8. client stub 接收到消息并进行解码
  9. 服务消费方(client)得到结果

小结:RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。

4. 自己实现 dubbo RPC(基于Netty)

  • 需求说明

    1. dubbo 底层使用了 Netty 作为网络通讯框架,要求用Netty 实现一个简单的RPC框架
    2. 模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用Netty4.1.20
  • 设计说明

    Netty核心技术十一--用Netty 自己 实现 dubbo RPC,NIO&Netty,dubbo,rpc,网络协议

    1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
    2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
    3. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用Netty请求提供者返回数据

4.1 公共接口 publicinterface包

4.1.1 HelloService

package site.zhourui.nioAndNetty.netty.dubborpc.publicinterface;

//这个是接口,是服务提供方和 服务消费方都需要
public interface HelloService {
    String hello(String msg);
}

4.2 远程调用netty包

本质上就是客户端访问服务端

4.2.1 NettyClientHandler

  1. 我们实现了Callable方法
  2. setPara(String para)方法: 设置要发给服务器端的信息
  3. 我们将ctx在channelActive时抽取为全局对象context,方便我们在其他方法也能使用(这里就是call方法)
  4. call方法:
    • 开启子线程向服务端发送消息
    • 发送完成后该子线程进行wait,等待服务提供方处理并返回数据(被唤醒)
    • 唤醒后打印服务端返回数据全局变量result中的数据
  5. channelRead方法:
    • 收到服务器的返回数据后,将返回数据放在全局变量result中
    • 唤醒等待的线程
    • 因为channelRead和call方法是有同步关系的所有要加上synchronized加锁
  6. 小结: 代码执行流程
    1. channelActive()
    2. setPara()设置需要发送的数据
    3. call(wait之前代码)被代理对象调用, 发送数据给服务器-> wait
    4. 等待被唤醒(channelRead)->notify
    5. call(wait之后代码)
package site.zhourui.nioAndNetty.netty.dubborpc.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext context;//上下文
    private String result; //返回的结果
    private String para; //客户端调用方法时,传入的参数

    //与服务器的连接创建后,就会被调用, 这个方法是第一个被调用(1)
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" channelActive 被调用  ");
        context = ctx; //因为我们在其它方法会使用到 ctx
    }

    //收到服务器的数据后,调用方法 (4)
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(" channelRead 被调用  ");
        result = msg.toString();
        notify(); //唤醒等待的线程
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    //被代理对象调用, 发送数据给服务器,-> wait -> 等待被唤醒(channelRead) -> 返回结果 (3)-》5
    @Override
    public synchronized Object call() throws Exception {
        System.out.println(" call1 被调用  ");
        context.writeAndFlush(para);
        //进行wait
        wait(); //等待channelRead 方法获取到服务器的结果后,唤醒
        System.out.println(" call2 被调用  ");
        return  result; //服务方返回的结果
    }

    void setPara(String para) {
        System.out.println(" setPara  ");
        this.para = para;
    }
}

4.2.2 NettyClient

说明:

  1. 创建线程池executor

  2. initClient():

    • 初始化NettyClientHandler 设为全局对象client
    • 创建客户端并连接客户端
      • StringDecoder():字符串编码器
      • StringEncoder():字符串解码器
      • pipeline.addLast(client):将加入自定义handler-client
  3. 编写方法getBean使用代理模式,获取一个代理对象

    public Object getBean(final Class<?> serivceClass, final String providerName) 
    
    • serivceClass: 需要代理的Class对象
    • providerName: 协议以及需要发送的数据
    • 如果client为空就初始化initClient
    • client.setPara():使用自定义handler的全局对象client设置需要发送的数据
    • executor.submit(client): 将我们的自定义handler提交给异步线程池,因为NettyClientHandler 实现了Callable方法,会自动调用call方法
      • .get():异步任务执行完成后获取返回结果
    • 将返回结果return
package site.zhourui.nioAndNetty.netty.dubborpc.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NettyClient {
    //创建线程池
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static NettyClientHandler client;
    private int count = 0;

    //编写方法使用代理模式,获取一个代理对象

    public Object getBean(final Class<?> serivceClass, final String providerName) {

        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serivceClass}, (proxy, method, args) -> {

                    System.out.println("(proxy, method, args) 进入...." + (++count) + " 次");
                    //{}  部分的代码,客户端每调用一次 hello, 就会进入到该代码
                    if (client == null) {
                        initClient();
                    }

                    //设置要发给服务器端的信息
                    //providerName 协议头 args[0] 就是客户端调用api hello(???), 参数
                    client.setPara(providerName + args[0]);

                    //
                    return executor.submit(client).get();

                });
    }

    //初始化客户端
    private static void initClient() {
        client = new NettyClientHandler();
        //创建EventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(client);
                            }
                        }
                );

        try {
            bootstrap.connect("127.0.0.1", 7000).sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4.2.3 NettyServerHandler

  • 当通道发生读事件时
    • 获取客户端发送的消息,并调用服务
    • 按照协议规则取出数据(HelloService#hello#)
      • HelloService# 为协议头
      • hello为数据
    • 回复客户端调用结果
package site.zhourui.nioAndNetty.netty.dubborpc.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import site.zhourui.nioAndNetty.netty.dubborpc.customer.ClientBootstrap;
import site.zhourui.nioAndNetty.netty.dubborpc.provider.HelloServiceImpl;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送的消息,并调用服务
        System.out.println("msg=" + msg);
        //客户端在调用服务器的api 时,我们需要定义一个协议
        //比如我们要求 每次发消息是都必须以某个字符串开头 "HelloService#hello#你好"
        if(msg.toString().startsWith(ClientBootstrap.providerName)) {

            String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(result);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

4.2.4 NettyServer

  • 启动客户端
    • StringDecoder
    • StringEncoder
    • NettyServerHandler
package site.zhourui.nioAndNetty.netty.dubborpc.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class NettyServer {
    public static void startServer(String hostName, int port) {
        startServer0(hostName,port);
    }

    //编写一个方法,完成对NettyServer的初始化和启动

    private static void startServer0(String hostname, int port) {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                                      @Override
                                      protected void initChannel(SocketChannel ch) throws Exception {
                                          ChannelPipeline pipeline = ch.pipeline();
                                          pipeline.addLast(new StringDecoder());
                                          pipeline.addLast(new StringEncoder());
                                          pipeline.addLast(new NettyServerHandler()); //业务处理器

                                      }
                                  }

                    );

            ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
            System.out.println("服务提供方开始提供服务~~");
            channelFuture.channel().closeFuture().sync();

        }catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

4.3 customer 包

4.3.1 ClientBootstrap

  • 设置providerName:我们发送的数据(协议+数据)
  • 创建一个消费者
  • 创建代理对象
  • 通过代理对象调用服务提供者的方法(服务)
package site.zhourui.nioAndNetty.netty.dubborpc.customer;

import site.zhourui.nioAndNetty.netty.dubborpc.netty.NettyClient;
import site.zhourui.nioAndNetty.netty.dubborpc.publicinterface.HelloService;

public class ClientBootstrap {
    //这里定义协议头
    public static final String providerName = "HelloService#hello#";

    public static void main(String[] args) throws  Exception{

        //创建一个消费者
        NettyClient customer = new NettyClient();

        //创建代理对象
        HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);

        for (;; ) {
            Thread.sleep(2 * 1000);
            //通过代理对象调用服务提供者的方法(服务)
            String res = service.hello("你好 dubbo~");
            System.out.println("调用的结果 res= " + res);
        }
    }
}

4.4 provider 包

4.4.1 HelloServiceImpl

服务端提供方的实现,远程真正被调用的方法

package site.zhourui.nioAndNetty.netty.dubborpc.provider;

import site.zhourui.nioAndNetty.netty.dubborpc.publicinterface.HelloService;

public class HelloServiceImpl implements HelloService {
    private static int count = 0;
    //当有消费方调用该方法时, 就返回一个结果
    @Override
    public String hello(String msg) {
        System.out.println("收到客户端消息=" + msg);
        //根据mes 返回不同的结果
        if(msg != null) {
            return "你好客户端, 我已经收到你的消息 [" + msg + "] 第" + (++count) + " 次";
        } else {
            return "你好客户端, 我已经收到你的消息 ";
        }
    }
}

4.4.1 ServerBootstrap

ServerBootstrap 会启动一个服务提供者,就是 NettyServer

package site.zhourui.nioAndNetty.netty.dubborpc.provider;

import site.zhourui.nioAndNetty.netty.dubborpc.netty.NettyServer;

//ServerBootstrap 会启动一个服务提供者,就是 NettyServer
public class ServerBootstrap {
    public static void main(String[] args) {

        //代码代填..
        NettyServer.startServer("127.0.0.1", 7000);
    }
}

4.5 测试

  1. 启动ServerBootstrap

    Netty核心技术十一--用Netty 自己 实现 dubbo RPC,NIO&amp;Netty,dubbo,rpc,网络协议

  2. 启动ClientBootstrap

    Netty核心技术十一--用Netty 自己 实现 dubbo RPC,NIO&amp;Netty,dubbo,rpc,网络协议

    Netty核心技术十一--用Netty 自己 实现 dubbo RPC,NIO&amp;Netty,dubbo,rpc,网络协议

4.5.1 debug看一下ClientBootstrap启动

首先还是先启动服务端ServerBootstrap

  1. debug启动ClientBootstrap

    Netty核心技术十一--用Netty 自己 实现 dubbo RPC,NIO&amp;Netty,dubbo,rpc,网络协议

  2. NettyClient(),此时只是初始化了全局属性

  3. getBean:创建代理对象

    • 先看看入参是什么数据

      Netty核心技术十一--用Netty 自己 实现 dubbo RPC,NIO&amp;Netty,dubbo,rpc,网络协议

    • 如果client没有被初始化就初始化

      Netty核心技术十一--用Netty 自己 实现 dubbo RPC,NIO&amp;Netty,dubbo,rpc,网络协议

    • 设置要发给服务器端的信息

      Netty核心技术十一--用Netty 自己 实现 dubbo RPC,NIO&amp;Netty,dubbo,rpc,网络协议

    • executor.submit:提交异步任务就会来到NettyClientHandler的call方法

      Netty核心技术十一--用Netty 自己 实现 dubbo RPC,NIO&amp;Netty,dubbo,rpc,网络协议

    • call方法执行到wait()方法后,channelRead不久后就会收到服务端的调用结果然后唤醒call方法的子线程继续执行

      Netty核心技术十一--用Netty 自己 实现 dubbo RPC,NIO&amp;Netty,dubbo,rpc,网络协议文章来源地址https://www.toymoban.com/news/detail-568677.html

到了这里,关于Netty核心技术十一--用Netty 自己 实现 dubbo RPC的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 微服务学习 | Springboot整合Dubbo+Nacos实现RPC调用

    🏷️ 个人主页 :鼠鼠我捏,要死了捏的主页  🏷️ 系列专栏 :Golang全栈-专栏 🏷️ 个人学习笔记,若有缺误,欢迎评论区指正   前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站AI学习网站。 目录 前言 快速上手

    2024年02月19日
    浏览(41)
  • 【Dubbo3云原生微服务开发实战】「Dubbo前奏导学」 RPC服务的底层原理和实现

    Dubbo是一款高效而强大的RPC服务框架,它旨在解决微服务架构下的服务监控和通信问题。该框架提供了Java、Golang等多语言的SDK,使得使用者可以轻松构建和开发微服务。Dubbo具备远程地址发现和通信能力,可通过Dubbo独有的身临其境的服务治理特验为主导,以提高开发人员的功

    2024年02月05日
    浏览(49)
  • 【SpringBoot集成Nacos+Dubbo】企业级项目集成微服务组件,实现RPC远程调用

    在日益增长的业务需求中,一开始使用的是每个项目独立开发,虽然都是前后端分离的项目,但是每一个项目之间互不干扰。后来,因为某种需求,需要几个项目的数据相互交错获取。 最开始的想法就是集成多个数据源。 举例 有A、B、C三个项目,对应着数据库DBa、DBb、DBc、

    2024年02月04日
    浏览(58)
  • Springboot3.X整合Dubbo3.XSpringCloudAlibaba微服务 2022.0 + Springboot3.X 集成 Dubbo实现对外调用http内部调用RPC

    近期自己新开了一套SpringCloud Alibaba微服务项目,接口使用了对外HTTP,内部RPC的设计,具体点说就是外部用户或客户端通过Nginx访问到Gateway网关再分发到各个服务,内部各个服务之间统一使用Dubbo RPC进行通信。下面是Springboot3.x集成Dubbo的分享: 1. 需要的关键依赖 2. 启动程序入

    2024年02月15日
    浏览(37)
  • 17游刃有余:动手实现自己的RPC框架(三)

    这篇文章我们来实现跨语言的网络通信。 跨语言RPC框架的必要性主要体现在以下几个方面: 解决不同语言之间的互操作性。不同语言使用的数据类型和序列化方式可能不同,跨语言 RPC 框架可以提供通用的编解码库和语言适配器,以便将不同语言的数据转换为通用的格式进行

    2024年02月14日
    浏览(38)
  • Dubbo源码浅析(一)—RPC框架与Dubbo

    RPC,Remote Procedure Call 即远程过程调用,与之相对的是本地服务调用,即LPC(Local Procedure Call)。本地服务调用比较常用,像我们应用内部程序 (注意此处是程序而不是方法,程序包含方法) 互相调用即为本地过程调用,而远程过程调用是指在本地调取远程过程进行使用。 而 RPC框

    2024年02月08日
    浏览(41)
  • Netty优化-rpc

    1.3 RPC 框架 1)准备工作 这些代码可以认为是现成的,无需从头编写练习 为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息 请求消息 响应消息 服务器架子 服务器 handler 客户端架子 客户端handler 服务器端的 service 获取 相关配置 application.properties 业务类 计数器

    2024年02月08日
    浏览(50)
  • 自定义Dubbo RPC通信协议

    Dubbo 协议层的核心SPI接口是 org.apache.dubbo.rpc.Protocol ,通过扩展该接口和围绕的相关接口,就可以让 Dubbo 使用我们自定义的协议来通信。默认的协议是 dubbo,本文提供一个 Grpc 协议的实现。 Google 提供了 Java 的 Grpc 实现,所以我们站在巨人的肩膀上即可,就不用重复造轮子了。

    2024年01月19日
    浏览(54)
  • 基于netty的rpc远程调用

    🚀🚀🚀这是一个手写RPC项目,用于实现远程过程调用(RPC)通信🚀🚀🚀 欢迎star串门 : https://github.com/red-velet/ 🚀Q-PRC 简单的RPC框架的实现 :该RPC框架实现了基本的远程过程调用功能,允许客户端通过网络调用远程服务的方法,实现分布式系统之间的通信和协作。 基于

    2024年02月14日
    浏览(36)
  • 分布式RPC框架Dubbo详解

    目录   1.架构演进 1.1 单体架构 1.2  垂直架构 1.3 分布式架构 1.4 SOA架构 1.5 微服务架构 2.RPC框架 2.1 RPC基本概念介绍 2.1.1 RPC协议 2.1.2 RPC框架 2.1.3 RPC与HTTP、TCP/ UDP、Socket的区别 2.1.4 RPC的运行流程  2.1.5 为什么需要RPC 2.2 Dubbo  2.2.1 Dubbo 概述 2.2.2 Dubbo实战   架构演进如下图: 这

    2024年02月07日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包