前言
CompletableFuture是Java 8中引入的一个功能强大的Future实现类,它的字面翻译是“可完成的Future”。 CompletableFuture对并发编程进行了增强,可以方便地将多个有一定依赖关系的异步任务以流水线的方式组合在一起,大大简化多异步任务的开发。
CompletableFuture实现了两个接口,一个是Future,另一个是CompletionStage,Future表示异步任务的结果,而CompletionStage字面意思是完成阶段,多个CompletionStage可以以流水线的方式组合起来,对于其中一个CompletionStage,它有一个计算任务,但可能需要等待其他一个或多个阶段完成才能开始,它完成后,可能会触发其他阶段开始运行。
CompletableFuture的设计主要是为了解决Future的阻塞问题,并提供了丰富的API来支持函数式编程和流式编程,可以更方便地组合多个异步任务,并处理它们的依赖关系和异常。这使得它在处理并发编程和异步编程时非常有用。
在使用CompletableFuture时,可以创建它的实例,并通过其提供的各种方法(如thenApply、thenCompose、thenAccept等)来定义任务之间的依赖关系和执行顺序。同时,CompletableFuture还提供了异常处理机制,可以更方便地处理任务执行过程中可能出现的异常。
一、CompletableFuture基本用法
- 静态方法supplyAsync
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
方法接受两个参数supplier和executor,使用executor执行supplier表示的任务,返回一个CompletableFuture,调用后,任务被异步执行,这个方法立即返回。
supplyAsync还有一个不带executor参数的方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
没有executor,任务被谁执行呢?与系统环境和配置有关,一般来说,如果可用的CPU核数大于2,会使用Java 7引入的Fork/Join任务执行服务,即ForkJoinPool.commonPool(),该任务执行服务背后的工作线程数一般为CPU核数减1,即Runtime.getRuntime().availableProcessors()-1,否则,会使用ThreadPerTaskExecutor,它会为每个任务创建一个线程。
对于CPU密集型的运算任务,使用Fork/Join任务执行服务是合适的,但对于一般的调用外部服务的异步任务,Fork/Join可能是不合适的,因为它的并行度比较低,可能会让本可以并发的多任务串行运行,这时,应该提供Executor参数。文章来源:https://www.toymoban.com/news/detail-845619.html
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Supplier;
public class CompletableFutureDemo {
private static ExecutorService executor = Executors.newFixedThreadPool(10);
private static Random rnd = new Random();
static int delayRandom(int min, int max) {
int milli = max > min ? rnd.nextInt(max - min) : 0;
try {
Thread.sleep(min + milli);
} catch (InterruptedException e) {
}
return milli;
}
static Callable<Integer> externalTask = () -> {
int time = delayRandom(20, 2000);
return time;
};
public static void master() {
Future<Integer> asyncRet = callExternalService();
try {
Integer ret = asyncRet.get();
System.out.println(ret);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
public static CompletableFuture<Integer> callExternalService(){
Supplier<Integer> supplierTask = () -> {
int time = delayRandom(20, 2000);
return time;
};
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(supplierTask);
return future;
}
public static void main(String[] args) throws Exception{
// master();
CompletableFuture<Integer> integerFuture = callExternalService();
boolean done = integerFuture.isDone();
Integer now = integerFuture.getNow(0);
System.out.println(now);
Integer result = integerFuture.get();
System.out.println(result);
now = integerFuture.getNow(0);
System.out.println(now);
// executor.shutdown();
}
}
二、使用CompletableFuture来调度执行由JSON串定义的DAG
在这个例子中,我们创建了四个任务:A、B、C 和 D。任务B依赖于任务A的结果,而任务D依赖于任务B和任务C的结果。我们使用thenApplyAsync来创建依赖链,并使用CompletableFuture.allOf来等待多个任务的完成。最后,我们使用get方法来获取结果。文章来源地址https://www.toymoban.com/news/detail-845619.html
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
public class DagScheduler {
public static void main(String[] args) throws ExecutionException, InterruptedException {
String dagJson = "{\"nodes\": [{\"id\": \"A\", \"task\": \"printA\"}, {\"id\": \"B\", \"task\": \"printB\", \"dependencies\": [\"A\"]}, {\"id\": \"C\", \"task\": \"printC\", \"dependencies\": [\"A\"]}, {\"id\": \"D\", \"task\": \"printD\", \"dependencies\": [\"B\", \"C\"]}]}";
Map<String, CompletableFuture<Void>> futures = new HashMap<>();
// 解析JSON串
JSONObject dagObject = JSONObject.parseObject(dagJson);
JSONArray nodesArray = dagObject.getJSONArray("nodes");
// 创建一个映射,用于存储每个节点的CompletableFuture
Map<String, Node> nodeMap = new HashMap<>();
for (int i = 0; i < nodesArray.size(); i++) {
JSONObject nodeObj = nodesArray.getJSONObject(i);
String id = nodeObj.getString("id");
List<String> dependencies = new ArrayList<>();
if (nodeObj.containsKey("dependencies")) {
dependencies = nodeObj.getJSONArray("dependencies").toJavaList(String.class);
}
Node node = new Node(id, () -> executeTask(id), dependencies);
nodeMap.put(id, node);
}
// 构建依赖关系并启动任务
for (Node node : nodeMap.values()) {
node.start(futures, nodeMap);
}
// 等待所有任务完成
CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])).get();
System.out.println("All tasks completed.");
}
private static Void executeTask(String taskId) {
// 执行任务的具体逻辑
System.out.println("Executing task: " + taskId);
// 模拟任务执行时间
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
return null;
}
static class Node {
private final String id;
private final Supplier<Void> task;
private final List<String> dependencies;
private CompletableFuture<Void> future;
public Node(String id, Supplier<Void> task, List<String> dependencies) {
this.id = id;
this.task = task;
this.dependencies = dependencies;
}
public void start(Map<String, CompletableFuture<Void>> futures, Map<String, Node> nodeMap) {
List<CompletableFuture<Void>> depFutures = new ArrayList<>();
for (String depId : dependencies) {
CompletableFuture<Void> depFuture = futures.get(depId);
if (depFuture == null) {
throw new IllegalStateException("Unknown dependency: " + depId);
}
depFutures.add(depFuture);
}
if (depFutures.isEmpty()) {
// 没有依赖,直接执行任务
future = CompletableFuture.supplyAsync( task);
} else {
// 等待所有依赖完成后执行任务
future = CompletableFuture.allOf(depFutures.toArray(new CompletableFuture[0])).thenRunAsync(()->executeTask(id));
}
futures.put(id, future);
}
}
}
到了这里,关于Java组合式异步编程CompletableFuture的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!