Netty Reactor 模式解析

这篇具有很好参考价值的文章主要介绍了Netty Reactor 模式解析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

Reactor 模式        

具体流程

配置 

初始化

NioEventLoop 

ServerBootstrapAcceptor 分发


Reactor 模式        

在刚学 Netty 的时候,我们肯定都很熟悉下面这张图,它就是单Reactor多线程模型。

Netty Reactor 模式解析,java,开发语言

在写Netty 服务端代码的时候,下面的代码时必不可少的,这是为什么呢?

public static void main(String[] args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup(4);
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup)
              .channel(NioServerSocketChannel.class)
              .childHandler(new NettyServerHandler(), new NettyServerHandler2());
    System.out.println("netty server start...");
    bootstrap.bind(9000);
}

        在 Netty 里,EventLoopGroup 就是线程池,不论 bossGroup 还是 workerGroup,它们里面的线程都叫 EventLoop。

        EventLoopGroup 就是一个线程池,bossGroup 叫连接线程池,它一般只有一个线程,workerGroup 叫工作线程池,它一般会有多个线程。bossGroup 线程池里的线程专门监听客户端连接事件,监听是否有 SelectionKey.OP_ACCEPT 事件被触发,所以 1 个线程就够用了,当它监听到有客户端请求连接时,它会把这个连接交给 workerGroup 里的一个线程去处理,这个过程叫分发,这个工作线程会为这个客户端建立一个 NIOSocketChannel,并注册到这个工作线程绑定的IO多路复用选择器 Selector 里,一个Selector可以接受多个 NIOSocketChannel 的注册,所以一个工作线程可以处理多个客户端。   这就是Reactor 模式,一个工作线程可以处理多个客户端,比 Java 传统的一个客户端对应一个工作线程节约了很多线程,减少了大量线程创建,线程切换,线程销毁的开销,所以Netty 性能很好。

        上面短短的服务端代码做了很多工作,当它刚启动还没有客户端请求连接时,bossGroup 连接线程池里的一个线程 EventLoop 会初始化一个 NioServerSocketChannel ,并把这个Channel注册到这个EventLoop 持有的IO多路复用选择器Selector里,Selector 会监听Channel里的 SelectionKey.OP_ACCEPT 事件,一旦有客户端连接过来,它会通过下面代码获取到一个

SocketChannel ch = javaChannel().accept();

NioSocketChannel,并把这个 NioSocketChannel 注册到 workerGroup 工作线程池里的一个EventLoop 里,它使用了一个叫 ServerBootstrapAcceptor 的 ChannelInboundHandler接口类去完成这个过程,连接完成后,后续这个客户端和服务端的交互和数据读写都在这个 EventLoop 完成。

具体流程

        下面我们看一下代码,Netty 代码中使用了很多继承,在继承中可以把子类相同的部分代码提到父类去完成,很多子类生成初始化的时候,它会调用父类的构造方法去完成,这个要注意。

配置 

        下面的代码主要做一些启动器的配置,group(bossGroup, workerGroup) 会设置连接线程池和工作线程池,后面有连接事件或读事件过来要处理时,它会从这些线程池里取线程去执行;channel(NioServerSocketChannel.class) 指定要生成服务端Channel,它只会监听SelectionKey.OP_ACCEPT 事件,childHandler(new NettyServerHandler(), new NettyServerHandler2()) 是我们业务处理的逻辑。

bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new NettyServerHandler(), new NettyServerHandler2());

初始化

        bind() 会把 ServerBootstrapAcceptor 添加到 NioServerSocketChannel 的 pipeline ,它会处理连接;获取一个 bossGroup 线程池里的 EventLoop并和 NioServerSocketChannel  进行绑定。

bootstrap.bind(9000)
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> {
    public void bind(int inetPort) {
        doBind(new InetSocketAddress(inetPort));
    } 
    // bind 流程
    private void doBind(final SocketAddress localAddress) {
        initAndRegister();
        // 让channel绑定的线程处理
        channel.eventLoop().execute(()->{
            // 绑定指定端口
            channel.bind(localAddress);
        });
    } 
    // 初始化和注册
    final void initAndRegister() {
        init(channel);
        // 把 NioServerSocketChannel 注册到一个复杂连接事件的 EventLoop 的 Selector 里 
        group.register(channel);
    } 
    // 把 ServerBootstrapAcceptor 添加到 NioServerSocketChannel 的 pipeline 里
    abstract void init(Channel channel);
}

NioEventLoop 

        现在要说一下 NioEventLoop,它拥有一个IO多路复用选择器 Selector,这个线程会在一个死循环里工作,永远也会停止;这个线程它会先执行一下 selector.select(1000),阻塞监听1秒,看看有没有Channel有事件过来,有就去处理任务,没有就等待1秒钟再超时放弃,再看看自己的任务队列有没有可执行的任务,有就去处理任务,没有就继续进行死循环,继续执行 selector.select(1000)。无论是连接线程还是工作线程都这样处理,因为它们共用了这套逻辑。

public class NioEventLoop extends SingleThreadEventLoop {
    @Override
    protected void run() {
        for (;;) {
            try {
                select();
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                # 处理事件
                processSelectedKeys();
            } finally {
                runAllTasks();
            }
        }
    }
    private void select() throws IOException {
        // 拿到多路复用器
        Selector selector = this.selector;
        for (;;) {
            // 等待,简化固定1秒
            int selectedKeys = selector.select(1000);
            // 如果有事件发生或当前有任务跳出循环
            if (selectedKeys != 0 || hasTasks()) {
                break;
            }
        }
    }
}

像下面这种 channel.eventLoop().execute(Runnable), 它也只是把 Runnable 加入到任务处理队列,稍后执行。

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> { 
    private void doBind(final SocketAddress localAddress) {
        ...
        channel.eventLoop().execute(()->{ 
            channel.bind(localAddress);
        });
    }  
}
public abstract class SingleThreadEventExecutor implements Executor {    
    // 待执行任务队列
    private final Queue<Runnable> taskQueue;   
    @Override
    public void execute(Runnable task) {
        // 把任务添加到 EventLoop 的任务队列,EventLoop 是 SingleThreadEventExecutor 的子类
        addTask(task);
        // 执行 EventLoop 的 run 逻辑
        startThread();
    }
}

        当 NioServerSocketChannel.accept() 监听到一个客户端连接,它会把这个 NIOSocketChannel 通过 pipeline 处理,最终被 ServerBootstrapAcceptor 所处理,

public class NioServerSocketChannel extends AbstractNioMessageChannel {
    @Override
    protected int doReadMessages(List<Object> buf) {
        SocketChannel ch = null;
        try {
            ch = javaChannel().accept();
        } catch (IOException e) {
        }
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
        return 0;
    }
}
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    @Override
    public void read() {
        final ChannelPipeline pipeline = pipeline();
        doReadMessages(readBuf);
        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
    }

    protected abstract int doReadMessages(List<Object> buf);
}

ServerBootstrapAcceptor 分发

        ServerBootstrapAcceptor 管理 workerGroup 里的所有工作线程和所有的业务处理代码 ChannelHandler,ServerBootstrapAcceptor 会把所有的 ChannelHandler 放到刚刚监听得到的 NIOSocketChannel 里的 pipeline 里,并从 workerGroup 里选择一个 EventLoop 工作线程把NIOSocketChannel 注册到该 EventLoop 拥有的IO多路复用选择器 Selector 里去,这就完成了分发,它已经处理了连接,后续这个 NIOSocketChannel 里的所有读写事件都会被 Selector 监听到,并被该 EventLoop 工作线程所处理。

private static class ServerBootstrapAcceptor implements ChannelInboundHandler {
        // 工作线程池,即 workerGroup 
        private final EventLoopGroup childGroup;
        // 业务操作 Handler
        private final ChannelHandler[] childHandlers;

        private ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler[] childHandlers) {
            this.childGroup = childGroup;
            this.childHandlers = childHandlers;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            final Channel child = (Channel) msg;
            // 完成 pipeline 责任链模式的组装
            for (ChannelHandler childHandler : childHandlers) {
                child.pipeline().addLast(childHandler);
            }
            // 把Channel 注册到 Selector
            childGroup.register(child);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 略
        }
    }
 

文章来源地址https://www.toymoban.com/news/detail-824206.html

到了这里,关于Netty Reactor 模式解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Netty实战专栏 | Java网络编程深入解析

    ✅作者简介:大家好,我是Leo,热爱Java后端开发者,一个想要与大家共同进步的男人😉😉 🍎个人主页:Leo的博客 💞当前专栏: Netty实战专栏 ✨特色专栏: MySQL学习 🥭本文内容:Netty实战专栏 | Java网络编程深入解析 🖥️个人小站 :个人博客,欢迎大家访问 📚个人知识

    2024年02月06日
    浏览(54)
  • Redis源码篇 - Reactor设计模式 和 Redis Reactor设计模式

    Reactor :反应器模式或者应答者模式,它是一种基于事件驱动的设计模式。拥有一个或者多个输入源,通过反应器分发给多个worker线程处理,实现并发场景下事件处理。        此图网上找的,画的很好:

    2024年02月16日
    浏览(40)
  • 服务器IO复用reactor模式

    调试: Linux下nc命令作为客户端: nc 127.0.0.1 7777

    2024年02月10日
    浏览(48)
  • Springboot使用Netty连接Tcp接口(c语言二进制字节码转java字符串)

    一、引入netty的jar包 io.netty netty-all 二、使用netty框架 1、创建客户端 package com.iflytek.digtal.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel

    2024年02月20日
    浏览(50)
  • 【深入解析spring cloud gateway】08 Reactor 知识扫盲

    1.1 背景知识 为了应对高并发服务器端开发场景,在2009 年,微软提出了一个更优雅地实现异步编程的方式——Reactive Programming,我们称之为响应式编程。随后,Netflix 和LightBend 公司提供了RxJava 和Akka Stream 等技术,使得Java 平台也有了能够实现响应式编程的框架。 在2017 年9 月

    2024年02月09日
    浏览(39)
  • Reactor和Proactor两种高效的事件处理模式

             服务器程序通常需要处理三类事件: I/O 事件、信号及定时事件 。有两种高效的事件处理模式: Reactor 和 Proactor ,同步 I/O 模型通常用于实现 Reactor 模式,异步 I/O 模型通常用于实现 Proactor 模式。 Reactor 模式         要求主线程(I/O 处理单元)只负责监听文

    2024年01月18日
    浏览(45)
  • 【Linux】高级IO --- Reactor网络IO设计模式

    人其实很难抵制诱惑,人只能远离诱惑,所以千万不要高看自己的定力。 1. 多路转接接口select poll epoll所做的工作其实都是事件通知,只向上层通知事件到来,处理就绪事件的工作并不由这些API来完成,这些接口在进行事件通知时,有没有自己的策略呢? 其实是有的,在网络

    2024年02月09日
    浏览(53)
  • 架构篇19:单服务器高性能模式-Reactor与Proactor

    上篇介绍了单服务器高性能的 PPC 和 TPC 模式,它们的优点是实现简单,缺点是都无法支撑高并发的场景,尤其是互联网发展到现在,各种海量用户业务的出现,PPC 和 TPC 完全无能为力。今天我将介绍可以应对高并发场景的单服务器高性能架构模式:Reactor 和 Proactor。 PPC 模式

    2024年01月25日
    浏览(39)
  • 重读Java设计模式: 适配器模式解析

    在软件开发中,经常会遇到不同接口之间的兼容性问题。当需要使用一个已有的类,但其接口与我们所需的不兼容时,我们可以通过适配器模式来解决这一问题。适配器模式是一种结构型设计模式,它允许接口不兼容的类之间进行合作。本文将深入探讨适配器模式的概念、应

    2024年04月09日
    浏览(78)
  • Java解析器设计模式

    解析器模式是一种行为型设计模式。其思想是:给定一个语言, 定义它的文法的一种表示,并定义一个解释器,该解释器使用该表示来解释语言中的句子。 抽象表达式(Expression):声明一个所有的具体表达式的抽象接口,包含一个interpret()方法,称做解释操作。 终结符表达式

    2024年02月16日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包