JAVA-- 在Java8 Parallel Stream中如何自定义线程池?

这篇具有很好参考价值的文章主要介绍了JAVA-- 在Java8 Parallel Stream中如何自定义线程池?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用Parallel Stream时,在适当的环境中,通过适当地使用并行度级别,可以在某些情况下获得性能提升。
如果程序创建一个自定义ThreadPool,必须记住调用它的shutdown()方法来避免内存泄漏。

Parallel Stream默认使用的线程池

如下代码示例,Parallel Stream并行处理使用的线程池是ForkJoinPool.commonPool(),这个线程池是由整个应用程序共享的线程池

@Test
    public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
        List<Long> aList = new ArrayList<>();
        Stream<Long> parallelStream = aList.parallelStream();

        assertTrue(parallelStream.isParallel());
    }

如何自定义线程池

简单示例

如下代码示例说明如下:

  • 使用的ForkJoinPool构造函数的并行级别为4。为了确定不同环境下的最佳值,需要进行一些实验,但一个好的经验法则是根据CPU的核数选择数值。
  • 接下来,处理并行流的内容,在reduce调用中对它们进行汇总。

这个简单的示例可能不能充分说明使用自定义线程池的用处,但是在不希望将公共线程池与长时间运行的任务绑定在一起(例如处理来自网络源的数据)或应用程序中的其他组件正在使用公共线程池的情况下,其好处就很明显了。

    @Test
    public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
        throws InterruptedException, ExecutionException {

        long firstNum = 1;
        long lastNum = 1_000_000;

        List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
            .collect(Collectors.toList());

        ForkJoinPool customThreadPool = new ForkJoinPool(4);
        long actualTotal = customThreadPool.submit(
            () -> aList.parallelStream().reduce(0L, Long::sum)).get();

        assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
    }

复杂点的示例

通过查看日志,可以看到使用了自定义的线程池,提高了并发处理的效率

	@Test
    public void testCustomThreadPool() throws ExecutionException, InterruptedException {
        List<Long> firstRange = LongStream.rangeClosed(1, 10).boxed()
            .collect(Collectors.toList());

        List<Long> secondRange = LongStream.rangeClosed(5000, 6000).boxed()
            .collect(Collectors.toList());

        ForkJoinPool forkJoinPool = new ForkJoinPool(3);
        Future<Long> future = forkJoinPool.submit(() -> {
            return firstRange.parallelStream().map((number) -> {
                try {
                    print(Thread.currentThread().getName() +" 正在处理 "+number);
                    Thread.sleep(5);
                } catch (InterruptedException e) {
                }finally {
                    return number;
                }
            }).reduce(0L, Long::sum);
        });
        assertEquals((1 + 10) * 10 / 2, future.get());

        forkJoinPool.shutdown();

        ForkJoinPool forkJoinPool2 = new ForkJoinPool(10);

        forkJoinPool2.submit(() -> {
            secondRange.parallelStream().forEach((number) -> {
                try {
                    print(Thread.currentThread().getName() +" 正在处理 "+number);
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                }
            });
        });
        forkJoinPool2.shutdown();
        TimeUnit.SECONDS.sleep(2);
    }
    
 	private static void print(String msg){
        System.out.println(msg);
    }

求质数

实现方案有如下2种:

  • 将Parallel task 直接提交给自定义的ForkJoinPool中
  • 将自定义线程池传递到完整的future.supplyAsync方法中
public class StreamTest {
    @Test
    public void testCompletableFuture()throws InterruptedException, ExecutionException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(2);
        CompletableFuture<List<Long>> primes = CompletableFuture.supplyAsync(() ->
                //parallel task here, for example
                range(1, 1_000_000).parallel().filter(StreamTest::isPrime).boxed().collect(toList()),
            forkJoinPool
        );
         forkJoinPool.shutdown();
        System.out.println(primes.get());
    }

    @Test
    public void testCustomForkJoinPool() throws InterruptedException {
        final int parallelism = 4;
        ForkJoinPool forkJoinPool = null;
        try {
            forkJoinPool = new ForkJoinPool(parallelism);
            final List<Integer> primes = forkJoinPool.submit(() ->
                // Parallel task here, for example
                IntStream.range(1, 1_000_000).parallel()
                    .filter(StreamTest::isPrime)
                    .boxed().collect(Collectors.toList())
            ).get();
            System.out.println(primes);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        } finally {
            if (forkJoinPool != null) {
                forkJoinPool.shutdown();
            }
        }
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }

    private static void print(String msg){
        System.out.println(msg);
    }
}

注意事项:小心内存泄漏【Memory Leak】

正如前面所讨论的,整个应用程序默认使用公共线程池。公共线程池是一个静态ThreadPool实例。
因此,如果使用默认线程池,就不会发生内存泄漏。

但是针对使用自定义线程池的场景下,customThreadPool对象不会被解引用和垃圾收集——相反,它将等待分配新任务【the customThreadPool object won’t be dereferenced and garbage collected — instead, it will be waiting for new tasks to be assigned】
也就是说,每次调用测试方法时,都会创建一个新的customThreadPool对象,并且它不会被释放。

解决这个问题很简单:在执行了这个方法之后关闭customThreadPool对象:文章来源地址https://www.toymoban.com/news/detail-488929.html

@Test
    public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
        throws InterruptedException, ExecutionException {

        long firstNum = 1;
        long lastNum = 1_000_000;

        List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
            .collect(Collectors.toList());
        ForkJoinPool customThreadPool = new ForkJoinPool(4);
        try {
            long actualTotal = customThreadPool.submit(
                () -> aList.parallelStream().reduce(0L, Long::sum)).get();

            assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
        }finally {
            customThreadPool.shutdown();
        }
    }

到了这里,关于JAVA-- 在Java8 Parallel Stream中如何自定义线程池?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java Stream对象并行处理方法parallel()

            Stream.parallel() 方法是 Java 8 中 Stream API 提供的一种并行处理方式。在处理大量数据或者耗时操作时,使用 Stream.parallel() 方法可以充分利用多核 CPU 的优势,提高程序的性能。本文将从以下几个方面对 Stream.parallel() 进行详解。 什么是 Stream.parallel() 方法         

    2024年02月16日
    浏览(66)
  • 使用Java8 Stream流中的Collectors.collectingAndThen()方法去重

    Collectors.collectingAndThen() 根据对象属性进行去重操作 Collectors.collectingAndThen()方法属于java8 Stream流中的 java.util.stream.Collectors ,此类实现了 java.util.stream.Collector 接口,还提供了大量的方法对Stream流中的元素进行 map 和 reduce 操作 在获取任务的时候,会出现id重复的状况,利用 Co

    2024年02月09日
    浏览(63)
  • 【Java基础】Java8 使用 stream().filter()过滤List对象(查找符合条件的对象集合)

    本篇主要说明在Java8及以上版本中,使用stream().filter()来过滤List对象,查找符合条件的集合。 集合对象以学生类(Student)为例,有学生的基本信息,包括:姓名,性别,年龄,身高,生日几项。 我的学生类代码如下: 下面来添加一些测试用的数据,代码如下: 添加过滤条件

    2024年02月12日
    浏览(68)
  • java8新特性Stream流中anyMatch和allMatch和noneMatch的使用!!!

    判断数据列表中是否存在任意一个元素符合设置的predicate条件,如果是就返回true,否则返回false。 接口定义: boolean anyMatch(Predicate? super T predicate); 方法描述: 在anyMatch 接口定义中是接收 Predicate 类型参数,在Lamdba表达式中 PredicateT 是接收一个T类型参数,然后经过逻辑验证返

    2024年02月08日
    浏览(59)
  • Java8使用stream流给List<Map<String,Object>>分组(多字段key)

    Java8使用 stream流 给ListMapString,Object根据字段key 分组 一、项目场景: 从已得到的List集合中,根据某一元素(这里指map的key)进行分组,筛选出需要的数据。 如果是SQL的话则使用 group by 直接实现,代码的方式则如下: 使用到stream流的 Collectors.groupingBy() 方法。 二、代码实现 1、首

    2024年02月02日
    浏览(65)
  • 使用java8 新特性stream流对List<Map<String, Object>>集合进行遍历、过滤、查询、去重、排序、分组

    对于一个ListMapString, Object类型的数据,可以使用Java 8的新特性stream流来进行遍历、过滤、查询、去重、排序、分组等操作。 遍历: 过滤: 查询: 去重: 排序: 分组:

    2024年02月10日
    浏览(69)
  • Java8 Stream流的合并

    最近的需求里有这样一个场景,要校验一个集合中每个对象的多个Id的有效性。比如一个Customer对象,有3个Id: id1 , id2 , id3 ,要把这些Id全部取出来,然后去数据库里查询它们是否存在。 通常情况下,我们都是从集合中取出对象的某一个字段,像这样: 现在要取3个字段,

    2024年02月02日
    浏览(54)
  • Java8 函数式编程stream流

    Java 8 中新增的特性旨在帮助程序员写出更好的代码,其中对核心类库的改进是很关键的一部分,也是本章的主要内容。对核心类库的改进主要包括集合类的 API 和新引入的流(Stream),流使程序员得以站在更高的抽象层次上对集合进行操作。下面将介绍stream流的用法。 ​场景

    2024年02月15日
    浏览(37)
  • Java8中Stream详细用法大全

    Java 8 是一个非常成功的版本,这个版本新增的Stream,配合同版本出现的Lambda ,给我们操作集合(Collection)提供了极大的便利。Stream流是JDK8新增的成员,允许以声明性方式处理数据集合,可以把Stream流看作是遍历数据集合的一个高级迭代器。Stream 是 Java8 中处理集合的关键抽

    2023年04月08日
    浏览(80)
  • Java8的Stream流的学习

    Stream可以由数组或集合创建,对流的操作分为两种: 中间操作,每次返回一个新的流,可以有多个。 终端操作,每个流只能进行一次终端操作,终端操作结束后流无法再次使用。终端操作会产生一个新的集合或值。 stream和parallelStream的简单区分: stream是顺序流,由主线程按

    2024年02月07日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包