CompletableFuture异步回调

这篇具有很好参考价值的文章主要介绍了CompletableFuture异步回调。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

CompletableFuture异步回调

CompletableFuture简介

CompletableFuture被用于异步编程,异步通常意味着非阻塞,可以使得任务单独允许在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常信息。
CompletableFuture实现了Future,CompletionStage接口,实现了Future接口可以兼容线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类。

Futrue和CompletableFuture

Future在Java里面,通过用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我么会得到一个Future,在Future里面有isDone方法来判断任务是否处理结束,该有get方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。

Futrue缺点

1.不支持手动完成。2.不支持进一步的非阻塞调用。3.不支持链式调用。4.不支持多个Future合并。5.不支持异步处理。

CompletableFuture类的使用案例

CompletableFuture01
package com.shaonian.juc.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * 演示CompletableFuture
 * @author 长名06
 * @version 1.0
 */
public class CompletableFuture01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = new CompletableFuture<>();

        new Thread(() -> {
            System.out.println("子线程开始干活");
            try {
                //子线程沉睡3s
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //完成future任务
            future.complete("success");
        },"A").start();

        System.out.println("主线程调用get方法获取结果为:" + future.get());
        System.out.println("主线程完成,阻塞结束");
    }
}
CompletableFuture02
package com.shaonian.juc.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @author 长名06
 * @version 1.0
 */
public class CompletableFuture02 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //异步调用,无返回值
        CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "执行runSync()");
        });
        completableFuture1.get();

        //异步调用,有返回值
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "执行supplyAsync()");
//            int i = 1/0;
            return 1024;
        });
        completableFuture2.whenComplete((t, u) -> {
            System.out.println("----t=" + t);//t参数,是执行的返回值
            System.out.println("----u=" + u);//异常信息
        }).get();
//        System.out.println(Runtime.getRuntime().availableProcessors());

    }
}
CompletableFuture03
package com.shaonian.juc.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * 演示线程依赖,执行api thenApply()
 * 一个任务,依赖于另一个任务可以使用thenApply()将两个任务(线程)串行化
 * 对一个数先加10 再平方
 * @author 长名06
 * @version 1.0
 */
public class CompletableFuture03 {

    public static Integer num = 10;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Thread.currentThread().getName() + "主线程开始");


        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("加10任务开启");
            num += 10;
            return num;
        }).thenApply(i -> num * num);
        Integer integer = future.get();
        System.out.println("主线程结束,子线程的结果为" + integer);
    }
}
CompletableFuture04
package com.shaonian.juc.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;

/**
 * 消费处理结果
 * thenAccept()方法,接收任务的处理结果,并消费结果,不返回结果了
 * @author 长名06
 * @version 1.0
 */
public class CompletableFuture04 {
    public static Integer num = 10;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Thread.currentThread().getName() + "主线程开始");


        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("加10任务开启");
            num += 10;
            return num;
        }).thenApply(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) {
                return num * num;
            }
        }).thenAccept(new Consumer<Integer>() {
            @Override
            public void accept(Integer i) {
                System.out.println("子线程全部处理完成,最后调用了accept方法,消费了结果" + i);
            }
        });
    }
}
CompletableFuture05
package com.shaonian.juc.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;

/**
 * 异常处理
 * @author 长名06
 * @version 1.0
 */
public class CompletableFuture05 {
    public static Integer num = 10;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Thread.currentThread().getName() + "主线程开始");


        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int i = 1/0;//模拟异常
            System.out.println("加10任务开启");
            num += 10;
            return num;
        }).exceptionally(new Function<Throwable, Integer>() {
            @Override
            public Integer apply(Throwable ex) {
                System.out.println(ex.getMessage());
                return -1;
            }
        });
    }
}
CompletableFuture06
package com.shaonian.juc.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
 * 消费结果,同时处理异常
 * handle类似与thenAccept/thenRun方法,是最后一步结果的调用,但是同时可以处理异常
 * @author 长名06
 * @version 1.0
 */
public class CompletableFuture06 {
    public static Integer num = 10;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Thread.currentThread().getName() + "主线程开始");


        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                int i = 1/0;
                System.out.println("加10任务开启");
                num += 10;
            return num;
        }).handle(new BiFunction<Integer, Throwable, Integer>() {
            @Override
            public Integer apply(Integer i, Throwable ex) {
                System.out.println("进入了handle方法");
                if(ex != null){
                    System.out.println("发生了异常,内容为" + ex.getMessage());
                    return -1;
                }else{
                    System.out.println("正常执行,结果为" + i);
                    return i;
                }
            }
        });
    }
}
CompletableFuture07
package com.shaonian.juc.completable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
 * 两个CompletableFuture结果的合并
 * @author 长名06
 * @version 1.0
 */
public class CompletableFuture07 {
    public static Integer num = 10;

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //有依赖关系的合并
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("加10任务开启");
            num += 10;
            return num;
        });
        //合并
        CompletableFuture<Integer> future2 = future.thenCompose(new Function<Integer, CompletionStage<Integer>>() {
            @Override
            public CompletionStage<Integer> apply(Integer i) {
                return CompletableFuture.supplyAsync(() -> {
                    return i + 1;
                });
            }
        });
        System.out.println(future.get());
        System.out.println(future2.get());

        //无依赖的任务合并
        CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("加10任务开启");
            num += 10;
            return num;
        });

        CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("乘10任务开启");
            num *= 10;
            return num;
        });

        //合并两个结果
        CompletableFuture<Object> future3 = job1.thenCombine(job2, new BiFunction<Integer, Integer, List<Integer>>() {
            @Override
            public List<Integer> apply(Integer result1, Integer result2) {
                ArrayList<Integer> list = new ArrayList<>();
                list.add(result1);
                list.add(result2);
                return list;
            }
        });
        System.out.println("合并结果为" + future3.get());
    }
}
CompletableFuture08
package com.shaonian.juc.completable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

/**
 * 多个独立任务的合并 allOf
 * @author 长名06
 * @version 1.0
 */
public class CompletableFuture08 {

    public static Integer num = 10;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<CompletableFuture<Integer>> list = new ArrayList<>();

        CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("加10任务开启");
            num += 10;
            return num;
        });
        list.add(job1);

        CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("乘10任务开启");
            num *= 10;
            return num;
        });
        list.add(job2);

        CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("减10任务开启");
            num -= 10;
            return num;
        });
        list.add(job3);

        CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
            System.out.println("除10任务开启");
            num /= 10;
            return num;
        });
        list.add(job4);
        //使用allOf需注意,输入也会执行任务,但是无法获取到结果
        //allOf需要等所有的任务执行完毕
        /**
         * 返回值是CompletableFuture<Void>类型
         *  public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
         *         return andTree(cfs, 0, cfs.length - 1);
         *  }
         */
//        CompletableFuture<Void> allJob = CompletableFuture.allOf(list.toArray(new CompletableFuture[0]));
//        System.out.println(allJob.get());
        //也可以使用 join的形式,执行,可以获取结果
        List<Integer> allResult = list.stream().map(CompletableFuture::join)
                .collect(Collectors.toList());
        System.out.println(allResult);
    }
}
CompletableFuture09
package com.shaonian.juc.completable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * anyOf
 * @author 长名06
 * @version 1.0
 */
public class CompletableFuture09 {
    public static Integer num = 10;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<CompletableFuture<Integer>> list = new ArrayList<>();

        CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("加10任务开启");
            num += 10;
            return num;
        });
        list.add(job1);

        CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("乘10任务开启");
            num *= 10;
            return num;
        });
        list.add(job2);

        CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("减10任务开启");
            num -= 10;
            return num;
        });
        list.add(job3);

        CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("除10任务开启");
            num /= 10;
            return num;
        });
        list.add(job4);
        //anyOf,这里只要有一个job执行完毕,就结束所有的任务执行,不需要等待所有的job执行完毕
        //但是这个很鸡肋,因为如果不要执行所有的任务,就没必要开启一个CompletableFuture了
        //也可以适用于竞争的场景,先执行成功的获取结果,其他的不再竞争了
        CompletableFuture<Object> allJob = CompletableFuture.anyOf(list.toArray(new CompletableFuture[0]));
        System.out.println(allJob.get());
    }
}

只是为了记录自己的学习历程,且本人水平有限,不对之处,请指正。文章来源地址https://www.toymoban.com/news/detail-747851.html

到了这里,关于CompletableFuture异步回调的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • CompletableFuture异步关于异常的坑

    写一个存在异常的程序,让其异步执行 结果:接口返回成功,控制台没有打印错误信息。 结果:接口返回失败,控制台打印异常日志。 异步方法中get()是阻塞的,在使用时要设置超时时间。 结果:接口返回成功,控制台打印异常信息。 结果:接口返回成功,控制台打印异步

    2024年02月08日
    浏览(28)
  • CompletableFuture:Java中的异步编程利器

    前言: 在秋招的面试中,面试官问了很多关于异步编程相关的知识点,朋友最近也和我聊到了这个话题,因此今天咱们来讨论讨论这个知识点! 随着现代软件系统的日益复杂,对于非阻塞性和响应性的需求也在不断增加。Java为我们提供了多种工具和技术来满足这些需求,其

    2024年02月04日
    浏览(29)
  • Java组合式异步编程CompletableFuture

    CompletableFuture是Java 8中引入的一个功能强大的Future实现类,它的字面翻译是“可完成的Future”。 CompletableFuture对并发编程进行了增强,可以方便地将多个有一定依赖关系的异步任务以流水线的方式组合在一起,大大简化多异步任务的开发。 CompletableFuture实现了两个接口,一个

    2024年04月09日
    浏览(26)
  • 【Java8新特性--->异步处理】CompletableFuture

    一、引入 假设一个商品详情页需要以下操作: 查询展示商品的基本信息耗时:0.5s 查询展示商品的销售信息耗时:0.7s 查询展示商品的图片信息耗时:1s 查询展示商品销售属性耗时:0.3s 查询展示商品规格属性耗时:1.5s 查询展示商品详情信息耗时:1s 即使每个查询时间耗时不

    2024年02月06日
    浏览(29)
  • SpringBoot多线程异步任务:ThreadPoolTaskExecutor + CompletableFuture

    在 SpringBoot 项目中,一个任务比较复杂,执行时间比较长,需要采用 多线程异步 的方式执行,从而缩短任务执行时间。 将任务拆分成多个独立的子任务,每个子任务在独立子线程中执行; 当所有子任务的子线程全部执行完成后,将几个子任务汇总,得到总任务的执行结果。

    2024年02月10日
    浏览(43)
  • 异步编程 - 06 基于JDK中的Future实现异步编程(中)_CompletableFuture源码解析

    CompletableFuture实现了CompletionStage接口 。 1)一个CompletionStage代表着一个异步计算节点,当另外一个CompletionStage计算节点完成后,当前CompletionStage会执行或者计算一个值;一个节点在计算终止时完成,可能反过来触发其他依赖其结果的节点开始计算。 2)一个节点(CompletionStag

    2024年02月09日
    浏览(26)
  • 从 Future 到 CompletableFuture:简化 Java 中的异步编程

    在并发编程中,我们经常需要处理多线程的任务,这些任务往往具有依赖性,异步性,且需要在所有任务完成后获取结果。Java 8 引入了 CompletableFuture 类,它带来了一种新的编程模式,让我们能够以函数式编程的方式处理并发任务,显著提升了代码的可读性和简洁性。 在这篇

    2024年02月11日
    浏览(27)
  • CompletableFuture与线程池:Java 8中的高效异步编程搭配

    摘要:在Java 8中,CompletableFuture和线程池的结合使用为程序员提供了一种高效、灵活的异步编程解决方案。本文将深入探讨CompletableFuture和线程池结合使用的优势、原理及实际应用案例,帮助读者更好地理解并掌握这一技术。 随着多核处理器的普及,应用程序的性能和响应能

    2024年02月07日
    浏览(48)
  • 并发编程 | 从Future到CompletableFuture - 简化 Java 中的异步编程

    在并发编程中,我们经常需要处理多线程的任务,这些任务往往具有依赖性,异步性,且需要在所有任务完成后获取结果。Java 8 引入了 CompletableFuture 类,它带来了一种新的编程模式,让我们能够以函数式编程的方式处理并发任务,显著提升了代码的可读性和简洁性。 在这篇

    2024年02月13日
    浏览(35)
  • CompletableFuture异步编程事务及多数据源配置详解(含gitee源码)

    仓库地址: buxingzhe: 一个多数据源和多线程事务练习项目 小伙伴们在日常编码中经常为了提高程序运行效率采用多线程编程,在不涉及事务的情况下,使用dou.lea大神提供的CompletableFuture异步编程利器,它提供了许多优雅的api,我们可以很方便的进行异步多线程编程,速度杠杠

    2024年01月22日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包