CompletableFuture异步任务编排使用

这篇具有很好参考价值的文章主要介绍了CompletableFuture异步任务编排使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

runAsync 和 supplyAsync

  • runAsync(runnable):无返回值
  • runAsync(runnable, executor):无返回值,可自定义线程池
  • supplyAsync(runnable):有返回值
  • supplyAsync(runnable, executor):有回值,可自定义线程池

相关代码演示:

    public static void testOne(){
        CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> {
            System.out.println("start1");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end1");
        });
        CompletableFuture<String> twoFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end2");
            return "this is twoFuture";
        });
        CompletableFuture.allOf(oneFuture, twoFuture).join();
        System.out.println(oneFuture.join());
        System.out.println(twoFuture.join());
    }
start1
start2
end1
end2
null
this is twoFuture

解析:oneFuture.join()获取的执行结果为null,因为runAsync是没有返回结果的。

allOf 和 anyOf

  • allOf(future1,future2,future3…):等待所有future任务都完成,才可以做接下来的事。无返回值
  • anyOf(future1,future2,future3…):任意一个任务完成,就可以做接下来的事。返回object

allOf用法示例:

    public static void testTwo(){
        long startTime = System.currentTimeMillis();
        CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> {
            System.out.println("start1");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end1");
        });
        CompletableFuture<String> twoFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end2");
            return "this is twoFuture";
        });
        CompletableFuture<Integer> threeFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start3");
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end3");
            return 100;
        });
        CompletableFuture.allOf(oneFuture, twoFuture, threeFuture).join();
        System.out.println(twoFuture.join() + threeFuture.join());
        System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
    }
start1
start2
start3
end1
end3
end2
this is twoFuture100
cost:2067ms

解析:allOf后的join起阻塞主线程作用。从结果可以看出,所有future执行完成后,再执行的主线程逻辑。

anyOf用法示例:

    public static void testThree(){
        long startTime = System.currentTimeMillis();
        CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> {
            System.out.println("start1");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end1");
        });
        CompletableFuture<String> twoFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end2");
            return "this is twoFuture";
        });
        CompletableFuture<Integer> threeFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start3");
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end3");
            return 100;
        });
        Object result = CompletableFuture.anyOf(oneFuture, twoFuture, threeFuture).join();
        System.out.println("result:" + result);
        System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
    }
start1
start2
start3
end1
result:null
cost:1058ms

解析:oneFuture 最先完成,因为没有返回值,所以获得的结果是null

join 和 get

都是用于获取Completable的返回值的

  • join方法可能会抛出未检验的异常
  • get方法强制用户手动处理异常

whenComplete 和 whenCompleteAsync 和 exceptionally

  • whenComplete:执行当前线程的任务继续执行whenComplete的任务
  • whenCompleteAsync:whenCompleteAsync的任务是由线程池来执行
  • CompleableFuture即使发生异常也会执行whenComplete、whenCompleteAsync
  • exceptionally是用来处理异常的

以whenComplete举例
正常逻辑:

    public static void testFour() {
        long startTime = System.currentTimeMillis();
        CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start1");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end1");
            return 100;
        }).whenComplete((res, e) ->{
            System.out.println("res:" + res);
            System.out.println("e:" + e);
        }).exceptionally((e) ->{
            System.out.println("error:" + e);
            return -1;
        });
        System.out.println("result:" + oneFuture.join());
        System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
    }
start1
end1
res:100
e:null
result:100
cost:1084ms

捕获和处理异常:

    public static void testFive() {
        long startTime = System.currentTimeMillis();
        CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start1");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end1");
            return 100/0;
        }).whenComplete((res, e) ->{
            System.out.println("res:" + res);
            System.out.println("e:" + e);
        }).exceptionally((e) ->{
            System.out.println("error:" + e);
            return -1;
        });
        System.out.println("result:" + oneFuture.join());
        System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
    }
start1
end1
res:null
e:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
error:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
result:-1
cost:1073ms

handle 和 handleAsync

  • handle和handleAsync的区别是后者用线程池管理
  • handle相当于whenComplete和exceptionally的组合,能够对异常捕获和处理

handle捕获和处理异常:

    public static void testSix() {
        long startTime = System.currentTimeMillis();
        CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start1");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end1");
            return 100/0;
        }).handle((res, e) ->{
            System.out.println("res:" + res);
            System.out.println("e:" + e);
            return -1;
        });
        System.out.println("result:" + oneFuture.join());
        System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
    }
start1
end1
res:null
e:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
result:-1
cost:1081ms

串行编排

  • 该模块用到的api都有普通版和async版本,这里不做赘述。async版本可以传入线程池,用线程池管理逻辑。

runAsync().thenRunAsync()

  • runAsync没有返回值,thenRunAsync也没有返回值
    public static void testSeven(){
        long startTime = System.currentTimeMillis();
        CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> {
            System.out.println("start1");
            System.out.println("end1");
        }).thenRunAsync(() ->{
            System.out.println("do something");
        });
        oneFuture.join();
        System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
    }
start1
end1
do something
cost:72ms

supplyAsync().thenAcceptAsync((res) ->{})

  • thenAcceptAsync取supplyAsync的返回值,自身没有返回值
    public static void testEight(){
        long startTime = System.currentTimeMillis();
        CompletableFuture<Void> oneFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start1");
            System.out.println("end1");
            return 100;
        }).thenAcceptAsync((res) ->{
            System.out.println("res:"+ res);
        });
        oneFuture.join();
        System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
    }
start1
end1
res:100
cost:83ms

supplyAsync().thenApplyAsync((res) ->{return}

  • thenApplyAsync取supplyAsync的返回值,自身也有返回值
    public static void testNine(){
        long startTime = System.currentTimeMillis();
        CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start1");
            System.out.println("end1");
            return 100;
        }).thenApplyAsync((res) ->{
            return 100* 10;
        });
        System.out.println("result:" + oneFuture.join());
        System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
    }
start1
end1
result:1000
cost:75ms

两个任务都完成,再做其他事

runAfterBothAsync

  • 无入参、无出参
    public static void testTen(){
        CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start1");
            System.out.println("end1");
            return 100;
        });
        CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> {
            System.out.println("start2");
            System.out.println("end2");
        });
        oneFuture.runAfterBothAsync(twoFuture, ()->{
            System.out.println("do something");
        });
        System.out.println("result:" + oneFuture.join());
    }
start1
end1
start2
end2
do something
result:100

thenAcceptBothAsync

  • 有入参、无出参
    public static void testEleven(){
        CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start1");
            System.out.println("end1");
            return 100;
        });
        CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> {
            System.out.println("start2");
            System.out.println("end2");
        });
        oneFuture.thenAcceptBothAsync(twoFuture, (res1, res2)->{
            System.out.println("res1:" + res1);
            System.out.println("res2:" + res2);
        });
        System.out.println("result:" + oneFuture.join());
    }
start1
end1
start2
end2
result:100
res1:100
res2:null

thenCombine

  • 有入参、有出参
    public static void testTwelve(){
        CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start1");
            System.out.println("end1");
            return 100;
        });
        CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> {
            System.out.println("start2");
            System.out.println("end2");
        });
        CompletableFuture<Integer> combineFuture = oneFuture.thenCombine(twoFuture, (res1, res2) -> {
            System.out.println("res1:" + res1);
            System.out.println("res2:" + res2);
            return res1 == 100 ? res1 : -1;
        });
        System.out.println("result1:" + oneFuture.join());
        System.out.println("combine:" + combineFuture.join());
    }
start1
end1
start2
end2
res1:100
res2:null
result1:100
combine:100

任意一个任务完成,再做其他事

runAfterEitherAsync

  • 无入参、无出参
    public static void testThirteen(){
        CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start1");
            System.out.println("end1");
            return 100;
        });
        CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> {
            System.out.println("start2");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end2");
        });
        oneFuture.runAfterEitherAsync(twoFuture, ()->{
            System.out.println("do something");
        });
        System.out.println("result:" + oneFuture.join());
    }
start1
end1
start2
result:100
do something

acceptEitherAsync

  • 有入参、无出参
    public static void testFourteen(){
        CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start1");
            System.out.println("end1");
            return 100;
        });
        CompletableFuture<Integer> twoFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start2");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end2");
            return 10;
        });
        oneFuture.acceptEitherAsync(twoFuture, (res)->{
            System.out.println("res:"+ res);
        });
        System.out.println("result:" + oneFuture.join());
    }
start1
end1
start2
result:100
res:100

applyToEitherAsync

  • 有入参、有出参
    public static void testFifteen(){
        CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start1");
            System.out.println("end1");
            return 100;
        });
        CompletableFuture<Integer> twoFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("start2");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end2");
            return 10;
        });
        CompletableFuture<Integer> applyFuture = oneFuture.applyToEitherAsync(twoFuture, (res) -> {
            System.out.println("res:" + res);
            return res * 10;
        });
        System.out.println("result:" + oneFuture.join());
        System.out.println("applyFuture:" + applyFuture.join());
    }
start1
end1
start2
result:100
res:100
applyFuture:1000

总结

根据以上api,在多任务的情况下可以实现任意组合,实现异步执行逻辑,并提高了代码的执行效率。文章来源地址https://www.toymoban.com/news/detail-444849.html

到了这里,关于CompletableFuture异步任务编排使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • CompletableFuture异步回调

    CompletableFuture简介 CompletableFuture被用于异步编程,异步通常意味着非阻塞,可以使得任务单独允许在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常信息。 CompletableFuture实现了Future,CompletionStage接口,实现了Future接

    2024年02月05日
    浏览(36)
  • CompletableFuture异步关于异常的坑

    写一个存在异常的程序,让其异步执行 结果:接口返回成功,控制台没有打印错误信息。 结果:接口返回失败,控制台打印异常日志。 异步方法中get()是阻塞的,在使用时要设置超时时间。 结果:接口返回成功,控制台打印异常信息。 结果:接口返回成功,控制台打印异步

    2024年02月08日
    浏览(38)
  • Java组合式异步编程CompletableFuture

    CompletableFuture是Java 8中引入的一个功能强大的Future实现类,它的字面翻译是“可完成的Future”。 CompletableFuture对并发编程进行了增强,可以方便地将多个有一定依赖关系的异步任务以流水线的方式组合在一起,大大简化多异步任务的开发。 CompletableFuture实现了两个接口,一个

    2024年04月09日
    浏览(37)
  • CompletableFuture:Java中的异步编程利器

    前言: 在秋招的面试中,面试官问了很多关于异步编程相关的知识点,朋友最近也和我聊到了这个话题,因此今天咱们来讨论讨论这个知识点! 随着现代软件系统的日益复杂,对于非阻塞性和响应性的需求也在不断增加。Java为我们提供了多种工具和技术来满足这些需求,其

    2024年02月04日
    浏览(37)
  • 【Java8新特性--->异步处理】CompletableFuture

    一、引入 假设一个商品详情页需要以下操作: 查询展示商品的基本信息耗时:0.5s 查询展示商品的销售信息耗时:0.7s 查询展示商品的图片信息耗时:1s 查询展示商品销售属性耗时:0.3s 查询展示商品规格属性耗时:1.5s 查询展示商品详情信息耗时:1s 即使每个查询时间耗时不

    2024年02月06日
    浏览(43)
  • 异步编程 - 06 基于JDK中的Future实现异步编程(中)_CompletableFuture源码解析

    CompletableFuture实现了CompletionStage接口 。 1)一个CompletionStage代表着一个异步计算节点,当另外一个CompletionStage计算节点完成后,当前CompletionStage会执行或者计算一个值;一个节点在计算终止时完成,可能反过来触发其他依赖其结果的节点开始计算。 2)一个节点(CompletionStag

    2024年02月09日
    浏览(35)
  • 从 Future 到 CompletableFuture:简化 Java 中的异步编程

    在并发编程中,我们经常需要处理多线程的任务,这些任务往往具有依赖性,异步性,且需要在所有任务完成后获取结果。Java 8 引入了 CompletableFuture 类,它带来了一种新的编程模式,让我们能够以函数式编程的方式处理并发任务,显著提升了代码的可读性和简洁性。 在这篇

    2024年02月11日
    浏览(34)
  • CompletableFuture与线程池:Java 8中的高效异步编程搭配

    摘要:在Java 8中,CompletableFuture和线程池的结合使用为程序员提供了一种高效、灵活的异步编程解决方案。本文将深入探讨CompletableFuture和线程池结合使用的优势、原理及实际应用案例,帮助读者更好地理解并掌握这一技术。 随着多核处理器的普及,应用程序的性能和响应能

    2024年02月07日
    浏览(62)
  • 并发编程 | 从Future到CompletableFuture - 简化 Java 中的异步编程

    在并发编程中,我们经常需要处理多线程的任务,这些任务往往具有依赖性,异步性,且需要在所有任务完成后获取结果。Java 8 引入了 CompletableFuture 类,它带来了一种新的编程模式,让我们能够以函数式编程的方式处理并发任务,显著提升了代码的可读性和简洁性。 在这篇

    2024年02月13日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包