并发-Executor框架笔记

这篇具有很好参考价值的文章主要介绍了并发-Executor框架笔记。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Executor框架

jdk5开始,把工作单元与执行机制分离开来,工作单元包括Runable和Callable,执行机制由Executor框架来提供。

Executor框架简介

Executor框架的两级调度模型
  • Java线程被一对一映射为本地操作系统线程
    • java线程启动会创建一个本地操作系统线程
    • java线程终止操作系统线程也会被回收
    • 操作系统会调度所有线程并将它们分配给可用的cpu
  • 在上层,java多线程程序通常把应用分解为若干任务,然后使用用户级调度器将这些任务映射为固定数量的线程。在底层,操作系统内核将这些线程映射到硬件处理器上
  • 应用通过Executor框架控制上层调度,下层由操作系统内核控制,下层调度不受应用程序控制。

并发-Executor框架笔记,# java并发编程,笔记,并发

Executor框架的结构与成员
Executor框架结构
  • 主要由任务,任务执行和异步计算结果组成

    • 任务:包括被执行任务需要实现的接口:Runnable和Callable
    • 任务的执行:包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorServie接口。(ThreadPoolExecutor和ScheduledThreadPoolExecutor两个关键类实现了)
    • 异步计算的结果:包括接口Future和实现Future接口的FutureTask类
  • 类和接口

  • 并发-Executor框架笔记,# java并发编程,笔记,并发

    • Executor是一个接口,是Executor框架的基础,将任务的提交与任务的执行分离开来
    • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务
    • ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。比Timer更灵活,功能更强大。
    • Future接口和实现Future接口的FutureTask类,代表异步计算的结果
    • Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行
  • 使用:

    • 主线程创建实现Runnable或Callable接口的任务对象,工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object result))
    • 可以把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command)),或者吧Runnable对象或Callable对象提交给ExecutorService执行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callable task))
    • 执行ExecutorService.submit(…),将会返回一个实现Future接口的对象,也可以创建FutureTask然后直接交给ExecutorService执行
    • 主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel来取消此任务
Executor框架的成员
  • ThreadPoolExecutor:通常使用工厂类Executors创建

    • FixedThreadPool:创建固定线程数的FixedThreadPool

      • 适用于为了满足资源管理的需求,需要限制当前线程数量的应用场景,适用于负载比较重的服务器

      • public static ExecutorService newFixedThreadPool(int nThreads)
        public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
        
    • SingleThreadExecutor:创建单个线程

      • 适用于需要保证顺序地执行各个任务,并且在任意时间点,不会有多个线程是活动的应用场景

        public static ExecutorService newSingleThreadExecutor()
        public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
        
    • CachedThreadPool:创建一个会根据需要创建新线程

      • 大小无界的线程池,适用于执行很多短期异步任务的小程序,或负载教轻的服务器

      • public static ExecutorService newCachedThreadPool()
        public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
        
  • ScheduledThreadPoolExecutor:通常使用工厂类Executors创建

    • 创建ScheduledThreadPoolExecutor:包含若干线程的ScheduledThreadPoolExecutor

      • 适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求需要限制后台线程数量的应用场景

      • public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
        
    • 创建SingleThreadScheduledExecutor:包含一个线程的ScheduledThreadPoolExecutor

      • 适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景

      • public static ScheduledExecutorService newSingleThreadScheduledExecutor(int corePoolSize)
        public static ScheduledExecutorService newSingleThreadScheduledExecutor(int corePoolSize, ThreadFactory threadFactory)
        
  • Future接口

    • Future接口和Future接口的FutureTask类用来表示异步计算的结果

    • 把Runnable接口或Callable接口的实现类提交给ThreadPoolExecutors或ScheduledThreadPoolExecutors时,会返回给一个FutureTask对象

    • <T> Future<T> submit(Callable<T> task)
      <T> Future<T> submit(Runnable task, T result)
      Future<> submit(Runnable task)
      
  • Runnable接口和Callable接口

    • Runnable不会返回结果,Callable会返回结果

    • 可以把Runnable包装成Callable

    • public static Callable<Object> callable(Runnable task)
      
    • 可以把Runnable和一个待返回的结果包装成一个Callable

    • public static <T> Callable<T> callable(Runnable task, T result)
      
    • 把callable对象提交给执行时,submit会返回一个FutureTask对象

      • 执行get方法等待任务执行完成,任务执行完成后get方法将返回该任务的结果

ThreadPoolExecutor详解

构成组件

  • corePool:核心线程池大小
  • maximumPool:最大线程池大小
  • BlockingQueue:用来暂时保存任务的工作队列
  • RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或者饱和时,execute方法将要调用的Handler

创建类型

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool
FixedThreadPool详解

固定线程数线程池

public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(nThreads,nThreads, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
}
  • corePoolSize 和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数
  • 当啊线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止
  • keepAliveTime设置为0,多余空闲线程会被立即终止

流程

  1. 如果当前运行线程数少于corePoolSize,则创建新线程来执行任务
  2. 在线程池完成预热后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue
  3. 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务执行
  • 使用无界队列LinkedBlockingQueue作为线程池工作队列,使用无界队列作为工作队列会对线程池带来影响
    • 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize
    • 由于上条,使用无界队列时,maximumPoolSize将是一个无效参数
    • 由于上条和上上条,使用无界队列时keepAliveTIme将是一个无效参数
    • 由于使用无界队列,运行中的FixedThreadPool不会拒绝任务
SingleThreadExecutor详解

使用单个worker线程的Executor

public static ExecutorService newSingleThreadExecutor(){
    return new ThreadPoolExecutor(1,1, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
}
  • corePoolSize 和maximumPoolSize都被设置为1
  • keepAliveTime设置为0,多余空闲线程会被立即终止
  • 使用无界队列LinkedBlockingQueue作为线程池的工作队列

工作流程

  1. 如果当前运行的线程数少于corePoolSize,则创建一个新线程来执行任务
  2. 在线程池完成预热后,将任务加入LinkedBlockingQueue
  3. 线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行
CachedThreadPool详解

根据需要创建新线程的线程池

public static ExecutorService newCachedThreadPool(int nThreads){
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L,TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>())
}
  • corePoolSize设置为0
  • maximumPoolSize被设置为Integer.MAX_VALUE
  • keepAliveTime设置为60L,线程池中的空闲线程等待新任务的最长时间是60秒,空闲线程超过60秒将会被终止
  • 使用没有容量的SynchronousQueue作为线程池的工作队列
  • 如果主线程提交的任务速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程,极端情况下,会因为创建过多线程为耗尽cpu和内存资源

流程:

  1. 首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程正则执行SynchronousQueue.poll,那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成;否则执行步骤2
  2. 当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll.这种情况下,步骤1 将失败,此时CachedThreadPool会创建一个新线程执行任务,execute方法执行完成
  3. 在步骤2中新创建将任务执行完后,会执行SynchronousQueue.poll,这个poll操作会让空闲线程最多在SynchronousQueue中等待60s,如果60s内主线程提交了一个新任务,那么这个空闲线程将执行主线程提交的新任务,否则这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。

ScheduledThreadPoolExecutor详解

  • 继承自ThreadPoolExecutor
  • 主要用来在给定的延迟之后运行任务,会定期执行任务。
  • 可以在构造函数中指定多个对应的后台线程数
ScheduledThreadPoolExecutor运行机制
  • DelayQueue是一个无界队列。

主要两部分

  • 当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate方法或者scheduleWithFixedDelay方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFuture接口的ScheduledFutureTask
  • 线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。

ScheduledThreadPoolExecutor为了实现周期性的执行任务,对ThreadPoolExecutor做了如下修改

  • 使用DelayQueue作为任务队列
  • 获取任务的方式不同
  • 执行周期任务后,增加了额外的处理
ScheduledThreadPoolExecutor的实现

ScheduledFutureTask成员变量:

  • long time:表示这个任务将要被执行的具体时间
  • long sequenceNumber:表示这个任务被添加到ScheduledThreadPoolExecutor中的序号
  • long period:表示任务执行的间隔周期

DelayQueue封装了一个优先级队列,这个优先级队列会将队列的任务根据time排列,小的在前,如果time相同,比较sequenceNumber,小的在前

ScheduledThreadPoolExecutor执行某个周期任务步骤

  • 线程从DelayQueue中获取已到期的ScheduledFutureTask,到期任务是指ScheduledFutureTask的time大于等于当前时间
    • 获取Lock
    • 获取周期任务
      • 如果PriorityQueue为空,当前线程到Condition中等待
      • 如果PriorityQueue的头元素的time时间比当前时间大,到condition中等待到time时间
      • 获取PriorityQueue的头元素,如果不为空,则唤醒condition中等待的所有线程
    • 释放Lock
  • 线程执行ScheduledFutureTask
  • 线程修改ScheduledFutureTask的time后边变为下次将要被执行的时间
  • 线程把这个修改time之后的ScheduledFutureTask放回到DelayQueue中
    • 获取Lock
    • 添加任务
      • 向PriorityQueue添加任务
      • 如果添加的任务是头元素,唤醒Condition中等待的所有线程
    • 释放Lock

FutureTask详解

Future接口和实现Future接口的FutureTask类,代表异步计算的结果

简介
  • FutureTask除了实现Future接口外,还实现了Runnable接口。
  • FutureTask可以交给Executor执行,也可以由调用线程直接执行
  • FutureTask可以处于3种状态
    • 未启动:run方法还没有被执行前
      • 执行FutureTask.get将导致调用线程阻塞
      • 执行FutureTask.cancel将导致此任务永远不会被执行
    • 已启动:run方法被执行过程中
      • 执行FutureTask.get将导致调用线程阻塞
      • 执行FutureTask.cancel(true)将以中断执行此任务线程的方式来试图停止任务
      • 执行FutureTask.cancel(false)将不会对正在执行此任务的线程产生影响
    • 已完成:run方法执行完后正常结束,或被取消,或执行run方法时抛出异常而异常结束
      • 执行FutureTask.get将导致调用线程立即返回结果或抛出异常
      • 执行FutureTask.cancel()返回false
使用
  • 可以把FutureTask交给Executor执行
  • 可以通过ExecutorService.submit返回一个FutureTask,然后执行FutureTask.get或cancel方法
  • 也可以单独使用FutureTask

当一个线程需要等待另一个线程把某个任务执行完后才能执行,此时可以使用FutureTask

实现
  • 基于AQS实现,包含两种类型操作

    • 至少一个acquire操作,阻塞调用线程,除非直到AQS状态允许这线程继续执行,FutureTask的acquire操作为get方法调用
    • 至少一个release操作,这个操作改变AQS的状态,改变后的状态可以允许一个或多个阻塞线程被解除阻塞,FutureTask的release操作包括run和cancel
  • Sync是FutureTask的内部私有类,继承自AQS,FutureTask的所有公有方法都直接委托给了内部私有Sync

  • FutureTask.get方法会用AQS的acquireSharedInterruptibly方法,执行过程

    • 调用AQS的acquireSharedInterryptibly方法,
      • 回调在子类Sync中实现的tryAcquireShared()方法来判断acquire操作是否可以成功
        • 成功条件:state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null
    • 如果成功则get方法立即返回,如果失败则到线程等待队列中去等待其他线程执行release操作
    • 当其他线程执行release操作唤醒当前线程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将离开线程等待队列并唤醒它的后继线程
    • 最后返回计算的结果或抛出异常

    FutureTask.run方法文章来源地址https://www.toymoban.com/news/detail-705271.html

    • 执行在构造函数中指定的任务(Callable.call)
    • 以原子方式来更新同步,如果这个原则操作成功,就设置代表计算结果的变量result的值为callable.call的返回值,然后调用AQS.releaseShared
    • AQS.releaseShared首先会回调在子类Sync中实现的tryReleaseShared来执行release操作,AQS.releaseShared,然后唤醒线程等待队列中的第一个线程
    • 调用FutureTask.done1

到了这里,关于并发-Executor框架笔记的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java并发编程学习笔记(一)线程的入门与创建

    认识 程序由指令和数据组成,简单来说,进程可以视为程序的一个实例 大部分程序可以同时运行多个实例进程,例如记事本、画图、浏览器等 少部分程序只能同时运行一个实例进程,例如QQ音乐、网易云音乐等 一个进程可以分为多个线程,线程为最小调度单位,进程则是作

    2024年02月16日
    浏览(54)
  • 【并发编程】无锁环形队列Disruptor并发框架使用

    Disruptor 是苹国外厂本易公司LMAX开发的一个高件能列,研发的初夷是解决内存队列的延识问顾在性能测试中发现竟然与10操作处于同样的数量级),基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCn演讲后,获得了业界关注,201年,企业应用软件专家Martin Fower专门撰

    2024年02月14日
    浏览(39)
  • 《C++并发编程实战》读书笔记(3):并发操作的同步

    当线程需要等待特定事件发生、或是某个条件成立时,可以使用条件变量 std::condition_variable ,它在标准库头文件 condition_variable 内声明。 wait() 会先在内部调用lambda函数判断条件是否成立,若条件成立则 wait() 返回,否则解锁互斥并让当前线程进入等待状态。当其它线程调用

    2024年02月10日
    浏览(36)
  • Python异步编程之web框架 异步vs同步 Redis并发对比

    主题: 比较异步框架和同步框架在RedisIO操作的性能差异 python版本 :python 3.8 数据库 :redis 5.0.7 压测工具 :locust web框架 :同步:flask 异步:starlette 请求并发量 : 模拟10个用户 服务器配置 : Intel(R) i7-12700F 客户端配置 :Intel(R) i7-8700 3.20GHz flask是python中轻量级web框架,特点是灵

    2024年02月10日
    浏览(46)
  • 【Linux网络编程】高并发服务器框架 线程池介绍+线程池封装

    前言 一、线程池介绍 💻线程池基本概念 💻线程池组成部分 💻线程池工作原理  二、线程池代码封装 🌈main.cpp 🌈ThreadPool.h 🌈ThreadPool.cpp 🌈ChildTask.h  🌈ChildTask.cpp 🌈BaseTask.h 🌈BaseTask.cpp 三、测试效果 四、总结 📌创建线程池的好处 本文主要学习 Linux内核编程 ,结合

    2024年01月16日
    浏览(95)
  • JUC并发编程学习笔记(十六)Volatile

    保证可见性 使用了volatile,即可保证它本身可被其他线程的工作内存感知,即变化时也会被同步变化。 不保证原子性 原子性:不可分割 线程A在执行任务时是不可被打扰的,也不能被分割,要么同时成功,要么同时失败。 每次结果也不一样。 如果不加Lock加synchronize

    2024年02月05日
    浏览(98)
  • JUC并发编程学习笔记(十五)JMM

    请你谈谈对Volatile的理解 Volatile是java虚拟机提供的 轻量级的同步机制 1、保证可见性 2、不保证原子性 3、禁止指令重排 什么是JMM JVM-java虚拟机 JMM-java内存模型,不存在的东西,概念!约定 关于JMM的一些同步的约定: 线程解锁前,必须把共享变量 立刻 刷回主存 线程加锁前,

    2024年02月05日
    浏览(50)
  • JUC并发编程学习笔记(八)读写锁

    ReadWriteLock ReadWriteLock只存在一个实现类那就是ReentrantReadWriteLock,他可以对锁实现更加细粒化的控制 读的时候可以有多个阅读器线程同时参与,写的时候只希望写入线程是独占的 Demo:

    2024年02月06日
    浏览(53)
  • JUC并发编程学习笔记(十九)原子引用

    带版本号的原子操作! 解决ABA问题,引入原子引用(乐观锁思想) AtomicStampedReference类解决ABA问题 所有相同类型的包装类对象之间值的比较全部使用equals方法比较 Integer使用了对象缓存机制,默认范围是-128至127,推荐使用静态工厂方法valueOf获取对象实例,而不是new,因为v

    2024年02月05日
    浏览(56)
  • JUC并发编程与源码分析笔记-目录

    视频学习地址:尚硅谷JUC并发编程,感谢阳哥,投币支持,不过学到后面,前面的好多又忘了,还是学的不够深刻哇!

    2024年02月07日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包