ThreadPoolExecutor线程池内部处理浅析

这篇具有很好参考价值的文章主要介绍了ThreadPoolExecutor线程池内部处理浅析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

我们知道如果程序中并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束时,会因为频繁创建线程而大大降低系统的效率,因此出现了线程池的使用方式,它可以提前创建好线程来执行任务。本文主要通过java的ThreadPoolExecutor来查看线程池的内部处理过程。

1 ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,下面我们来看一下ThreadPoolExecutor类的部分实现源码。

1.1 构造方法

ThreadPoolExecutor类提供了如下4个构造方法

// 设置线程池时指定核心线程数、最大线程数、线程存活时间及等待队列。
// 线程创建工厂和拒绝策略使用默认的(AbortPolicy)
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

// 设置线程池时指定核心线程数、最大线程数、线程存活时间、等待队列及线程创建工厂 
// 拒绝策略使用默认的(AbortPolicy)
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

// 设置线程池时指定核心线程数、最大线程数、线程存活时间、等待队列及拒绝策略
// 线程创建工厂使用默认的
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
   }
// 设置线程池时指定核心线程数、最大线程数、线程存活时间、等待队列、线程创建工厂及拒绝策略
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

通过观察上述每个构造器的源码实现,我们可以发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

下面解释一下构造器中各个参数的含义:

  • corePoolSize:核心池的线程个数上线,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程。
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。
  • unit:参数keepAliveTime的时间单位。
  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响;
  • threadFactory:线程工厂,主要用来创建线程;
  • handler:表示当拒绝处理任务时的策略。有以下四种取值:ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。

1.2 核心方法

在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法。

 public void execute(Runnable command) {
        // 判断提交的任务command是否为null,若是null,则抛出空指针异常;
        if (command == null)
            throw new NullPointerException();
        // 获取线程池中当前线程数
        int c = ctl.get();
        // 如果线程池中当前线程数小于核心池大小,进入if语句块
        if (workerCountOf(c) < corePoolSize) {
            // 如果以给定的命令启动一个核心线程执行任务成功,直接返回
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 如果线程池不处于运行状态并且移除刚加入的任务成功则执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果当前线程数为0,则在线程池里增加一个线程,保证队列里的任务不会没有线程执行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        } 
        // 尝试启动核心线程之外的线程,如果不满足,则执行对应的拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

主要方法addWorker。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果线程池状态大于SHUTDOWN或者线程池状态等于SHUTDOWN,firstTask不等于null
            // 或者线程池状态等于SHUTDOWN,任务队列等于空时,直接返回false结束。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 如果线程数量大于等于最大数量或者大于等于上限
                //(入参core传true,取核心线程数,否则取最大线程数),直接返回false结束。
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false
                // CAS操作给工作线程数加1,成功则跳到retry处,不再进入循环。
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 如果线程池状态与刚进入时不一致,则跳到retry处,再次进入循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 新建一个线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {

                    int rs = runStateOf(ctl.get());
                    // 如果线程池状态在SHUTDOWN之前或者
                    // 线程池状态等于SHUTDOWN并且firstTask等于null时,进入处理。
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 如果要执行的线程正在运行,则抛异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 启动线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果线程添加失败,则将新增的对应信息删除
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

1.3 任务执行run方法

在上述addWorker中,当调用线程的start方法启动线程后,会执行其中的run方法。

public void run() {
            runWorker(this);
        }

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 如果任务不为空或者新获取到的任务不为空
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 当线程池状态,大于等于 STOP 时,保证工作线程都有中断标志。
                // 当线程池状态,小于STOP时,保证工作线程都没有中断标志。
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

2 整体处理过程

通过上述源码分析,我们可以得出线程池处理任务的过程如下:

3 总结

本文从源码层面主要分析了线程池的创建、运行过程,通过上述的分析,可以看出当线程池中的线程数量超过核心线程数后,会先将任务放入等待队列,队列放满后当最大线程数大于核心线程数时,才会创建新的线程执行。

作者:京东物流 管碧强

来源:京东云开发者社区 自猿其说Tech 转载请注明来源文章来源地址https://www.toymoban.com/news/detail-747789.html

到了这里,关于ThreadPoolExecutor线程池内部处理浅析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • java线程池ThreadPoolExecutor使用

    在开发服务端软件项目时,软件经常需要处理执行时间很短而数目巨大的请求,如果为每一个请求创建一个新的线程,则会导致性能上的瓶颈。因为JVM需要频繁的处理线程对象的创建与销毁,如果请求的执行时间很短,则有可能花在创建和销毁线程对象上的时间大于真正执行

    2024年02月06日
    浏览(54)
  • 线程池ThreadPoolExecutor底层原理源码分析

    ThreadPoolExecutor中提供了两种执行任务的方法: void execute(Runnable command) Future? submit(Runnable task) 实际上submit中最终还是调用的execute()方法,只不过会返回⼀个Future对象,用来获取任务执行结果: execute(Runnable command)方法执行时会分为三步: 注意:提交⼀个Runnable时,不管当前线程

    2024年02月06日
    浏览(64)
  • Java多线程 - 创建线程池的方法 - ThreadPoolExecutor和Executors

    线程池介绍 什么是线程池 ? 线程池就是一个可以复用线程的技术。 不使用线程池的问题 : 如果用户每发起一个请求,后台就创建一个新线程来处理,下次新任务来了又要创建新线程,而创建新线程的开销是很大的,这样会严重影响系统的性能。 线程池工作原理 : 例如线程池

    2023年04月16日
    浏览(49)
  • 全网最详细的线程池 ThreadPoolExecutor 详解,建议收藏!

    五种状态: 线程池的 shutdown() 方法,将线程池由 RUNNING(运行状态)转换为 SHUTDOWN状态 线程池的 shutdownNow() 方法,将线程池由RUNNING 或 SHUTDOWN 状态转换为 STOP 状态。 注: SHUTDOWN 状态 和 STOP 状态 先会转变为 TIDYING 状态,最终都会变为 TERMINATED ThreadPoolExecutor 继承自 AbstractExe

    2024年02月03日
    浏览(47)
  • 线程池高手进阶:揭秘ThreadPoolExecutor的小妙招!

    ThreadPoolExecutor 是 Java 中用于创建和管理线程池的接口,当线程池中的任务队列已满,并且线程池中的线程数量已经达到最大时,如果再有新的任务提交,就需要一个策略来处理这些无法执行的任务。它 提供了四种拒绝策略,都是 RejectedExecutionHandler 接口的实现,如下: Abor

    2024年01月25日
    浏览(42)
  • Python多线程与线程池(python线程池ThreadPoolExecutor)concurrent.futures高级别异步执行封装

    在进行复杂的计算或处理大量数据时,可以通过创建多个线程来同时执行多个任务,从而提高程序的执行效率。这种技术称为多线程编程。 1.1 线程简介 线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一

    2024年02月12日
    浏览(32)
  • ThreadPoolExecutor——高效处理并发任务的必备良器

      ThreadPoolExecutor是Java concurrent中用于管理线程池的类,它是Executor框架的一个实现。线程池是一种提高应用程序性能和可靠性的技术,它将多个任务分配给多个线程执行,从而实现并发处理。ThreadPoolExecutor提供了一种灵活的方式来管理线程池,可以控制线程池的大小、阻塞

    2024年02月02日
    浏览(30)
  • MySQL内部机制:SQL语句的执行过程浅析

    目录 1. 连接与认证 2. 解析与优化 3. 查询缓存 4. 打开表和读取数据 5. 执行 6. 返回结果 7. 日志记录 8. 关闭连接 当我们在MySQL中执行一个SQL语句时,背后发生了一系列的操作和步骤。下面是一个简化的概述,描述了SQL语句在MySQL中的执行过程或原理: 客户端(如应用程序或命

    2024年03月12日
    浏览(55)
  • 多线程系列(十八) -AQS原理浅析

    在之前的文章中,我们介绍了 ReentrantLock、ReadWriteLock、CountDownLatch、CyclicBarrier、Semaphore、ThreadPoolExecutor 等并发工具类的使用方式,它们在请求共享资源的时候,都能实现线程同步的效果。 在使用方式上稍有不同,有的是独占式,多个线程竞争时只有一个线程能执行方法,比

    2024年03月13日
    浏览(51)
  • UE4 多线程源码浅析(3——TaskGraph)

    本文章只是我个人在学习虚幻引擎过程中的一些理解,不一定正确,若有说的不对的地方,欢迎指正。 前两篇我们分别讲了虚幻多线程的基础线程系统( FRunnable )和异步任务系统( AsyncTask ),本篇我们来讲讲虚幻多线程的最后一员大将—— TaskGraph 。 TaskGraph 字面意思是任

    2024年01月17日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包