Netty中NioEventLoop介绍

这篇具有很好参考价值的文章主要介绍了Netty中NioEventLoop介绍。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、Netty基本介绍

        Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty 在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

        Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。

        本文主要介绍Netty中的核心类之一的:NioEventLoop类。

二、NioEventLoop继承体系

Netty中NioEventLoop介绍

 三、EventLoop相关接口

        我们先看右边的接口, 部分接口我们在上一篇以及介绍过了,可以看Netty中NioEventLoopGroup介绍,我们从OrderedEventExecutor开始往下看。

一、OrderedEventExecutor

public interface OrderedEventExecutor extends EventExecutor {
}

         OrderedEventExecutor继承了EventExecutor,即拥有线程相关的操作声明。

二、EventLoop

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    EventLoopGroup parent();
}

        EventLoop继承了OrderedEventExecutor和EventLoopGroup接口,即拥有线程相关操作即事件循环组的相关行为声明。但并未声明新的接口。

四、EventLoop相关实现

        在介绍EventLoop的实现之前,我们需要了解一些内容:

  1. EventLoop 定义了Netty的核心抽象,用来处理连接的生命周期中所发生的事件,在内部,将会为每个Channel分配一个EventLoop。
  2. EventLoopGroup 是一个 EventLoop 池,包含很多的 EventLoop。
  3. Netty 为每个 Channel 分配了一个 EventLoop,用于处理用户连接请求、对用户请求的处理等所有事件。EventLoop 本身只是一个线程驱动,在其生命周期内只会绑定一个线程,让该线程处理一个 Channel 的所有 IO 事件。
  4. 一个 Channel 一旦与一个 EventLoop 相绑定,那么在 Channel 的整个生命周期内是不能改变的。一个 EventLoop 可以与多个 Channel 绑定。即 Channel 与 EventLoop 的关系是 n:1,而 EventLoop 与线程的关系是 1:1。

         上一篇EventLoopGroup的介绍里写到了MultithreadEventExecutorGroup的初始化和创建EventExecutor

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
	if (nThreads <= 0) {
		throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
	}
 
	if (executor == null) {
		executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
	}
 
	// 初始化线程组
	children = new EventExecutor[nThreads];
 
	for (int i = 0; i < nThreads; i ++) {
		boolean success = false;
		try {
			// 将线程和传入的executor做一个绑定
			// 注意:这里线程组每个元素都绑定了同一个executor
            // newChild是一个抽象方法,依赖子类实现
			children[i] = newChild(executor, args);
			success = true;
		} catch (Exception e) {
			// TODO: Think about if this is a good exception type
			throw new IllegalStateException("failed to create a child event loop", e);
		} finally {
			// 失败执行策略
			if (!success) {
				for (int j = 0; j < i; j ++) {
					children[j].shutdownGracefully();
				}
 
				for (int j = 0; j < i; j ++) {
					EventExecutor e = children[j];
					try {
						while (!e.isTerminated()) {
							e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
						}
					} catch (InterruptedException interrupted) {
						// Let the caller handle the interruption.
						Thread.currentThread().interrupt();
						break;
					}
				}
			}
		}
	}
 
	// 初始化一个EventExecutor选择工厂,轮询获取EventExecutor,chooserFactory的默认实现是DefaultEventExecutorChooserFactory
	// next()方法依赖chooser实现
	chooser = chooserFactory.newChooser(children);
 
	// 声明线程终止的监听器
	final FutureListener<Object> terminationListener = new FutureListener<Object>() {
		@Override
		public void operationComplete(Future<Object> future) throws Exception {
			if (terminatedChildren.incrementAndGet() == children.length) {
				terminationFuture.setSuccess(null);
			}
		}
	};
 
	// 将监听器绑定到线程组的每个线程中
	for (EventExecutor e: children) {
		e.terminationFuture().addListener(terminationListener);
	}
 
	// 初始化线程集合(只读)
	Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
	Collections.addAll(childrenSet, children);
	readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

        创建EventExecutor

// 创建EventLoop对象,并绑定executor
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
	return new NioEventLoop(this, executor, (SelectorProvider) args[0],
		((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

         我们在这里接着往下跟NioEventLoop

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
	// 初始化
	super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
	if (selectorProvider == null) {
		throw new NullPointerException("selectorProvider");
	}
	if (strategy == null) {
		throw new NullPointerException("selectStrategy");
	}
	provider = selectorProvider;
	// 创建一个selector的二元组
	final SelectorTuple selectorTuple = openSelector();
	selector = selectorTuple.selector;
	unwrappedSelector = selectorTuple.unwrappedSelector;
	selectStrategy = strategy;
}

        进入super方法

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
	super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
	// 创建收尾队列
	tailTasks = newTaskQueue(maxPendingTasks);
}

        接着进入super

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
	super(parent);
	this.addTaskWakesUp = addTaskWakesUp;
	this.maxPendingTasks = Math.max(16, maxPendingTasks);
	// 初始化子线程
	this.executor = ThreadExecutorMap.apply(executor, this);
	taskQueue = newTaskQueue(this.maxPendingTasks);
	rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

        进入ThreadExecutorMap.apply方法

public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
	ObjectUtil.checkNotNull(executor, "executor");
	ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
	return new Executor() {
		@Override
		public void execute(final Runnable command) {
			// 这里调用了NioEventLoopGroup所包含的executor的execute()
			executor.execute(apply(command, eventExecutor));
		}
	};
}

        这里我们关注apply()方法及executor.execute()方法,先跟进去apply()方法:

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
	ObjectUtil.checkNotNull(command, "command");
	ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
	// 这里包装了一个runnable,记录当前执行线程,并在执行完成后删除
	return new Runnable() {
		@Override
		public void run() {
            // 做了线程隔离
			setCurrentEventExecutor(eventExecutor);
			try {
				command.run();
			} finally {
				setCurrentEventExecutor(null);
			}
		}
	};
}

        我们再进去executor.execute(apply(command, eventExecutor))的execute方法,这里的实现是ThreadPerTaskExecutor类,跟进去:

@Override
public void execute(Runnable command) {
    // 创建并启动一个线程
	threadFactory.newThread(command).start();
}

        这里就完成了线程的创建

        我们再回去看看NioEventLoop的openSelector()方法,我们先了解下SelectorTuple类

private static final class SelectorTuple {
	final Selector unwrappedSelector;
	final Selector selector;

	SelectorTuple(Selector unwrappedSelector) {
		this.unwrappedSelector = unwrappedSelector;
		this.selector = unwrappedSelector;
	}

	SelectorTuple(Selector unwrappedSelector, Selector selector) {
		this.unwrappedSelector = unwrappedSelector;
		this.selector = selector;
	}
}

        SelectorTuple 只是一个包含两个 Selector 的内部类,用于封装优化前后的 Selector。而 openSelector() 方法就是为了返回 Selector 并且根据配置判断是否需要优化当前 Selector 。下面看具体代码:

private SelectorTuple openSelector() {
	final Selector unwrappedSelector;
	try {
		// 根据provider创建出个NIo的原生selector
		unwrappedSelector = provider.openSelector();
	} catch (IOException e) {
		throw new ChannelException("failed to open a new selector", e);
	}
	// 若禁用了keyset优化功能,则直接返回NIo原生的selector,优化就是将selector中的三个set集合变为三个数组
	// 因为数组是顺序存放的,要比随机存放的集合执行效率高
	if (DISABLE_KEY_SET_OPTIMIZATION) {
		return new SelectorTuple(unwrappedSelector);
	}

	// 此处优化逻辑省略...
}

        NioEventLoop的父类是一个Executor,所以我们在看看execute()方法:

@Override
public void execute(Runnable task) {
	if (task == null) {
		throw new NullPointerException("task");
	}
	// 判断当前线程是不是EventLoop中成员变量的executor线程
	boolean inEventLoop = inEventLoop();
	// 将任务添加到队列
	addTask(task);
	if (!inEventLoop) {
		// 启动线程(成员变量中的execute)
		startThread();
		if (isShutdown()) {
			boolean reject = false;
			try {
				if (removeTask(task)) {
					reject = true;
				}
			} catch (UnsupportedOperationException e) {
				// The task queue does not support removal so the best thing we can do is to just move on and
				// hope we will be able to pick-up the task before its completely terminated.
				// In worst case we will log on termination.
			}
			if (reject) {
				reject();
			}
		}
	}

	if (!addTaskWakesUp && wakesUpForTask(task)) {
		wakeup(inEventLoop);
	}
}

总结:

        NioEventLoopGroup创建CUP核心数两倍的EventLoop数组,NioEventLoopGroup内部还包含了一个Executor成员变量。

        随后对EventLoop数组进行初始化,传入NioEventLoopGroup的executor成员变量,EventLoop内部也有一个executor成员变量,EventLoop对内部的executor变量进行初始化,并在其executor的execute()方法调用NioEventLoopGroup的成员变量executor的execute()方法。

        也就是说EventLoop的executor调用execute()方法的时候,会调用NioEventLoopGroup的Executor的execute方法来执行具体的操作。

        文章来源地址https://www.toymoban.com/news/detail-475315.html

到了这里,关于Netty中NioEventLoop介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【netty系列-01】深入理解网络通信基本原理和tcp/ip协议

    Netty系列整体栏目 内容 链接地址 【一】深入理解网络通信基本原理和tcp/ip协议 https://zhenghuisheng.blog.csdn.net/article/details/136359640 【二】深入理解Socket本质和BIO https://zhenghuisheng.blog.csdn.net/article/details/136549478 在最初的网络中,是借鉴于这个OSI七层网络模型,而在实际开发应用中

    2024年03月17日
    浏览(59)
  • 从零开始学习Netty - 学习笔记 - NIO基础 - ByteBuffer: 简介和基本操作

    1.1. Channel Buffer Channel 在Java NIO(New I/O)中,“Channel”(通道)是一个重要的概念,用于 在非阻塞I/O操作中进行数据的传输 。Java NIO提供了一种更为灵活和高效的I/O处理方式,相比于传统的I/O,它具有更好的性能和可扩展性。 常见的Java NIO中的通道类型: FileChannel(文件通道

    2024年02月20日
    浏览(45)
  • 网络编程的无冕之王-Netty入门和核心组件介绍

    最近我在研究Netty,之前只是经常听说,并没有实际做过研究,为什么突然要好好研究一下它,主要是因为前段时间,我在看RocketMQ底层原理的时候发现它的底层的网络通信都是基于Netty,然后网上一查,果然,大家太多的耳熟能详的工具组件,都是基于Netty做的开发。大家看

    2024年02月10日
    浏览(49)
  • JBoss安装并部署war包

    JBOSS是一个免费的开放源代码的Web应用服务器,技术先进、性能稳定,而且免费,将Tomcat内核作为其Servlet容器引擎,并加以审核和调优。 解决Tomcat的一些欠缺: 如活动连接支持、静态内容、大文件和HTTPS等。除了性能问题, Tomcat是受限的集成平台,仅能运行Java应用程序。在

    2024年02月04日
    浏览(30)
  • IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)

    本系列教程包括: IOT云平台 simple(0)IOT云平台简介 IOT云平台 simple(1)netty入门 IOT云平台 simple(2)springboot入门 IOT云平台 simple(3)springboot netty实现TCP Server IOT云平台 simple(4)springboot netty实现简单的mqtt broker IOT云平台 simple(5)springboot netty实现modbus TCP Master IOT云平台 si

    2023年04月09日
    浏览(32)
  • Jboss(CVE-2017-12149)反序列化命令执行漏洞

    该漏洞为 Java反序列化错误类型,存在于 Jboss 的 HttpInvoker 组件中的 ReadOnlyAccessFilter 过滤器中。该过滤器在没有进行任何安全检查的情况下尝试将来自客户端的数据流进行反序列化,从而导致了漏洞。 受影响系统及应用版本    Jboss AS 5.x、Jboss AS 6.x 环境搭建 使用vulhub靶场

    2024年02月15日
    浏览(42)
  • Jboss历史漏洞利用 - JMX Console 未授权访问漏洞

    docker搭建操作指南 进行访问,出现以下界面即可搭建成功 Jboss的webUI界面 http://ip:port/jmx-console 未授权访问(或默认密码 admin/admin ),可导致JBoss的部署管理的信息泄露,攻击者也可以直接上传木马获取 webshell 访问 如果可以直接访问或者通过默认账号密码登录则存在对应漏洞 找

    2024年02月13日
    浏览(41)
  • 人工智能是由谁领导开发的?

    作者:禅与计算机程序设计艺术 近几年,人工智能领域经历了从“小机器人”到“AlphaGo”再到“华为在手臂上拍了一张照片之后还可以识别人的生理特征”等重大突破性进展。但是,在这项科技浪潮中,究竟存在哪些团队和个人在推动其发展,又是怎样的角色扮演者参与其

    2024年02月06日
    浏览(40)
  • Jboss反序列化远程代码执行漏洞(CVE-2017-12149)

    :Jboss文件下的httplnvoker组件中的Readonly Accessfilter过滤器中的dofilter没有限制来自客户端的序列化数据而进行反序列化。 漏洞影响版本:Jboss5.x Jboss6.x 利用vulhub搭建Jboss cd vulhub cd jboss cd CVE-2017-12149 docker-compose build (若出现镜像问题,直接执行下面命令) docker-compose up -d 访问:h

    2024年02月07日
    浏览(52)
  • 水库大坝安全监测系统是由什么组成的?

    水库大坝是防洪抗灾的重要设施,它们的安全性直接关系到人民群众的生命财产安全。因此,水库大坝的安全监测必不可少。水库大坝安全监测系统是一种集成了数据采集、传输、处理和分析的技术平台,能够实时、准确地监测大坝的状态,及时发现异常情况,提供科学的依

    2024年02月16日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包