并发编程 | CompletionService - 如何优雅地处理批量异步任务

这篇具有很好参考价值的文章主要介绍了并发编程 | CompletionService - 如何优雅地处理批量异步任务。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

引言

上一篇文章中,我们详细地介绍了 CompletableFuture,它是一种强大的并发工具,能帮助我们以声明式的方式处理异步任务。虽然 CompletableFuture 很强大,但它并不总是最适合所有场景的解决方案。
在这篇文章中,我们将介绍 Java 的 CompletionService,这是一种能处理批量异步任务并在完成时获取结果的并发工具。
CompletionService CompletableFuture 在很多方面都相似。它们都用于处理异步任务,并且都提供了获取任务完成结果的机制。然而,CompletionService 采用了更传统并发模型,它将生产者和消费者的角色更明确地分离开来。

回顾我们在上一篇文章:并发编程 | 从Future到CompletableFuture 中讨论的需求,我们需要查找并计算一系列旅行套餐的价格。我们使用 CompletableFuture 实现了这个需求,并且代码看起来很简洁明了。然而,事情都有两面性。有些人并不习惯这种写法,觉得CompletableFuture 的实现中存在大量的嵌套,会让代码难以阅读和理解。另外,我们的代码中有大量的函数式编程,这在一定程度上增加了对代码阅读的门槛,如果你不熟悉这种编程范式,代码可能会看起来很混乱。

有没有一种方法,既简洁的同时,又不回到Future的回调地狱陷阱中去?有,CompletionService 。来看下CompletionService 是怎么解决问题。


使用CompletionService 解决问题

如果我们用 CompletionService 来实现这个需求,会是什么样呢?我们来看下代码:

public List<TravelPackage> searchTravelPackages(SearchCondition searchCondition) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    CompletionService<List<TravelPackage>> completionService = new ExecutorCompletionService<>(executorService);

    List<Flight> flights = searchFlights(searchCondition);
    for (Flight flight : flights) {
        // 提交所有的任务
        completionService.submit(() -> {
            List<TravelPackage> travelPackagesForFlight = new ArrayList<>();
            List<Hotel> hotels = searchHotels(flight);
            for (Hotel hotel : hotels) {
                TravelPackage travelPackage = calculatePrice(flight, hotel);
                travelPackagesForFlight.add(travelPackage);
            }
            return travelPackagesForFlight;
        });
    }

    List<TravelPackage> allTravelPackages = new ArrayList<>();
    for (int i = 0; i < flights.size(); i++) {
        // 等待它们的完成
        Future<List<TravelPackage>> future = completionService.take();
        // 如果没完成,这里会阻塞
        List<TravelPackage> travelPackagesForFlight = future.get();
        allTravelPackages.addAll(travelPackagesForFlight);
    }
    executorService.shutdown();
    allTravelPackages.sort(Comparator.comparing(TravelPackage::getPrice));

    return allTravelPackages;
}

通过上面的代码,我们可以看到 CompletionService 提供了一个更传统的并发模型来处理异步任务。相比CompletableFuture 而言,我们的代码中没有复杂的嵌套,代码更加直观。

对初学者来说,这个模型会更容易理解,特别是对于那些不熟悉函数式编程的读者来说。
当然,作为老手的你(假如你弄懂了上篇文章,并实践完),如果你在使用CompletableFuture 过程中发现它嵌套太深太复杂,CompletionService 可能也是个不错的选择。


基于上述代码抽取CompletionService

我们把关键代码抽取出来并简化,就可以得到下面这段代码:

ExecutorService executor = Executors.newFixedThreadPool(4);
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);

long start = System.currentTimeMillis();
// 提交3个任务
completionService.submit(() -> {
    // 业务返回的实践可能不一样,模拟不一样的任务执行时间
    Thread.sleep(5000);
    return "任务1完成";
});
completionService.submit(() -> {
    // 业务返回的实践可能不一样,模拟不一样的任务执行时间
    Thread.sleep(3000);
    return "任务2完成";
});
completionService.submit(() -> {
    // 业务返回的实践可能不一样,模拟不一样的任务执行时间
    Thread.sleep(500);
    return "任务3完成";
});
completionService.submit(() -> {
    // 业务返回的实践可能不一样,模拟不一样的任务执行时间
    Thread.sleep(500);
    return "任务4完成";
});

// 获取结果
for (int i = 0; i < 4; i++) {
    try {
        Future<String> future = completionService.take();
        // 如果没完成,这里会阻塞
        System.out.println(future.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}
executor.shutdown();
long end = System.currentTimeMillis();
System.out.println("任务花费时间: " + (end - start) + " ms");

结合文中代码注释,我把它总结为一句口诀:批量提交,快速获取。

批量我知道啊,就是遍历呗,但是提交到那里去?快速获取是什么意思?别急,我们接着往下看。


使用ExecutorService 实现需求

在回答这个问题之前,我们先来看一下代码。我们先sumbit()一下…然后get()拿到数据…
嗯?这不是和之前ExecutorService 差不多吗?好像可以用它实现啊,你看代码:

public List<TravelPackage> searchTravelPackages(SearchCondition searchCondition) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newFixedThreadPool(10);

    List<Flight> flights = searchFlights(searchCondition);
    List<Future<List<TravelPackage>>> futureList = new ArrayList<>();
    for (Flight flight : flights) {
        Future<List<TravelPackage>> future = executorService.submit(() -> {
            List<TravelPackage> travelPackagesForFlight = new ArrayList<>();
            List<Hotel> hotels = searchHotels(flight);
            for (Hotel hotel : hotels) {
                TravelPackage travelPackage = calculatePrice(flight, hotel);
                travelPackagesForFlight.add(travelPackage);
            }
            return travelPackagesForFlight;
        });
        futureList.add(future);
    }

    List<TravelPackage> allTravelPackages = new ArrayList<>();
    for (Future<List<TravelPackage>> future : futureList) {
        List<TravelPackage> travelPackagesForFlight = future.get();
        allTravelPackages.addAll(travelPackagesForFlight);
    }

    executorService.shutdown();
    allTravelPackages.sort(Comparator.comparing(TravelPackage::getPrice));
    return allTravelPackages;
}

看,是不是可以实现了。那CompletionService这玩意存在的意义是啥?我们继续往下看。


提交先后顺序 VS 任务完成快慢顺序

我们先把上面抽取出来的代码执行,结果如下:

任务3完成
任务4完成
任务2完成
任务1完成
任务花费时间: 5012 ms
Disconnected from the target VM, address: '127.0.0.1:10373', transport: 'socket'

Process finished with exit code 0

然后,我们换成ExecutorService 执行,抽取的ExecutorService 代码如下:

ExecutorService executor = Executors.newFixedThreadPool(3);
ArrayList<Future<String>> futures = new ArrayList<>();
long start = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(4);

futures.add(executor.submit(() -> {
    // 业务返回的实践可能不一样,模拟不一样的任务执行时间
    Thread.sleep(5000);
    latch.countDown();
    return "任务1完成";
}));
futures.add(executor.submit(() -> {
    // 业务返回的实践可能不一样,模拟不一样的任务执行时间
    Thread.sleep(3000);
    latch.countDown();
    return "任务2完成";
}));
futures.add(executor.submit(() -> {
    // 业务返回的实践可能不一样,模拟不一样的任务执行时间
    Thread.sleep(500);
    latch.countDown();
    return "任务3完成";
}));
futures.add(executor.submit(() -> {
    // 业务返回的实践可能不一样,模拟不一样的任务执行时间
    Thread.sleep(500);
    latch.countDown();
    return "任务4完成";
}));

for (Future<String> future : futures) {
    try {
        // 如果没完成,这里会阻塞
        System.out.println(future.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}
latch.await();
executor.shutdown();
long end = System.currentTimeMillis();
System.out.println("任务花费时间: " + (end - start) + " ms");

执行结果如下:

任务1完成
任务2完成
任务3完成
任务4完成
任务花费时间: 5007 ms
Disconnected from the target VM, address: '127.0.0.1:14882', transport: 'socket'

Process finished with exit code 0

细心的你肯定可以看到它们执行结果上的差异。CompletionService 是按照任务时间的顺序消费的。好,搞懂了这个,我们就可以回答上面其中一个问题:

快速获取是什么?

CompletionService是按照任务的快慢,谁先执行完谁就先返回。可以看到上面示例代码的结果,任务3只需要500ms,所以任务3先返回。


CompletionService 的适用场景

既然CompletionService 可以按照任务快慢顺序来返回,我们来看下它适合哪些场景:

执行一组任务并处理结果

上面就是很好的例子,我们可以在任何任务完成后立即获取并处理其结果,以实现快速响应。提高程序的吞吐量(先执行完任务,就有多的线程空闲,可以响应更多任务)。

生产者-消费者模式

我们在最早的开篇说过,CompletionService可以天然地实现生产者-消费者模式。这个模式中,生产者线程负责批量提交任务,消费者线程负责获取并处理任务的结果,而且它也可以安全地在多个线程之间共享


新的问题又出现了,为什么又可以在多个线程之间共享?提交到那里去?快速获取是怎么做到的?以问题为导向,我们来分析下源码。

CompletionService源码分析

提交到那里去?为什么可以在多线程之间共享?

我们先看下构造函数中做了什么:

public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

ExecutorCompletionService使用了一个BlockingQueue来存储已完成的任务。因为,任务的提交ExecutorBlockingQueue都是线程安全的。所以多线程共享的数据竞争问题已经在内部解决了。

快速获取是怎么做到的?

我们可以看下submit()方法是怎么实现的。当你提交一个任务时,这个任务被封装在一个QueueingFuture对象中:

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}

QueueingFuture重写了done()方法。当任务完成时,done()方法会被调用,QueueingFuture会将自己添加到completionQueue中:

private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); } //当任务完成时,将任务添加到队列中
    private final Future<V> task;
}

这样似乎就可以解释,快速获取的机制。完成的任务优先被放入BlockingQueue中按照完成顺序排队。
现在,我换一种表述,你看下是否正确:快的任务在消费的时候就会被排在队列前面先被消费,这样就形成一个任务完成快慢的顺序,第一个被消费到的任务一定是最快的。


第一个被消费到的任务一定是最快的吗?

从上面的代码测试示例结果来看, 确实如此。但是,我很遗憾的告诉你,这句话是错误的。
这句话的正确性是建立在任务数等于线程数的前提下。这就显得很鸡肋了,在在生产中很难达到这个效果,因为资源是稀缺的。当然,我们还是拿代码说话:

ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
        long start = System.currentTimeMillis();

        completionService.submit(() -> {
            // 业务返回的实践可能不一样,模拟不一样的任务执行时间
            Thread.sleep(5000);
            return "任务1完成";
        });
        completionService.submit(() -> {
            // 业务返回的实践可能不一样,模拟不一样的任务执行时间
            Thread.sleep(3000);
            return "任务2完成";
        });
        completionService.submit(() -> {
            // 业务返回的实践可能不一样,模拟不一样的任务执行时间
            Thread.sleep(6000);
            return "任务3完成";
        });
        completionService.submit(() -> {
            // 业务返回的实践可能不一样,模拟不一样的任务执行时间
            Thread.sleep(500);
            return "任务4完成";
        });

        for (int i = 0; i < 4; i++) {
            try {
                System.out.println(completionService.take().get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        executor.shutdown();
        long end = System.currentTimeMillis();
        System.out.println("任务花费时间: " + (end - start) + " ms");

假如遵循执行快慢顺序,理想的状态应该是:4 -> 2 -> 1 -> 3;而结果却是:

Connected to the target VM, address: '127.0.0.1:5068', transport: 'socket'
任务2完成
任务4完成
任务1完成
任务3完成
任务花费时间: 6020 ms
Disconnected from the target VM, address: '127.0.0.1:5068', transport: 'socket'

这个结果也是意料之外,但在情理之中。因为线程总共只有3个,在1,2,3之间排序,任务顺序应该是2,1,3;然后当2执行完之后,1和3依然未执行完;这个时候4正好执行完。于是就插队到任务中。最终得到2,4,1,3的结果。
因此,我们可以说:在生产环境中,这个顺序是不可控的,除非你把线程设置为1;


CompletionService相关面试题

如何使用CompletionService处理一组任务并获取结果?
比较ExecutorService和CompletionService,它们有什么相同之处和不同之处?
在何种情况下,你会选择使用CompletionService而不是ExecutorService?
解释CompletionService是如何保证按任务完成顺序获取结果的
当一个任务被提交到CompletionService后,它的生命周期是怎样的?在任务执行过程中,CompletionService内部都发生了什么?
在使用CompletionService处理任务时,如果某个任务执行异常,应该如何处理?
如果我想取消CompletionService中的所有任务,应该如何做?
谈谈你对Java中的Executor,ExecutorService,CompletionService和Future之间关系的理解

看完上面的文章,你可以试着来回答了吗?


参考文献

  1. Java并发编程小册

总结

让我们一起回顾今天所学。首先,我引导你使用了CompletionService和ExecutorService来实现了先前复杂的需求。相较于CompletableFuture,它们可能显得更为传统,但也更易理解。然后,我们一起探索了CompletionService的存在意义。我们试图解答,既然ExecutorService已经足够应对需求,为什么还要有CompletionService这样的设计。为了揭示这个疑惑,我们深入到源码中,同时也纠正了一个错误观点,以帮助你对CompletionService有更深刻的理解。最后,我们通过面试题形式,来巩固和复习我们所学的知识。文章来源地址https://www.toymoban.com/news/detail-609877.html

到了这里,关于并发编程 | CompletionService - 如何优雅地处理批量异步任务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • JavaScript 生成器函数详解:优雅处理异步任务流

    目录 1. 生成器函数的定义和使用 2. 暂停和恢复执行 3. 与其他语言特性的配合使用 Iterator Protocol 迭代器协议  解构赋值  生成器和 Promise 的组合使用          使用 Promise:         使用 async/await: 委托给另外一个Generator函数         Generators 是 JavaScript 中的一种

    2024年02月12日
    浏览(45)
  • Rust基础拾遗--并发和异步编程

       通过 Rust程序设计-第二版 笔记的形式对Rust相关 重点知识 进行汇总,读者通读此系列文章就可以轻松的把该语言基础捡起来。 为什么一些看似正确的多线程惯用法却根本不起作用? 与“内存模型”有关 你最终会找到一种自己用起来顺手且不会经常出错的并发惯用法。

    2024年02月19日
    浏览(26)
  • Unity 中的 async/await:优雅处理异步任务与协程

    内容将会持续更新,有错误的地方欢迎指正,谢谢!   Unity 中的 async/await:优雅处理异步任务与协程Coroutine       TechX 坚持将创新的科技带给世界! 拥有更好的学习体验 —— 不断努力,不断进步,不断探索 TechX —— 心探索、心进取! 助力快速掌握 async/await 异步等待 为初

    2024年02月06日
    浏览(42)
  • JUC并发编程学习笔记(十四)异步回调

    Future设计的初衷:对将来的某个事件的结果进行建模 在Future类的子类中可以找到CompletableFuture,在介绍中可以看到这是为非异步的请求使用一些异步的方法来处理 点进具体实现类中,查看方法,可以看到CompletableFuture中的异步内部类,里面是实现的异步方法 以及一些异步方法

    2024年02月05日
    浏览(33)
  • Python异步编程之web框架 异步vs同步 Redis并发对比

    主题: 比较异步框架和同步框架在RedisIO操作的性能差异 python版本 :python 3.8 数据库 :redis 5.0.7 压测工具 :locust web框架 :同步:flask 异步:starlette 请求并发量 : 模拟10个用户 服务器配置 : Intel(R) i7-12700F 客户端配置 :Intel(R) i7-8700 3.20GHz flask是python中轻量级web框架,特点是灵

    2024年02月10日
    浏览(32)
  • 【文末送书】Python高并发编程:探索异步IO和多线程并发

    欢迎关注博主 Mindtechnist 或加入【智能科技社区】一起学习和分享Linux、C、C++、Python、Matlab,机器人运动控制、多机器人协作,智能优化算法,滤波估计、多传感器信息融合,机器学习,人工智能等相关领域的知识和技术。搜索关注公粽号 《机器和智能》 发送“刷题宝

    2024年02月15日
    浏览(32)
  • 掌握C#中异步魔法:同步方法如何优雅调用异步方法

      概述: 上述C#示例演示了如何在同步方法中调用异步方法。通过使用`async`和`await`,实现了同步方法对异步方法的调用。建议使用`await`而不是`Result`来避免潜在的死锁问题。这种模式在处理异步任务时能够提高代码的可读性和性能。 在C#中,从同步方法调用异步方法

    2024年03月25日
    浏览(31)
  • SpringBoot如何优雅的实现异步调用?

    Spring Boot 提供了多种方式来实现异步任务,这里介绍三种主要实现方式。 Spring Boot 提供了多种方式来实现异步任务,这里介绍三种实现方式。 @Async 注解是 Spring 提供的一种轻量级异步方法实现方式,它可以标记在方法上,用来告诉 Spring 这个方法是一个异步方法,Spring 会将

    2024年02月07日
    浏览(27)
  • 并发编程 | 从Future到CompletableFuture - 简化 Java 中的异步编程

    在并发编程中,我们经常需要处理多线程的任务,这些任务往往具有依赖性,异步性,且需要在所有任务完成后获取结果。Java 8 引入了 CompletableFuture 类,它带来了一种新的编程模式,让我们能够以函数式编程的方式处理并发任务,显著提升了代码的可读性和简洁性。 在这篇

    2024年02月13日
    浏览(35)
  • 【SpringBoot系列】如何优雅地实现异步调用

    1.前言 在现代的应用程序开发中,异步调用是提高系统性能和响应能力的重要手段之一。 Spring Boot作为一个快速开发框架,提供了多种方式来实现异步调用,使得开发者能够更加优雅地处理并发和异步任务。 本文将介绍如何在Spring Boot中实现异步调用的方法和技巧,帮助开发

    2024年02月07日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包