并发编程 | 从Future到CompletableFuture - 简化 Java 中的异步编程

这篇具有很好参考价值的文章主要介绍了并发编程 | 从Future到CompletableFuture - 简化 Java 中的异步编程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

引言

在并发编程中,我们经常需要处理多线程的任务,这些任务往往具有依赖性,异步性,且需要在所有任务完成后获取结果。Java 8 引入了 CompletableFuture 类,它带来了一种新的编程模式,让我们能够以函数式编程的方式处理并发任务,显著提升了代码的可读性和简洁性。
在这篇博客中,我们将深入探讨 CompletableFuture 的设计原理,详细介绍其 API 的使用方式,并通过具体的示例来展示其在并发任务处理中的应用。我们也将探讨其与 Future,CompletableFuture 以及 Java 并发包中其他工具的对比,理解何时以及为什么需要使用 CompletableFuture。让我们一起踏上这个富有挑战性的学习之旅吧!


在开始之前,我们先来回顾一下Java语言发展历史

Java 并发编程的演进

自从诞生以来,Java 就一直致力于提供强大的并发和异步编程工具。在最初的 JDK 1.4 时期,Java 开发者需要使用低级的并发控制工具,如 synchronized 和 wait/notify,这些工具虽然功能强大,但使用起来非常复杂。
为了简化并发编程,Java 在 JDK 1.5 中引入了JUC包,提供了一系列高级的并发控制工具,如 ExecutorService、Semaphore 和 Future。

我们先来看下,Future到底是怎么进行异步编程的

Future的异步编程之旅

在开始我们的旅程之前,我们先看看一下这个需求。

一个复杂的需求

假设你正在为一家在线旅行社工作,用户可以在网站上搜索并预订飞机票和酒店。以下是你需要处理的一系列操作:

  1. 根据用户的搜索条件,查询所有可用的飞机票
  2. 对每一个飞机票,查询与之匹配的可用酒店
  3. 对每一个飞机票和酒店的组合,计算总价格
  4. 将所有的飞机票和酒店的组合按照价格排序
  5. 将结果返回给用户

实现

为了实现这个需求,首先,我们需要创建一个 ExecutorService,:

ExecutorService executor = Executors.newFixedThreadPool(10);
// 1. 查询飞机票
Future<List<Flight>> futureFlights = executor.submit(() -> searchFlights(searchCondition));

List<Flight> flights;
try {
    flights = futureFlights.get();
} catch (InterruptedException | ExecutionException e) {
    // 处理异常
}

// 2. 对每个飞机票查询酒店
List<Future<List<Hotel>>> futureHotelsList = new ArrayList<>();
for (Flight flight : flights) {
    Future<List<Hotel>> futureHotels = executor.submit(() -> searchHotels(flight));
    futureHotelsList.add(futureHotels);
}

List<Future<List<TravelPackage>>> futureTravelPackagesList = new ArrayList<>();
for (Future<List<Hotel>> futureHotels : futureHotelsList) {
    List<Hotel> hotels;
    try {
        hotels = futureHotels.get();
    } catch (InterruptedException | ExecutionException e) {
        // 处理异常
    }
    
    // 3. 对每个飞机票和酒店的组合计算总价格
    for (Hotel hotel : hotels) {
        Future<List<TravelPackage>> futureTravelPackages = executor.submit(() -> calculatePrices(flight, hotel));
        futureTravelPackagesList.add(futureTravelPackages);
    }
}

List<TravelPackage> travelPackages = new ArrayList<>();
for (Future<List<TravelPackage>> futureTravelPackages : futureTravelPackagesList) {
    try {
        travelPackages.addAll(futureTravelPackages.get());
    } catch (InterruptedException | ExecutionException e) {
        // 处理异常
    }
}

// 4. 将所有的旅行套餐按照价格排序
travelPackages.sort(Comparator.comparing(TravelPackage::getPrice));

// 5. 返回结果
return travelPackages;

需求终于做完了(叹气声)。此时此刻,生在JDK8+的你,会不会感同身受呢。这还是在没有处理异常,没有很多业务代码的前提下。好,现在缓一下我们继续。我们可以从上面代码最直观的看到什么?


再完美的表达,也敌不过一个让你直观感受的例子。接下来,我们来分析一下Future的缺点。

分析这趟Future异步编程之旅

从上面的 Future 的例子中,我们可以明显看到以下几点缺点:

回调地狱

Future 的实现使得我们必须在每一个 Future 完成后启动另一个 Future,这使得代码看起来像是在不断嵌套回调。这种方式会使得代码难以阅读和理解,特别是在涉及复杂的异步任务链时。

阻塞操作

虽然 Future.get() 可以得到任务的结果,但这是一个阻塞操作,它会阻止当前线程的执行,直到异步操作完成。这种设计对于要实现非阻塞的异步编程来说,是非常不理想的。

复杂的错误处理

在使用 Future 链式处理异步任务时,如果中间某个环节出现错误,错误处理的复杂性就会大大增加。你需要在每个 Future 的处理过程中都增加异常处理代码,这使得代码变得更加复杂和难以维护。

无法表示任务间复杂关系

使用 Future 很难直观地表示出任务之间的依赖关系。例如,你无法使用 Future 来表示某个任务需要在另外两个任务都完成后才能开始,或者表示多个任务可以并行执行但是必须在一个共同的任务之前完成。这种限制使得 Future 在处理复杂的异步任务链时变得非常困难。

因此,为了解决这些问题,CompletableFuture 被引入了 Java 8,提供了更强大和灵活的异步编程工具。


CompletableFuture的异步编程之旅

同样还是上面的例子,我们来看下它的实现代码:

CompletableFuture.supplyAsync(() -> searchFlights())  // 1. 查询飞机票
                .thenCompose(flights -> {  // 2. 对每个飞机票查询酒店
                    List<CompletableFuture<List<TravelPackage>>> travelPackageFutures = flights.stream()
                            .map(flight -> CompletableFuture.supplyAsync(() -> searchHotels(flight))  // 查询酒店
                                    .thenCompose(hotels -> {  // 3. 对每个飞机票和酒店的组合计算总价格
                                        List<CompletableFuture<TravelPackage>> packageFutures = hotels.stream()
                                                .map(hotel -> CompletableFuture.supplyAsync(() -> new TravelPackage(flight, hotel)))
                                                .collect(Collectors.toList());

                                        return CompletableFuture.allOf(packageFutures.toArray(new CompletableFuture[0]))
                                                .thenApply(v -> packageFutures.stream()
                                                        .map(CompletableFuture::join)
                                                        .collect(Collectors.toList()));
                                    }))
                            .collect(Collectors.toList());

                    return CompletableFuture.allOf(travelPackageFutures.toArray(new CompletableFuture[0]))
                            .thenApply(v -> travelPackageFutures.stream()
                                    .flatMap(future -> future.join().stream())
                                    .collect(Collectors.toList()));
                })
                .thenApply(travelPackages -> {  // 4. 将所有的旅行套餐按照价格排序
                    return travelPackages.stream()
                            .sorted(Comparator.comparing(TravelPackage::getPrice))
                            .collect(Collectors.toList());
                })
                .exceptionally(e -> {  // 处理所有的异常
                    // 处理异常
                    return null;
                });

你可能乍一看,感觉怎么比Future还要复杂。但是实际在业务中,它反而更加容易读懂。每一步,每一个操作都可以顺着thenCompose下去。


分析这趟CompletableFuture异步编程之旅

CompletableFuture 是 Java 8 中引入的,用于解决在使用 Future 时遇到的一些问题。它实现了 FutureCompletionStage 接口,并且提供了大量的方法来帮助你更好地控制和管理异步操作。我们来结合上面的例子来分析它的优点:

链式编程

我们使用 CompletableFuture 中的 supplyAsync 方法来异步地开始查询航班的操作:

    CompletableFuture<List<Flight>> flightsFuture = CompletableFuture.supplyAsync(() -> 
        searchFlights(source, destination));

然后,我们使用 thenCompose 方法将查询航班和查询酒店的操作连在一起:

    CompletableFuture<List<TravelPackage>> travelPackagesFuture = flightsFuture.thenCompose(flights ->
        CompletableFuture.supplyAsync(() -> flights.stream()
            .map(flight -> searchHotels(flight))
            .collect(Collectors.toList())
        ));
非阻塞操作

上述的 thenCompose 方法是非阻塞的,即查询酒店的操作会立即开始,而不需要等待查询航班的操作完成。

异常处理

我们使用 exceptionally 方法处理查询航班和查询酒店过程中可能出现的异常:

    CompletableFuture<List<TravelPackage>> travelPackagesFuture = flightsFuture.thenCompose(flights ->
        CompletableFuture.supplyAsync(() -> flights.stream()
            .map(flight -> searchHotels(flight))
            .collect(Collectors.toList())
        )).exceptionally(ex -> {
            System.out.println("失败了: " + ex);
            return new ArrayList<>();
        });
表示任务间复杂关系

我们使用 CompletableFuture.allOf 方法来表示所有的旅行套餐计算任务都必须在开始排序之前完成:

    CompletableFuture<List<TravelPackage>> sortedTravelPackagesFuture = travelPackagesFuture.thenApply(travelPackages ->
        travelPackages.stream()
            .flatMap(List::stream)
            .sorted(Comparator.comparing(TravelPackage::getPrice))
            .collect(Collectors.toList())
        );

暂停一分钟,再细细体会上面的例子。我们接着来集中比较这两者

CompletableFuture与Future的比较

异步执行与结果获取
  • Future 提供了一种在未来某个时间点获取结果的方式,但它的主要问题是在获取结果时,如果结果尚未准备好,会导致阻塞。另外,使用 isDone() 方法进行轮询也不是一个好的选择,因为它将消耗CPU资源。
  • CompletableFuture 提供了非阻塞的结果获取方法,thenApply, thenAccept, thenRun 等方法可以在结果准备好后被自动执行,这样我们不需要手动检查和等待结果。
链式操作
  • Future 不支持链式操作,我们无法在 Future 完成后自动触发另一个任务。
  • CompletableFuture 提供了 thenApply, thenAccept, thenRun, thenCompose, thenCombine 等一系列方法,用于在当前任务完成后自动执行另一个任务,形成任务链。
异常处理
  • Future 中,只能通过 get() 方法获取异常,但是这种方式会阻塞线程,直到任务执行完毕。
  • CompletableFuture 提供了 exceptionally, handle 等方法,我们可以用这些方法在发生异常时提供备用的结果,或者对异常进行处理。
任务组合
  • Future 并未提供任何任务组合的方式。
  • CompletableFuture 提供了 allOf, anyOf, thenCombine 等方法,我们可以通过这些方法来表示任务间的并行关系,或者汇聚关系。
灵活的任务执行控制
  • Future 在任务执行上相对较为死板,我们无法中途取消任务,也无法在任务结束后执行特定操作。
  • CompletableFuture 提供了 cancel, complete 等方法,用于中途取消任务,或者提前完成任务。此外,whenCompletewhenCompleteAsync 方法允许我们在任务结束时,无论成功或失败,都可以执行特定的操作。

假如有一个面试官现在问题它们两者的区别,你会回答了吗? 接下来,我们来解析一下


进阶 | 理解CompletableFuture原理

为了让你理解的不那么晦涩,我为你讲生活中的例子:

我们可以把 CompletableFuture 想象成一家装配线生产车间。每一件零件(任务)的加工完成(Future 完成)都可能会触发下一步工作(下一步的操作),而每一步工作的完成都会通知车间(Future),以便开始下一个阶段的生产。这个过程就像一条流水线,每完成一个步骤就自动进行下一个。

带着这个场景,我们接着往下看。

任务链

CompletableFuture 的源码中,有一个内部类 Completion,代表了任务链中的一项任务。每当一个任务完成时,它都会尝试去完成依赖于它的任务,就像流水线上的工人完成了一部分工作后,就会把半成品传递给下一个工人。

    abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
        // ...
    }
结果容器

CompletableFuture 本身就是一个结果容器,它持有了执行的结果,包括正常的计算结果或者执行过程中出现的异常。

    volatile Object result; // The outcome of the computation
工作线程

所有的异步任务都会提交到 ForkJoinPool.commonPool() 中进行执行,当然也可以指定自定义的 Executor 来执行任务。

    static final ForkJoinPool ASYNC_POOL = ForkJoinPool.commonPool();
任务触发

当一个任务完成后,CompletableFuture 会通过 tryFire 方法触发与之关联的下一个任务。这就好比工人完成了一部分工作后,通知流水线的下一位工人继续完成接下来的工作。

 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
        // ...
        if (a != null && a.stack != null) {
            if (mode < 0)
                a.cleanStack();
            else
                a.postComplete();
        }
        if (b != null && b.stack != null) {
            if (mode < 0)
                b.cleanStack();
            else
                b.postComplete();
        }
        return null;
    }

是不是有点理解了呢?我可以肯定的说,你已经超过80%的人了!


CompletableFuture的主要方法

细心的你肯定发现了,CompletableFuture大多数方法都实现于一个CompletionStage接口。当然,我在这里可以为你把所有方法都试过一遍,但是你肯定会看的特别累。这样!我把上面需求中所用到的方法都为你讲解,剩下的请你结合网上的案例学习。

supplyAsync()方法

这个方法用于异步执行一个供应函数,并返回一个CompletableFuture对象。在我们的示例中,这个方法用于启动一个异步任务来查找航班。

    CompletableFuture<List<Flight>> flightsFuture = CompletableFuture.supplyAsync(() -> searchFlights(destination));
thenCompose()方法

这个方法用于链接多个CompletableFuture对象,形成一个操作链。当一个操作完成后,thenCompose()方法会将操作的结果传递给下一个操作。在我们的示例中,这个方法用于在找到航班之后查找酒店。

    CompletableFuture<List<Hotel>> hotelsFuture = flightsFuture.thenCompose(flights -> CompletableFuture.supplyAsync(() -> searchHotels(destination)));
thenCombine()方法

这个方法用于将两个独立的CompletableFuture对象的结果合并为一个结果。在我们的示例中,这个方法用于将查找航班和酒店的结果合并为一个旅行套餐。

    CompletableFuture<List<TravelPackage>> travelPackagesFuture = flightsFuture.thenCombine(hotelsFuture, (flights, hotels) -> createTravelPackages(flights, hotels));
thenAccept()方法

这个方法在CompletableFuture对象完成计算后执行一个消费函数,接收计算结果作为参数,不返回新的计算值。在我们的示例中,这个方法用于打印出所有的旅行套餐。

    travelPackagesFuture.thenAccept(travelPackages -> printTravelPackages(travelPackages));
allOf()方法

这个方法用于将一个CompletableFuture对象的数组组合成一个新的CompletableFuture对象,这个新的CompletableFuture对象在数组中所有的CompletableFuture对象都完成时完成。在我们的示例中,这个方法用于将每个航班与每个酒店的组合结果(也就是旅行套餐)组合在一起。

    CompletableFuture.allOf(packageFutures.toArray(new CompletableFuture[0]))
                    .thenApply(v -> packageFutures.stream()
                            .map(CompletableFuture::join)
                            .collect(Collectors.toList()));
thenApply()方法

这个方法用于对CompletableFuture的结果进行变换,并返回一个新的CompletableFuture对象。在我们的示例中,这个方法用于将查询到的旅行套餐按照价格进行排序。

    .thenApply(travelPackages -> {  // 4. 将所有的旅行套餐按照价格排序
                    return travelPackages.stream()
                            .sorted(Comparator.comparing(TravelPackage::getPrice))
                            .collect(Collectors.toList());
                })
exceptionally()方法

这个方法用于处理CompletableFuture的异常情况。如果CompletableFuture的计算过程中抛出异常,那么这个方法会被调用。在我们的示例中,这个方法用于处理查询旅行套餐过程中可能出现的任何异常。

    .exceptionally(e -> {  // 处理所有的异常
                    // 处理异常
                    return null;
                });

当然,这些方法已经够你用了。除非这个需求比我想得还复杂,那算你厉害。哦,不对,算需求变态。现在,你可以挥起历史的毛笔续写了吗?


Java 并发编程的续章

JDK 1.5 的 Future 解决了许多并发编程的复杂性,但是它仍有一些局限性。Future 只能描述一个异步操作,并不能描述一个由多个步骤组成的异步操作。例如,当需要处理一个由多个异步操作序列组成的业务流程时,你可能会发现你的代码被复杂的回调逻辑淹没,这就是人们常说的回调地狱。此外,Future 没有提供一种有效的方式来处理异步操作的结果,你只能通过阻塞调用 get() 方法来获取结果。
为了解决这些问题,Java 在 JDK 1.8 中引入了 CompletableFuture。CompletableFuture 是 Future 的增强版,它不仅能表示一个异步操作,还可以通过 thenCompose(), thenCombine(), allOf() 等方法来描述一个由多个步骤组成的异步操作。通过这些方法,CompletableFuture 能以流畅的链式调用的方式来描述复杂的异步业务流程,这大大简化了异步编程的复杂性。


常见面试题

请解释一下 Future 接口在 Java 中的用途?
解释一下 Future 的局限性是什么?
请解释一下 CompletableFuture 的用途以及它如何克服 Future 的局限性?
如何用 CompletableFuture 来表示一组并行的异步操作?
请解释一下 CompletableFuture 的 thenApply(),thenCompose(),和 thenCombine() 方法的作用及区别?
如果你有一个耗时的异步操作需要执行,但是你又不希望调用 get() 方法时阻塞,你可以使用 CompletableFuture 的哪个方法来达到这个目的?
如何处理 CompletableFuture 的异常?
请解释一下 CompletableFuture 的工作原理?

阅读完文章的你,是否可以回答这些问题呢?我在留言等你。

总结

好了,到这里就结束了,我们来回顾一下。首先,我带你回顾了一下Java并发世界的编年史。紧接着,我带你体验了一下古人经常使用的Future。感到它的不妙之后,我带你回到CompletableFuture 。紧接着有深入了解了它的全貌以及使用方法。最后,希望阅读到这里的你,不要忘记回答问题哦。文章来源地址https://www.toymoban.com/news/detail-635853.html

到了这里,关于并发编程 | 从Future到CompletableFuture - 简化 Java 中的异步编程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 异步编程 - 04 基于JDK中的Future实现异步编程(上)_Future & FutureTask 源码解析

    这里我们主要探讨如何使用JDK中的Future实现异步编程,这包含 如何使用FutureTask实现异步编程及其内部实现原理; 如何使用CompletableFuture实现异步编程及其内部实现原理, 以及CompletableFuture与JDK Stream如何完美结合的。 在Java并发包(JUC包)中Future代表着异步计算结果,Future中

    2024年02月09日
    浏览(35)
  • Java组合式异步编程CompletableFuture

    CompletableFuture是Java 8中引入的一个功能强大的Future实现类,它的字面翻译是“可完成的Future”。 CompletableFuture对并发编程进行了增强,可以方便地将多个有一定依赖关系的异步任务以流水线的方式组合在一起,大大简化多异步任务的开发。 CompletableFuture实现了两个接口,一个

    2024年04月09日
    浏览(37)
  • 【并发编程】Java的Future机制详解(Future接口和FutureTask类)

    目录 一、彻底理解Java的Future模式 二、为什么出现Future机制 2.1 Future 类有什么用? 三、Future的相关类图 2.1 Future 接口 2.2 FutureTask 类 五、FutureTask源码分析 5.1 state字段 5.2 其他变量 5.3 CAS工具初始化 5.4 构造函数 5.5 jdk1.8和之前版本的区别 六、Callable 和 Future 有什么关系? 七、

    2024年02月03日
    浏览(38)
  • Java并发编程:Callable、Future和FutureTask

    在前面的文章中我们讲述了创建线程的 2 种方式,一种是直接继承 Thread ,另外一种就是实现 Runnable 接口。 这 2 种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。 如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起

    2024年02月13日
    浏览(38)
  • Java学习笔记-day06-响应式编程Reactor与Callback、CompletableFuture三种形式异步编码对比

    Reactor 是一个基于Reactive Streams规范的响应式编程框架。它提供了一组用于构建异步、事件驱动、响应式应用程序的工具和库。Reactor 的核心是 Flux (表示一个包含零到多个元素的异步序列)和 Mono 表示一个包含零或一个元素的异步序列)。 Reactor 通过提供响应式的操作符,如

    2024年02月03日
    浏览(42)
  • 并发编程-CompletableFuture解析

    CompletableFuture对象是JDK1.8版本新引入的类,这个类实现了两个接口,一个是Future接口,一个是CompletionStage接口。 CompletionStage接口是JDK1.8版本提供的接口,用于异步执行中的阶段处理,CompletionStage定义了一组接口用于在一个阶段执行结束之后,要么继续执行下一个阶段,要么对

    2024年02月15日
    浏览(34)
  • C++ 并发编程 | future与async

    async 函数接受两种不同的启动策略,这些策略在 std::launch 枚举中定义,如下: std::launch::defered :这种策略意味着任务将在调用 future::get() 或 future::wait 函数时延迟执行,也就是任务将在需要结果时 同步 执行 std::launch::async :任务在单独一个线程上 异步 执行 默认情况下 asy

    2024年01月24日
    浏览(44)
  • CompletableFuture异步编程事务及多数据源配置详解(含gitee源码)

    仓库地址: buxingzhe: 一个多数据源和多线程事务练习项目 小伙伴们在日常编码中经常为了提高程序运行效率采用多线程编程,在不涉及事务的情况下,使用dou.lea大神提供的CompletableFuture异步编程利器,它提供了许多优雅的api,我们可以很方便的进行异步多线程编程,速度杠杠

    2024年01月22日
    浏览(43)
  • CompletableFuture异步编程事务及多数据源配置问题(含gitee源码)

    仓库地址: buxingzhe: 一个多数据源和多线程事务练习项目 小伙伴们在日常编码中经常为了提高程序运行效率采用多线程编程,在不涉及事务的情况下,使用dou.lea大神提供的CompletableFuture异步编程利器,它提供了许多优雅的api,我们可以很方便的进行异步多线程编程,速度杠杠

    2024年02月05日
    浏览(40)
  • 【Java8新特性--->异步处理】CompletableFuture

    一、引入 假设一个商品详情页需要以下操作: 查询展示商品的基本信息耗时:0.5s 查询展示商品的销售信息耗时:0.7s 查询展示商品的图片信息耗时:1s 查询展示商品销售属性耗时:0.3s 查询展示商品规格属性耗时:1.5s 查询展示商品详情信息耗时:1s 即使每个查询时间耗时不

    2024年02月06日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包