Netty优化-rpc

这篇具有很好参考价值的文章主要介绍了Netty优化-rpc。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.3 RPC 框架

1)准备工作

这些代码可以认为是现成的,无需从头编写练习

为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息

@Data
public abstract class Message implements Serializable {

    // 省略旧的代码

    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;

    static {
        // ...
        messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }

}

请求消息

@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {

    /**
     * 调用的接口全限定名,服务端根据它找到实现
     */
    private String interfaceName;
    /**
     * 调用接口中的方法名
     */
    private String methodName;
    /**
     * 方法返回类型
     */
    private Class<?> returnType;
    /**
     * 方法参数类型数组
     */
    private Class[] parameterTypes;
    /**
     * 方法参数值数组
     */
    private Object[] parameterValue;

    public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
        super.setSequenceId(sequenceId);
        this.interfaceName = interfaceName;
        this.methodName = methodName;
        this.returnType = returnType;
        this.parameterTypes = parameterTypes;
        this.parameterValue = parameterValue;
    }

    @Override
    public int getMessageType() {
        return RPC_MESSAGE_TYPE_REQUEST;
    }
}

响应消息

@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
    /**
     * 返回值
     */
    private Object returnValue;
    /**
     * 异常值
     */
    private Exception exceptionValue;

    @Override
    public int getMessageType() {
        return RPC_MESSAGE_TYPE_RESPONSE;
    }
}

服务器架子

@Slf4j
public class RpcServer {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        
        // rpc 请求消息处理器,待实现
        RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

服务器 handler

@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
        RpcResponseMessage response = new RpcResponseMessage();
        response.setSequenceId(message.getSequenceId());
        try {
            // 获取真正的实现对象
            HelloService service = (HelloService)
                    ServicesFactory.getService(Class.forName(message.getInterfaceName()));

            // 获取要调用的方法
            Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());

            // 调用方法
            Object invoke = method.invoke(service, message.getParameterValue());
            // 调用成功
            response.setReturnValue(invoke);
        } catch (Exception e) {
            e.printStackTrace();
            // 调用异常
            response.setExceptionValue(e);
        }
        // 返回结果
        ctx.writeAndFlush(response);
    }

    public static void main(String[] args) throws Exception {
        RpcRequestMessage message = new RpcRequestMessage(
                1,
                "cn.itcast.rpc.service.HelloService",
                "sayHello",
                String.class,
                new Class[]{String.class},
                new Object[]{"张三"}
        );
        // 获取真正的实现对象
        HelloService service = (HelloService)
                ServicesFactory.getService(Class.forName(message.getInterfaceName()));
        Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
        Object invoke = method.invoke(service, message.getParameterValue());
        System.out.println(invoke);
    }
}

客户端架子

@Slf4j
public class RpcClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        
        // rpc 响应消息处理器,待实现
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ProcotolFrameDecoder());
                    ch.pipeline().addLast(LOGGING_HANDLER);
                    ch.pipeline().addLast(MESSAGE_CODEC);
                    ch.pipeline().addLast(RPC_HANDLER);
                }
            });
            Channel channel = bootstrap.connect("localhost", 8080).sync().channel();

            ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(
                    1,
                    "cn.itcast.rpc.service.HelloService",
                    "sayHello",
                    String.class,
                    new Class[]{String.class},
                    new Object[]{"张三"}
            )).addListener(promise -> {
                if (!promise.isSuccess()) {
                    Throwable cause = promise.cause();
                    log.error("error", cause);
                }
            });

            channel.closeFuture().sync();
        } catch (Exception e) {
            log.error("client error", e);
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端handler

@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        log.debug("{}", msg);
    }
}

服务器端的 service 获取

public class ServicesFactory {

    static Properties properties;
    static Map<Class<?>, Object> map = new ConcurrentHashMap<>();

    static {
        try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
            properties = new Properties();
            properties.load(in);
            Set<String> names = properties.stringPropertyNames();
            for (String name : names) {
                if (name.endsWith("Service")) {
                    Class<?> interfaceClass = Class.forName(name);
                    Class<?> instanceClass = Class.forName(properties.getProperty(name));
                    map.put(interfaceClass, instanceClass.newInstance());
                }
            }
        } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
            throw new ExceptionInInitializerError(e);
        }
    }

    public static <T> T getService(Class<T> interfaceClass) {
        return (T) map.get(interfaceClass);
    }
}

相关配置 application.properties

serializer.algorithm=Json
cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl

业务类

public interface HelloService {
    String sayHello(String name);
}

public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHello(String msg) {
        //int i = 1 / 0;
        return "你好, " + msg;
    }
}

计数器

public abstract class SequenceIdGenerator {
    private static final AtomicInteger id = new AtomicInteger();

    public static int nextId() {
        return id.incrementAndGet();
    }
}

客户端最终代码

@Slf4j
public class RpcClientManager {

    public static void main(String[] args) throws IOException {
        HelloService service = getProxyService(HelloService.class);
        System.out.println(service.sayHello("zhangsan"));
        System.out.println(service.sayHello("lisi"));
    }

    public static <T> T getProxyService(Class<T> serviceClass) {
        ClassLoader classLoader = serviceClass.getClassLoader();
        Class<?>[] interfaces = new Class[]{serviceClass};
        Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
            // 1. 将方法调用转换为 消息对象
            int sequenceId = SequenceIdGenerator.nextId();
            RpcRequestMessage msg = new RpcRequestMessage(
                    sequenceId,
                    serviceClass.getName(),
                    method.getName(),
                    method.getReturnType(),
                    method.getParameterTypes(),
                    args
            );
            // 2. 将消息对象发送出去
            getChannel().writeAndFlush(msg);

            // 3. 准备一个空 Promise 对象,来接收结果             指定 promise 对象异步接收结果线程
            DefaultPromise<Object> promise = new DefaultPromise<>(channel.eventLoop());
            RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);

            log.debug("主线程开始等待...");
            // 4. 阻塞等待 promise 结果
            promise.await();
            log.debug("主线程放行...");
            if (promise.isSuccess()) {
                return promise.getNow();
            } else {
                throw new RuntimeException(promise.cause());
            }
        });
        return (T) o;
    }

    private static Channel channel = null;
    private static final Object LOCK = new Object();

    public static Channel getChannel() {
        if (channel != null) {
            return channel;
        }
        synchronized (LOCK) {
            if (channel != null) {
                return channel;
            }
            initChannel();
            return channel;
        }
    }

    //初始化channel
    private static void initChannel() {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.group(group);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ProcotolFrameDecoder());
                ch.pipeline().addLast(LOGGING_HANDLER);
                ch.pipeline().addLast(MESSAGE_CODEC);
                ch.pipeline().addLast(RPC_HANDLER);
            }
        });
        try {
            channel = bootstrap.connect("localhost", 8080).sync().channel();
            channel.closeFuture().addListener(future -> {
                group.shutdownGracefully();
            });
        } catch (Exception e) {
            log.error("client error", e);
        }
    }
}

服务端不变,服务端handler

@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
        RpcResponseMessage response = new RpcResponseMessage();
        response.setSequenceId(message.getSequenceId());
        try {
            HelloService service = (HelloService)
                    ServicesFactory.getService(Class.forName(message.getInterfaceName()));
            Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
            Object invoke = method.invoke(service, message.getParameterValue());
            response.setReturnValue(invoke);
        } catch (Exception e) {
            e.printStackTrace();
            String msg = e.getCause().getMessage();
            response.setExceptionValue(new Exception("远程调用出错:" + msg));
        }
        log.error(response.toString());
        ctx.writeAndFlush(response);
    }
}

执行结果文章来源地址https://www.toymoban.com/news/detail-717624.html

16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程开始等待...
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - 16909060, 1, 0, 102, 1, 228
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - RpcResponseMessage(super=Message(sequenceId=1, messageType=102), returnValue=你好, zhangsan, exceptionValue=null)
16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程放行...
你好, zhangsan
16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程开始等待...
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - 16909060, 1, 0, 102, 2, 224
16:42:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - RpcResponseMessage(super=Message(sequenceId=2, messageType=102), returnValue=你好, lisi, exceptionValue=null)
16:42:29 [DEBUG] [main] c.i.r.RpcClientManager - 主线程放行...
你好, lisi

到了这里,关于Netty优化-rpc的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 手写RPC框架--5.Netty业务逻辑

    RPC框架-Gitee代码(麻烦点个Starred, 支持一下吧) RPC框架-GitHub代码(麻烦点个Starred, 支持一下吧) 1.在 DcyRpcBootstrap 类的 start() 方法中加入netty代码 (待完善) 2.在 ReferenceConfig 类的 get() 方法中加入netty代码 (待完善) 每次启动程序都会建立一个新的Netty连接,显示是对不合适的 解决方案

    2024年02月09日
    浏览(46)
  • 【netty系列-01】深入理解网络通信基本原理和tcp/ip协议

    Netty系列整体栏目 内容 链接地址 【一】深入理解网络通信基本原理和tcp/ip协议 https://zhenghuisheng.blog.csdn.net/article/details/136359640 【二】深入理解Socket本质和BIO https://zhenghuisheng.blog.csdn.net/article/details/136549478 在最初的网络中,是借鉴于这个OSI七层网络模型,而在实际开发应用中

    2024年03月17日
    浏览(62)
  • Springboot整合Netty实现RPC服务器

    try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 60)); pipeline.addLast(new JsonDecoder()); pipeline.addLast(new JsonEnco

    2024年04月09日
    浏览(43)
  • 【基于netty+zookeeper的rpc远程调用框架】首篇——缘起

    🐼 作者简介:一名大三在校生🎋 空有想法,没有实践 缘起 作为一名即将步入社会的大三学生,我深知一份优秀的简历对于求职的重要性。暑期实习作为大学生涯中的一个重要节点,不仅是锻炼自己、积累经验的宝贵机会,更是向未来雇主展示自己能力和潜力的关键时期。

    2024年04月26日
    浏览(43)
  • Dubbo源码解析第一期:如何使用Netty4构建RPC

            早期学习和使用Dubbo的时候(那时候Dubbo还没成为Apache顶级项目),写过一些源码解读,但随着Dubbo发生了翻天覆地的变化,那些文章早已过时,所以现在计划针对最新的Apache Dubbo源码来进行“阅读理解”,希望和大家一起再探Dubbo的实现。由于能力有限,如果文章

    2024年01月21日
    浏览(37)
  • Netty 系列教程——Netty 与 HTTP 协议

    作者:禅与计算机程序设计艺术 Netty 是由 JBOSS 提供的一个 Java 高性能网络应用程序框架。通过 Netty 的高效的 buffer 池、通道 pipeline、事件驱动机制等技术实现了天生的异步非阻塞特性。为了简化开发难度并提升开发效率,Netty 推出了一系列的开源组件如 Socket 通信模块(ne

    2024年02月06日
    浏览(39)
  • Netty系列(一):Springboot整合Netty,自定义协议实现

    Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单

    2023年04月25日
    浏览(55)
  • 10 - 网络通信优化之通信协议:如何优化RPC网络通信?

    微服务框架中 SpringCloud 和 Dubbo 的使用最为广泛,行业内也一直存在着对两者的比较,很多技术人会为这两个框架哪个更好而争辩。 我记得我们部门在搭建微服务框架时,也在技术选型上纠结良久,还曾一度有过激烈的讨论。当前 SpringCloud 炙手可热,具备完整的微服务生态,

    2024年02月11日
    浏览(38)
  • 从无到有手写一个基于Netty+Kyro+Zookeeper的RPC框架,javaweb面试题目整理

    通过这个简易的轮子,你可以学到 RPC 的底层原理和原理以及各种 Java 编码实践的运用。 你甚至可以把这个当做你的毕设/项目经验的选择,这是非常不错!对比其他求职者的项目经验都是各种系统,造轮子肯定是更加能赢得面试官的青睐。 介绍 这是一款基于 Netty+Kyro+Zookee

    2024年04月15日
    浏览(48)
  • MQTT协议基本讲解(结合netty)

    这里主要讲一下MQTT的结构,另外结合netty来说下具体参数的设置问题。 当然如果你想了解MQTT的所有设计规则,并且该怎么去实现每个规则细节,那么推荐你去 EMQX 官网,这个框架不仅对MQTT进行了深入的,通俗易懂的讲解,更是一个性能非常不错的MQTT broker服务端。 首先基本

    2024年02月08日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包