手写RPC框架--5.Netty业务逻辑

这篇具有很好参考价值的文章主要介绍了手写RPC框架--5.Netty业务逻辑。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RPC框架-Gitee代码(麻烦点个Starred, 支持一下吧)
RPC框架-GitHub代码(麻烦点个Starred, 支持一下吧)

5.Netty业务逻辑

a.加入基础的Netty代码

1.在DcyRpcBootstrap类的start()方法中加入netty代码 (待完善)

/**
 * 启动netty服务
 */
public void start() {
    // 1.创建EventLoopGroup,老板只负责处理请求,之后会将请求分发给worker,1比2的比例
    NioEventLoopGroup boss = new NioEventLoopGroup(2);
    NioEventLoopGroup worker = new NioEventLoopGroup(10);

    try{
        // 2.服务器端启动辅助对象
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        // 3.配置服务器
        serverBootstrap = serverBootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // TODO 核心内容,需要添加很多入栈和出栈的handler
                        socketChannel.pipeline().addLast(null);
                    }
                });

        // 4.绑定端口
        ChannelFuture channelFuture = serverBootstrap.bind(port).sync();

        // 5.阻塞操作
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        try {
            boss.shutdownGracefully().sync();
            worker.shutdownGracefully().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2.在ReferenceConfig类的get()方法中加入netty代码 (待完善)

/**
 * 代理设计模式,生成一个API接口的代理对象
 * @return 代理对象
 */
public T get() {
    // 使用动态代理完成工作
    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    Class[] classes = new Class[]{interfaceRef};

    // 使用动态代理生成代理对象
    Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 调用sayHi()方法,事实上会走进这个代码段当中
            // 已经知道method(具体的方法),args(参数列表)
            log.info("method-->{}", method.getName());
            log.info("args-->{}", args);

            // 1.发现服务,从注册中心,寻找一个可用的服务
            // 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
            InetSocketAddress address = registry.lookup(interfaceRef.getName());
            if (log.isInfoEnabled()){
                log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
            }
            // 2.使用netty连接服务器,发送 调用的 服务名字+方法名字+参数列表,得到结果
            // 定义线程池 EventLoopGroup
            NioEventLoopGroup group = new NioEventLoopGroup();
            // 启动一个客户端需要一个辅助类 bootstrap
            Bootstrap bootstrap = new Bootstrap();

            try {
                bootstrap = bootstrap.group(group)
                        .remoteAddress(address)
                        // 选择初始化一个什么样的channel
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(null);
                            }
                        });


                // 3.连接到远程节点;等待连接完成
                ChannelFuture channelFuture = bootstrap.connect().sync();
                // 4.获取channel并且写数据,发送消息到服务器端
                channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty".getBytes(StandardCharsets.UTF_8)));
                // 5.阻塞程序,等待接收消息
                channelFuture.channel().closeFuture().sync();

            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                try {
                    group.shutdownGracefully().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            return null;
        }
    });

    return (T) helloProxy;
}

b.对通道channel进行缓存

每次启动程序都会建立一个新的Netty连接,显示是对不合适的
解决方案:缓存channel,尝试从缓存中获取channel。如果为空,则创建新的连接并进行缓存

1.在DcyRpcBootstrap类的中添加一个全局的缓存:对通道进行缓存

// Netty的连接缓存
public static final Map<InetSocketAddress, Channel> CHANNEL_CACHE = new ConcurrentHashMap<>();

2.在ReferenceConfig类的get()方法中进行修改:查询缓存是否存在通道(address),若未命中,则建立新的channel并进行缓存

/**
 * 代理设计模式,生成一个API接口的代理对象
 * @return 代理对象
 */
public T get() {
    // 使用动态代理完成工作
    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    Class[] classes = new Class[]{interfaceRef};

    // 使用动态代理生成代理对象
    Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 调用sayHi()方法,事实上会走进这个代码段当中
            // 已经知道method(具体的方法),args(参数列表)
            log.info("method-->{}", method.getName());
            log.info("args-->{}", args);

            // 1.发现服务,从注册中心,寻找一个可用的服务
            // 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
            InetSocketAddress address = registry.lookup(interfaceRef.getName());
            if (log.isInfoEnabled()){
                log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
            }
            // 2.使用netty连接服务器,发送 调用的 服务名字+方法名字+参数列表,得到结果

            // 每次在这都会建立一个新的连接,对程序不合适
            // 解决方案:缓存channel,尝试从缓存中获取channel。如果为空,则创建新的连接并进行缓存
            // 1.从全局缓存中获取一个通道
            Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);

            if (channel == null) {
                // 建立新的channel
                // 定义线程池 EventLoopGroup
                NioEventLoopGroup group = new NioEventLoopGroup();
                // 启动一个客户端需要一个辅助类 bootstrap
                Bootstrap bootstrap = new Bootstrap();

                try {
                    bootstrap = bootstrap.group(group)
                            .remoteAddress(address)
                            // 选择初始化一个什么样的channel
                            .channel(NioSocketChannel.class)
                            .handler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketChannel socketChannel) throws Exception {
                                    socketChannel.pipeline().addLast(null);
                                }
                            });


                    // 3.尝试连接服务器
                    channel = bootstrap.connect().sync().channel();
                    // 缓存
                    DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);

                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            if (channel == null){
                throw new NetworkException("获取通道channel发生了异常。");
            }
            ChannelFuture channelFuture = channel.writeAndFlush(new Object());

            return null;
        }
    });

    return (T) helloProxy;
}

c.对代码进行重构优化

1.在com.dcyrpc.discovery下创建NettyBootstrapInitializer类:提供Bootstrap的单例

/**
 * 提供Bootstrap的单例
 */
public class NettyBootstrapInitializer {

    private static final Bootstrap bootstrap = new Bootstrap();
    
    static {
        NioEventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group)
                // 选择初始化一个什么样的channel
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(null);
                    }
                });
    }

    private NettyBootstrapInitializer() {
    }

    public static Bootstrap getBootstrap() {
        return bootstrap;
    }
}

2.在ReferenceConfig类的get()方法中进行代码的优化

/**
 * 代理设计模式,生成一个API接口的代理对象
 * @return 代理对象
 */
public T get() {
    // 使用动态代理完成工作
    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    Class[] classes = new Class[]{interfaceRef};

    // 使用动态代理生成代理对象
    Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 调用sayHi()方法,事实上会走进这个代码段当中
            // 已经知道method(具体的方法),args(参数列表)
            log.info("method-->{}", method.getName());
            log.info("args-->{}", args);

            // 1.发现服务,从注册中心,寻找一个可用的服务
            // 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
            InetSocketAddress address = registry.lookup(interfaceRef.getName());
            if (log.isInfoEnabled()){
                log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
            }
            // 2.使用netty连接服务器,发送 调用的 服务名字+方法名字+参数列表,得到结果

            // 每次在这都会建立一个新的连接,对程序不合适
            // 解决方案:缓存channel,尝试从缓存中获取channel。如果为空,则创建新的连接并进行缓存
            // 1.从全局缓存中获取一个通道
            Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);

            if (channel == null) {
                // await()方法会阻塞,会等待连接成功再返回
                // sync和await都是阻塞当前线程,获取返回值。因为连接过程和发送数据过程是异步的
                // 如果发生了异常,sync会主动在主线程抛出异常,await不会,异常在子线程中处理,需要使用future处理
//                    channel = NettyBootstrapInitializer.getBootstrap().connect(address).await().channel();

                // 使用addListener执行异步操作
                CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
                NettyBootstrapInitializer.getBootstrap().connect(address).addListener((ChannelFutureListener) promise -> {
                    if (promise.isDone()) {
                        // 异步的,已经完成
                        log.info("已经和【{}】成功建立连接。", address);
                        channelFuture.complete(promise.channel());
                    } else if (!promise.isSuccess()) {
                        channelFuture.completeExceptionally(promise.cause());
                    }
                });

                // 阻塞获取channel
                channel = channelFuture.get(3, TimeUnit.SECONDS);

                // 缓存channel
                DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
            }
            if (channel == null){
                throw new NetworkException("获取通道channel发生了异常。");
            }

            /**
             * ---------------------------同步策略---------------------------
             */
//                ChannelFuture channelFuture = channel.writeAndFlush(new Object()).await();
//                // get()阻塞获取结果
//                // getNow()获取当前的结果,如果未处理完成,返回null
//                if (channelFuture.isDone()) {
//                    Object object = channelFuture.getNow();
//                } else if (!channelFuture.isSuccess()) {
//                    // 发生问题,需要捕获异常。
//                    // 子线程可以捕获异步任务的异常
//                    Throwable cause = channelFuture.cause();
//                    throw new RuntimeException(cause);
//                }

            /**
             * ---------------------------异步策略---------------------------
             */
            CompletableFuture<Object> completableFuture = new CompletableFuture<>();
            // TODO 需要将completableFuture暴露出去
            channel.writeAndFlush(Unpooled.copiedBuffer("hello".getBytes())).addListener((ChannelFutureListener) promise -> {
                // 当前的promise返回的结果是,writeAndFlush的返回结果
                // 一旦数据被写出去,这个promise也就结束了
//                    if (promise.isDone()) {
//                        completableFuture.complete(promise.getNow());
//                    }

                // 只需要处理异常
                if (!promise.isSuccess()) {
                    completableFuture.completeExceptionally(promise.cause());
                }
            });

            return completableFuture.get(3, TimeUnit.SECONDS);
        }
    });

    return (T) helloProxy;
}

d.完成基础通信

1.在DcyRpcBootstrap类的start()方法中添加 handler:SimpleChannelInboundHandler

/**
 * 启动netty服务
 */
public void start() {
    // 1.创建EventLoopGroup,老板只负责处理请求,之后会将请求分发给worker,1比2的比例
    NioEventLoopGroup boss = new NioEventLoopGroup(2);
    NioEventLoopGroup worker = new NioEventLoopGroup(10);

    try{
        // 2.服务器端启动辅助对象
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        // 3.配置服务器
        serverBootstrap = serverBootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // TODO 核心内容,需要添加很多入栈和出栈的handler
                        socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                log.info("byteBuf --> {}", byteBuf.toString(Charset.defaultCharset()));

                                channelHandlerContext.channel().writeAndFlush(Unpooled.copiedBuffer("dcyrpc--hello".getBytes()));
                            }
                        });
                    }
                });

        // 4.绑定端口
        ChannelFuture channelFuture = serverBootstrap.bind(port).sync();

        // 5.阻塞操作
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        try {
            boss.shutdownGracefully().sync();
            worker.shutdownGracefully().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2.在NettyBootstrapInitializer类的初始化Netty的静态代码块中添加 handler:SimpleChannelInboundHandler

static {
    NioEventLoopGroup group = new NioEventLoopGroup();
    bootstrap.group(group)
            // 选择初始化一个什么样的channel
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                            log.info("msg --> {}", msg.toString(Charset.defaultCharset()));
                        }
                    });
                }
            });
}

e.异步获取服务器的返回结果

1.在DcyRpcBootstrap类的中添加一个全局的对外挂起的 completableFuture

// 定义全局的对外挂起的 completableFuture
public static final Map<Long, CompletableFuture<Object>> PENDING_REQUEST = new HashMap<>(128);

2.在ReferenceConfig类中的get()方法完成对,completableFuture暴露出去

/**
 * 代理设计模式,生成一个API接口的代理对象
 * @return 代理对象
 */
public T get() {
    // 使用动态代理完成工作
    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    Class[] classes = new Class[]{interfaceRef};

    // 使用动态代理生成代理对象
    Object helloProxy = Proxy.newProxyInstance(classLoader, classes, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            // 调用sayHi()方法,事实上会走进这个代码段当中
            // 已经知道method(具体的方法),args(参数列表)
            log.info("method-->{}", method.getName());
            log.info("args-->{}", args);

            // 1.发现服务,从注册中心,寻找一个可用的服务
            // 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
            InetSocketAddress address = registry.lookup(interfaceRef.getName());
            if (log.isInfoEnabled()){
                log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
            }
            
            // 1.从全局缓存中获取一个通道
            Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);

            if (channel == null) {
                // 使用addListener执行异步操作
                CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
                NettyBootstrapInitializer.getBootstrap().connect(address).addListener((ChannelFutureListener) promise -> {
                    if (promise.isDone()) {
                        // 异步的,已经完成
                        log.info("已经和【{}】成功建立连接。", address);
                        channelFuture.complete(promise.channel());
                    } else if (!promise.isSuccess()) {
                        channelFuture.completeExceptionally(promise.cause());
                    }
                });

                // 阻塞获取channel
                channel = channelFuture.get(3, TimeUnit.SECONDS);

                // 缓存channel
                DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
            }
            if (channel == null){
                log.error("获取或建立与【{}】通道时发生了异常。", address);
                throw new NetworkException("获取通道时发生了异常。");
            }

            CompletableFuture<Object> completableFuture = new CompletableFuture<>();
            // TODO 需要将completableFuture暴露出去
            DcyRpcBootstrap.PENDING_REQUEST.put(1L, completableFuture);

            channel.writeAndFlush(Unpooled.copiedBuffer("hello".getBytes())).addListener((ChannelFutureListener) promise -> {

                // 只需要处理异常
                if (!promise.isSuccess()) {
                    completableFuture.completeExceptionally(promise.cause());
                }
            });

            // 如果没有地方处理这个completableFuture,这里会阻塞等待 complete 方法的执行
            // 在Netty的pipeline中最终的handler的处理结果 调用complete
            return completableFuture.get(10, TimeUnit.SECONDS);
        }
    });

    return (T) helloProxy;
}

3.在NettyBootstrapInitializer类的初始化Netty的静态代码块中:寻找与之匹配的待处理 completeFuture

tatic {
    NioEventLoopGroup group = new NioEventLoopGroup();
    bootstrap.group(group)
            // 选择初始化一个什么样的channel
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                            // 异步
                            // 服务提供方,给予的结果
                            String result = msg.toString(Charset.defaultCharset());
                            // 从全局的挂起的请求中,寻找与之匹配的待处理 completeFuture
                            CompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(1L);
                            completableFuture.complete(result);
                        }
                    });
                }
            });
}

f.调整代码

在core模块com.dcyrpc下创建proxy.handler

在handler包下创建RpcConsumerInvocationHandler类,实现InvocationHandler接口

  • ReferenceConfig类下的InvocationHandler匿名内部类拷贝到该RpcConsumerInvocationHandler类中
/**
 * 该类封装了客户端通信的基础逻辑,每一个代理对象的远程调用过程都封装在invoke方法中
 * 1.发现可用服务
 * 2.建立连接
 * 3.发送请求
 * 4.得到结果
 */
@Slf4j
public class RpcConsumerInvocationHandler implements InvocationHandler {

    // 接口
    private Class<?> interfaceRef;

    // 注册中心
    private Registry registry;

    public RpcConsumerInvocationHandler(Class<?> interfaceRef, Registry registry) {
        this.interfaceRef = interfaceRef;
        this.registry = registry;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        // 1.发现服务,从注册中心,寻找一个可用的服务
        //  - 传入服务的名字,返回ip+端口 (InetSocketAddress可以封装端口/ip/host name)
        InetSocketAddress address = registry.lookup(interfaceRef.getName());
        if (log.isInfoEnabled()){
            log.info("服务调用方,发现了服务{}的可用主机{}", interfaceRef.getName(), address);
        }

        // 2.尝试获取一个可用的通道
        Channel channel = getAvailableChannel(address);
        if (log.isInfoEnabled()){
            log.info("获取了和【{}】建立的连接通道,准备发送数据", address);
        }

        /**
         * ---------------------------封装报文---------------------------
         */
        // 3.封装报文

        /**
         * ---------------------------同步策略---------------------------
         */
//                ChannelFuture channelFuture = channel.writeAndFlush(new Object()).await();
//                // get()阻塞获取结果
//                // getNow()获取当前的结果,如果未处理完成,返回null
//                if (channelFuture.isDone()) {
//                    Object object = channelFuture.getNow();
//                } else if (!channelFuture.isSuccess()) {
//                    // 发生问题,需要捕获异常。
//                    // 子线程可以捕获异步任务的异常
//                    Throwable cause = channelFuture.cause();
//                    throw new RuntimeException(cause);
//                }

        /**
         * ---------------------------异步策略---------------------------
         */

        // 4.写出报文
        CompletableFuture<Object> completableFuture = new CompletableFuture<>();
        // 将completableFuture暴露出去
        DcyRpcBootstrap.PENDING_REQUEST.put(1L, completableFuture);

        channel.writeAndFlush(Unpooled.copiedBuffer("hello".getBytes())).addListener((ChannelFutureListener) promise -> {
            // 需要处理异常
            if (!promise.isSuccess()) {
                completableFuture.completeExceptionally(promise.cause());
            }
        });

        // 如果没有地方处理这个completableFuture,这里会阻塞等待 complete 方法的执行
        // 在Netty的pipeline中最终的handler的处理结果 调用complete
        // 5.获得响应的结果
        return completableFuture.get(10, TimeUnit.SECONDS);
    }

    /**
     * 根据地址获取一个可用的通道
     * @param address
     * @return
     */
    private Channel getAvailableChannel(InetSocketAddress address) {
        // 1.尝试从缓存中获取通道
        Channel channel = DcyRpcBootstrap.CHANNEL_CACHE.get(address);

        // 2.拿不到就建立新连接
        if (channel == null) {

            // 使用addListener执行异步操作
            CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
            NettyBootstrapInitializer.getBootstrap().connect(address).addListener((ChannelFutureListener) promise -> {
                if (promise.isDone()) {
                    // 异步的,已经完成
                    log.info("已经和【{}】成功建立连接。", address);
                    channelFuture.complete(promise.channel());
                } else if (!promise.isSuccess()) {
                    channelFuture.completeExceptionally(promise.cause());
                }
            });

            // 阻塞获取channel
            try {
                channel = channelFuture.get(3, TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                log.error("获取通道时发生异常。{}", e);
                throw new DiscoveryException(e);
            }

            // 缓存channel
            DcyRpcBootstrap.CHANNEL_CACHE.put(address, channel);
        }

        // 3.建立连接失败
        if (channel == null){
            log.error("获取或建立与【{}】通道时发生了异常。", address);
            throw new NetworkException("获取通道时发生了异常。");
        }

        // 4.返回通道
        return channel;
    }
}

ReferenceConfig类的get()方法被修改为:让整个代码可读性更高,更简洁

/**
 * 代理设计模式,生成一个API接口的代理对象
 * @return 代理对象
 */
public T get() {
    // 使用动态代理完成工作
    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    Class<T>[] classes = new Class[]{interfaceRef};
    InvocationHandler handler = new RpcConsumerInvocationHandler(interfaceRef, registry);

    // 使用动态代理生成代理对象
    Object helloProxy = Proxy.newProxyInstance(classLoader, classes, handler);

    return (T) helloProxy;
}

g.处理handler (优化)

在core模块com.dcyrpc下创建channelhandler.handler

channelhandler.handler包下创建MySimpleChannelInboundHandler类:处理响应结果

继承 SimpleChannelInboundHandler<ByteBuf>,重写read0方法

拷贝NettyBootstrapInitializer静态代码块中的匿名内部类SimpleChannelInboundHandler的代码

public class MySimpleChannelInboundHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        // 异步
        // 服务提供方,给予的结果
        String result = msg.toString(Charset.defaultCharset());
        // 从全局的挂起的请求中,寻找与之匹配的待处理 completeFuture
        CompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(1L);
        completableFuture.complete(result);
    }
}

channelhandler包下创建ConsumerChannelInitializer,继承 ChannelInitializer<SocketChannel>,重写initChannel方法

拷贝NettyBootstrapInitializer静态代码块中的匿名内部类ChannelInitializer的代码

public class ConsumerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new MySimpleChannelInboundHandler());
    }
}

NettyBootstrapInitializer类的初始化Netty的静态代码块中:优化handler的匿名内部类文章来源地址https://www.toymoban.com/news/detail-703330.html

static {
    NioEventLoopGroup group = new NioEventLoopGroup();
    bootstrap.group(group)
            // 选择初始化一个什么样的channel
            .channel(NioSocketChannel.class)
            .handler(new ConsumerChannelInitializer());
}

到了这里,关于手写RPC框架--5.Netty业务逻辑的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Java框架】RPC远程调用

    RPC(Remote Procedure Call)叫作远程过程调用,它是利用网络从远程计算机上请求服务,可以理解为把程序的一部分放在其他远程计算机上执行。通过网络通信将调用请求发送至远程计算机后,利用远程计算机的系统资源执行这部分程序,最终返回远程计算机上的执行结果。 RP

    2024年02月15日
    浏览(37)
  • 10 - 网络通信优化之通信协议:如何优化RPC网络通信?

    微服务框架中 SpringCloud 和 Dubbo 的使用最为广泛,行业内也一直存在着对两者的比较,很多技术人会为这两个框架哪个更好而争辩。 我记得我们部门在搭建微服务框架时,也在技术选型上纠结良久,还曾一度有过激烈的讨论。当前 SpringCloud 炙手可热,具备完整的微服务生态,

    2024年02月11日
    浏览(32)
  • Netty优化-rpc

    1.3 RPC 框架 1)准备工作 这些代码可以认为是现成的,无需从头编写练习 为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息 请求消息 响应消息 服务器架子 服务器 handler 客户端架子 客户端handler 服务器端的 service 获取 相关配置 application.properties 业务类 计数器

    2024年02月08日
    浏览(50)
  • RPC分布式网络通信框架(二)—— moduo网络解析

    网络部分,包括寻找rpc服务主机,发起rpc调用请求和响应rpc调用结果,使用muduo网络和zookeeper 服务配置中心 (专门做服务发现) 其中MprpcApplication类负责框架的一些初始化操作,注意去除类拷贝构造和移动构造函数(实现单例模式)。其中项目还构建了MprpcConfig类负责读取服

    2024年02月17日
    浏览(47)
  • 基于netty的rpc远程调用

    🚀🚀🚀这是一个手写RPC项目,用于实现远程过程调用(RPC)通信🚀🚀🚀 欢迎star串门 : https://github.com/red-velet/ 🚀Q-PRC 简单的RPC框架的实现 :该RPC框架实现了基本的远程过程调用功能,允许客户端通过网络调用远程服务的方法,实现分布式系统之间的通信和协作。 基于

    2024年02月14日
    浏览(34)
  • 【Flink网络通讯(一)】Flink RPC框架的整体设计

    我们从整体的角度看一下Flink RPC通信框架的设计与实现,了解其底层Akka通信框架的基础概念及二者之间的关系。   Akka是使用Scala语言编写的库,用于在JVM上简化编写具有可容错、高可伸缩性的Java或Scala的Actor模型。Akka基于Actor模型,提供了一个用于构建可扩展、弹性、快速响

    2024年02月21日
    浏览(39)
  • RPC分布式网络通信框架(一)—— protobuf的使用

    常见序列化和反序列化协议有XML、JSON、protobuf,相比于其他protobuf更有优势: 1、protobuf是二进制存储的,xml和json都是文本存储的。故protobuf占用带宽较低 2、protobuf不需要存储额外的信息。 json如何存储数据?键值对。例:Name:”zhang san”, pwd: “12345”。 protobuf存储数据的方式

    2024年02月16日
    浏览(52)
  • 如何手写一个RPC?

    在学习 RPC 框架之前,我们先来手写一个RPC。 我们在学习的过程中,一定要做到知其然,还要知其所以然。 单体架构 要知道,在以前单体架构的时候,会将所有的应用功能都集中在一个服务当中。 单体架构初始开发简单,所有的功能都在一个项目中,容易理解整个应用的业

    2024年01月17日
    浏览(39)
  • 手写简单的RPC

    RPC(Remote Procedure Call,远程过程调用)是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布

    2024年04月22日
    浏览(45)
  • 手写rpc和redis

    rpc框架搭建 consumer 消费者应用 provider 提供的服务 Provider-common 公共类模块 rpc 架构 service-Registration 服务发现 nacos nacos配置中心 load-balancing 负载均衡 redis-trench 手写redis实现和链接 rpc框架核心代码 相关的gitub仓库地址:(https://github.com/zhaoyiwen-wuxian/RpcTrench.git) master分支,进行切

    2024年01月24日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包