深入理解Java线程池ThreadPoolExcutor实现原理、数据结构和算法(源码解析)

这篇具有很好参考价值的文章主要介绍了深入理解Java线程池ThreadPoolExcutor实现原理、数据结构和算法(源码解析)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

什么是线程池?

        线程池主要是为了解决执行新任务执行时,应用程序为减少为任务创建一个新线程和任务执行完毕时销毁线程所带来的开销。通过线程池,可以在项目初始化时就创建一个线程集合,然后在需要执行新任务时重用这些线程而不是每次都新建一个线程,一旦任务已经完成了,线程回到线程池中并等待下一次分配任务,达到资源复用的效果。

线程池主要优势?

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

如何创建线程?

通过Executors

创建线程池

newSingleThreadExecutor:创建一个只有一个线程的线程池,串行执行所有任务,即使空闲时也不会被关闭。可以保证所有任务的执行顺序按照任务的提交顺序执行。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。

适用场景:需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程活动的应用场景。

newFixedThreadPool:创建一个固定线程数量的线程池(corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作为阻塞队列)。初始化时线程数量为零,之后每次提交一个任务就创建一个线程,直到线程达到线程池的最大容量。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

适用场景:为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。

newCachedThreadPool:创建一个可缓存的线程池,线程的最大数量为Integer.MAX_VALUE。空闲线程会临时缓存下来,线程会等待60s还是没有任务加入的话就会被关闭。

适用场景:适用于执行很多的短时间异步任务的小程序,或者是负载较轻的服务器。

newScheduledThreadPool:创建一个支持执行延迟任务或者周期性执行任务的线程池。

ThreadPoolExecutor

阿里巴巴开发手册并发编程有一条规定:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这是为什么呢?主要是因为这样的可以避免资源耗尽的风险,因为使用Executors返回线程池对象的弊端有:

FixedThreadPool 和 SingleThreadPool 允许的阻塞队列长度为 Integer.MAX_VALUE,这样会导致堆积大量的请求,从而导致OOM;

CachedThreadPool 允许创建的线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

所以创建线程池,最好是根据线程池的用途,然后自己创建线程池。

目录

一、线程池系列相关文章

二、继承实现关系图

三、低层数据存储结构

3.1 核心属性

3.1.1 说明

3.1.2 线程池五种状态

3.2 构造器

四、工作原理

五、源码解析

5.1 核心方法execute()

5.2 添加任务addWork()

5.3 执行任务runWork()

5.4 阻塞队列取任务getTask()

5.5 Worker无任务最后处理processWorkerExit()

5.6 关闭线程池

5.6.1 关闭线程池shutdown()

5.6.2 关闭线程池shutdownNow()

5.6.3 尝试结束线程池tryTerminate()

5.6.4 判断线程池是否关闭awaitTermination()


一、线程池系列相关文章

        1.1 Java线程池ThreadPoolExcutor01-参数说明

        1.2 Java线程池ThreadPoolExcutor02-阻塞队列之ArrayBlockingQueue

        1.3 Java线程池ThreadPoolExcutor03-阻塞队列之LinkedBlockingQueue

        1.4 Java线程池ThreadPoolExcutor04-阻塞队列之PriorityBlockingQueue原理及扩容机制详解

        1.5 Java线程池ThreadPoolExcutor05-阻塞队列之DelayQueue原理及扩容机制详解

        1.6 Java线程池ThreadPoolExcutor06-阻塞队列之SynchronousQueue

        1.7 Java线程池ThreadPoolExcutor07-阻塞队列之LinkedTransferQueue

        1.8 Java线程池ThreadPoolExcutor07-阻塞队列之LinkedBlockingDeque

        1.9 Java线程池ThreadPoolExcutor08-4种拒绝策略

二、继承实现关系图

java线程池threadpool,线程池,多线程,java,数据结构,算法,面试

三、低层数据存储结构

3.1 核心属性

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    private final BlockingQueue<Runnable> workQueue;
    private final ReentrantLock mainLock = new ReentrantLock();
    private final HashSet<Worker> workers = new HashSet<>();
    private final Condition termination = mainLock.newCondition();
    private int largestPoolSize;
    private long completedTaskCount;
    private volatile ThreadFactory threadFactory;
    private volatile RejectedExecutionHandler handler;
    private volatile long keepAliveTime;
    private volatile boolean allowCoreThreadTimeOut;
    private volatile int corePoolSize;
    private volatile int maximumPoolSize;
...
}
3.1.1 说明
属性名 说明
COUNT_BITS 用于计算线程池的状态值、容量
workQueue

阻塞队列。七个:

ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务。详见1.2链接。

LinkedBlockingQueue: 基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene。详见1.3链接。

PriorityBlockingQueue:具有优先级的无界阻塞队列。详见1.4链接。

DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。详见1.5链接。

SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。详见1.6链接。

LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。详见1.7链接。

LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。详见1.8链接。

threadFactory
线程工厂
handler

拒绝策略处理类。四种:AbortPolicy策略(默认)、DiscardPolicy策略、DiscardOldestPolicy策略 和 CallerRunsPolicy策略。详见1.9链接。

keepAliveTime
为多余的空闲线程等待新任务的最长时间, 超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余 的空闲线程会被立即终止。
allowCoreThreadTimeOut

如果为true,核心线程使用keepAliveTime超时等待工作。

如果为false(默认),核心线程即使在空闲时也保持活动。

corePoolSize 核心线程数
maximumPoolSize 最大线程数
3.1.2 线程池五种状态

属性ctl 是ThreadPoolExecutor内部一个用来进行技术和状态控制的控制变量,它使用了一个原子整形字段来实现两个方面的管理:

  • 低29位记录线程池的线程数,
  • 高3位记录线程池的工作状态

五种状态:

  • RUNNING线程池一旦被创建处于RUNNING状态。线程池处于RUNNING状态时,能够接收新任务以及对已添加的任务进行处理。
  • SHUTDOWN线程池处于SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。调用线程池的shutdown()接口时,线程池由RUNNING状态转变为SHUTDOWN状态。
  • STOP: 线程池处于STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()接口时,线程池由RUNNING状态或者SHUTDOWN状态变为STOP状态。
  • TIDYING: 当所有的任务已终止,ctl记录的任务数为0,线程池的状态会变为TIDYING状态。当线程池状态为SHUTDOWN时,阻塞队列为空并且线程池中执行的任务也为空时,就会由SHUTDOWN状态变为 TIDYING状态;当线程池为STOP时,线程池中执行的任务为空时,就会又STOP状态变为 TIDYING状态。
  • TERMINATED: 线程池彻底终止,就会变成TERMINATED状态。线程池处于TIDYING状态时,调用terminated()就会由TIDYING状态变为TERMINATED状态。

状态转换如下图:

java线程池threadpool,线程池,多线程,java,数据结构,算法,面试

                                                                图3.1.2-1

3.2 构造器

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    ...
}
参数名 说明
corePoolSize 核心线程数
maximumPoolSize 最大线程数
keepAliveTime 为多余的空闲线程等待新任务的最长时间, 超过这个时间后多余的线程将被终止。
unit keepAliveTime的单位
workQueue 阻塞队列
threadFactory 线程工厂
handler 拒绝策略处理类

详见1.1链接。

四、工作原理

java线程池threadpool,线程池,多线程,java,数据结构,算法,面试

  • 如果任务null直接退出,否则执行步骤2;
  • 若工作线程数小于核心线程数,执行步骤3创建工作线程并执行任务,否则执行步骤4。
  • 若阻塞队列已满
    • 工作线程数小于最大线程数,执行步骤6创建工作线程并执行任务,否则执行步骤10 拒绝任务(执行拒绝策略)
  • 若阻塞队列未满,添加任务到阻塞队列,若线程状态不为运行中,则任务从队列中取出,并执行步骤10 拒绝任务(执行拒绝策略)

五、源码解析

5.1 核心方法execute()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
    
   
    int c = ctl.get();
    //通过位运算,获取当前线程数,如果线程数小于corePoolSize,则执行addWorker(),创建新线程执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        // 若添加工作任务执行任务不成功,尝试添加到阻塞队列中
        c = ctl.get();
    }
    // 线程数超过核心线程数,任务添加到阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 添加到阻塞队列,但若线程池状态不是运行中,则从队列中取出
        if (! isRunning(recheck) && remove(command))
            // 并且执行拒绝策略
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 阻塞队列已满,则若工作线程数小于最大线程数 创建线程执行任务
    else if (!addWorker(command, false))
        // 若线程数已达最大线程数,否则执行拒绝策略
        reject(command);
}

从源码中可以清晰看出线程池执行任务的逻辑与“四、工作原理”所述一致。

5.2 添加任务addWork()

THreadPoolExecutor内部类Worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    private static final long serialVersionUID = 6138294804551838833L;
    //当前Worker所处于的线程
    final Thread thread;
    //待执行的任务
    Runnable firstTask;
    //任务计数器
    volatile long completedTasks;
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
}

它是对Runnable进行了封装,主要功能是对待执行的任务进行中断处理和状态监控。Worker还继承了AQS,在每个任务执行时进行了加锁的处理。可以将Worker简单理解为可中断的、可进行锁处理的Runnable。

创建工作线程并执行是addWork()方法,源码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    // 自旋,判断线程池状态,并对线程数量执行原子+1操作
    retry:
    for (;;) {
        int c = ctl.get();
        // 获取线程池状态
        int rs = runStateOf(c);
       	// 如果线程池已经关闭,则直接返回false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 判断线程数是否已达上限,根据传入参数core的不同,判断corePoolSize或者maximumPoolSize。
            // 如果线程数已达上限,直接返回false
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 执行原子操作,对线程数+1
            if (compareAndIncrementWorkerCount(c))
                // 执行原子操作成功,则退出自旋 并 创建工作线程执行任务
                break retry;
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    // ctl变量操作成功,执行Worker相关逻辑
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建一个新的Worker,传入待执行的任务
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 加锁后,再次判断线程池状态
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果创建了新的Worker,则调用其start方法立即执行
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWork()方法首先在一个自旋中做三个判断:

  • 线程是否关闭,若关闭则直接返回false退出
  • 通过参数core来确定工作线程数与核心线程数比较 还是 与最大线程数比较,若工作线程数大,则返回false退出
  • CAS尝试将线程数加1,若成功则创建一个辨析的Worker并立即执行其start()方法执行该任务。

5.3 执行任务runWork()

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 指向线程池调execute添加的任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 首先释放锁,允许中断
    w.unlock(); 
    boolean completedAbruptly = true;
    
    try {
        // 从worker中取第1个任务,若任务为空则从阻塞队列中取任务,直到返回null,这里达到线程复用的效果,实现线程处理多个任务。
        while (task != null || (task = getTask()) != null) {
            // 执行任务前先加锁
            w.lock();
            // 如果线程池已经终止,则中断该线程。保存了线程池在STOP状态下线程中断的,非STOP状态下线程没有被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 记录正在运行的任务
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务(调任务的run方法)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 执行完任务,清除记录信息
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 当前Worker计数器+1,统计worker执行了多少任务,最后累加进completedTaskCount变量,可以调用相应方法返回一些统计信息。
                w.completedTasks++;
                // 释放锁
                w.unlock();
            }
        }
        // 表示worker是否异常终止,执行到这里代表执行正常,后续的方法需要这个变量
        completedAbruptly = false;
    } finally {
        // completedTasks累加到completedTaskCount变量中
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker()的主要逻辑就是进行线程池的关闭检查,然后执行任务,并将计数器+1。

注意这行代码 while (task != null || (task = getTask()) != null) ,当task = w.firstTask 的值为null时执行task = getTask(), getTask是从任务列队是取任务。也就是说,Worker在执行完提交给自己的任务后,会执行任务队列中的任务。

5.4 阻塞队列取任务getTask()

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

说明:

  • if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) 这行代码体现出了 SHUTDOWN状态和 STOP状态的区别。若线程池状态为 SHUDOWN 状态,则条件为 false,取任务执行;而如果线程池的状态为 STOP 状态,则条件为 true,不管队列是否还有任务,不再处理了。
  • timed后在的判断逻辑有点复杂,以下几种情况为true,CAS尝试将线程数减1
    • 工作线程数大于最大线程数(后面wc>1||workQueue.isEmpty()应该自然满足)(可能是在运行中调用setMaximumPoolSize)
    • 设置了allowCoreThreadTimeOut为true且队列中取的任务为null,说明没任务了
    • 工作线程数大于核心线程数 且队列中取的任务为null(后面wc>1||workQueue.isEmpty()应该自然满足)
  • try后面逻辑
    • 延时取任务:allowCoreThreadTimeOut为true 或者 wc > corePoolSize
    • 直接取任务(若没任务则阻塞等待):allowCoreThreadTimeOut为false 或者 wc <= corePoolSize

结论:

  • allowCoreThreadTimeOut设置为true时,工作线程数达最大之后,因无新任务而线程减少,工作线程总数最小值可以为0
  • allowCoreThreadTimeOut设置为false时,只有wc大于核心线程数,才去做CAS减线程数操作,所以工作线程数达到最大之后,因无新任务而线程减少,工作线程总数最小为核心线程数

5.5 Worker无任务最后处理processWorkerExit()

private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
   
    tryTerminate();

    int c = ctl.get();
    // 判断状态是否小于STOP
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

说明:

  • decrementWorkerCount():如果为异常结束,则工作线程数减1
  • try 逻辑:加锁累加完成任务数
  • tryTerminate(): 尝试终止线程池
  • 判断状态是否小于STOP为true
    • allowCoreThreadTimeOut设置为true
      • 若队列不为空:至少保留一个worker
      • 若队列为空:直接退出,线程池的worker数减少,最终可能为0
    • allowCoreThreadTimeOut设置为false: 则保持worker数不少于corePoolSize(若线程数小于corePoolSize,则添加 null任务的worker

总结worker:线程池启动后,worker在池内创建,包装了提交的Runnable任务并执行,执行完就等待下一个任务,不再需要时就结束。

5.6 关闭线程池

从图3.1.2-1看出有两种方法关闭线程池:

  • shutdown: 不能再提交任务,已经提交的任务可继续执行;
  • shutdownNow: 不能再提交任务,已经提交的任务未执行的任务不再执行,正在执行的任务可继续执行,但会中断,返回已提交未执行的任务
5.6.1 关闭线程池shutdown()
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

说明:

  • checkShutdownAccess(): 安装策略机构
  • advanceRunState(SHUTDOWN): 线程池状态切换到SHUTDOWN状态
  • interruptIdleWorkers(): 中断所有空闲的worker
  •  tryTerminate(): 尝试结束线程池
5.6.2 关闭线程池shutdownNow()
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

说明:

  • checkShutdownAccess(): 安装策略机构
  • advanceRunState(STOP): 线程池状态切换到STOP状态
  • interruptWorkers(): 中断所有空闲的worker
  • drainQueue(): 取出等待队列里未执行的任务
  • tryTerminate(): 尝试结束线程池
5.6.3 尝试结束线程池tryTerminate()
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

说明:

  • ctl.get():获取ctl值判断线程池状态,以下状态不处理直接return
    • RUNNING状态,正在运行状态肯定不能停
    • TIDYING或TERMINATED状态,已经没有正在运行的worker了
    • SHUTDOWN状态且阻塞队列不为空,执行完才能停
    • 工作线程数不为0。又调了一次interruptIdleWorkers(ONLY_ONE),可能疑惑在调tryTerminate之前时已经调用过了,为什么又调用,而且每次只中断一个空闲worker?我们需要知道,shutdown时worker可能在执行中,执行完阻塞在队列的take,不知道要结束,所有要补充调用interruptIdleWorkers。每次只中断一个是因为processWorkerExit时,还会执行tryTerminate,自动中断下一个空闲的worker。
  • try逻辑:加锁CAS尝试将线程池状态切换成TIDYING,再切换成TERMINATED状态,terminated是空方法供子类来实现。
5.6.4 判断线程池是否关闭awaitTermination()
public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

说明:接收人timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。

ps: 以上是研读源码加上翻阅许多文献理解的总结,如有错误或不足的地方,欢迎指出,欢迎留言交流。我会继续努力学习和分享更多有干货的内容。文章来源地址https://www.toymoban.com/news/detail-728213.html

到了这里,关于深入理解Java线程池ThreadPoolExcutor实现原理、数据结构和算法(源码解析)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入理解Java多线程编程

            Java的多线程编程在现代应用程序中扮演着重要的角色。它可以提高应用程序的性能、并发处理能力和响应性。然而,多线程编程也带来了一些挑战,如线程安全、死锁和资源竞争等问题。本文将深入探讨Java多线程编程的基本概念和最佳实践。 1. 理解线程和进程

    2024年02月08日
    浏览(35)
  • “深入理解Java的多线程编程“

    多线程编程是指在一个程序中同时运行多个线程,以提高程序的并发性和性能。Java是一门支持多线程编程的强大编程语言,提供了丰富的多线程相关类和接口。 在Java中,可以通过以下方式实现多线程编程: 继承Thread类:创建一个继承自Thread类的子类,并重写run()方法,在

    2024年02月13日
    浏览(40)
  • Java-多线程-深入理解ConcurrentHashMap

        ConcurrentHashMap(Concurrent: 并存的,同时发生的 ;)     ConcurrentHashMap是Java中的一个线程安全的哈希表实现,它可以在多线程环境下高效地进行并发操作。     HashMap线程不安全,在多线程操作下可能会导致数据错乱     使用HashMap和ConcurrentHashMap分别实

    2024年02月14日
    浏览(25)
  • 【多线程系列-03】深入理解java中线程的生命周期,任务调度

    多线程系列整体栏目 内容 链接地址 【一】深入理解进程、线程和CPU之间的关系 https://blog.csdn.net/zhenghuishengq/article/details/131714191 【二】java创建线程的方式到底有几种?(详解) https://blog.csdn.net/zhenghuishengq/article/details/127968166 【三】深入理解java中线程的生命周期,任务调度 ht

    2024年02月17日
    浏览(37)
  • 深入理解 Java 多线程、Lambda 表达式及线程安全最佳实践

    线程使程序能够通过同时执行多个任务而更有效地运行。 线程可用于在不中断主程序的情况下在后台执行复杂的任务。 创建线程 有两种创建线程的方式。 扩展Thread类 可以通过扩展Thread类并覆盖其run()方法来创建线程: 实现Runnable接口 另一种创建线程的方式是实现Runnable接口

    2024年03月15日
    浏览(43)
  • 深入理解数据结构:队列的实现及其应用场景

    队列(Queue)是一种具有先进先出(FIFO)特性的数据结构。在队列中,数据的插入和删除操作分别在队列的两端进行。插入操作在队列的尾部进行,而删除操作则在队列的头部进行。这种特性使得队列在很多实际应用中非常有用,比如任务调度、缓冲区管理等。 线性表是一种

    2024年04月28日
    浏览(34)
  • Java开发 - 深入理解Redis哨兵机制原理

    Redis的主从、哨兵模式、集群模式,在前文中都已经有了详细的搭建流程,可谓是手把手教程,也得到了很多朋友的喜欢。由于前文偏向于应用方面,就导致了理论知识的匮乏,我们可能会用了,但却不明所以,所以今天,博主就通过接下里的几篇博客给大家分别讲解Redis哨兵

    2024年02月17日
    浏览(28)
  • 华为云出品《深入理解高并发编程:Java线程池核心技术》电子书发布

    系统拆解线程池核心源码的开源小册 透过源码看清线程池背后的设计和思路 详细解析AQS并发工具类 点击下方链接进入官网,右上角搜索框搜索“《深入理解高并发编程:Java线程池核心技术》” 即可获取下载。 https://auth.huaweicloud.com/authui/login.html?locale=zh-cnservice=https%3A%2F%2F

    2024年02月16日
    浏览(26)
  • Java开发 - 深入理解Redis Cluster的工作原理

    前面我们讲过Redis Cluster的搭建方式,也是本着应用优先的原则,所以对其基础概念和原理几乎没有涉及,但当学会了Redis集群的搭建方式之后,对于其原来我们还是要知道一些的,所以这篇博客,我们将一起来学习Redis Cluster的一些相关知识。 在开始Redis Cluster的讲解之前,还

    2024年02月15日
    浏览(36)
  • “深入理解Java虚拟机(JVM):背后的工作原理解析“

    标题:深入理解Java虚拟机(JVM):背后的工作原理解析 摘要:本文将深入探讨Java虚拟机(JVM)的工作原理,包括内存管理、垃圾回收、即时编译器等关键概念,以及如何优化代码以提高性能。通过示例代码和详细解释,读者将对JVM的底层原理有更深入的理解。 正文: 一、

    2024年02月12日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包