【业务功能篇92】微服务-springcloud-多线程-异步处理-异步编排-CompletableFutrue

这篇具有很好参考价值的文章主要介绍了【业务功能篇92】微服务-springcloud-多线程-异步处理-异步编排-CompletableFutrue。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

三、CompletableFutrue

一个商品详情页

  • 展示SKU的基本信息 0.5s
  • 展示SKU的图片信息 0.6s
  • 展示SKU的销售信息 1s
  • spu的销售属性 1s
  • 展示规格参数 1.5s
  • spu详情信息 1s

1.ComplatableFuture介绍

  Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用 isDone方法检查计算是否完成,或者使用 get阻塞住调用线程,直到计算完成返回结果,你也可以使用 cancel方法停止任务的执行。

  虽然 Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

  很多语言,比如Node.js,采用回调的方式实现异步编程。Java的一些框架,比如Netty,自己扩展了Java的 Future接口,提供了 addListener等多个扩展方法;Google guava也提供了通用的扩展Future;Scala也提供了简单易用且功能强大的Future/Promise异步编程模式。

  作为正统的Java类库,是不是应该做点什么,加强一下自身库的功能呢?

  在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

  CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过 get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。

  CompletableFuture和FutureTask同属于Future接口的实现类,都可以获取线程的执行结果。

【业务功能篇92】微服务-springcloud-多线程-异步处理-异步编排-CompletableFutrue,Spring boot,Java,Spring cloud,spring cloud,微服务,多线程,异步处理,异步编排

2.创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

方法分为两类:

  • runAsync 没有返回结果
  • supplyAsync 有返回结果
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5
            ,50
            ,10
            , TimeUnit.SECONDS
            ,new LinkedBlockingQueue<>(100)
            , Executors.defaultThreadFactory()
            ,new ThreadPoolExecutor.AbortPolicy()
    );

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        System.out.println("main -- 线程开始了...");
        // 获取CompletableFuture对象
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("线程开始了...");
            int i = 100/50;
            System.out.println("线程结束了...");
        },executor);
        System.out.println("main -- 线程结束了...");

        System.out.println("------------");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程开始了...");
            int i = 100 / 50;
            System.out.println("线程结束了...");
            return i;
        }, executor);
        System.out.println("获取的线程的返回结果是:" + future.get() );
    }

3.whenXXX和handle方法

  当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);

public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) ;
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) ;
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) ;

相关方法的说明:

  • whenComplete 可以获取异步任务的返回值和抛出的异常信息,但是不能修改返回结果
  • execptionlly 当异步任务跑出了异常后会触发的方法,如果没有抛出异常该方法不会执行
  • handle 可以获取异步任务的返回值和抛出的异常信息,而且可以显示的修改返回的结果
/**
 * CompletableFuture的介绍
 */
public class CompletableFutureDemo2 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5
            ,50
            ,10
            , TimeUnit.SECONDS
            ,new LinkedBlockingQueue<>(100)
            , Executors.defaultThreadFactory()
            ,new ThreadPoolExecutor.AbortPolicy()
    );

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程开始了...");
            int i = 100 / 5;
            System.out.println("线程结束了...");
            return i;
        }, executor).handle((res,exec)->{
            System.out.println("res = " + res + ":exec="+exec);
            return res * 10;
        });
        // 可以处理异步任务之后的操作
        System.out.println("获取的线程的返回结果是:" + future.get() );
    }

 /*   public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程开始了...");
            int i = 100 / 5;
            System.out.println("线程结束了...");
            return i;
        }, executor).whenCompleteAsync((res,exec)->{
            System.out.println("res = " + res);
            System.out.println("exec = " + exec);
        }).exceptionally((res)->{ // 在异步任务显示的抛出了异常后才会触发的方法
            System.out.println("res = " + res);
            return 10;
        });
        // 可以处理异步任务之后的操作
        System.out.println("获取的线程的返回结果是:" + future.get() );
    }*/

/*    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程开始了...");
            int i = 100 / 0;
            System.out.println("线程结束了...");
            return i;
        }, executor).whenCompleteAsync((res,exec)->{
            System.out.println("res = " + res);
            System.out.println("exec = " + exec);
        });
        // 可以处理异步任务之后的操作
        System.out.println("获取的线程的返回结果是:" + future.get() );
    }*/
}

4.线程串行方法

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作

带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
/**
 * CompletableFuture的介绍
 */
public class CompletableFutureDemo3 {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5
            ,50
            ,10
            , TimeUnit.SECONDS
            ,new LinkedBlockingQueue<>(100)
            , Executors.defaultThreadFactory()
            ,new ThreadPoolExecutor.AbortPolicy()
    );

    /**
     * 线程串行的方法
     * thenRun:在前一个线程执行完成后,开始执行,不会获取前一个线程的返回结果,也不会返回信息
     * thenAccept:在前一个线程执行完成后,开始执行,获取前一个线程的返回结果,不会返回信息
     * thenApply: 在前一个线程执行完成后。开始执行,获取前一个线程的返回结果,同时也会返回信息
     * @param args
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程开始了..." + Thread.currentThread().getName());
            int i = 100 / 5;
            System.out.println("线程结束了..." + Thread.currentThread().getName());
            return i;
        }, executor).thenApply(res -> {
            System.out.println("res = " + res);
            return res * 100;
        });
        // 可以处理异步任务之后的操作
        System.out.println("获取的线程的返回结果是:" + future.get() );
    }
    /*public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程开始了..." + Thread.currentThread().getName());
            int i = 100 / 5;
            System.out.println("线程结束了..." + Thread.currentThread().getName());
            return i;
        }, executor).thenAcceptAsync(res -> {
            System.out.println(res + ":" + Thread.currentThread().getName());
        }, executor);
        // 可以处理异步任务之后的操作
        //System.out.println("获取的线程的返回结果是:" + future.get() );
    }*/
    /*public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程开始了..."+Thread.currentThread().getName());
            int i = 100 / 5;
            System.out.println("线程结束了..."+Thread.currentThread().getName());
            return i;
        }, executor).thenRunAsync(() -> {
            System.out.println("线程开始了..."+Thread.currentThread().getName());
            int i = 100 / 5;
            System.out.println("线程结束了..."+Thread.currentThread().getName());
        }, executor);
        // 可以处理异步任务之后的操作
        //System.out.println("获取的线程的返回结果是:" + future.get() );
    }*/

}

5.两个都完成

  上面介绍的相关方法都是串行的执行,接下来看看需要等待两个任务执行完成后才会触发的几个方法

  • thenCombine :可以获取前面两线程的返回结果,本身也有返回结果
  • thenAcceptBoth:可以获取前面两线程的返回结果,本身没有返回结果
  • runAfterBoth:不可以获取前面两线程的返回结果,本身也没有返回结果
/**
     * @param args
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1 线程开始了..." + Thread.currentThread().getName());
            int i = 100 / 5;
            System.out.println("任务1 线程结束了..." + Thread.currentThread().getName());
            return i;
        }, executor);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2 线程开始了..." + Thread.currentThread().getName());
            int i = 100 /10;
            System.out.println("任务2 线程结束了..." + Thread.currentThread().getName());
            return i;
        }, executor);

        // runAfterBothAsync 不能获取前面两个线程的返回结果,本身也没有返回结果
        CompletableFuture<Void> voidCompletableFuture = future1.runAfterBothAsync(future2, () -> {
            System.out.println("任务3执行了");
        },executor);

        // thenAcceptBothAsync 可以获取前面两个线程的返回结果,本身没有返回结果
        CompletableFuture<Void> voidCompletableFuture1 = future1.thenAcceptBothAsync(future2, (f1, f2) -> {
            System.out.println("f1 = " + f1);
            System.out.println("f2 = " + f2);
        }, executor);

        // thenCombineAsync: 既可以获取前面两个线程的返回结果,同时也会返回结果给阻塞的线程
        CompletableFuture<String> stringCompletableFuture = future1.thenCombineAsync(future2, (f1, f2) -> {
            return f1 + ":" + f2;
        }, executor);

        // 可以处理异步任务之后的操作
        System.out.println("获取的线程的返回结果是:" + stringCompletableFuture.get() );
    }

6.两个任务完成一个

  在上面5个基础上我们来看看两个任务只要有一个完成就会触发任务3的情况

  • runAfterEither:不能获取完成的线程的返回结果,自身也没有返回结果
  • acceptEither:可以获取线程的返回结果,自身没有返回结果
  • applyToEither:既可以获取线程的返回结果,自身也有返回结果
/**
     * @param args
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1 线程开始了..." + Thread.currentThread().getName());
            int i = 100 / 5;
            System.out.println("任务1 线程结束了..." + Thread.currentThread().getName());
            return i;
        }, executor);
        CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2 线程开始了..." + Thread.currentThread().getName());
            int i = 100 /10;
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2 线程结束了..." + Thread.currentThread().getName());
            return i+"";
        }, executor);
        // runAfterEitherAsync 不能获取前面完成的线程的返回结果,自身也没有返回结果
        future1.runAfterEitherAsync(future2,()->{
            System.out.println("任务3执行了....");
        },executor);

        // acceptEitherAsync 可以获取前面完成的线程的返回结果  自身没有返回结果
        future1.acceptEitherAsync(future2,(res)->{
            System.out.println("res = " + res);
        },executor);

        // applyToEitherAsync 既可以获取完成任务的线程的返回结果  自身也有返回结果
        CompletableFuture<String> stringCompletableFuture = future1.applyToEitherAsync(future2, (res) -> {
            System.out.println("res = " + res);
            return res + "-->OK";
        }, executor);
        // 可以处理异步任务之后的操作
        System.out.println("获取的线程的返回结果是:" + stringCompletableFuture.get() );
    }

7.多任务组合

allOf:等待所有任务完成

anyOf:只要有一个任务完成文章来源地址https://www.toymoban.com/news/detail-687118.html

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
/**
     * @param args
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1 线程开始了..." + Thread.currentThread().getName());
            int i = 100 / 5;
            System.out.println("任务1 线程结束了..." + Thread.currentThread().getName());
            return i;
        }, executor);
        CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务2 线程开始了..." + Thread.currentThread().getName());
            int i = 100 /10;
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务2 线程结束了..." + Thread.currentThread().getName());
            return i+"";
        }, executor);

        CompletableFuture<Object> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务3 线程开始了..." + Thread.currentThread().getName());
            int i = 100 /10;
            System.out.println("任务3 线程结束了..." + Thread.currentThread().getName());
            return i+"";
        }, executor);

        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
        anyOf.get();
        System.out.println("主任务执行完成..." + anyOf.get());

        CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
        allOf.get();// 阻塞在这个位置,等待所有的任务执行完成
        System.out.println("主任务执行完成..." + future1.get() + " :" + future2.get() + " :" + future3.get());
    }

到了这里,关于【业务功能篇92】微服务-springcloud-多线程-异步处理-异步编排-CompletableFutrue的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【业务功能100】补充代码【业务功能88】微服务-springcloud-分布式锁-redis-redisson-springcache

    采用redisson做分布式锁,完成数据的查询接口功能getCatelog2JSONRedis 原先从mysql数据库查询的效率较低,现在将部分固定数据展示比如页面的树形栏目信息等,存储到 redis缓存 ,然后基于分布式集群,需要结合本地锁(synchronized )与分布式锁(redissonClient.getLock(“catelog2JSON-lock”

    2024年02月09日
    浏览(46)
  • 【业务功能篇85】微服务-springcloud-Nginx-反向代理-网关

    Nginx域名 在c:/window/system32/drivers/etc/hosts文件,我们在这个文件中添加 注意如果是没有操作权限,那么点击该文件右击属性,去掉只读属性即可 通过这个域名访问到Nginx服务 nginx.cof是全局配置文件 /mydata/nginx/conf/nginx.cof 文件中最后配置了一个信息 include /etc/nginx/conf.d/*.conf 表示

    2024年02月10日
    浏览(64)
  • 【业务功能篇90】微服务-springcloud-检索服务-ElasticSearch实战运用-DSL语句

      商品检索页面我们放在search服务中处理,首页我们需要在mall-search服务中支持Thymeleaf。添加对应的依赖 然后我们拷贝模板文件到template目录下,然后不要忘记添加Thymeleaf的名称空间 需要把相关的静态资源文件拷贝到Nginx服务中。目录结构是:/mydata/nginx/html/static/search/ 我们

    2024年02月10日
    浏览(56)
  • 【业务功能109】微服务-springcloud-springboot-Skywalking-链路追踪-监控

    skywalking 是一个apm系统,包含监控,追踪,并拥有故障诊断能力的 分布式 系统   Skywalking是由国内开源爱好者吴晟开源并提交到Apache孵化器的产品,它同时吸收了Zipkin /Pinpoint /CAT 的设计思路。特点是:支持多种插件,UI功能较强,支持非侵入式埋点。目前使用厂商最多,版

    2024年02月08日
    浏览(59)
  • 【业务功能篇94】微服务-springcloud-springboot-认证服务-注册功能-第三方短信验证API

      结合我们前面介绍的商城的架构我们需要单独的搭建一个认证服务。   首先创建一个SpringBoot项目,然后添加对应的依赖   我们需要把认证服务注册到Nacos中,添加对应的依赖,然后完成对应的配置 放开Nacos注册中心 然后启动测试   然后我们整理登录和注册的相关

    2024年02月09日
    浏览(56)
  • 【业务功能篇99】微服务-springcloud-springboot-电商订单模块-生成订单服务-锁定库存

    一个是需要生成订单信息一个是需要生成订单项信息。具体的核心代码为 锁定库存的操作,需要操作ware仓储服务。 没有库存或者锁定库存失败我们通过自定义的异常抛出 如果下订单操作成功(订单数据和订单项数据)我们就会操作锁库存的行为 锁定库存失败通过抛异常来

    2024年02月09日
    浏览(45)
  • 【业务功能篇84】微服务SpringCloud-ElasticSearch-Kibanan-电商实例应用

    ElasticSearch实现商城系统中全文检索的流程。 商品的映射关系 如上的例子,netsted类型,避免了我们properties属性里的三个字段值同类型,他们的内容不会错乱匹配 参考官网地址:https://www.elastic.co/guide/en/elasticsearch/reference/7.4/nested.html 3.1 创建ESModel 点击上架功能传递spuId到后台

    2024年02月10日
    浏览(60)
  • 【业务功能篇82】微服务SpringCloud-ElasticSearch-Kibanan-docke安装-进阶实战

    https://www.elastic.co/guide/en/elasticsearch/reference/7.4/getting-started-search.html 在ElasticSearch中支持两种检索方式 通过使用REST request URL 发送检索参数(uri+检索参数) 通过使用 REST request body 来发送检索参数 (uri+请求体) 第一种方式 GET bank/_search # 检索bank下的所有信息,包括 type 和 docs GET ba

    2024年02月11日
    浏览(42)
  • 【业务功能篇81】微服务SpringCloud-ElasticSearch-Kibanan-docker安装-入门实战

      ES 是一个开源的 高扩展的分布式全文搜索引擎 ,是整个Elastic Stack技术栈的核心。它可以近乎实时的存储,检索数据;本身扩展性很好,可以扩展到上百台服务器,处理PB级别的数据。   ElasticSearch的底层是开源库Lucene,但是你没办法直接用Lucene,必须自己写代码去调用

    2024年02月02日
    浏览(48)
  • 【业务功能篇81】微服务SpringCloud-ElasticSearch-Kibanan-docke安装-入门实战

      ES 是一个开源的 高扩展的分布式全文搜索引擎 ,是整个Elastic Stack技术栈的核心。它可以近乎实时的存储,检索数据;本身扩展性很好,可以扩展到上百台服务器,处理PB级别的数据。   ElasticSearch的底层是开源库Lucene,但是你没办法直接用Lucene,必须自己写代码去调用

    2024年02月11日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包