Java的CompletableFuture,Java的多线程开发

这篇具有很好参考价值的文章主要介绍了Java的CompletableFuture,Java的多线程开发。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

三、Java8的CompletableFuture,Java的多线程开发

0、Java线程工作内存介绍

Java的CompletableFuture,Java的多线程开发

  • 如下图:

Java的CompletableFuture,Java的多线程开发

1、CompletableFuture的常用方法

  • 以后用到再加
runAsync() :开启异步(创建线程执行任务),无返回值
supplyAsync() :开启异步(创建线程执行任务),有返回值
thenApply() :然后应用,适用于有返回值的结果,拿着返回值再去处理。
exceptionally():用于处理异步任务执行过程中出现异常的情况的一个方法:返回默认值或者一个替代的 CompletableFuture 对象,从而避免系统的崩溃或异常处理的问题。
handle():类似exceptionally()


get()  :阻塞线程:主要可以: ①获取线程中的异常然后处理异常、②设置等待时间
join() :阻塞线程:推荐使用  join()  方法,因为它没有受到 interrupt 的干扰,不需要捕获异常,也不需要强制类型转换。他自己会抛出异常。


CompletableFuture.allOf()
CompletableFuture.anyOf()
  • get() 和 join() 方法区别?
    • 都可以阻塞线程 —— 等所有任务都执行完了再执行后续代码。
CompletableFuture 中的  get()  和  join()  方法都用于获取异步任务的执行结果,但是在使用时需要注意以下几点区别: 
 
1. 抛出异常的方式不同:如果异步任务执行过程中出现异常, get()  方法会抛出 ExecutionException 异常,而  join()  方法会抛出 CompletionException 异常,这两个异常都是继承自 RuntimeException 的。 
 
2. 方法调用限制不同: join()  方法是不可以被中断的,一旦调用就必须等待任务执行完成才能返回结果;而  get()  方法可以在调用时设置等待的超时时间,如果超时还没有获取到结果,就会抛出 TimeoutException 异常。 
 
3. 返回结果类型不同: get()  方法返回的是异步任务的执行结果,该结果是泛型类型 T 的,需要强制转换才能获取真正的结果;而  join()  方法返回的是异步任务的执行结果,该结果是泛型类型 T,不需要强制转换。 
 
4. 推荐使用方式不同:推荐在 CompletableFuture 中使用  join()  方法,因为它没有受到 interrupt 的干扰,不需要捕获异常,也不需要强制类型转换。 
 
综上所述, get()  方法和  join()  方法都是获取异步任务的执行结果,但是在使用时需要根据具体场景选择使用哪个方法。如果需要获取执行结果并且不希望被中断,推荐使用  join()  方法;如果需要控制等待时间或者需要捕获异常,则可以使用  get()  方法。
  • anyOf() 和 allOf() 的区别?
CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,它支持链式调用、组合和转换异步操作等功能。其中,anyOf 和 allOf 都是 CompletableFuture 的两个常用方法,它们的区别如下: 
 
1. anyOf:任意一个 CompletableFuture 完成,它就会跟随这个 CompletableFuture 的结果完成,返回第一个完成的 CompletableFuture 的结果。 
 
2. allOf:所有的 CompletableFuture 都完成时,它才会跟随它们的结果完成,返回一个空的 CompletableFuture。 
 
简而言之,anyOf 和 allOf 的最大区别是:anyOf 任意一个 CompletableFuture 完成就跟着它的结果完成,而 allOf 所有的 CompletableFuture 完成才可以完成,并返回一个空的 CompletableFuture。 
 
举例来说,如果有三个 CompletableFuture:f1、f2、f3,其中 f1 和 f2 可能会返回一个字符串,而 f3 可能会返回一个整数,那么: 
 
- anyOf(f1, f2, f3) 的结果是 f1、f2、f3 中任意一个 CompletableFuture 的结果; 
- allOf(f1, f2, f3) 的结果是一个空的 CompletableFuture,它的完成状态表示 f1、f2、f3 是否全部完成。 
 
总之,anyOf 和 allOf 在实际使用中可以根据不同的需求来选择,它们都是 CompletableFuture 中非常强大的组合操作。

2、使用CompletableFuture

2.1、实体类准备

package com.cc.md.entity;

import lombok.Data;

/**
 * @author CC
 * @since 2023/5/24 0024
 */
@Data
public class UserCs {

    private String name;

    private Integer age;

}

2.2、常用方式

  • 无返回值推荐:开启多线程——无返回值的——阻塞:test06
    @Resource(name = "myIoThreadPool")
    private ThreadPoolTaskExecutor myIoThreadPool;
    
    //CompletableFuture开启多线程——无返回值的
    @Test
    public void test06() throws Exception {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        //循环,模仿很多任务
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                //第一批创建的线程数
                log.info("打印:{}", finalI);
                //模仿io流耗时
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }, myIoThreadPool);
            futures.add(future);
        }
        //阻塞:多线程的任务执行。相当于多线程执行完了,再执行后面的代码
        //如果不阻塞,上面的相当于异步执行了。
        //阻塞方式1:可以获取返回的异常、设置等待时间
//        futures.forEach(future -> {
//            try {
//                future.get();
//            } catch (Exception e) {
//                throw new RuntimeException(e);
//            }
//        });
        //阻塞方式2(推荐)
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
        log.info("打印:都执行完了。。。");
    }
  • 有返回值推荐:开启多线程——有返回值的,返回一个新的List——阻塞——使用stream流的map:test09
    • test07、test08 可以转化为 test09 (现在这个)
    • 可以返回任务类型的值,不一定要返回下面的user对象。
    @Resource(name = "myIoThreadPool")
    private ThreadPoolTaskExecutor myIoThreadPool;
    
    //CompletableFuture开启多线程——有返回值的,返回一个新的List——先有数据的情况——使用stream流的map
    //像这种,需要构建另一个数组的,相当于一个线程执行完了,会有返回值
    //使用stream流的map + CompletableFuture.supplyAsync()
    @Test
    public void test09() throws Exception {
        //先获取数据,需要处理的任务。
        List<UserCs> users = this.getUserCs();
        //莫法处理任务
        List<CompletableFuture<UserCs>> futures = users.stream()
                .map(user -> CompletableFuture.supplyAsync(() -> {
                    // 处理数据
                    user.setName(user.getName() + "-改");
                    log.info("打印-改:{}", user.getName());
                    // 其他的业务逻辑。。。

                    return user;
                }, myIoThreadPool)).collect(Collectors.toList());

        //获取futures
        List<UserCs> endList = futures.stream()
                //阻塞所有线程
                .map(CompletableFuture::join)
                //取age大于10的用户
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("打印:都执行完了。。。{}", endList);
    }

2.3、异常处理

  • exceptionally
  • handle
	//CompletableFuture 异常处理
    @Test
    public void test10() throws Exception {
        //先获取数据,需要处理的任务。
        List<UserCs> users = this.getUserCs();
        //莫法处理任务
        List<CompletableFuture<UserCs>> futures = users.stream()
                .map(user -> CompletableFuture.supplyAsync(() -> {
                        if (user.getAge() > 5){
                            int a = 1/0;
                        }
                        // 处理数据
                        user.setName(user.getName() + "-改");
                        log.info("打印-改:{}", user.getName());
                        // 其他的业务逻辑。。。

                        return user;
                    }, myIoThreadPool)
                    //处理异常方式1:返回默认值或者一个替代的 Future 对象,从而避免系统的崩溃或异常处理的问题。
                    .exceptionally(throwable -> {
                        //可以直接获取user
                        System.out.println("异常了:" + user);
                        //处理异常的方法……
                        //1还可以进行业务处理……比如将异常数据存起来,然后导出……
                        //2返回默认值,如:user、null
                        //return user;
                        //3抛出异常
                        throw new RuntimeException(throwable.getMessage());
                    })
                    //处理异常方式2:类似exceptionally(不推荐)
//                    .handle((userCs, throwable) -> {
//                        System.out.println("handle:" + user);
//                        if (throwable != null) {
//                            // 处理异常
//                            log.error("处理用户信息出现异常,用户名为:" + user.getName(), throwable);
//                            // 返回原始数据
//                            return userCs;
//                        } else {
//                            // 返回正常数据
//                            return userCs;
//                        }
//                    })
                )
                .collect(Collectors.toList());

        //获取futures
        List<UserCs> endList = futures.stream()
                //阻塞所有线程
                .map(CompletableFuture::join)
                //取age大于10的用户
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("打印:都执行完了。。。{}", endList);
    }

2.4、CompletableFuture的使用测试

1、推荐使用:test03、test05、test09、test10、test11

2、test07、test08就是test09的前身。


  • test01:获取当前电脑(服务器)的cpu核数

  • test02:线程池原始的使用(不推荐直接这样用)

  • test03:开启异步1 —— @Async

  • test04:开启异步2 —— CompletableFuture.runAsync()

  • test05:开启异步2的改造 —— CompletableFuture.runAsync() 和 supplyAsync() —— 阻塞所有异步方法,一起提交

    • 相当于开了3个线程去执行三个不同的方法,然后执行完后一起提交。
      
  • test052:开启异步2的改造 —— 第一个任务执行完了,获取到返回值,给后面的执行,可以连写,也可以单写。 —— 阻塞线程:get、join

  • test06:CompletableFuture开启多线程——无返回值的

  • test07:CompletableFuture开启多线程——无返回值的——构建一个新List

    • 1、相当于多线程执行任务,然后把结果插入到List中
      2、接收多线程的List必须是线程安全的,ArrayList线程不安全
         线程安全的List —— CopyOnWriteArrayList 替代 ArrayList
      
  • test08:CompletableFuture开启多线程——无返回值的——构建一个新List——先有数据的情况(基本和test07是一个方法)

  • test09:CompletableFuture开启多线程——有返回值的,返回一个新的List——先有数据的情况——使用stream流的map

  • test10:CompletableFuture 异常处理。相当于是 test09的增强,处理异常

  • test11:CompletableFuture 异常处理:如果出现异常就舍弃任务

    • 1、想了一下,出现异常后的任务确实没有执行下去了,任务不往下执行,怎么会发现异常呢?
      2、发现了异常任务也就完了。而且打印了异常,相当于返回了异常。
      3、未发生异常的任务会执行完成。如果发生异常都返回空,最后舍弃空的,就得到任务执行成功的 CompletableFuture
      

↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓所有方式↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓文章来源地址https://www.toymoban.com/news/detail-458654.html


package com.cc.md;

import com.cc.md.entity.UserCs;
import com.cc.md.service.IAsyncService;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@SpringBootTest
class Test01 {

    private static final Logger log = LoggerFactory.getLogger(Test01.class);

    @Resource(name = "myIoThreadPool")
    private ThreadPoolTaskExecutor myIoThreadPool;
    /**
     * 异步类
     */
    @Resource
    private IAsyncService asyncService;

    @Test
    void test01() {
        //获取当前jdk能调用的CPU个数(当前服务器的处理器个数)
        int i = Runtime.getRuntime().availableProcessors();
        System.out.println(i);
    }

    //线程池原始的使用
    @Test
    void test02() {
        try {
            for (int i = 0; i < 1000; i++) {
                int finalI = i;
                myIoThreadPool.submit(() -> {
                    //第一批创建的线程数
                    log.info("打印:{}", finalI);
                    //模仿io流耗时
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
            }
        }catch(Exception e){
            throw new RuntimeException(e);
        }finally {
            myIoThreadPool.shutdown();
        }
    }

    //开启异步1 —— @Async
    @Test
    public void test03() throws Exception {
        log.info("打印:{}", "异步测试的-主方法1");
        asyncService.async1();
        asyncService.async2();
        //不会等待异步方法执行,直接返回前端数据
        log.info("打印:{}", "异步测试的-主方法2");
    }

    //开启异步2 —— CompletableFuture.runAsync()
    @Test
    public void test04() throws Exception {
        log.info("打印:{}", "异步测试的-主方法1");
        CompletableFuture.runAsync(() -> {
            log.info("打印:{}", "异步方法1!");
            //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。
            this.async2("异步方法1!-end");
        }, myIoThreadPool);
        //不会等待异步方法执行,直接返回前端数据
        log.info("打印:{}", "异步测试的-主方法2");
    }

    //异步需要执行的方法,可以写在同一个类中。
    private void async2(String msg) {
        //模仿io流耗时
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.info("打印:{}", msg);
    }

    //开启异步2的改造 —— CompletableFuture.runAsync() 和 supplyAsync()  —— 阻塞所有异步方法,一起提交
    //相当于开了3个线程去执行三个不同的方法,然后执行完后一起提交。
    @Test
    public void test05() throws Exception {
        log.info("打印:{}", "异步测试的-主方法1");
        //异步执行1
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            log.info("打印:{}", "异步方法1!");
            //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。
            this.async2("异步方法1-end");
            return "异步方法1";
        }, myIoThreadPool);

        //异步执行2
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            log.info("打印:{}", "异步方法2!");
            //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。
            this.async2("异步方法2-end");
            return "异步方法2";
        }, myIoThreadPool);

        //异步执行3,不用我们自己的线程池 —— 用的就是系统自带的 ForkJoinPool 线程池
        CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
            log.info("打印:{}", "异步方法3!");
            //异步执行的代码,也可以是方法,该方法不用单独写到其他类中。
            this.async2("异步方法3-end");
        });

        //阻塞所有异步方法,一起提交后才走下面的代码
        CompletableFuture.allOf(future1, future2, future3).join();

        log.info("打印:{}", "异步-阻塞-测试的-主方法2-end");
    }

    //开启异步2的改造 —— 第一个任务执行完了,获取到返回值,给后面的执行,可以连写,也可以单写。 —— 阻塞线程:get、join
    // CompletableFuture 的 get 和 join 方法区别:
    // get:①可以获取线程中的异常、②设置等待时间
    // join:推荐在 CompletableFuture 中使用  join()  方法,因为它没有受到 interrupt 的干扰,不需要捕获异常,也不需要强制类型转换。
    @Test
    public void test052() throws Exception {
        log.info("打印:{}", "异步测试的-主方法1");
        //异步执行1
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            log.info("打印:{}", "异步方法1!");
            // 异步执行的代码,也可以是方法,该方法不用单独写到其他类中。
            String str = "异步方法1-end";
            this.async2(str);
            return str;
        }, myIoThreadPool);

        // 异步执行2 - 无返回值 —— 分开写的方式
        CompletableFuture<Void> future2 = future1.thenAccept(str1 -> {
            log.info("打印:{}", "异步方法2!");
            // 异步执行的代码,也可以是方法,该方法不用单独写到其他类中。
            this.async2(String.format("%s-加-异步方法2! - 无返回值 - ",str1));
        });

        // 异步执行3 - 有返回值 —— 分开写future1,连写future3方式
        CompletableFuture<String> future3 = future1.thenApply(str2 -> {
            log.info("打印:{}", "异步方法3!");
            // 异步执行的代码,也可以是方法,该方法不用单独写到其他类中。
            this.async2(String.format("%s-加-异步方法3! - 有返回值 - ", str2));
            return "异步执行3 - 有返回值 ";

            //连写的方式。
        }).thenApply(str3 -> {
            String format = String.format("%s- end", str3);
            log.error("异步3然后应用 - {}", format);
            //返回后面的应用
            return format;
        });
        // 获取future3的返回值:
        //如果需要捕获异常、设置等待超时时间,则用get
        log.info("future3的返回值(不阻塞):{}", future3.get());
//        log.info("future3的返回值(不阻塞-设置等待时间,超时报错:TimeoutException):{}",
//                future3.get(2, TimeUnit.SECONDS));
        //推荐使用 join方法
//        log.info("future3的返回值(阻塞):{}", future3.join());

        //阻塞所有异步方法,一起提交后才走下面的代码
        CompletableFuture.allOf(future1, future2).join();

        log.info("打印:{}", "异步-阻塞-测试的-主方法2-end");
    }

    //CompletableFuture开启多线程——无返回值的
    @Test
    public void test06() throws Exception {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        //循环,模仿很多任务
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                //第一批创建的线程数
                log.info("打印:{}", finalI);
                //模仿io流耗时
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }, myIoThreadPool);
            futures.add(future);
        }
        //阻塞:多线程的任务执行。相当于多线程执行完了,再执行后面的代码
        //如果不阻塞,上面的相当于异步执行了。
        //阻塞方式1:可以获取返回的异常、设置等待时间
//        futures.forEach(future -> {
//            try {
//                future.get();
//            } catch (Exception e) {
//                throw new RuntimeException(e);
//            }
//        });
        //阻塞方式2(推荐)
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
        log.info("打印:都执行完了。。。");
    }

    //CompletableFuture开启多线程——无返回值的——构建一个新List
    //相当于多线程执行任务,然后把结果插入到List中
    //接收多线程的List必须是线程安全的,ArrayList线程不安全
    //线程安全的List —— CopyOnWriteArrayList 替代 ArrayList
    @Test
    public void test07() throws Exception {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        //存数据的List
        List<UserCs> addList = new CopyOnWriteArrayList<>();
        //循环,模仿很多任务
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                log.info("打印:{}", finalI);
                UserCs userCs = new UserCs();
                userCs.setName(String.format("姓名-%s", finalI));
                userCs.setAge(finalI);
                addList.add(userCs);
            }, myIoThreadPool);
            futures.add(future);
        }
        //阻塞
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();

        //返回新的List:endList,取age大于10的用户
        List<UserCs> endList = addList.stream()
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("打印:都执行完了。。。{}", endList);
    }

    //CompletableFuture开启多线程——无返回值的——构建一个新List——先有数据的情况
    //用CopyOnWriteArrayList 替代 ArrayList接收
    @Test
    public void test08() throws Exception {
        //先获取数据,需要处理的任务。
        List<UserCs> users = this.getUserCs();
        //开启多线程
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        //存数据的List
        List<UserCs> addList = new CopyOnWriteArrayList<>();
        //莫法处理任务
        users.forEach(user -> {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                //添加数据
                user.setName(user.getName() + "-改");
                addList.add(user);

                log.info("打印-改:{}", user.getName());
                //其他的业务逻辑。。。

            }, myIoThreadPool);
            futures.add(future);
        });

        //阻塞
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();

        //返回新的List:endList
        List<UserCs> endList = addList.stream()
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("打印:都执行完了。。。{}", endList);
    }

    //CompletableFuture开启多线程——有返回值的,返回一个新的List——先有数据的情况——使用stream流的map
    //像这种,需要构建另一个数组的,相当于一个线程执行完了,会有返回值
    //使用stream流的map + CompletableFuture.supplyAsync()
    @Test
    public void test09() throws Exception {
        //先获取数据,需要处理的任务。
        List<UserCs> users = this.getUserCs();
        //莫法处理任务
        List<CompletableFuture<UserCs>> futures = users.stream()
                .map(user -> CompletableFuture.supplyAsync(() -> {
                    // 处理数据
                    user.setName(user.getName() + "-改");
                    log.info("打印-改:{}", user.getName());
                    // 其他的业务逻辑。。。

                    return user;
                }, myIoThreadPool)).collect(Collectors.toList());

        //获取futures
        List<UserCs> endList = futures.stream()
                //阻塞所有线程
                .map(CompletableFuture::join)
                //取age大于10的用户
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("打印:都执行完了。。。{}", endList);
    }

    //基础数据
    private List<UserCs> getUserCs() {
        List<UserCs> users = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            UserCs userCs = new UserCs();
            userCs.setName(String.format("姓名-%s", i));
            userCs.setAge(i);
            users.add(userCs);
        }
        return users;
    }

    //CompletableFuture 异常处理
    @Test
    public void test10() throws Exception {
        //先获取数据,需要处理的任务。
        List<UserCs> users = this.getUserCs();
        //莫法处理任务
        List<CompletableFuture<UserCs>> futures = users.stream()
                .map(user -> CompletableFuture.supplyAsync(() -> {
                        if (user.getAge() > 5){
                            int a = 1/0;
                        }
                        // 处理数据
                        user.setName(user.getName() + "-改");
                        log.info("打印-改:{}", user.getName());
                        // 其他的业务逻辑。。。

                        return user;
                    }, myIoThreadPool)
                    //处理异常方式1:返回默认值或者一个替代的 Future 对象,从而避免系统的崩溃或异常处理的问题。
                    .exceptionally(throwable -> {
                        //可以直接获取user
                        System.out.println("异常了:" + user);
                        //处理异常的方法……
                        //1还可以进行业务处理……比如将异常数据存起来,然后导出……
                        //2返回默认值,如:user、null
                        //return user;
                        //3抛出异常
                        throw new RuntimeException(throwable.getMessage());
                    })
                    //处理异常方式2:类似exceptionally(不推荐)
//                    .handle((userCs, throwable) -> {
//                        System.out.println("handle:" + user);
//                        if (throwable != null) {
//                            // 处理异常
//                            log.error("处理用户信息出现异常,用户名为:" + user.getName(), throwable);
//                            // 返回原始数据
//                            return userCs;
//                        } else {
//                            // 返回正常数据
//                            return userCs;
//                        }
//                    })
                )
                .collect(Collectors.toList());

        //获取futures
        List<UserCs> endList = futures.stream()
                //阻塞所有线程
                .map(CompletableFuture::join)
                //取age大于10的用户
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("打印:都执行完了。。。{}", endList);
    }

    //CompletableFuture 异常处理:如果出现异常就舍弃任务。
    // 想了一下,出现异常后的任务确实没有执行下去了,任务不往下执行,怎么会发现异常呢?
    // 发现了异常任务也就完了。而且打印了异常,相当于返回了异常。
    // 未发生异常的任务会执行完成。如果发生异常都返回空,最后舍弃空的,就得到任务执行成功的 CompletableFuture
    @Test
    public void test11() {
        List<UserCs> users = getUserCs();
        List<CompletableFuture<UserCs>> futures = users.stream()
                .map(user -> CompletableFuture.supplyAsync(() -> {
                            if (user.getAge() > 15) {
                                int a = 1 / 0;
                            }
                            user.setName(user.getName() + "-改");
                            log.info("打印-改:{}", user.getName());
                            return user;
                        }, myIoThreadPool)
                        //处理异常
                        .exceptionally(throwable -> {
                            //其他处理异常的逻辑

                            return null;
                        })
                )
                //舍弃返回的对象是null的 CompletableFuture
                .filter(e -> Objects.nonNull(e.join())).collect(Collectors.toList());

        //获取futures
        List<UserCs> endList = futures.stream()
                //阻塞所有线程
                .map(CompletableFuture::join)
                //取age大于10的用户
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("打印:都执行完了。。。{}", endList);

    }

}

到了这里,关于Java的CompletableFuture,Java的多线程开发的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java8中DateTimeFormatter真的是线程安全的吗?

    Java8中DateTimeFormatter真的是线程安全的吗? 答案是否定的   由于之前写了一个旷世的ocr的服务,接入了旷世的FaceID的人脸比对的接口,然后就在写代码的过程中就遇到了这个奇怪的bug,旷世的FaceId的人脸识别的接口文档如下:   说实话,旷世的产品真的是太难用了,光说

    2024年02月03日
    浏览(32)
  • JAVA-- 在Java8 Parallel Stream中如何自定义线程池?

    使用Parallel Stream时,在适当的环境中,通过适当地使用并行度级别,可以在某些情况下获得性能提升。 如果程序创建一个自定义ThreadPool,必须记住调用它的shutdown()方法来避免内存泄漏。 如下代码示例,Parallel Stream并行处理使用的线程池是ForkJoinPool.commonPool(),这个线程池是

    2024年02月09日
    浏览(43)
  • JAVA的多线程及并发

    继承 Thread 类; 实现 Runnable 接口; 实现 Callable 接口通过 FutureTask 包装器来创建 Thread 线程; 使 用 ExecutorService 、 Callable 、 Future 实 现 有 返 回 结 果 的多 线 程 ( 也 就 是 使 用 了 ExecutorService 来管理前面的三种方式)。 1、使用退出标志,使线程正常退出,也就是当 run

    2024年03月13日
    浏览(50)
  • 什么是Java的多线程?

    Java的多线程是指在同一时间内,一个程序中同时运行多个线程。每个线程都是一个独立的执行路径,可以独立地执行代码。Java中的多线程机制使得程序可以更高效地利用计算机的多核处理器和CPU时间,从而提高程序的性能和响应能力。 创建和使用Java多线程通常需要以下几个

    2024年02月02日
    浏览(32)
  • “深入理解Java的多线程编程“

    多线程编程是指在一个程序中同时运行多个线程,以提高程序的并发性和性能。Java是一门支持多线程编程的强大编程语言,提供了丰富的多线程相关类和接口。 在Java中,可以通过以下方式实现多线程编程: 继承Thread类:创建一个继承自Thread类的子类,并重写run()方法,在

    2024年02月13日
    浏览(67)
  • 【JavaEE】Java中的多线程 (Thread类)

    作者主页: paper jie_博客 本文作者:大家好,我是paper jie,感谢你阅读本文,欢迎一建三连哦。 本文录入于《JavaEE》专栏,本专栏是针对于大学生,编程小白精心打造的。笔者用重金(时间和精力)打造,将基础知识一网打尽,希望可以帮到读者们哦。 其他专栏:《MySQL》《

    2024年02月05日
    浏览(54)
  • 深入浅出Java的多线程编程——第二篇

    目录 前情回顾 1. 中断一个线程 1.1 中断的API 1.2 小结 2. 等待一个线程  2.1 等待的API 3. 线程的状态 3.1 贯彻线程的所有状态 3.2 线程状态和状态转移的意义 4. 多线程带来的的风险-线程安全 (重点) 4.1 观察线程不安全 4.2 线程安全的概念 4.3 线程不安全的原因 4.3.1 修改共享数据

    2024年02月07日
    浏览(82)
  • Java 8并发集合:安全高效的多线程集合

    在多线程环境中,使用线程安全的数据结构非常重要,以避免竞态条件和数据不一致的问题。Java 8引入了一些并发集合类,提供了安全高效的多线程集合操作。本教程将介绍Java 8中的并发集合类,包括ConcurrentHashMap、ConcurrentLinkedQueue、ConcurrentSkipListSet和CopyOnWriteArrayList。 Conc

    2024年02月04日
    浏览(55)
  • 针对java中list.parallelStream()的多线程数据安全问题我们采用什么方法最好呢?

    当使用List.parallelStream()方法进行多线程处理时,可能会涉及到数据安全问题。下面是一些常见的方法来处理parallelStream()的多线程数据安全问题: 1. 使用线程安全的集合:Java中提供了线程安全的集合类,如CopyOnWriteArrayList和synchronizedList等。可以将原始的List转换为线程安全的集

    2024年02月10日
    浏览(41)
  • CompletableFuture结合线程池初步使用

    CompletableFuture 是 Java 8 引入的一个类,用于支持异步编程和函数式编程。CompletableFuture 的优点包括: 异步编程:CompletableFuture 支持异步编程,可以在异步任务完成之前继续执行其他任务,从而提高程序的效率和吞吐量。 链式调用:CompletableFuture 提供了丰富的方法来支持链式调

    2024年02月09日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包