Netty源码解读

这篇具有很好参考价值的文章主要介绍了Netty源码解读。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Netty源码解读

Netty线程模型

Netty源码解读
1、定义了两组线程池BossGroup和WorkerGroup,BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写
2、BossGroup和WorkerGroup类型都是NioEventLoopGroup,Group中维护了多个事件循环线程NioEventLoop,每个NioEventLoop维护了一个Selector和TaskQueue
3、每个Boss NioEventLoop线程内部循环执行的步骤有 3 步
3.1、处理accept事件 , 与client 建立连接 , 生成 NioSocketChannel
3.2、将NioSocketChannel注册到某个worker NIOEventLoop上的selector
3.3、runAllTasks处理任务队列TaskQueue的任务
4、 每个worker NioEventLoop线程循环执行的步骤
4.1、轮询注册到自己selector上的所有NioSocketChannel 的read, write事件
4.2、处理 I/O 事件, 即read , write 事件, 在对应NioSocketChannel 处理业务
4.3、runAllTasks处理任务队列TaskQueue的任务 ,一些耗时的业务处理一般可以放入TaskQueue中慢慢处理,这样不影响数据在pipeline中的流动处理
4.4、处理NioSocketChannel业务时,会使用 pipeline (管道),管道中维护了很多 handler处理器用来处理 channel 中的数据

Netty服务启动示例

// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(2);
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                //对workerGroup的SocketChannel设置handler处理器
                ch.pipeline().addLast(new NettyServerHandler());
            }
        });
// 启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(9099).sync();

Netty源码解读

Netty源码分析

从bootstrap.bind作为入口分析启动流程,进入后可以看到会调用AbstractBootstrap#doBind,最终会调用initAndRegister()方法,主要逻辑都在前三步中实现,本次也主要分析这三个步骤

# AbstractBootstrap类
// 1、创建一个服务端Channel,即NioServerSocketChannel
channel = channelFactory.newChannel();
// 2、初始化NioServerSocketChannel,在pipeline中添加一些处理器hander
init(channel);
// 3、进行注册
ChannelFuture regFuture = config().group().register(channel);
// 把NioServerSocketChannel绑定到指定端口
channel.bind(localAddress, promise);

channelFactory.newChannel();

bootstrap.channel(NioServerSocketChannel.class) 会将serverChannel绑定到ReflectiveChannelFactory上

# AbstractBootstrap类

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

channelFactory.newChannel()会调用ReflectiveChannelFactory的newChannel方法,进而调用constructor.newInstance(),而该constructor正好是NioServerSocketChannel类;所以new的对象就是NioServerSocketChannel
服务端NioServerSocketChannel进行初始化
1、设置感兴趣事件为连接事件OP_ACCEPT
2、设置channel为非阻塞
3、初始化服务端pipeline

# NioServerSocketChannel类

public NioServerSocketChannel(ServerSocketChannel channel) {
    // 将感兴趣的事件设置为连接事件OP_ACCEPT
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

// 父类初始化方法 ch 即为NioServerSocketChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    // 设置为非阻塞
    ch.configureBlocking(false);
}

// 父类的父类中初始化pipeline,此时只有HeadContext和TailContext
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

Netty源码解读

init(channel)

调用ServerBootstrap.init方法,向服务端NioServerSocketChannel的pipeline中添加hander处理器ChannelInitializer;此时服务端pipeline链表中的hander如下
Netty源码解读

# ServerBootstrap 类

void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    //向 pipeline中添加hander处理器ChannelInitializer
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

Netty源码解读

config().group().register(channel)

bootstrap.group(bossGroup, workerGroup)构造时,将group设置为bossGroup,childGroup设置为workerGroup; config().group().register(channel)会调用bossGroup的register方法,从bossGroup的MultithreadEventLoopGroup线程组中取一个线程SingleThreadEventLoop进行调用register方法

register注册逻辑

服务端的NioServerSocketChannel和客户端的NioSocketChannel都会调用此方法进行注册
1、服务启动时,NioServerSocketChannel注册到selector上,对客户端OP_ACCEPT操作感兴趣
2、当有客户端连接时,通过NioServerSocketChannel的accept()得到每个客户端的NioSocketChannel,将其注册到selector上,对客户端OP_READ操作感兴趣

# SingleThreadEventLoop extends SingleThreadEventExecutor 类

public ChannelFuture register(final ChannelPromise promise) {
    promise.channel().unsafe().register(this, promise);
    return promise;
}

调用AbstractChannel的register方法,创建一个注册的task交给EventLoop线程处理

# AbstractChannel 类

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    .......
    AbstractChannel.this.eventLoop = eventLoop;
    .......
    // 1、处理连接事件时,用的是bossGroup里的NioEventLoop
    // 2、处理读写事件时,用的是workGroup里的NioEventLoop
    eventLoop.execute(new Runnable() {
        @Override
        public void run() {
            register0(promise);
        }
    });
}

private void register0(ChannelPromise promise) {
    doRegister();
    // 1、NioServerSocketChannel 处理逻辑
        // 调用NioServerSocketChannel服务端pipeline中hander的handlerAdded方法
        // 此时会调用到ChannelInitializer的handlerAdded,然后调用其initChannel,该方法中
        // 会向服务端pipeline中加入ServerBootstrapAcceptor
        // 调用服务端pipeline中hander的channelRegistered方法
        // 调用服务端pipeline中hander的ChannelActive方法
    // 2、NioSocketChannel 处理逻辑 调用我们自定义hander中的方法
        // 调用客户端pipeline中hander的handlerAdded方法
        // 调用客户端pipeline中hander的channelRegistered方法
        // 调用客户端pipeline中hander的ChannelActive方法,我们自定义hander的ChannelActive在此调用
    pipeline.invokeHandlerAddedIfNeeded();
    pipeline.fireChannelRegistered();
    pipeline.fireChannelActive();

}
// doRegister()逻辑由子类AbstractNioChannel实现
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 将channel注册到Selector上
            // 1、NioServerSocketChannel注册到Selector上
            // 2、NioSocketChannel注册到Selector上
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
        }
    }
}

Netty源码解读

eventLoop.execute就是调用SingleThreadEventExecutor#execute

# SingleThreadEventExecutor 类
@Override
public void execute(Runnable task) {
    // 将注册register0逻辑加入队列taskQueue
    addTask(task);
    // 开启线程循环监听事件,会调用SingleThreadEventExecutor.run方法
	// 最终调用子类NioEventLoop的run()方法
    startThread();
}

死循环执行 selector.select方法,直到监听到事件或者超时,才会执行processSelectedKeys逻辑
1、服务端启动后,NioServerSocketChannel若监听到客户端OP_ACCEPT操作,则会执行processSelectedKeys逻辑,若超时,则继续下一次循环
2、客户端连接成功后,NioSocketChannel若监听到客户端OP_READ操作,则会执行processSelectedKeys逻辑,若超时,则继续下一次循环

# NioEventLoop 类
@Override
protected void run() {
    for (;;) {
        try {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    ....
                    case SelectStrategy.SELECT:
                        // 该方法监听到事件(OP_ACCEPT|OP_READ)时才会返回
                        select(wakenUp.getAndSet(false));
                    default:
                }
            } catch (IOException e) {
            	.....
            }
            // 监听到事件执行
            try {
                // 1、获取SelectionKey处理事件
                processSelectedKeys();
            } finally {
                // 2、执行taskQueue中其他的注册方法register0
                runAllTasks();
            }
            
        }
   }
}   

private void select(boolean oldWakenUp) throws IOException {
    // 一直循环遍历
    int selectCnt = 0;
    for (;;) {
        // 根据注册的定时任务,获取本次select的阻塞时间
        long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    	// 没有监听到事件或没有超时,则一直阻塞(会让出cpu资源)
        int selectedKeys = selector.select(timeoutMillis);
        selectCnt ++;
        if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
	        // 正常场景
            // 当有连接|读写操作或者selector被唤醒了,则直接返回
            break;
        }

        long time = System.nanoTime();
        if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
            // 正常场景
            // 说明没有监听到事件,而是超时了,则重置selectCnt
            selectCnt = 1;
        } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // 异常场景  select 空轮询bug修复
            // 若空轮询次数超过SELECTOR_AUTO_REBUILD_THRESHOLD配置
            // 则关闭老的select,建立新的select
            selector = selectRebuildSelector(selectCnt);
            selectCnt = 1;
            break;
        }
        currentTimeNanos = time;
    }
}

private void processSelectedKeysOptimized() {
    // 遍历所有的selectedKeys进行处理
    for (int i = 0; i < selectedKeys.size; ++i) {
         processSelectedKey(k, (AbstractNioChannel) a);
    }
}
            
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        // 连接|读写操作会调用该方法
        // 1、连接操作调用NioMessageUnsafe的read方法
        // 2、读写操作调用NioByteUnsafe的read方法
        unsafe.read();
    }
}

Netty源码解读

OP_ACCEPT连接操作处理
1、为每个客户端创建NioSocketChannel,并进行初始化
1.1、设置感兴趣事件为OP_READ
1.2、设置channel为非阻塞
1.3、初始化客户端pipeline
2、调用服务端NioServerSocketChannel的pipeline,将客户端的NioSocketChannel作为参数传过去,最终会调用到ServerBootstrapAcceptor,将NioSocketChannel注册到workGroup上

# NioMessageUnsafe 类
public void read() {
    final ChannelPipeline pipeline = pipeline();
    // 创建每个客户端的NioSocketChannel
    doReadMessages(readBuf);
    int size = readBuf.size();
    // readBuf为NioSocketChannel,遍历客户端所有的NioSocketChannel
    // 执行服务端NioServerSocketChannel的pipeline,循环执行fireChannelRead,
    // 最终会调用服务端hander的ChannelRead方法,此处会调用到ServerBootstrapAcceptor的ChannelRead方法
    for (int i = 0; i < size; i ++) {
        readPending = false;
        pipeline.fireChannelRead(readBuf.get(i));
    }
    // 调用服务端pipeline的读完成方法
    pipeline.fireChannelReadComplete();

}

protected abstract int doReadMessages(List<Object> buf) throws Exception;
// 调用子类NioServerSocketChannel#doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {
    // 获取客户端的连接得到SocketChannel,每个客户端在服务端都有一个对应的SocketChannel
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            // NioSocketChannel处理方式同NioServerSocketChannel
            // 1、设置感兴趣事件为连接事件OP_READ
            // 2、设置channel为非阻塞
            // 3、初始化客户端NioSocketChannel的pipeline
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {

    }
    return 0;
}

Netty源码解读

将我们自定义的hander添加到NioSocketChannel的pipeline上,然后将NioSocketChannel注册到workGroup上,此时客户端pipeline链表中的hander如下
Netty源码解读

# ServerBootstrapAcceptor 类

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 传过来的NioSocketChannel
    final Channel child = (Channel) msg;
    // 将我们手动添加的Hander添加到pipeline
    child.pipeline().addLast(childHandler);
    try {
        // 将NioSocketChannel注册workGroup的一个线程的selector上,
        // 方式同NioServerSocketChannel,执行register注册逻辑
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                .....
            }
        });
    } catch (Throwable t) {
    }
}

Netty源码解读

OP_READ操作处理
进行数据读写,并执行pipeline中自定义的hander

# NioByteUnsafe类

// 接受到客户端OP_READ事件时调用
public void read() {
    // 获取客户端NioSocketChannel的pipeline
    final ChannelPipeline pipeline = pipeline();
    do {
        // 数据读写
        // 调用pipeline.fireChannelRead时会依次调用pipeline中hander的ChannelRead方法
        // 我们自定义的hander的ChannelRead方法就会在此处调用
        byteBuf = allocHandle.allocate(allocator);
        allocHandle.lastBytesRead(doReadBytes(byteBuf));
        pipeline.fireChannelRead(byteBuf);
    } while (allocHandle.continueReading());
    allocHandle.readComplete();
    // 调用pipeline的读完成方法
    pipeline.fireChannelReadComplete();
}

Netty源码解读文章来源地址https://www.toymoban.com/news/detail-422522.html

到了这里,关于Netty源码解读的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 34.Netty源码之Netty如何处理网络请求

    通过前面两节源码课程的学习,我们知道 Netty 在服务端启动时会为创建 NioServerSocketChannel,当客户端新连接接入时又会创建 NioSocketChannel,不管是服务端还是客户端 Channel,在创建时都会初始化自己的 ChannelPipeline。 如果把 Netty 比作成一个生产车间,那么 Reactor 线程无疑是车间

    2024年02月11日
    浏览(34)
  • 13.Netty源码之Netty中的类与API

    Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中ServerBootstrap 是服务端启动引导类。 java //泛型 AbstractBootstrapB extends AbstractBootstrapB, C, C extends Channel ​ ServerBootstrap extends AbstractBootstrapServerBootstrap, ServerChannel ​

    2024年02月15日
    浏览(52)
  • [Netty源码] Netty轻量级对象池实现分析 (十三)

    1.对象池技术介绍 对象池其实就是缓存一些对象从而避免大量创建同一个类型的对象, 类似线程池。对象池缓存了一些已经创建好的对象, 避免需要的时候创建。同时限制了实例的个数。 池化技术最终要的就是重复的使用池内已经创建的对象。 创建对象的开销大 会创建大量的

    2023年04月18日
    浏览(44)
  • 《Netty》从零开始学netty源码(五十四)之PoolThreadLocalCache

    前面讲到 PoolThreadCache ,它为线程提供内存缓存,当线程需要分配内存时可快速从其中获取,在Netty中用 PoolThreadLocalCache 来管理 PoolThreadCache ,它的数据结构如下: PoolThreadLocalCache 相当于java的 ThreadLocal ,我们知道 ThreadLocal 中维护的是 ThreadLocalMap ,使用hashcode来做下标,而N

    2024年02月03日
    浏览(32)
  • 《Netty》从零开始学netty源码(五十九)之ServerBootstrapAcceptor

    前面初始化channel的过程中向pipeline中添加了一个channelHandler,即 ServerBootstrapAcceptor ,它的作用主要是将worker组的channel进行注册,它的数据结构如下: 它的属性主要是通过 ServerBootstrap 启动类设置的,它的方法主要是 channelRead() 方法,其过程如下: 在第五十八篇中,当EventLo

    2024年02月05日
    浏览(56)
  • 《Netty》从零开始学netty源码(四十九)之PoolArena

    Netty中分配内存是委托给PoolArena来管理的,它主要有两个实现类: 默认情况下使用的DirectArena,它的数据结构如下: 从属性中我们看到PoolArena主要分配三种类型的内存,小于32KB的分配small类型的PoolSubpage,存储在smallSubpagePools,32KB~4MB的分配normal类型的PoolChunk,根据其利用率的

    2024年02月02日
    浏览(33)
  • 《Netty》从零开始学netty源码(四十六)之PooledByteBuf

    Netty中一大块内存块 PoolChunk 默认大小为4MB,为了尽可能充分利用内存会将它切成很多块 PooledByteBuf , PooledByteBuf 的类关系图如下: PooledUnsafeDirectByteBuf 与 PooledUnsafeHeapByteBuf 直接暴露对象的底层地址。 PooledByteBuf 的创建过程开销很大,高并发情况下进行网络I/O时会创建大量的

    2024年02月01日
    浏览(86)
  • netty源码阅读--服务启动

    netty是一个非常成熟的NIO框架,众多apache的顶级项目底层通信框架都是用的是netty,本系列博客主要是记录自己复习netty源码的过程,重在理解netty的关键如:如何启动,如何接受网络数据、netty的内存管理机制以及编解码器等,废话不多说,直接跟着netty源码中的MQTT的官方示例

    2023年04月22日
    浏览(34)
  • 4.netty源码分析

    1.pipeline调用handler的源码 //pipeline得到双向链表的头,next到尾部, 2.心跳源码 主要分析IdleStateHandler3个定时任务内部类 //考虑了网络传输慢导致出站慢的情况 //超时重新发送,然后关闭 ReadTimeoutHandler(继承IdleStateHandler 直接关闭连接)和WriteTimeoutHandler(继承ChannelOutboundHandlerAdapter 使用

    2024年02月14日
    浏览(29)
  • 《Netty》从零开始学netty源码(四十二)之PoolChunk.runsAvailMap

    PoolChunk 中的 runsAvailMap 属性用于存放可用的run的信息, PoolChunk 中每一次分配内存都会更新 runsAvailMap 中可用的run的起始信息及末尾信息,先看下它的数据结构: 我们看下它的构造函数是如何赋值的: PoolChunk 的默认大小为4MB,对应sizeClasses表格中的31,所以array的初始长度为

    2023年04月25日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包