关于如何用好线程池的一些建议

这篇具有很好参考价值的文章主要介绍了关于如何用好线程池的一些建议。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 线程的使用场景

异步任务

简单来说就是某些不需要同步返回业务处理结果的场景,比如:短信、邮件等通知类业务,评论、点赞等互动性业务。

并行计算

就像MapReduce一样,充分利用多线程的并行计算能力,将大任务拆分为多个子任务,最后再将所有子任务计算后的结果进行汇总,ForkJoinPool就是JDK中典型的并行计算框架。

串行任务

很简单,假设某个方法需要经过,A、B、C三个步骤,A步骤耗时1秒,B步骤耗时2秒,C步骤耗时3秒,那么如果是串行处理,则该方法最终需要耗时6秒,但如果A、B、C三个步骤互相之间是没有依赖的,那么就可以利用多线程的方式,同时处理三个步骤,这样该方法只需要等待耗时最长的步骤结束即可。

2. 线程池创建

不要直接使用Executors创建线程池,应通过ThreadPoolExecutor的方式,主动明确线程池的参数,避免产生意外。

每个参数都要显示设置,例如像下面这样:

private static final ExecutorService executor = new ThreadPoolExecutor(
        2,
        4,
        1L,
        TimeUnit.MINUTES,
        new LinkedBlockingQueue<>(100),
        new ThreadFactoryBuilder().setNameFormat("common-pool-%d").build(),
        new ThreadPoolExecutor.CallerRunsPolicy());

3. 参数的配置建议

CorePoolSize(核心线程数)

一般在配置核心线程数的时候,是需要结合线程池将要处理任务的特性来决定的,而任务的性质一般可以划分为:CPU密集型、I/O密集型。

比较通用的配置方式如下

CPU密集型:一般建议线程的核心数与CPU核心数保持一致。
I/O密集型:一般可以设置2倍的CPU核心数的线程数,因为此类任务CPU比较空闲,可以多分配点线程充分利用CPU资源来提高效率。

通过Runtime.getRuntime().availableProcessors()可以获取核心线程数。

另外还有一个公式可以借鉴

线程核心数 = cpu核心数 / (1-阻塞系数)

阻塞系数 = 阻塞时间/(阻塞时间+使用CPU的时间)

实际上大多数线上业务所消耗的时间主要就是I/O等待,因此一般线程数都可以设置的多一点,比如tomcat中默认的线程数就是200,所以最佳的核心线程数是需要根据特定场景,然后通过实际上线上允许结果分析后,再不断的进行调整。

MaximumPoolSize

maximumPoolSize的设置也是看实际应用场景,如果设置的和corePoolSize一样,那就完全依靠阻塞队列和拒绝策略来控制任务的处理情况,如果设置的比corePoolSize稍微大一点,那可能对于一些突然流量的场景更使用。

KeepAliveTime

由maximumPoolSize创建出来的线程,在经过keepAliveTime时间后进行销毁,依旧突发流量持续的时间来决定。

WorkQueue

那么阻塞队列应该设置多大呢?我们知道当线程池中所有的线程都在工作时,如果再有任务进来,就会被放到阻塞队列中等待,如果阻塞队列设置的太小,可能很快队列就满了,导致任务被丢弃或者异常(由拒绝策略决定),如果队列设置的太大,又可能会带来内存资源的紧张,甚至OOM,以及任务延迟时间过长。

所以阻塞队列的大小,又是要结合实际场景来设置的。

一般会根据处理任务的速度与任务产生的速度进行计算得到一个大概的数值。

假设现在有1个线程,每秒钟可以处理10个任务,正常情况下每秒钟产生的任务数小于10,那么此时队列长度为10就足以。
但是如果高峰时期,每秒产生的任务数会达到20,会持续10秒,且任务又不希望丢弃,那么此时队列的长度就需要设置到100。

监控workQueue中等待任务的数量是非常重要的,只有了解实际的情况,才能做出正确的决定。

ThreadFactory

通过threadFactory我们可以自定义线程组的名字,设置合理的名称将有利于你线上进行问题排查。

Handler

最后拒绝策略,这也是要结合实际的业务场景来决定采用什么样的拒绝方式,例如像过程类的数据,可以直接采用DiscardOldestPolicy策略。

常见的拒绝策略

AbortPolicy

JDK自带线程池中默认的拒绝策略,直接拒绝任务并抛出异常。

CallerRunsPolicy

由当前调用调用者继续执行当前任务。

DiscardPolicy

直接丢弃当前任务

DiscardOldestPolicy

丢弃阻塞队列中最早丢进去的任务

其他的拒绝策略

NewThreadRunsPolicy

这是Netty中的拒绝策略,和CallerRunsPolicy有点像,任务不会丢弃,不同的是Netty中是新建了一个线程继续执行当前任务。

AbortPolicyWithReport

dubbo中的拒绝策略,也是抛出异常,不同的时对于日志内容的输出更加丰富,也是为了我们更好的排查问题。

EsAbortPolicy

针对某种特定场景时,做出不同的处理方式,比如在elasticsearch中只有当isForceExecution为true(isForceExecution是用来判定任务是执行还是拒绝的条件),且阻塞队列是SizeBlockingQueue类型时,才会放入当前队列中,否则抛出异常。

4. 线程池的任务处理流程

阻塞队列的设计起到了良好的缓冲作用,当面对突发流量到来时,先将任务丢到队列中,再慢慢来消费,其原理和MQ是类似的,一旦队列也被打满了,则说明消费能力与你的期望对比,已经严重不足了,此时maximumPoolSize参数的设计,又给了你一次处理的机会,你可以选择再开启一部分线程来应对突发状况,当危机接触后,再主动帮你回收这部分线程,或者选择使用拒绝策略。

一个简单的任务处理,考虑各种实际运行中可能遇到的情况,对于线程池的使用者来说,也应了解线程池的任务处理流程,再结合自身的业务场景充分考虑其中的参数设置。

5. 线程的状态

Java中对线程的定义有如下几种状态:RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, NEW, TERMINATED

RUNNABLE

可运行的状态,包含了运行中和准备就绪两种状态,也就是说RUNNABLE状态下线程并不一定已经运行了,可能还在等待CPU资源。

BLOCKED

处于阻塞状态下的线程,并且这个阻塞是因为进入了同步代码块或者方法,需要等待锁的释放。

一旦线上出现blocked状态的线程,是需要排查原因的。

WAITING

处于等待状态下的线程,例如一个线程调用了Object.wait()方法,那么这个线程就会等待另一个线程调用Object.notify()或者Object.notifyAll()。
或者调用thread.join()的线程等待指定线程的终止。

常见的方法有:Object.wait()、Thread.join()、LockSupport.park()

TIMED_WAITING

与WAITING的区别就在于TIMED_WAITING是明确带有具体等待时间的,常见的方法有:Thread.sleep(long)、Object.wait(long)、Thread.join(long)、LockSupport.parkNanos(long)、LockSupport.parkUntil(long)

NEW

创建了一个线程,但还没调用start()方法。

TERMINATED

终止状态,表示线程已经执行完毕。

关于如何用好线程池的一些建议

6. 线程池的监控

线程池自身提供的统计数据

public class ThreadPoolMonitor {

    private final static Logger log = LoggerFactory.getLogger(ThreadPoolMonitor.class);

    private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 0,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
            new ThreadFactoryBuilder().setNameFormat("my_thread_pool_%d").build());

    public static void main(String[] args) {
        log.info("Pool Size: " + threadPool.getPoolSize());
        log.info("Active Thread Count: " + threadPool.getActiveCount());
        log.info("Task Queue Size: " + threadPool.getQueue().size());
        log.info("Completed Task Count: " + threadPool.getCompletedTaskCount());
    }
}

通过micrometer API完成统计,这样就可以接入Prometheus了文章来源地址https://www.toymoban.com/news/detail-474704.html

package com.springboot.micrometer.monitor;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.micrometer.core.instrument.Metrics;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;

@Component
public class ThreadPoolMonitor {

    private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 8, 0,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
            new ThreadFactoryBuilder().setNameFormat("my_thread_pool_%d").build(), new ThreadPoolExecutor.DiscardOldestPolicy());

    /**
     * 活跃线程数
     */
    private AtomicLong activeThreadCount = new AtomicLong(0);

    /**
     * 队列任务数
     */
    private AtomicLong taskQueueSize = new AtomicLong(0);

    /**
     * 完成任务数
     */
    private AtomicLong completedTaskCount = new AtomicLong(0);

    /**
     * 线程池中当前线程的数量
     */
    private AtomicLong poolSize = new AtomicLong(0);

    @PostConstruct
    private void init() {

        /**
         * 通过micrometer API完成统计
         *
         * gauge最典型的使用场景就是统计:list、Map、线程池、连接池等集合类型的数据
         */
        Metrics.gauge("my_thread_pool_active_thread_count", activeThreadCount);
        Metrics.gauge("my_thread_pool_task_queue_size", taskQueueSize);
        Metrics.gauge("my_thread_pool_completed_task_count", completedTaskCount);
        Metrics.gauge("my_thread_pool_size", poolSize);

        // 模拟线程池的使用
        new Thread(this::runTask).start();
    }

    private void runTask() {
        // 每5秒监控一次线程池的使用情况
        monitorThreadPoolState();
        // 模拟任务执行
        IntStream.rangeClosed(0, 500).forEach(i -> {
            // 每500毫秒,执行一个任务
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 每个处理一个任务耗时5秒
            threadPool.submit(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        });
    }

    private void monitorThreadPoolState() {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            activeThreadCount.set(threadPool.getActiveCount());
            taskQueueSize.set(threadPool.getQueue().size());
            poolSize.set(threadPool.getPoolSize());
            completedTaskCount.set(threadPool.getCompletedTaskCount());
        }, 0, 5, TimeUnit.SECONDS);
    }
}

到了这里,关于关于如何用好线程池的一些建议的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 并发编程系列-线程池的正确使用

    在Java语言中,创建线程并不像创建对象一样简单。虽然只需要使用new Thread()即可创建线程,但实际上创建线程比创建对象复杂得多。创建对象只需在JVM的堆中分配内存,而创建线程需要调用操作系统内核的API,并为线程分配一系列资源,这个成本相对较高。因此,线程被视为

    2024年02月11日
    浏览(36)
  • Java线程池的类型和使用

    在并发编程中,线程池是一种非常重要的工具,它可以实现线程的复用,避免频繁地创建新线程,从而提高程序的性能和效率。Java的并发库提供了丰富的线程池功能,本文将介绍Java线程池的类型和使用。 线程池是一种执行多个任务的并发模型。它由一个线程队列和一组可重

    2024年02月13日
    浏览(43)
  • 数据资源丨原始数据哪里找?这些网站要用好!(建议收藏)

    资料搜集是个相当繁琐与累的工作,也是投资入门的基本,良好的信息资料搜集能力有利于我们快速了解投资主体的基本情况,为后续的调研及一手资料的获得打下较好的基础。 目录 一、搜索引擎(重点掌握) 1、搜索的选择 2、搜索技巧 3、搜索引擎推荐 二、数据库

    2024年02月06日
    浏览(67)
  • Spring(19) ThreadPoolTaskExecutor 线程池的使用

    1.1 为什么使用线程池 降低系统资源消耗: 通过重用已存在的线程,降低线程创建和销毁造成的消耗; 提高系统响应速度: 当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行; 方便线程并发数的管控: 因为线程若是无限制的创建,可能会导致内

    2024年01月19日
    浏览(38)
  • Springboot中使用线程池的三种方式

    前言 多线程是每个程序员的噩梦,用得好可以提升效率很爽,用得不好就是埋汰的火葬场。 这里不深入介绍,主要是讲解一些标准用法,熟读唐诗三百首,不会作诗也会吟。 这里就介绍一下springboot中的多线程的使用,使用线程连接池去异步执行业务方法。 由于代码中包含详

    2024年02月08日
    浏览(47)
  • Java中线程池的使用及原理(附带线程池拒绝策略介绍)

    目录 1.使用线程池的目的 2.线程池的使用及原理 3.生动讲解拒绝策略: 4.自己实现一个简单的线程池 5.线程数的决定方法 6.总结 希望各位大佬能多多点赞关注收藏,有了你们的支持,我也能更有动力的向你们学习不是(手动狗头)!!! 在知道线程池之前,每当我们需要一

    2024年04月17日
    浏览(37)
  • Java并发编程学习16-线程池的使用(中)

    上篇分析了在使用任务执行框架时需要注意的各种情况,并简单介绍了如何正确调整线程池大小。 本篇将继续介绍对线程池进行配置与调优的一些方法,详细如下: ThreadPoolExecutor 为 Executors 中的 newCachedThreadPool 、 newFixedThreadPool 和 newScheduledThreadExecutor 等工厂方法返回的 Exe

    2024年02月10日
    浏览(44)
  • Java并发编程学习18-线程池的使用(下)

    上篇介绍了 ThreadPoolExecutor 配置和扩展相关的信息,本篇开始将介绍递归算法的并行化。 还记得我们在《Java并发编程学习11-任务执行演示》中,对页面绘制程序进行一系列改进,这些改进大大地提供了页面绘制的并行性。 我们简单回顾下相关的改进过程: 第一次新增时,页

    2024年02月12日
    浏览(38)
  • 线程池的介绍、原理、监控运维、框架使用场景案例

    线程池是一种线程复用的技术,它可以有效地控制线程的数量,处理过程中将任务添加到队列,然后在线程创建后启动这些任务。主要作用有: 重用线程,减少线程创建和销毁带来的开销。 可以有效控制线程的数量,方便线程管理。 提高系统响应速度,当有新的任务到来时,无需等待

    2024年02月03日
    浏览(38)
  • 第一次使用ThreadPoolTaskExecutor实现线程池的经历,反复修改了多次代码才正常使用

    1、前言   在一个向第三方平台推送消息的场景中,为了提高程序的执行效率,每次发送消息,都创建一个新的线程来完成发送消息的任务,为了提供线程的使用性能,我选择了ThreadPoolTaskExecutor线程池,结果在使用的过程中,出现了较多的问题,这里记录一下避免以后再出

    2024年02月08日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包