什么是CompletableFuture?
在Java中CompletableFuture用于异步编程,异步编程是编写非阻塞的代码,运行的任务在一个单独的线程,与主线程隔离,并且会通知主线程它的进度,成功或者失败。
在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。
使用这种并行方式,可以极大的提高程序的性能。
Future vs CompletableFuture
CompletableFuture 是 Future API的扩展。
Future 被用于作为一个异步计算结果的引用。提供一个 isDone()
方法来检查计算任务是否完成。当任务完成时,get()
方法用来接收计算任务的结果。
从 Callbale和 Future 教程可以学习更多关于 Future 知识.
Future API 是非常好的 Java 异步编程进阶,但是它缺乏一些非常重要和有用的特性。
CompletableFuture 实现了 Future
和 CompletionStage
接口,并且提供了许多关于创建,链式调用和组合多个 Future 的便利方法集,而且有广泛的异常处理支持。
之前项目中需要对一组集合进行处理,集合内的所有元素处理完后更新任务状态。当时经过询问得知可以用到CompletableFuture,于是经过短暂的研究写了一个很粗略的代码,如下:
public void calculateAvgSensorData() {
//0.获取同步锁
Boolean isLock = redisUtils.getLockWithExpire(LOCK_CAL_CABINET_AVG_ENV, LOCK_EXPIRE);
if (!isLock) {
log.warn("获取互斥锁: {}失败", LOCK_CAL_CABINET_AVG_ENV);
return;
}
//1.查询昨日每个机柜一整天的环境数据均值,如果有报警,则用报警数据
//1.1.获取当前日期
Date today = DateUtil.date();
//1.2.获取昨天日期
Date yesterday = DateUtil.offsetDay(today, DAY_OFFSET);
String queryDate = DateUtil.formatDate(yesterday);
//1.3.查询机房列表
List<BaseProps> roomList = roomService.getAllByUsed(1);
//1.4.查询传感器列表
List<Sensor> sensorList = sensorService.list();
if (CollectionUtil.isEmpty(roomList) || CollectionUtil.isEmpty(sensorList)) {
return;
}
//1.5.记录任务执行时间
AutoTask autoTask = autoTaskService.getById(AutoTaskId.CAL_CABINET_AVG_ENV_TASK);
autoTask.setTaskStatus(AutoTaskStatus.EXECUTING);
HistoryAutoTask historyAutoTask = BeanUtil.copyProperties(autoTask, HistoryAutoTask.class);
historyAutoTask.setExecuteTime(new Date());
//记录任务开始时间
historyAutoTaskService.startAutoTask(historyAutoTask);
//1.6.组装待查询的机房队列
roomList.forEach(room -> {
queue.add(room);
});
for(int i = 0; i < MAX_THREAD; i++) {
//创建异步执行任务
CompletableFuture cf = CompletableFuture.runAsync(()->{
do{
try {
queryCabinetEnv(sensorList, queryDate);
} catch (Exception e) {
log.error("插入机房环境数据均值失败,失败原因: {}", e);
historyAutoTask.setTaskStatus(AutoTaskStatus.FAILED);
historyAutoTaskService.updateAutoTask(historyAutoTask, AutoTaskStatus.EXECUTING);
}
}while(queue.size() > 0);
}, taskExecutor).whenComplete((res,excption)-> {
//3.记录定时任务执行状态
historyAutoTask.setTaskStatus(AutoTaskStatus.EXECUTED);
historyAutoTask.setFinishTime(new Date());
historyAutoTaskService.updateAutoTask(historyAutoTask, AutoTaskStatus.EXECUTING);
});
}
}
如上代码,虽然用到了线程池,也用到了CompletableFuture,不过一眼可以看出问题所在,就是在每个遍历结束后都会更新一次任务状态,这明显是不对的。明明在所有任务执行完后再执行一次任务状态更新就可以了,这里却每个线程执行完任务后更新一次任务状态。
对于如上代码,肯定是有更加优雅的写法的,再次经过深入学习后,写出以下例子仅供改造参考:
package com.tct.ii.ucr.task;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
* @author wl
* @date 2022/10/12
*/
@Slf4j
public class TestFuture {
public static CompletableFuture<String> printStr(String str, ExecutorService executorService) {
return CompletableFuture.supplyAsync(() -> {
log.info("str:{}", str);
return str;
}, executorService);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<String> list = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
List<CompletableFuture<String>> futureList = list.stream()
.map(str -> printStr(str, executorService)).collect(Collectors.toList());
CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
CompletableFuture<List<String>> resultFuture = allFuture.thenApply(v -> futureList.stream().map(future -> future.join()).collect(Collectors.toList()));
log.info("result:{}", resultFuture.get());
executorService.shutdown();
}
}
在多任务组合中,allOf:等待所有任务完成,anyOf:只要有一个任务完成。
测试结果如图:
文章来源:https://www.toymoban.com/news/detail-574932.html
可以看到这里既用到了线程池,最后调用allOf方法等待所有任务执行完后可以一次性获取结果,非常方便和优雅。 文章来源地址https://www.toymoban.com/news/detail-574932.html
到了这里,关于CompletableFuture解决多线程返回结果问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!