【Java 】从源码全面解析Java 线程池

这篇具有很好参考价值的文章主要介绍了【Java 】从源码全面解析Java 线程池。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、引言

线程池技术在互联网技术使用如此广泛,几乎所有的后端技术面试官都要在线程池技术的使用和原理方面对小伙伴们进行 360° 的刁难。

作为一个在互联网公司面一次拿一次 Offer 的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。

希望能帮助各位读者以后面试势如破竹,对面试官进行 360° 的反击,吊打问你的面试官,让一同面试的同僚瞠目结舌,疯狂收割大厂 Offer!

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马。

二、使用

我想大部分人应该都使用过线程池,我们的 JDK 中也提供了一些包装好的线程池使用,比如:

  • newFixedThreadPool:返回一个核心线程数为 nThreads 的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
  • newSingleThreadExecutor:返回一个核心线程数为 1 的线程池
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
  • newCachedThreadPool:大同小异
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

通过上面 JDK 提供的我们可以发现一个共识,他们其实都是调用了 ThreadPoolExecutor 的构造方法来进行线程池的创建。

这时候,我们不免有疑问,我们难道不可以直接使用 ThreadPoolExecutor 的构造方法去进行创建嘛

是的,阿里巴巴Java开发手册中明确指出,『不允许』使用Executors创建线程池

所以,我们在生产中,一般使用 ThreadPoolExecutor 的构造方法自定义去创建线程池,比如:

public class ThreadPoolTest {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,      // 核心线程数
                5,  // 最大线程数
                200,   // 非核心工作线程在阻塞队列位置等待的时间
                TimeUnit.SECONDS,  // 非核心工作线程在阻塞队列位置等待的单位
                new LinkedBlockingQueue<>(), // 阻塞队列,存放任务的地方
                new ThreadPoolExecutor.AbortPolicy() // 拒绝策略:这里有四种
        );

        for (int i = 0; i < 10; i++) {
            MyTask task = new MyTask();
            executor.execute(task);
        }

        // 关闭线程
        executor.shutdown();

    }
}

class MyTask implements Runnable {
    @Override
    public void run() {
        System.out.println("我被执行了....");
    }
}

三、源码

整体的流程如下:
【Java 】从源码全面解析Java 线程池

1、初始化

聊源码不从初始化聊的,都是不讲道理的

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

// 执行ThreadPoolExecutor的构造方法进行初始化
// corePoolSize: 核心线程数
// maximumPoolSize: 最大线程数
// keepAliveTime: 非核心工作线程在阻塞队列位置等待的时间
// unit: 非核心工作线程在阻塞队列位置等待的时间单位
// workQueue: 存放任务的阻塞队列
// threadFactory: 线程工厂(生产线程的地方)
// RejectedExecutionHandler: 拒绝策略
public ThreadPoolExecutor(int corePoolSize, 
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 核心线程数可以为0
    // 最大线程数不为0
    // 最大线程数 大于 核心线程数
    // 等待时间大于等于0
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    // 将当前的入参赋值给成员变量
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

我们上面初始化的过程主要对入参做了一些校验,然后将方法的入参赋予给成员变量

1.1 拒绝策略

1.1.1 AbortPolicy

简单粗暴,直接抛出异常

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
    }
}

1.1.2 CallerRunsPolicy

当前拒绝策略会在线程池无法处理任务时,将任务交给调用者处理

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 如果当前的
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

1.1.3 DiscardOldestPolicy

如果当前的阻塞队列满了,弹出时间最久的

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            // 获取阻塞队列,弹出一个时间最久的
            e.getQueue().poll();
            // 执行当前的
            e.execute(r);
        }
    }
}

1.1.4 DiscardPolicy

简单粗暴,不做任何操作

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

1.1.5 自定义拒绝策略

自己写的业务逻辑,可以将拒绝的任务放至数据库等存储,等后续在执行
public static class MyRejectedExecution implements RejectedExecutionHandler{

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("这是我自己的拒绝策略");
    }
}

1.2 其余变量

// 该数值代表两个意思:
// 高3位表示当前线程池的状态
// 低29位表示当前线程池工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//  COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY就是当前工作线程能记录的工作线程的最大个数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 111:代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务。
private static final int RUNNING    = -1 << COUNT_BITS;
// 000:代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完。
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001:代表STOP状态,不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管。
private static final int STOP       =  1 << COUNT_BITS;
// 010:代表TIDYING状态,这个状态是否SHUTDOWN或者STOP转换过来的,代表当前线程池马上关闭,就是过渡状态。
private static final int TIDYING    =  2 << COUNT_BITS;
// 011:代表TERMINATED状态,这个状态是TIDYING状态转换过来的,转换过来只需要执行一个terminated方法。
private static final int TERMINATED =  3 << COUNT_BITS;

// 基于&运算的特点,保证只会拿到ctl高三位的值
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 基于&运算的特点,保证只会拿到ctl低29位的值
private static int workerCountOf(int c)  { return c & CAPACITY; }

线程池的状态变化流程图:

【Java 】从源码全面解析Java 线程池

2、线程池的execute方法

  • Step1:当前的线程池个数低于核心线程数,直接添加核心线程即可
  • Step2:当前的线程池个数大于核心线程数,将任务添加至阻塞队列中
  • Step3:如果添加阻塞队列失败,则需要添加非核心线程数处理任务
  • Step4:如果添加非核心线程数失败(满了),执行拒绝策略
public void execute(Runnable command) {
    // 如果当前传过来的任务是null,直接抛出异常即可
    if (command == null)
        throw new NullPointerException();
    // 获取当前的数据值
    int c = ctl.get();
    
//==========================线程池第一阶段:启动核心线程数开始==================================================
    // Step1:获取ctl低29位的数值,与我们的核心线程数相比
    if (workerCountOf(c) < corePoolSize) {
        // Step2:添加一个核心线程
        if (addWorker(command, true)){
            return;
        }
        // 更新一下当前值
        c = ctl.get();
    }
//==========================线程池第一阶段:启动核心线程数结束==================================================
    
    // 如果走到下面会有两种情况:
    // 1、核心线程数满了,需要往阻塞队列里面扔任务
    // 2、核心线程数满了,阻塞队列也满了,执行拒绝策略
    
//==========================线程池第二阶段:任务放至阻塞队列开始==================================================
    // 判断当前的状态是不是Running的状态(RUNNING可以处理任务,并且处理阻塞队列中的任务)
    // 如果是Running的状态,则可以将任务放至阻塞队列中
    // 这里如果放阻塞队列失败了,证明阻塞队列满了
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次更新数值
        int recheck = ctl.get();
        // 再次校验当前的线程池状态是不是Running
        // 如果线程池状态不是Running的话,需要删除掉刚刚放的任务
        if (!isRunning(recheck) && remove(command)){
            // 执行拒绝策略
            reject(command);
        } 
        // 如果到这里,说明上面阻塞队列中已经有数据了
        // 如果线程池的个数为0的话,需要创建一个非核心工作线程去执行该任务
        // 不能让人家堵塞着
        else if (workerCountOf(recheck) == 0){
            addWorker(null, false);
        }
    }
//==========================线程池第二阶段:任务放至阻塞队列结束==================================================

    // 如果走到这里的逻辑,证明上面的逻辑没走通,有以下两种情况:
    // 1、线程池的状态不是Running
    //    1.1 如果是这种情况,下面的添加非核心工作线程失败执行拒绝策略,但这个并不是这个逻辑的重点
    // 2、阻塞队列添加任务失败(阻塞队列满了)
    //    2.1 这种情况才是我们需要关心的
    //    2.2 阻塞队列满了,添加非核心工作线程
    //    2.3 若添加非核心工作线程失败,证明已经到达maximumPoolSize的限制,执行拒绝策略
//==========================线程池第三阶段:启动非核心线程数开始==================================================
    // 添加一个非核心工作线程
    else if (!addWorker(command, false))
        // 工作队列中添加任务失败,执行拒绝策略
        reject(command);
//==========================线程池第三阶段:启动非核心线程数结束==================================================
}

流程图如下:
【Java 】从源码全面解析Java 线程池

3、线程池的addWorker方法

3.1 校验

  • 校验当前线程池的状态
  • 校验当前线程池工作线程的个数(核心线程数、最大工作线程数)
private boolean addWorker(Runnable firstTask, boolean core) {
    // 这里主要是为了结束整个循环
    retry:
    for (;;) {
        // 获取当前线程池的数值(ctl)
        int c = ctl.get();
        
        // runStateOf:基于&运算的特点,保证只会拿到ctl高三位的值
        int rs = runStateOf(c);

//==========================线程池状态判断=============================================================
        // rs >= SHUTDOWN:代表当前线程池状态为:SHUTDOWN、STOP、TIDYING、TERMINATED,线程池状态异常
        // 但这里SHUTDOWN状态稍许不同(不会接收新任务,正在处理的任务正常进行,阻塞队列的任务也会做完)
        // 如果当前的状态是SHUTDOWN状态并且阻塞队列任务不为空且新任务为空
        // 需要新起一个非核心工作线程去执行任务
        // 如果不是前面的,直接返回false即可
        if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())){
            return false;
        }

//==========================工作线程个数判断==========================================================
        for (;;) {
            // 获取当前线程池中线程的个数
            int wc = workerCountOf(c);
            // 1、如果线程池线程的个数是否超过了工作线程的最大个数
            // 2、core=true(核心线程)=false(工作线程)
            // 2.1 根据当前core判断创建的是核心线程数(corePoolSize)还是非核心线程数(maximumPoolSize)
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){
                return false;
            }
            // 尝试将线程池线程加一
            if (compareAndIncrementWorkerCount(c)){
                // CAS成功后,直接退出外层循环,代表可以执行添加工作线程操作了。
                break retry;
            }
            
            // 获取当前线程池的数值(ctl)
            c = ctl.get(); 
            // 获取当前线程池的状态
            // 判断当前线程池的状态等不等于我们上面的rs
            // 我们线程池的状态被人更改了,需要重新跑整个for循环判断逻辑
            if (runStateOf(c) != rs){
                continue retry;
            }
        }
    }
    // 省略下面的代码
}

3.2 添加线程

{
    // 省略校验的步骤
    
    // 两个标记
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 将当前的任务封装成Worker
        w = new Worker(firstTask);
        // 拿到当前Worker的线程
        final Thread t = w.thread;
        // 线程不为空
        if (t != null) {
            // 上锁,保证线程安全(workers、largestPoolSize)
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                
                // 获取线程池的状态
                int rs = runStateOf(ctl.get());

                // 1、rs < SHUTDOWN:保证当前线程池的状态一定是RUNNING状态
                // 2、rs == SHUTDOWN && firstTask == null:如果当前线程池是SHUTDOWN状态且新任务为空
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                    // 判断线程是否还活着
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    // private final HashSet<Worker> workers = new HashSet<Worker>();
                    // 添加到我们的work队列中
                    workers.add(w);
                    // 获取works的大小
                    int s = workers.size();
                    // largestPoolSize在记录最大线程个数的记录
                    // 如果当前的大小比最大的还要打,替换即可
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // worker添加成功
                    workerAdded = true;
                }
            } finally {
                // 解锁
                mainLock.unlock();
            }
            // 如果添加成功
            if (workerAdded) {
                // 启动线程
                t.start();
                // 线程启动标志位
                workerStarted = true;
            }
        }
    } finally {
        // 如果线程没有启动成功,从workers集合中删除掉该worker
        if (!workerStarted)
            addWorkerFailed(w);
    }
    // 返回线程是否启动成功
    return workerStarted;
}

// Worker的初始化
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // 从线程工厂里面拿一个线程出来
    this.thread = getThreadFactory().newThread(this);
}

// 从workers集合中删除掉该worker
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

4、线程池的 worker 源码

// Worker继承了AQS,目的就是为了控制工作线程的中断。
// Worker实现了Runnable,内部的Thread对象,在执行start时,必然要执行Worker中断额一些操作
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    
    private static final long serialVersionUID = 6138294804551838833L;
    // 当前线程工厂创建的线程(也是执行任务使用的线程)
    final Thread thread;
    
    // 当前的第一个任务
    Runnable firstTask;
  
    // 记录执行了多少个任务
    volatile long completedTasks;

    // 构造方法
    Worker(Runnable firstTask) {
        // 将State设置为-1,代表当前不允许中断线程
        setState(-1); 
        // 设置任务
        this.firstTask = firstTask;
        // 设置线程
        this.thread = getThreadFactory().newThread(this);
    }

    // 线程启动执行的方法
    public void run() {
        runWorker(this);
    }
    
    // =======================Worker管理中断================================   
    // 当前方法是中断工作线程时,执行的方法
    void interruptIfStarted() {
        Thread t;
        // 只有Worker中的state >= 0的时候,可以中断工作线程
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                // 如果状态正常,并且线程未中断,这边就中断线程
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    } 
}

5、线程池的 runWorker 方法

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    // 拿到当前的线程
    Thread wt = Thread.currentThread();
    // 拿到当前Worker的第一个任务(如果携带的话)
    Runnable task = w.firstTask;
    // 置空
    w.firstTask = null;
    // 解锁
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        // 如果任务不等于空 或者 从阻塞队列中拿到的任务不等于空
        while (task != null || (task = getTask()) != null) {
            // 加锁
            w.lock();
            // 如果线程池状态 >= STOP,确保线程中断了
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 在任务执行前做一些操作,自己实现的钩子
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 任务执行
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 任务执行后一些操作:自己实现的钩子
                    afterExecute(task, thrown);
                }
            } finally {
                // 任务置空
                task = null;
                // 执行任务+1
                w.completedTasks++;
                // 解锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 删除线程的方法
        processWorkerExit(w, completedAbruptly);
    }
}

6、线程池的 getTask 方法

private Runnable getTask() {
    // 超时的标记
    boolean timedOut = false; 

    // 死循环拿数据
    for (;;) {
        // 拿到当前的ctl
        int c = ctl.get();
        // 获取其线程池状态
        int rs = runStateOf(c);

        // 如果线程池状态是STOP,没有必要处理阻塞队列任务,直接返回null
        // 如果线程池状态是SHUTDOWN,并且阻塞队列是空的,直接返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 线程池中的线程个数减一
            decrementWorkerCount();
            return null;
        }

        // 当前线程池中线程个数
        int wc = workerCountOf(c);

        // 这里是个重点
        // allowCoreThreadTimeOut:是否允许核心线程数超时(开启这个之后),核心线程数也会执行下面超时的逻辑
        // wc > corePoolSize:当前线程池中的线程个数大于核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // wc > maximumPoolSize:基本不存在
        // timed && timedOut:第一次肯定是失败的(超时标记为false)
        if ((wc > maximumPoolSize || (timed && timedOut))
            // 1、线程个数为1
            // 2、阻塞队列是空的
            && (wc > 1 || workQueue.isEmpty())) {
            // 线程池的线程个数减一
            if (compareAndDecrementWorkerCount(c)){
                return null;
            }
            continue;
        }

        try {
            // 根据我们前面的timed的值(当前线程池中的线程个数是否大于核心线程数)
            // 如果大于,执行workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)带有时间的等待,超过时间无任务,会返回null
            // 如果小于,执行workQueue.take(),死等任务,不会返回null
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null){
                return r;
            }
            // 到这里,说明上面的等待超时了
            // 这里要注意一下,如果这里超时后,我们上面 if ((wc > maximumPoolSize || (timed && timedOut)) 这个判断要起作用了
        	// (timed && timedOut) true
            // wc > 1 || workQueue.isEmpty():当线程大于1或者阻塞队列无数据,直接返回null,让外部循环删除
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

7、线程池的 processWorkerExit 方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 正常退出的:completedAbruptly=false
    // 不是正常退出的:completedAbruptly=true
    if (completedAbruptly) 
        decrementWorkerCount();

    // 加锁——上锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 将当前worker的执行任务数累加到线程池中
        completedTaskCount += w.completedTasks;
        // 线程池删除该工作线程
        workers.remove(w);
    } finally {
        // 解锁
        mainLock.unlock();
    }

    tryTerminate();

    // 获取ctl的数据
    int c = ctl.get();
    // 这里只有SHUTDOWN、RUNNING会进入判断
    if (runStateLessThan(c, STOP)) {
        // 正常退出的
        if (!completedAbruptly) {
            // 是否允许超时
            // 允许:0
            // 不允许:核心线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果min=0并且阻塞队列不为空
            // 将min设置成1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 当前线程池的大小大于最小值,直接返回即可
            if (workerCountOf(c) >= min){
                return;
            }
        }
        // 如果没有的话,说明线程池中没有线程了,并且还有阻塞任务
        // 只能添加一个非核心线程去处理这些任务
        addWorker(null, false);
    }
}

8、线程池的关闭方法

8.1 shutdownNow 方法

  • 将线程池状态修改为Stop(不会接收新任务,正在处理任务的线程会被中断,阻塞队列的任务一个不管)
  • 将线程池中的线程全部中断
  • 删除当前线程池所有的工作线程
  • 将线程池的状态从:Stop --> TIDYING --> TERMINATED,正式标记线程池的结束(唤醒一下等待的主线程)
public List<Runnable> shutdownNow() {
    // 声明返回结果
    List<Runnable> tasks;
    // 加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 将线程池状态修改为STOP
        advanceRunState(STOP);
        // 将线程池中的线程全部中断
        interruptWorkers();
        // 删除当前所有的工作线程
        tasks = drainQueue();
    } finally {
        // 解锁
        mainLock.unlock();
    }
    // 查看当前线程池是否可以变为TERMINATED状态
    // 从 Stop 状态修改为 TIDYING,在修改为 TERMINATED
    tryTerminate();
    return tasks;
}

// targetState = STOP
// 作用:将当前线程池的状态修改为Stop
private void advanceRunState(int targetState) {
    // 进来直接死循环
    for (;;) {
        // 拿到当前的ctl
        int c = ctl.get();
        // runStateAtLeast(c, targetState):当前的c是不是大于STOP(如果大于Stop的话,说明线程池状态已经G了
        // 基于CAS,将ctl从c修改为Stop状态,不修改工作线程个数,仅仅将状态修改为Stop
        // 如果可以修改成功,直接退出即可
        if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

// 将线程池中的线程全部中断
private void interruptWorkers() {
    // 加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 循环遍历线程组
        for (Worker w : workers)
            // 中断线程
           	// 这里会给线程打一个中断的标记,具体什么时候中断线程,需要我们自己去控制
            w.interruptIfStarted();
    } finally {
        // 解锁
        mainLock.unlock();
    }
}

// 删除当前所有的工作线程
private List<Runnable> drainQueue() {
    // 存放工作线程的队列
    BlockingQueue<Runnable> q = workQueue;
    // 返回的结果
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    // 清空阻塞队列并将数据放入taskList中
    q.drainTo(taskList);
    // 校验当前的数据是够真的清空
    if (!q.isEmpty()) {
        // 如果确实有遗漏的,毕竟这哥们也没上锁
        // 手动的将线程从workQueue删除掉并且放到taskList中
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    // 最终返回即可
    return taskList;
}

final void tryTerminate() {
    for (;;) {
        // 拿到ctl
        int c = ctl.get();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // CAS将当前的ctl设置成TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 该方法是一个钩子函数,我们自己定义,在线程池销毁之前做最后的处理
                    terminated();
                } finally {
                    // 将ctl设置成TERMINATED标志着线程池的正式结束
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 线程池提供了一个方法,主线程在提交任务到线程池后,是可以继续做其他操作的。
                    // 咱们也可以让主线程提交任务后,等待线程池处理完毕,再做后续操作
                    // 这里线程池凉凉后,要唤醒哪些调用了awaitTermination方法的线程
                    // 简单来说,当时等待线程池返回的主线程,由于线程池已经销毁了,他们也必须要唤醒
                    termination.signalAll();
                }
                return;
            }
        } finally {
            // 解锁
            mainLock.unlock();
        }
    }
}

8.2 shutdown 方法

public void shutdown() {
    // 加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 将线程池状态修改为SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 将线程池中的线程全部中断
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 查看当前线程池是否可以变为TERMINATED状态
    // 从 SHUTDOWN 状态修改为 TIDYING,在修改为 TERMINATED
    tryTerminate();
}

四、流程图

【Java 】从源码全面解析Java 线程池

五、总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。文章来源地址https://www.toymoban.com/news/detail-438624.html

到了这里,关于【Java 】从源码全面解析Java 线程池的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 2023Java岗面试,进互联网大厂必备Java面试八股文真题解析

    前言 一般技术面试官都会通过自己的方式去考察程序员的技术功底与基础理论知识。 很多时候,面试官问的问题会和自己准备的“题库”中的问题不太一样,即使做了复盘,下次面试还是不知道该从何处下手。 为此鄙人软磨硬泡才把阿里P8专门归纳整理的 《Java进阶知识典藏

    2023年04月10日
    浏览(55)
  • 2023Java 岗面试,进互联网大厂必备 Java 面试八股文真题解析

    前言 一般技术面试官都会通过自己的方式去考察程序员的技术功底与基础理论知识。 很多时候,面试官问的问题会和自己准备的“题库”中的问题不太一样,即使做了复盘,下次面试还是不知道该从何处下手。 为此鄙人软磨硬泡才把阿里 P8 专门归纳整理的 《Java 进阶知识典

    2024年02月15日
    浏览(46)
  • Spring Cloud智慧工地源码,利用计算机技术、互联网、物联网、云计算、大数据等新一代信息技术开发,微服务架构

    智慧工地系统 充分利用计算机技术、互联网、物联网、云计算、大数据等新一代信息技术,以PC端,移动端,设备端三位一体的管控方式为企业现场工程管理提供了先进的技术手段。让劳务、设备、物料、安全、环境、能源、资料、计划、质量、视频监控等十大管理环节变得

    2024年02月05日
    浏览(53)
  • Java语言开发的AI智慧导诊系统源码springboot+redis 3D互联网智导诊系统源码

    Java 语言开发的AI智慧导诊系统源码 springboot+redis 3D 互联网智导诊系统源码 智慧导诊解决盲目就诊问题,减轻分诊工作压力。降低挂错号比例,优化就诊流程,有效提高线上线下医疗机构接诊效率。可通过人体画像选择症状部位,了解对应病症信息和推荐就医科室。 智慧导诊

    2024年04月23日
    浏览(44)
  • 利用移动互联、物联网、智能算法、地理信息系统、大数据分析等信息技术开发的智慧工地云平台源码

    智慧工地是指利用移动互联、物联网、智能算法、地理信息系统、大数据挖掘分析等信息技术,提高项目现场的“人•机•料•法•环•安”等施工要素信息化管理水平,实现工程施工可视化智能管理,并逐步实现绿色生态建造。 技术架构: 微服务 + Java+Spring Cloud +UniApp +M

    2024年02月06日
    浏览(57)
  • 2023互联网大厂最全Java面试八股文(附大厂 P5-P8 技术栈)

    为什么感觉 Java 面试变难了? 几年前,你只需要简单的 ssm 框架 ,就能轻松找到一份 Java 的工作,但现在不一样了,随着涌入这个行业的人越来越多,同一个岗位需要筛选掉更多人,要求自然水涨船高, 这也就是现在越来越多 Java 程序员抱怨行业越来越卷的原因 ,当然这个

    2024年02月15日
    浏览(48)
  • 智慧工地源码,互联网+建筑工地,基于微服务+Java+Spring Cloud +Vue+UniApp开发

    基于微服务+Java+Spring Cloud +Vue+UniApp +MySql开发的智慧工地云平台源码 智慧工地就是互联网+建筑工地,是将互联网+的理念和技术引入建筑工地,然后以物联网、移动互联网技术为基础,充分应用BIM、大数据、人工智能、移动通讯、云计算、物联网等信息技术,通过人机交互、感

    2024年02月13日
    浏览(46)
  • 互联网编程之多线程/线程池TCP服务器端程序设计

    目录 需求 多线程TCP服务器 线程池TCP服务器 测试 日志模块 多线程TCP服务器(30分): 设计编写一个TCP服务器端程序,需使用多线程处理客户端的连接请求。客户端与服务器端之间的通信内容,以及服务器端的处理功能等可自由设计拓展,无特别限制和要求。 线程池TCP服务器

    2024年02月11日
    浏览(40)
  • DNS:解析互联网的“导航系统”

    引言: 在互联网时代,我们每天都在使用各种网站和服务,但很少有人真正了解这些网站和服务是如何被找到和访问的。这背后有一个被称为DNS(域名系统)的“导航系统”,它负责将人类可读的域名转换为计算机可识别的IP地址。本文将详细介绍DNS的工作原理、功能和重要

    2024年01月25日
    浏览(38)
  • 互联网医院源码|互联网医院软件体现智慧医疗的优势

    现在大家看病一般都会直接在互联网医院平台上去就诊,每次大家需要看病时,可以在手机上直接去预约指定的医生,同城周边的所有医院都是可以去直接选择的,这样也可以去帮助大家节省很多的看病时间,在互联网医院软件中所具备的功能一般都是比较齐全的,那么互联

    2023年04月18日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包