Java - ThreadPoolExecutor线程池分析

这篇具有很好参考价值的文章主要介绍了Java - ThreadPoolExecutor线程池分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Java - ThreadPoolExecutor源码分析


 1. 为什么要自定义线程池

首先ThreadPoolExecutor中,一共提供了7个参数,每个参数都是非常核心的属性,在线程池去执行任务时,每个参数都有决定性的作用。

但是如果直接采用JDK提供的方式去构建,可见设置的核心参数最多就两个,这样就会导致对线程池的控制粒度很粗。所以在阿里规范中也推荐自己创建自定义线程池。

自定义构建线程池,可以细粒度的控制线程池,去管理内存的属性,并且针对一些参数的设置可能更好的在后期排查问题。

ThreadPoolExecutor 七大核心参数:

public ThreadPoolExecutor(int corePoolSize,                         //  核心工作线程(当前任务执行结束后,不会销毁)
                          int maximumPoolSize,                      //  最大工作线程(代表当前线程池中一共可以有多少工作线程)
                          long keepAliveTime,                       //  非核心工作线程在阻塞队列位置等待时间
                          TimeUnit unit,                            //  非核心工作线程在阻塞队列位置等待时间的单位
                          BlockingQueue<Runnable> workQueue,        //  任务在没有核心工作线程处理时,任务先到阻塞队列中
                          ThreadFactory threadFactory,              //  构建线程的线程工厂,可以自定义thread信息
                          RejectedExecutionHandler handler)         //  当线程池无法处理处理任务时,执行拒绝策略

2.ThreadPoolExecutor应用

JDK提供的几种拒绝策略:

  • AbortPolicy: 当前拒绝策略会在无法执行任务时,直接抛出一个异常
public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() { }

    /**
     * Always throws RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}
  • CallerRunsPolicy: 当前拒绝策略会在无法执行任务时,将任务交给调用者处理
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
  • DiscardPolicy:当前拒绝策略会在无法执行任务时,直接将任务丢弃
public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }

    /**
     * Does nothing, which has the effect of discarding task r.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
  • DiscardOldestPolicy: 当前拒绝策略会在无法执行任务时,将阻塞队列中最早的任务丢弃,将当前任务再次交接线程池处理
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }

    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
  • 当然也可以自定义拒绝策略,根据自己业务修改实现逻辑, 只需实现   RejectedExecutionHandler 类中的 rejectedExecution 方法。

 3. ThreadPoolExecutor的核心属性

线程池的核心属性就是ctl,它会基于ctl拿到线程池的状态以及工作线程个数。

//  当前线程的核心属性
//  当前的ctl其实就是一个int类型的数值,内部是基于AtomicInteger套了一层,进行运算时,是原子操作
//  ctl表示线程池的两个核心属性
//  线程池的状态: ctl的高3位,表示线程池状态
//  工作线程的数量: ctl的低29位,表示工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//  Integer.SIZE: 获取Integer的bit位个数
//  声明一个常量: COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
//  CAPACITY就是当前工作线程能记录的工作线程的最大个数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

//  线程池状态表示
//  当前五个状态中,只有RUNNING状态表示线程池正常,可以正常接收任务处理
//  111: 代表RUNNING状态,RUNNING可以处理任务,并且处理阻塞队列中的任务
private static final int RUNNING    = -1 << COUNT_BITS;
//  000: 代表SHUTDOWN状态,不会接收新任务,正在处理的任务正常进行,阻塞队列中的任务也会处理完
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//  001: 代表STOP状态,不在接收新任务,正在处理的任务会被中断,阻塞队列中的任务不在处理
private static final int STOP       =  1 << COUNT_BITS;
//  010: 代表TIDYING状态,这个状态是SHUTDOWN或者STOP转换过来的,代表线程池马上关闭,过度状态
private static final int TIDYING    =  2 << COUNT_BITS;
//  011: 代表TERMINATED状态,这个状态是TIDYING转换过来的,转换过来需要执行terminated方法
private static final int TERMINATED =  3 << COUNT_BITS;

//  下面方法是帮助运算ctl值的,需要传入ctl
//  基于&运算的特点,保证获取ctl的高3位的值
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//  基于&运算的特点,保证获取ctl的低29位的值
private static int workerCountOf(int c)  { return c & CAPACITY; }
//  runStateOf 和 workerCountOf 方法都是拆包
//  基于|运算的特点,对线程池状态rs和线程个数wc进行封装
private static int ctlOf(int rs, int wc) { return rs | wc; }

 线程池的转换方式:

Java - ThreadPoolExecutor线程池分析

ThreadPoolExecutor中的execute方法

execute方法是提交任务到线程池的核心方法。

execute源码解析:

//  提交执行任务
//  command 就是提交过来的任务
public void execute(Runnable command) {
    //  提交的任务不能为null  健壮性判断
    if (command == null)
        throw new NullPointerException();
    //  获取核心属性ctl值,用于后续判断
    int c = ctl.get();
    //  如果工作线程个数小于核心线程数
    //  满足要求,添加核心工作线程
    if (workerCountOf(c) < corePoolSize) {
        //  addWorker(任务,是否是核心线程) ture: 核心线程,false:非核心线程
        //  addWorker返回true: 代表添加工作线程成功
        //  addWorker返回false: 代表添加工作线程失败
        //  addWorker中会基于线程池状态,以及工作线程个数判断,查看能否添加工作线程
        if (addWorker(command, true))
            //  工作线程构建出来了,任务也交给command去处理了
            return;
        //  说明线程池状态或者是工作线程个数发生了变化,导致添加失败,需要重新获取ctl值
        c = ctl.get();
    }
    //  添加核心工作线程失败后
    //  先判断线程池状态是否是RUNNING状态,如果是正常基于阻塞队列的offer方法,将任务添加到阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {
        //  如果任务添加到阻塞队列成功,走if内部
        //  如果任务在丢到阻塞队列之前,线程池状态发生改变了
        //  重新获取ctl
        int recheck = ctl.get();
        //  如果线程池的状态不是RUNNING状态,将任务从阻塞队列中移除
        if (! isRunning(recheck) && remove(command))
            //  并且直接拒绝策略
            reject(command);
        //  在这,说明阻塞队列有我刚放进去的任务
        //  查看一下工作线程数是不是0个
        //  如果工作线程为0个,需要添加一个非核心工作线程去处理阻塞队列中的任务
        //  发生这种情况有两种:
        //  1. 构建线程池时,核心线程数可以是0个
        //  2. 即使有核心线程,可以设置核心线程也允许超时,设置allowCoreThreadTimeOut(默认false)为ture
        else if (workerCountOf(recheck) == 0)
            //  为了避免阻塞队列中的任务堆积,添加一个非核心线程去处理
            addWorker(null, false);
    }
    //  任务添加到阻塞队列失败
    //  构建一个非核心工作线程
    //  如果添加非核心工作线程成功,直接完成
    else if (!addWorker(command, false))
        //  添加失败,执行拒绝策略
        reject(command);
}

execute方法流程图:

Java - ThreadPoolExecutor线程池分析

 ThreadPoolExecutor中的addWorker方法

addWorker方法中主要分为两大块:

  • 第一块:校验线程池的状态以及工作线程个数
  • 第二块:添加工作线程并且启动工作线程
//  校验和添加启动工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
    //  =======================第一块====================
    //  外层for循环在校验线程池的状态
    //  内层for循环是在校验工作线程的个数

    //  retry是给外层for循环添加的一个标记,为了方便在内层for循环跳出到外层for循环
    retry:
    for (;;) {
        //  获取ctl
        int c = ctl.get();
        //  拿到ctl的高3位的值
        int rs = runStateOf(c);

        // =====================线程池状态判断==========================
        //  如果线程池状态是SHUTDOWN,并且此时阻塞队列有任务,工作线程为0,则添加一个工作线程去处理阻塞队列的任务

        //  判断线程池的状态是否大于等于SHUTDOWN,满足则说明线程不是RUNNING状态
        if (rs >= SHUTDOWN &&
            //  如果这三个条件都满足,就代表是要添加非核心工作线程去处理阻塞队列中任务
            //  如果三个条件有一个不满足,返回false配合!,就不需要添加
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            //  不需要添加工作线程
            return false;


        for (;;) {
            // ===============工作线程个数判断==========================
            //  基于ctl拿到低29位的值,代表工作线程的个数
            int wc = workerCountOf(c);
            //  如果工作线程个数大于最大工作线程数CAPACITY值,就不可以添加,返回false
            if (wc >= CAPACITY ||
                //  基于core来判断添加的是否是核心工作线程
                //  如果是核心: 基于corePoolSize去判断
                //  如果是非核心: 基于maximumPoolSize去判断
                wc >= (core ? corePoolSize : maximumPoolSize))
                //  代表不能添加,工作线程个数不满足要求
                return false;
            //  针对ctl + 1 , 采用CAS操作
            if (compareAndIncrementWorkerCount(c))
                //  CAS成功后,直接退出外层循环,代表可以进行执行添加工作线程操作了
                break retry;
            //  重新获取一次ctl值
            c = ctl.get();  // Re-read ctl
            //  判断重新获取的ctl中,线程池的状态与之前是否有区别
            //  如果状态发生改变,需要重新去判断线程状态
            if (runStateOf(c) != rs)
                //  重新进入外层for循环
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    //  =======================第二块====================
    //  添加工作线程以及启动工作线程
    //  声明了三个变量
    //  workerStarted 表示工作线程启动状态,默认false
    boolean workerStarted = false;
    //  workerAdded 表示工作线程添加状态,默认false
    boolean workerAdded = false;
    //  w 表示工作线程
    Worker w = null;
    try {
        //  构建工作线程,并且将任务传递进去
        w = new Worker(firstTask);
        //  获取worker中的thread对象
        final Thread t = w.thread;
        //  判断thread是否不为null, 在new worker时,内部会通过给予的threadFactory去构造thread交给worker
        //  一般如果为null,代表ThreadFactory有问题
        if (t != null) {
            //  加锁,保证使用worker成员变量以及对largestPoolSize赋值时,保证线程安全
            final ReentrantLock mainLock = this.mainLock;
            //  加锁, 因为后续要操作HashSet是线程不安全的
            mainLock.lock();
            try {
                //  再次获取线程池状态
                int rs = runStateOf(ctl.get());
                //  再次判断
                //  如果满足 rs < SHUTDOWN,说明线程池是RUNNING状态,可以继续执行
                //  如果线程池状态为SHUTDOWN,并且firstTask为null,添加非核心线程处理阻塞队列任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //  进入这里,就可以添加工作线程
                    //  将threadFactory构建线程后,不能直接启动线程,如果启动则抛出异常
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    /**
                    * 包含池中所有工作线程的集合。只有在获得mainLock时才能访问
                    * private final HashSet<Worker> workers = new HashSet<Worker>();
                    * 将创建好的worker放入工作线程集合中
                    */ 
                    workers.add(w);
                    //  获取工作线程集合的大小,拿到工作线程个数
                    int s = workers.size();
                    //  largestPoolSize在记录最大线程个数的记录
                    //  如果当前工作线程个数,大于最大线程个数的记录,就赋值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //  设置工作线程添加成功
                    workerAdded = true;
                }
            } finally {
                //  释放锁
                mainLock.unlock();
            }
            //  如果工作线程添加成功
            if (workerAdded) {
                //  直接启动worker中的线程
                t.start();
                //  设置启动工作线程成功
                workerStarted = true;
            }
        }
    } finally {
        //  做补偿的操作,如果工作线程启动失败,将这个添加失败的工作线程处理掉
        if (! workerStarted)
            //  从工作线程的集合中移除掉  
            addWorkerFailed(w);
    }
    //  返回工作线程释放启动成功
    return workerStarted;
}

addWorker方法中的Worker对象

线程的执行是交接Worker对象来做的,Worker对象中比较核心的就是基于AQS来实现的线程中断

 文章来源地址https://www.toymoban.com/news/detail-685580.html

private final class Worker
    extends AbstractQueuedSynchronizer      //  基于AQS来实现线程中断
    implements Runnable                     //  存储需要执行的任务
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** 工作线程的Thread对象,初始化时构建出来 */
    final Thread thread;
    /** 需要执行的任务 */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        //  刚初始化的工作线程是不允许被中断的
        setState(-1); // inhibit interrupts until runWorker
        //  第一次new的时候,会将任务赋值给firstTask
        this.firstTask = firstTask;
        //  给Worker构建Thread对象
        this.thread = getThreadFactory().newThread(this);
    }

    /** 调用t.start(), 执行当前的run方法  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    // 这里它自己实现了一套锁机制  (不可重入锁)
    // 中断线程不是立刻让线程停止,只是将thread的中断标识设置为true
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker对象中调用的runWorker方法

线程池添加线程成功就会调用 t.start() 方法启动Worker对象中线程,会去执行Worker对象中的run方法,run方法实际执行的是 runWorker(this) 方法。

执行任务的流程,并且做了中断线程相关的lock操作。

//  执行任务
final void runWorker(Worker w) {
    //  获取当前工作线程
    Thread wt = Thread.currentThread();
    //  获取Worker对象中封装的任务
    Runnable task = w.firstTask;
    //  将Worker的firstTask置为null
    w.firstTask = null;
    //  将Worker的state置为0,代表可以被中断
    w.unlock(); // allow interrupts
    //  任务执行时,勾子函数中是否出现异常标识位
    boolean completedAbruptly = true;
    try {
        //  获取任务的第一个方式,就是执行execute方法、submit方法,传入的任务直接处理
        //  获取任务的第二个方式,就是从工作队列中获取任务执行
        while (task != null || (task = getTask()) != null) {
            //  加锁,在SHUTDOWN状态下,当前线程不允许被中断
            //  并且Worker内部实现的锁,并不是重入锁,因为在中断时,也需要对Worker进行加锁,不能获取就代表当前线程正在执行任务
            w.lock();
            //  如果线程池状态发生了改变,变为了STOP状态,必需将当前线程中断
            //  第一个判断条件:判断当前线程池状态是否是STOP状态
            if ((runStateAtLeast(ctl.get(), STOP) 
                //  第二个判断条件: 查看中断标记位,并归位,如果为false,说明不是STOP,反之true,需要再次查看是否是并发操作导致线程池为STOP
                || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) 
            //  第三个判断条件: 查询当前线程的中断标记位,如果是false,就执行中断
            && !wt.isInterrupted())
                //  将中断标志置为true
                wt.interrupt();
            try {
                //  执行任务的勾子函数,(前置增强 注:不是动态代理)  
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //  执行任务  
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //  执行任务的勾子函数,(后置增强 注:不是动态代理) 
                    afterExecute(task, thrown);
                }
            } finally {
                //  将task置为null  
                task = null;
                //  执行任务的任务个数加一
                w.completedTasks++;
                //  将state标识置为0
                w.unlock();
            }
        }
        //  如果勾子函数有异常是不会走到这里的
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker方法中的getTask方法

如何从工作队列workQueue中获取任务的。

//  从工作队列中获取任务
private Runnable getTask() {
    //  标识(非核心线程可以干掉)
    boolean timedOut = false; // Did the last poll() time out?
    //  死循环
    for (;;) {
        //  ==================判断线程池状态=======================

        //  获取ctl标识  
        int c = ctl.get();
        //  获取线程池的状态
        int rs = runStateOf(c);

        // 如果进入if,需要干掉当前工作线程
        // 线程池状态为 SHUTDOWN、 STOP
        // 如果线程池状态大于等于STOP,需要移除掉当前工作线程
        // 如果线程池状态为SHUTDOWN,并且工作线程为空,需要移除掉当前工作线程
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //  当前工作线程计数器减一
            decrementWorkerCount();
            //  返回null,交接processWorkerExit移除当前工作线程
            return null;
        }

        //  ==================判断工作线程数量======================
        //  拿到工作线程数
        int wc = workerCountOf(c);

        // allowCoreThreadTimeOut: 是否允许核心线程超时(默认false)
        // 工作线程是否大于核心线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //  工作线程是否已经大于最大线程数
        //  工作线程大于核心线程数,并且当前线程已超时
        //  尝试干掉当前线程
        if ((wc > maximumPoolSize || (timed && timedOut))
            //  工作线程数大于1,或者工作队列为空
            //  如果工作线程队列,就干掉自己
            //  如果工作线程数大于1,就干掉自己
            && (wc > 1 || workQueue.isEmpty())) {
            //  基于CAS的方法移除当前线程,只有一个线程会CAS成功
            if (compareAndDecrementWorkerCount(c))
                //  返回null,交接processWorkerExit移除当前工作线程
                return null;
            continue;
        }

        //  =================从工作队列获取任务======================
        try {
            Runnable r = timed ?
                //  阻塞一定时间从工作队列获取任务(可以理解为非核心走这个方法)
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                //  一直阻塞(可以理解为核心走这个方法)
                workQueue.take();
            if (r != null)
                //  拿到任务直接返回
                return r;
            //  从队列获取任务时,超时了(达到了当前工作线程的最大生成时间)
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

runWorker方法中processWorkerExit方法

//  移除当前工作线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /*
     * 如果执行processWorkerExit方法的操作不是getTask中的操作,
     * 是直接因为异常引起的。(一般是勾子函数抛出异常导致的)
     */
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        //  因为执行方式“不合法”,手动扣减工作线程数
        decrementWorkerCount();

    //  加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //  记录当前线程池一共处理多少任务
        completedTaskCount += w.completedTasks;
        //  移除工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    //  尝试将线程池关闭(到TIDYING过度状态 - TERMINATED销毁状态)
    tryTerminate();

    //  重新获取ctl
    int c = ctl.get();
    //  判断当前状态是否小于STOP, 能进入说明是RUNNING、 SHUTDOWN
    if (runStateLessThan(c, STOP)) {
        //  如果是正常状态移除当前工作线程
        if (!completedAbruptly) {
            //  核心线程数最小值 ,允许核心线程超时就是0
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //  如果工作队列中的任务不为空,设置工作线程的最小值为1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            //  还有工作线程在线程池中
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //  1. 说明是不正常的方式移除当前工作线程,那需要在添加一个非核心线程
        //  2. 线程池工作队列不为空,并且没有工作线程,需要在添加一个非核心线程
        addWorker(null, false);
    }
}

  

线程池为啥要构建空任务的非核心线程?

  • 第一个:在 execute 方法中有个判断工作线程是否为0,是就添加一个空任务的非核心线程;
else if (workerCountOf(recheck) == 0)
    addWorker(null, false);
  • 第二个:在工作线程 Worker 启动后,工作线程会运行 runWorker 方法,该方法中有个操作,当工作线程结束之后会执行 processWorkerExit 方法,在这个方法内部又有添加一个空任务的非核心线程;
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

综合上诉,Java它有一个这样的场景:

在初始化线程池的时候可能设置线程池的核心线程数为0 或者 设置allowCoreThreadTimeOut(默认false)为ture导致核心线程超时释放,存在没有核心线程的情况。

当我们把任务添加到阻塞队列之后,没有工作线程导致阻塞队列任务堆积,直到后续有新任务加入才会去创建工作线程。

Java - ThreadPoolExecutor线程池分析

/**
 * If false (default), core threads stay alive even when idle.
 * If true, core threads use keepAliveTime to time out waiting
 * for work.
 * 如果为false(默认值),核心线程即使在空闲时也会保持活动状态。
 * 如果为true,核心线程将使用keepAliveTime超时等待工作
 */
private volatile boolean allowCoreThreadTimeOut;

综上所述,因此线程池需要构建空任务的非核心线程去处理这种情况。

线程池使用完为什么必须执行shutdown方法或者shutdownNow方法?

  • 第一点:在线程池 addWorker 方法中我们可以看到,

线程池启动线程也是基于 Thread 对象去进行的一个 start 方法启动的,像这种它会占用jvm的栈,

所以属于GC Roots 通过垃圾回收的可达性分析算法,这种线程就不能被回收,会一直占用jvm的资源,

因此不能及时的调用 shutdown 或者 shutdownNow 方法,就可能造成内存泄漏问题!!!

  • 第二点:线程池启动对象是基于你 Worker 对象内部的 Thread 对象启动的,

当执行Thread对象的 start方法时,它会执行 Worker对象的 run 方法,

该方法中的runWorker 方法传入的是 this 就是当前的Worker对象,

就会导致启动的线程还指向了Worker对象,这个Worker对象是不能回收的,

又因为Worker对象属于线程池的内部类,

导致整个 ThreadPoolExecutor 线程池对象也不会被回收!!!

综上所述,当使用完线程池对象后,没有及时的调用关闭方法,会导致堆内存资源消耗很严重,最后会导致内存泄漏问题!

 线程池的核心参数该如何设置?

主要的难点在于任务类型无法控制,比如:

cpu密集型: cpu不断的处理任务,大量的计算等操作。

IO密集型: 不需要cpu一直调度,大多数时间都是等待结果的,如:调用第三方服务等待网络响应、等待IO响应、查询数据库等待数据库响应等等。

混合型:上面两种都会有。

大多数情况都需自己去测试,调试!没有绝对固定的一个公式。可以参考:

M thread = N cpu * U cpu * ( 1 + W / C )

线程数 = cpu的个数 * cpu的利用率 * ( 1 + 等待时间 / 计算时间 ) 注:W/C 是程序运行时 等待时间和计算时间的比值

1 * 100% * (1 + 50% / 50% )= 2

公式只是给定一个调试的初始值,需要自己后续测试调试!

 

以上可能还有不足,仅供参考!!!

 

到了这里,关于Java - ThreadPoolExecutor线程池分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 自定义线程池 ThreadPoolExecutor

    Executor框架最核心的类是ThreadPoolExecutor,它是线程池的实现类,主要由下列4个组件构成。 ·corePool:核心线程池的大小。 ·maximumPool:最大线程池的大小。 ·BlockingQueue:用来暂时保存任务的工作队列。 ·RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或ThreadPoolExecutor已经饱和时

    2024年02月05日
    浏览(33)
  • 进程与线程、线程创建、线程周期、多线程安全和线程池(ThreadPoolExecutor)

    什么进程? 进程是 资源(CPU、内存等)分配 的基本单位,它是程序执行时的一个实例。程序运行时系统就会创建一个进程,并为它分配资源,然后把该进程放入进程就绪队列,进程调度器选中它的时候就会为它分配CPU时间,程序开始真正运行。 什么是线程? 线程是操作系

    2024年02月14日
    浏览(38)
  • ThreadPoolExecutor线程池内部处理浅析

    我们知道如果程序中并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束时,会因为频繁创建线程而大大降低系统的效率,因此出现了线程池的使用方式,它可以提前创建好线程来执行任务。本文主要通过java的ThreadPoolExecutor来查看线程池的内部处理过程。

    2024年02月05日
    浏览(42)
  • 线程池高手进阶:揭秘ThreadPoolExecutor的小妙招!

    ThreadPoolExecutor 是 Java 中用于创建和管理线程池的接口,当线程池中的任务队列已满,并且线程池中的线程数量已经达到最大时,如果再有新的任务提交,就需要一个策略来处理这些无法执行的任务。它 提供了四种拒绝策略,都是 RejectedExecutionHandler 接口的实现,如下: Abor

    2024年01月25日
    浏览(39)
  • 全网最详细的线程池 ThreadPoolExecutor 详解,建议收藏!

    五种状态: 线程池的 shutdown() 方法,将线程池由 RUNNING(运行状态)转换为 SHUTDOWN状态 线程池的 shutdownNow() 方法,将线程池由RUNNING 或 SHUTDOWN 状态转换为 STOP 状态。 注: SHUTDOWN 状态 和 STOP 状态 先会转变为 TIDYING 状态,最终都会变为 TERMINATED ThreadPoolExecutor 继承自 AbstractExe

    2024年02月03日
    浏览(44)
  • Java进阶(ConcurrentHashMap)——面试时ConcurrentHashMap常见问题解读 & 结合源码分析 & 多线程CAS比较并交换 初识

    List、Set、HashMap作为Java中常用的集合,需要深入认识其原理和特性。 本篇博客介绍常见的关于Java中线程安全的ConcurrentHashMap集合的面试问题,结合源码分析题目背后的知识点。 关于List的博客文章如下: Java进阶(List)——面试时List常见问题解读 结合源码分析 关于的Set的博

    2024年02月06日
    浏览(59)
  • Python多线程与线程池(python线程池ThreadPoolExecutor)concurrent.futures高级别异步执行封装

    在进行复杂的计算或处理大量数据时,可以通过创建多个线程来同时执行多个任务,从而提高程序的执行效率。这种技术称为多线程编程。 1.1 线程简介 线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一

    2024年02月12日
    浏览(30)
  • ThreadPoolExecutor源码学习

    继承结构如图所示: ThreadPoolExecutor - AbstractExecutorService - ExecutorService - Executor 线程池状态参数 线程池状态转移图如下所示 RUNNING: 线程池创建后进入的状态 SHUTDOWN: 调用 shutdown 方法进入该状态,该方法主要包含如下操作 更新线程池状态为 SHUTDOWN 中断空闲线程 interruptIdleWorker

    2023年04月11日
    浏览(27)
  • 从源码分析 Go 语言使用 cgo 导致的线程增长

    TDengine Go 连接器 https://github.com/taosdata/driver-go 使用 cgo 调用 taos.so 中的 API,使用过程中发现线程数不断增长,本文从一个 cgo 调用开始解析 Go 源码,分析造成线程增长的原因。 对 driver-go/wrapper/taosc.go 进行转换 go tool cgo taosc.go 执行后生成 _obj 文件夹 以 taosc.cgo1.go 中 TaosResetC

    2024年02月07日
    浏览(53)
  • openxr runtime Monado 源码解析 源码分析:整体介绍 模块架构 模块作用 进程 线程模型 整体流程

    monado系列文章索引汇总: openxr runtime Monado 源码解析 源码分析:源码编译 准备工作说明 hello_xr解读 openxr runtime Monado 源码解析 源码分析:整体介绍 模块架构 模块作用 进程 线程模型 整体流程 openxr runtime Monado 源码解析 源码分析:CreateInstance流程(设备系统和合成器系统)C

    2024年02月11日
    浏览(81)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包