目录
Reactor 模式
具体流程
配置
初始化
NioEventLoop
ServerBootstrapAcceptor 分发
Reactor 模式
在刚学 Netty 的时候,我们肯定都很熟悉下面这张图,它就是单Reactor多线程模型。
在写Netty 服务端代码的时候,下面的代码时必不可少的,这是为什么呢?
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new NettyServerHandler(), new NettyServerHandler2());
System.out.println("netty server start...");
bootstrap.bind(9000);
}
在 Netty 里,EventLoopGroup 就是线程池,不论 bossGroup 还是 workerGroup,它们里面的线程都叫 EventLoop。
EventLoopGroup 就是一个线程池,bossGroup 叫连接线程池,它一般只有一个线程,workerGroup 叫工作线程池,它一般会有多个线程。bossGroup 线程池里的线程专门监听客户端连接事件,监听是否有 SelectionKey.OP_ACCEPT 事件被触发,所以 1 个线程就够用了,当它监听到有客户端请求连接时,它会把这个连接交给 workerGroup 里的一个线程去处理,这个过程叫分发,这个工作线程会为这个客户端建立一个 NIOSocketChannel,并注册到这个工作线程绑定的IO多路复用选择器 Selector 里,一个Selector可以接受多个 NIOSocketChannel 的注册,所以一个工作线程可以处理多个客户端。 这就是Reactor 模式,一个工作线程可以处理多个客户端,比 Java 传统的一个客户端对应一个工作线程节约了很多线程,减少了大量线程创建,线程切换,线程销毁的开销,所以Netty 性能很好。
上面短短的服务端代码做了很多工作,当它刚启动还没有客户端请求连接时,bossGroup 连接线程池里的一个线程 EventLoop 会初始化一个 NioServerSocketChannel ,并把这个Channel注册到这个EventLoop 持有的IO多路复用选择器Selector里,Selector 会监听Channel里的 SelectionKey.OP_ACCEPT 事件,一旦有客户端连接过来,它会通过下面代码获取到一个
SocketChannel ch = javaChannel().accept();
NioSocketChannel,并把这个 NioSocketChannel 注册到 workerGroup 工作线程池里的一个EventLoop 里,它使用了一个叫 ServerBootstrapAcceptor 的 ChannelInboundHandler接口类去完成这个过程,连接完成后,后续这个客户端和服务端的交互和数据读写都在这个 EventLoop 完成。
具体流程
下面我们看一下代码,Netty 代码中使用了很多继承,在继承中可以把子类相同的部分代码提到父类去完成,很多子类生成初始化的时候,它会调用父类的构造方法去完成,这个要注意。
配置
下面的代码主要做一些启动器的配置,group(bossGroup, workerGroup) 会设置连接线程池和工作线程池,后面有连接事件或读事件过来要处理时,它会从这些线程池里取线程去执行;channel(NioServerSocketChannel.class) 指定要生成服务端Channel,它只会监听SelectionKey.OP_ACCEPT 事件,childHandler(new NettyServerHandler(), new NettyServerHandler2()) 是我们业务处理的逻辑。
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new NettyServerHandler(), new NettyServerHandler2());
初始化
bind() 会把 ServerBootstrapAcceptor 添加到 NioServerSocketChannel 的 pipeline ,它会处理连接;获取一个 bossGroup 线程池里的 EventLoop并和 NioServerSocketChannel 进行绑定。
bootstrap.bind(9000)
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> {
public void bind(int inetPort) {
doBind(new InetSocketAddress(inetPort));
}
// bind 流程
private void doBind(final SocketAddress localAddress) {
initAndRegister();
// 让channel绑定的线程处理
channel.eventLoop().execute(()->{
// 绑定指定端口
channel.bind(localAddress);
});
}
// 初始化和注册
final void initAndRegister() {
init(channel);
// 把 NioServerSocketChannel 注册到一个复杂连接事件的 EventLoop 的 Selector 里
group.register(channel);
}
// 把 ServerBootstrapAcceptor 添加到 NioServerSocketChannel 的 pipeline 里
abstract void init(Channel channel);
}
NioEventLoop
现在要说一下 NioEventLoop,它拥有一个IO多路复用选择器 Selector,这个线程会在一个死循环里工作,永远也会停止;这个线程它会先执行一下 selector.select(1000),阻塞监听1秒,看看有没有Channel有事件过来,有就去处理任务,没有就等待1秒钟再超时放弃,再看看自己的任务队列有没有可执行的任务,有就去处理任务,没有就继续进行死循环,继续执行 selector.select(1000)。无论是连接线程还是工作线程都这样处理,因为它们共用了这套逻辑。
public class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
try {
select();
} catch (IOException e) {
e.printStackTrace();
}
try {
# 处理事件
processSelectedKeys();
} finally {
runAllTasks();
}
}
}
private void select() throws IOException {
// 拿到多路复用器
Selector selector = this.selector;
for (;;) {
// 等待,简化固定1秒
int selectedKeys = selector.select(1000);
// 如果有事件发生或当前有任务跳出循环
if (selectedKeys != 0 || hasTasks()) {
break;
}
}
}
}
像下面这种 channel.eventLoop().execute(Runnable), 它也只是把 Runnable 加入到任务处理队列,稍后执行。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> {
private void doBind(final SocketAddress localAddress) {
...
channel.eventLoop().execute(()->{
channel.bind(localAddress);
});
}
}
public abstract class SingleThreadEventExecutor implements Executor {
// 待执行任务队列
private final Queue<Runnable> taskQueue;
@Override
public void execute(Runnable task) {
// 把任务添加到 EventLoop 的任务队列,EventLoop 是 SingleThreadEventExecutor 的子类
addTask(task);
// 执行 EventLoop 的 run 逻辑
startThread();
}
}
当 NioServerSocketChannel.accept() 监听到一个客户端连接,它会把这个 NIOSocketChannel 通过 pipeline 处理,最终被 ServerBootstrapAcceptor 所处理,
public class NioServerSocketChannel extends AbstractNioMessageChannel {
@Override
protected int doReadMessages(List<Object> buf) {
SocketChannel ch = null;
try {
ch = javaChannel().accept();
} catch (IOException e) {
}
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
return 0;
}
}
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
@Override
public void read() {
final ChannelPipeline pipeline = pipeline();
doReadMessages(readBuf);
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
}
protected abstract int doReadMessages(List<Object> buf);
}
ServerBootstrapAcceptor 分发
ServerBootstrapAcceptor 管理 workerGroup 里的所有工作线程和所有的业务处理代码 ChannelHandler,ServerBootstrapAcceptor 会把所有的 ChannelHandler 放到刚刚监听得到的 NIOSocketChannel 里的 pipeline 里,并从 workerGroup 里选择一个 EventLoop 工作线程把NIOSocketChannel 注册到该 EventLoop 拥有的IO多路复用选择器 Selector 里去,这就完成了分发,它已经处理了连接,后续这个 NIOSocketChannel 里的所有读写事件都会被 Selector 监听到,并被该 EventLoop 工作线程所处理。文章来源:https://www.toymoban.com/news/detail-824206.html
private static class ServerBootstrapAcceptor implements ChannelInboundHandler {
// 工作线程池,即 workerGroup
private final EventLoopGroup childGroup;
// 业务操作 Handler
private final ChannelHandler[] childHandlers;
private ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler[] childHandlers) {
this.childGroup = childGroup;
this.childHandlers = childHandlers;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
final Channel child = (Channel) msg;
// 完成 pipeline 责任链模式的组装
for (ChannelHandler childHandler : childHandlers) {
child.pipeline().addLast(childHandler);
}
// 把Channel 注册到 Selector
childGroup.register(child);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 略
}
}
,文章来源地址https://www.toymoban.com/news/detail-824206.html
到了这里,关于Netty Reactor 模式解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!