SpringBoot线程池和Java线程池的实现原理

这篇具有很好参考价值的文章主要介绍了SpringBoot线程池和Java线程池的实现原理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

@

目录
  • 使用默认的线程池
    • 方式一:通过@Async注解调用
    • 方式二:直接注入 ThreadPoolTaskExecutor
    • 线程池默认配置信息
  • SpringBoot 线程池的实现原理
    • 覆盖默认的线程池
    • 管理多个线程池
  • JAVA常用的四种线程池
    • newCachedThreadPool
    • newFixedThreadPool
    • newScheduledThreadPool
    • newSingleThreadExecutor
  • Java 线程池中的四种拒绝策略
    • CallerRunsPolicy
    • AbortPolicy
    • DiscardPolicy
    • DiscardOldestPolicy
  • java 线程复用的原理

使用默认的线程池

方式一:通过@Async注解调用

public class AsyncTest {
    @Async
    public void async(String name) throws InterruptedException {
        System.out.println("async" + name + " " + Thread.currentThread().getName());
        Thread.sleep(1000);
    }
}

启动类上需要添加@EnableAsync注解,否则不会生效。

@SpringBootApplication
//@EnableAsync
public class Test1Application {
   public static void main(String[] args) throws InterruptedException {
      ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);
      AsyncTest bean = run.getBean(AsyncTest.class);
      for(int index = 0; index <= 10; ++index){
         bean.async(String.valueOf(index));
      }
   }
}

注意:

	a) 注解的方法必须是public方法。
	b) 注解的方法不要定义为static
	c) 方法一定要从另一个类中调用,也就是从类的外部调用,类的内部调用是无效的。

Spring 基于AOP 实现了异步,因此,需要获取到容器中的代理对象才能具有异步功能。假定原对象叫obj,容器中的代理对象叫做 objproxy,在执行 fun() 会同时执行aop带来的其他方法。但是如果在fun() 中调用了 fun2(),那 fun2() 是原始的方法,而不是经过aop代理后的方法,就不会具有异步的功能。

// objproxy.fun()
before()
fun.invoke(obj);
after()

因此可以重新从容器中获取代理对象来调用,也就是使用全局的 ApplicationContext.getBean()(在fun1()中使用代理对象来调用fun2()使fun2()具有异步功能) 。

方式二:直接注入 ThreadPoolTaskExecutor

此时可不加 @EnableAsync注解

@SpringBootTest
class Test1ApplicationTests {

   @Resource
   ThreadPoolTaskExecutor threadPoolTaskExecutor;

   @Test
   void contextLoads() {
      Runnable runnable = () -> {
         System.out.println(Thread.currentThread().getName());
      };

      for(int index = 0; index <= 10; ++index){
         threadPoolTaskExecutor.submit(runnable);
      }
   }

}

线程池默认配置信息

SpringBoot线程池的常见配置:

spring:
  task:
    execution:
      pool:
        core-size: 8
        max-size: 16                          # 默认是 Integer.MAX_VALUE
        keep-alive: 60s                       # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止
        allow-core-thread-timeout: true       # 是否允许核心线程超时,默认true
        queue-capacity: 100                   # 线程队列的大小,默认Integer.MAX_VALUE
      shutdown:
        await-termination: false              # 线程关闭等待
      thread-name-prefix: task-               # 线程名称的前缀

SpringBoot 线程池的实现原理

TaskExecutionAutoConfiguration 类中定义了 ThreadPoolTaskExecutor,该类的内部实现也是基于java原生的 ThreadPoolExecutor类。initializeExecutor()方法在其父类中被调用,但是在父类中 RejectedExecutionHandler 被定义为了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); ,并通过initialize()方法将AbortPolicy传入initializeExecutor()中。

注意在TaskExecutionAutoConfiguration 类中,ThreadPoolTaskExecutor类的bean的名称为: applicationTaskExecutortaskExecutor

// TaskExecutionAutoConfiguration#applicationTaskExecutor()
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
      AsyncAnnotationBeanPostProcessor.DEFAUL
          T_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
   return builder.build();
}
// ThreadPoolTaskExecutor#initializeExecutor()
@Override
protected ExecutorService initializeExecutor(
      ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

   BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

   ThreadPoolExecutor executor;
   if (this.taskDecorator != null) {
      executor = new ThreadPoolExecutor(
            this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
            queue, threadFactory, rejectedExecutionHandler) {
         @Override
         public void execute(Runnable command) {
            Runnable decorated = taskDecorator.decorate(command);
            if (decorated != command) {
               decoratedTaskMap.put(decorated, command);
            }
            super.execute(decorated);
         }
      };
   }
   else {
      executor = new ThreadPoolExecutor(
            this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
            queue, threadFactory, rejectedExecutionHandler);

   }

   if (this.allowCoreThreadTimeOut) {
      executor.allowCoreThreadTimeOut(true);
   }

   this.threadPoolExecutor = executor;
   return executor;
}
// ExecutorConfigurationSupport#initialize()
public void initialize() {
   if (logger.isInfoEnabled()) {
      logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
   }
   if (!this.threadNamePrefixSet && this.beanName != null) {
      setThreadNamePrefix(this.beanName + "-");
   }
   this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}

覆盖默认的线程池

覆盖默认的 taskExecutor对象,bean的返回类型可以是ThreadPoolTaskExecutor也可以是Executor

@Configuration
public class ThreadPoolConfiguration {

    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }
}

管理多个线程池

如果出现了多个线程池,例如再定义一个线程池 taskExecutor2,则直接执行会报错。此时需要指定bean的名称即可。

@Bean("taskExecutor2")
public ThreadPoolTaskExecutor taskExecutor2() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    //设置线程池参数信息
    taskExecutor.setCorePoolSize(10);
    taskExecutor.setMaxPoolSize(50);
    taskExecutor.setQueueCapacity(200);
    taskExecutor.setKeepAliveSeconds(60);
    taskExecutor.setThreadNamePrefix("myExecutor2--");
    taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    taskExecutor.setAwaitTerminationSeconds(60);
    //修改拒绝策略为使用当前线程执行
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    //初始化线程池
    taskExecutor.initialize();
    return taskExecutor;
}

引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。

@Resource
ThreadPoolTaskExecutor taskExecutor2;

对于使用@Async注解的多线程则在注解中指定bean的名字即可。

    @Async("taskExecutor2")
    public void async(String name) throws InterruptedException {
        System.out.println("async" + name + " " + Thread.currentThread().getName());
        Thread.sleep(1000);
    }

线程池的四种拒绝策略

JAVA常用的四种线程池

ThreadPoolExecutor 类的构造函数如下:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

newCachedThreadPool

不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE),如果有空闲的线程超过需要,则回收,否则重用已有的线程。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                              60L, TimeUnit.SECONDS,
                              new SynchronousQueue<Runnable>());

newFixedThreadPool

定长线程池,超出线程数的任务会在队列中等待。

return new ThreadPoolExecutor(nThreads, nThreads,
                              0L, TimeUnit.MILLISECONDS,
                              new LinkedBlockingQueue<Runnable>());

newScheduledThreadPool

类似于newCachedThreadPool,线程数无上限,但是可以指定corePoolSize。可实现延迟执行、周期执行。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

周期执行:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(()->{
   System.out.println("rate");
}, 1, 1, TimeUnit.SECONDS);

延时执行:

scheduledThreadPool.schedule(()->{
   System.out.println("delay 3 seconds");
}, 3, TimeUnit.SECONDS);

newSingleThreadExecutor

单线程线程池,可以实现线程的顺序执行。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

Java 线程池中的四种拒绝策略

  • CallerRunsPolicy:线程池让调用者去执行。

  • AbortPolicy:如果线程池拒绝了任务,直接报错。

  • DiscardPolicy:如果线程池拒绝了任务,直接丢弃。

  • DiscardOldestPolicy:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。

CallerRunsPolicy

直接在主线程中执行了run方法。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
 
    public CallerRunsPolicy() { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

效果类似于:

Runnable thread = ()->{
   System.out.println(Thread.currentThread().getName());
   try {
      Thread.sleep(0);
   } catch (InterruptedException e) {
      throw new RuntimeException(e);
   }
};

thread.run();

AbortPolicy

直接抛出RejectedExecutionException异常,并指示任务的信息,线程池的信息。、

public static class AbortPolicy implements RejectedExecutionHandler {
 
    public AbortPolicy() { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

DiscardPolicy

什么也不做。

public static class DiscardPolicy implements RejectedExecutionHandler {
 
    public DiscardPolicy() { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

DiscardOldestPolicy

  • e.getQueue().poll() : 取出队列最旧的任务。

  • e.execute(r) : 当前任务入队。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
 
    public DiscardOldestPolicy() { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

java 线程复用的原理

java的线程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker 对象,该对象在 被维护在private final HashSet<Worker> workers = new HashSet<Worker>();workQueue是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue队列中。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

work对象的执行依赖于 runWorker(),与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。文章来源地址https://www.toymoban.com/news/detail-410310.html

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

到了这里,关于SpringBoot线程池和Java线程池的实现原理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Springboot结合线程池的使用

    配置文件 配置类 方式一:线程池结合CompletableFuture来实现 配置线程池类 CompletableFuture使用线程池进行调用 任务类 方式二:使用@EnableAsync和@Async方式实现 在启动类上加@EnableAsync注解 编写线程池配置 使用 任务类 方式三:重写springboot默认的线程池配置 在启动类上加@EnableAsy

    2024年02月01日
    浏览(42)
  • Springboot中使用线程池的三种方式

    前言 多线程是每个程序员的噩梦,用得好可以提升效率很爽,用得不好就是埋汰的火葬场。 这里不深入介绍,主要是讲解一些标准用法,熟读唐诗三百首,不会作诗也会吟。 这里就介绍一下springboot中的多线程的使用,使用线程连接池去异步执行业务方法。 由于代码中包含详

    2024年02月08日
    浏览(36)
  • 线程池的介绍、原理、监控运维、框架使用场景案例

    线程池是一种线程复用的技术,它可以有效地控制线程的数量,处理过程中将任务添加到队列,然后在线程创建后启动这些任务。主要作用有: 重用线程,减少线程创建和销毁带来的开销。 可以有效控制线程的数量,方便线程管理。 提高系统响应速度,当有新的任务到来时,无需等待

    2024年02月03日
    浏览(28)
  • JAVA基础:线程池的使用

    目录 1.概述 2.线程池的优势​​​​​​​ 2.1.线程池为什么使用自定义方式? 2.2.封装的线程池工具类有什么好处? 3.线程池的七大参数 3.线程池的创建 3.1. 固定数量的线程池 3.2. 带缓存的线程池 3.3. 执⾏定时任务 3.4. 定时任务单线程 3.5. 单线程线程池 3.6. 根据当前CPU⽣成线

    2024年02月11日
    浏览(27)
  • Java线程池的类型和使用

    在并发编程中,线程池是一种非常重要的工具,它可以实现线程的复用,避免频繁地创建新线程,从而提高程序的性能和效率。Java的并发库提供了丰富的线程池功能,本文将介绍Java线程池的类型和使用。 线程池是一种执行多个任务的并发模型。它由一个线程队列和一组可重

    2024年02月13日
    浏览(33)
  • Java并发编程学习16-线程池的使用(中)

    上篇分析了在使用任务执行框架时需要注意的各种情况,并简单介绍了如何正确调整线程池大小。 本篇将继续介绍对线程池进行配置与调优的一些方法,详细如下: ThreadPoolExecutor 为 Executors 中的 newCachedThreadPool 、 newFixedThreadPool 和 newScheduledThreadExecutor 等工厂方法返回的 Exe

    2024年02月10日
    浏览(28)
  • Java并发编程学习18-线程池的使用(下)

    上篇介绍了 ThreadPoolExecutor 配置和扩展相关的信息,本篇开始将介绍递归算法的并行化。 还记得我们在《Java并发编程学习11-任务执行演示》中,对页面绘制程序进行一系列改进,这些改进大大地提供了页面绘制的并行性。 我们简单回顾下相关的改进过程: 第一次新增时,页

    2024年02月12日
    浏览(26)
  • Java基础-多线程&JUC-线程池和自定义线程池

    主要核心原理 不推荐Executors创建没有上线的线程池,建议使用自定义的线程池; Java工具类创建线程池; 当只有3个任务时,直接上处理机运行; 当有6个任务时,任务1-3上处理机运行,任务4-6进入阻塞队列等待; 当有9个任务时,任务1-3上处理机运行,任务4-6进入阻塞队列等

    2024年02月12日
    浏览(26)
  • 线程池和使用

    tip: 作为程序员一定学习编程之道,一定要对代码的编写有追求,不能实现就完事了。我们应该让自己写的代码更加优雅,即使这会费时费力。 推荐: 体系化学习Java(Java面试专题) 线程池是一种用于管理和复用线程的机制。在多线程应用程序中,线程的创建和销毁需要消耗

    2024年02月08日
    浏览(29)
  • 面试题:线程池的底层工作原理

    线程池的几个重要的参数: 1、corePoolSize:线程池的核心线程数(也是默认线程数) 2、maximumPoolSize:最大线程数 3、keepAliveTime:允许的线程最大空闲时间(单位/秒) 线程池内部是通过队列+线程实现的,当我们利用线程池执行任务时: 如果此时线程池中的线程数量小于core

    2024年02月12日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包