runAsync 和 supplyAsync
- runAsync(runnable):无返回值
- runAsync(runnable, executor):无返回值,可自定义线程池
- supplyAsync(runnable):有返回值
- supplyAsync(runnable, executor):有回值,可自定义线程池
相关代码演示:
public static void testOne(){
CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> {
System.out.println("start1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end1");
});
CompletableFuture<String> twoFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end2");
return "this is twoFuture";
});
CompletableFuture.allOf(oneFuture, twoFuture).join();
System.out.println(oneFuture.join());
System.out.println(twoFuture.join());
}
start1
start2
end1
end2
null
this is twoFuture
解析:oneFuture.join()获取的执行结果为null,因为runAsync是没有返回结果的。
allOf 和 anyOf
- allOf(future1,future2,future3…):等待所有future任务都完成,才可以做接下来的事。无返回值
- anyOf(future1,future2,future3…):任意一个任务完成,就可以做接下来的事。返回object
allOf用法示例:
public static void testTwo(){
long startTime = System.currentTimeMillis();
CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> {
System.out.println("start1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end1");
});
CompletableFuture<String> twoFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end2");
return "this is twoFuture";
});
CompletableFuture<Integer> threeFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start3");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end3");
return 100;
});
CompletableFuture.allOf(oneFuture, twoFuture, threeFuture).join();
System.out.println(twoFuture.join() + threeFuture.join());
System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
}
start1
start2
start3
end1
end3
end2
this is twoFuture100
cost:2067ms
解析:allOf后的join起阻塞主线程作用。从结果可以看出,所有future执行完成后,再执行的主线程逻辑。
anyOf用法示例:
public static void testThree(){
long startTime = System.currentTimeMillis();
CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> {
System.out.println("start1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end1");
});
CompletableFuture<String> twoFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end2");
return "this is twoFuture";
});
CompletableFuture<Integer> threeFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start3");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end3");
return 100;
});
Object result = CompletableFuture.anyOf(oneFuture, twoFuture, threeFuture).join();
System.out.println("result:" + result);
System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
}
start1
start2
start3
end1
result:null
cost:1058ms
解析:oneFuture 最先完成,因为没有返回值,所以获得的结果是null
join 和 get
都是用于获取Completable的返回值的
- join方法可能会抛出未检验的异常
- get方法强制用户手动处理异常
whenComplete 和 whenCompleteAsync 和 exceptionally
- whenComplete:执行当前线程的任务继续执行whenComplete的任务
- whenCompleteAsync:whenCompleteAsync的任务是由线程池来执行
- CompleableFuture即使发生异常也会执行whenComplete、whenCompleteAsync
- exceptionally是用来处理异常的
以whenComplete举例
正常逻辑:
public static void testFour() {
long startTime = System.currentTimeMillis();
CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end1");
return 100;
}).whenComplete((res, e) ->{
System.out.println("res:" + res);
System.out.println("e:" + e);
}).exceptionally((e) ->{
System.out.println("error:" + e);
return -1;
});
System.out.println("result:" + oneFuture.join());
System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
}
start1
end1
res:100
e:null
result:100
cost:1084ms
捕获和处理异常:
public static void testFive() {
long startTime = System.currentTimeMillis();
CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end1");
return 100/0;
}).whenComplete((res, e) ->{
System.out.println("res:" + res);
System.out.println("e:" + e);
}).exceptionally((e) ->{
System.out.println("error:" + e);
return -1;
});
System.out.println("result:" + oneFuture.join());
System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
}
start1
end1
res:null
e:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
error:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
result:-1
cost:1073ms
handle 和 handleAsync
- handle和handleAsync的区别是后者用线程池管理
- handle相当于whenComplete和exceptionally的组合,能够对异常捕获和处理
handle捕获和处理异常:文章来源:https://www.toymoban.com/news/detail-444849.html
public static void testSix() {
long startTime = System.currentTimeMillis();
CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end1");
return 100/0;
}).handle((res, e) ->{
System.out.println("res:" + res);
System.out.println("e:" + e);
return -1;
});
System.out.println("result:" + oneFuture.join());
System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
}
start1
end1
res:null
e:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
result:-1
cost:1081ms
串行编排
- 该模块用到的api都有普通版和async版本,这里不做赘述。async版本可以传入线程池,用线程池管理逻辑。
runAsync().thenRunAsync()
- runAsync没有返回值,thenRunAsync也没有返回值
public static void testSeven(){
long startTime = System.currentTimeMillis();
CompletableFuture<Void> oneFuture = CompletableFuture.runAsync(() -> {
System.out.println("start1");
System.out.println("end1");
}).thenRunAsync(() ->{
System.out.println("do something");
});
oneFuture.join();
System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
}
start1
end1
do something
cost:72ms
supplyAsync().thenAcceptAsync((res) ->{})
- thenAcceptAsync取supplyAsync的返回值,自身没有返回值
public static void testEight(){
long startTime = System.currentTimeMillis();
CompletableFuture<Void> oneFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start1");
System.out.println("end1");
return 100;
}).thenAcceptAsync((res) ->{
System.out.println("res:"+ res);
});
oneFuture.join();
System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
}
start1
end1
res:100
cost:83ms
supplyAsync().thenApplyAsync((res) ->{return}
- thenApplyAsync取supplyAsync的返回值,自身也有返回值
public static void testNine(){
long startTime = System.currentTimeMillis();
CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start1");
System.out.println("end1");
return 100;
}).thenApplyAsync((res) ->{
return 100* 10;
});
System.out.println("result:" + oneFuture.join());
System.out.println("cost:" + (System.currentTimeMillis() - startTime) + "ms");
}
start1
end1
result:1000
cost:75ms
两个任务都完成,再做其他事
runAfterBothAsync
- 无入参、无出参
public static void testTen(){
CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start1");
System.out.println("end1");
return 100;
});
CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> {
System.out.println("start2");
System.out.println("end2");
});
oneFuture.runAfterBothAsync(twoFuture, ()->{
System.out.println("do something");
});
System.out.println("result:" + oneFuture.join());
}
start1
end1
start2
end2
do something
result:100
thenAcceptBothAsync
- 有入参、无出参
public static void testEleven(){
CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start1");
System.out.println("end1");
return 100;
});
CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> {
System.out.println("start2");
System.out.println("end2");
});
oneFuture.thenAcceptBothAsync(twoFuture, (res1, res2)->{
System.out.println("res1:" + res1);
System.out.println("res2:" + res2);
});
System.out.println("result:" + oneFuture.join());
}
start1
end1
start2
end2
result:100
res1:100
res2:null
thenCombine
- 有入参、有出参
public static void testTwelve(){
CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start1");
System.out.println("end1");
return 100;
});
CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> {
System.out.println("start2");
System.out.println("end2");
});
CompletableFuture<Integer> combineFuture = oneFuture.thenCombine(twoFuture, (res1, res2) -> {
System.out.println("res1:" + res1);
System.out.println("res2:" + res2);
return res1 == 100 ? res1 : -1;
});
System.out.println("result1:" + oneFuture.join());
System.out.println("combine:" + combineFuture.join());
}
start1
end1
start2
end2
res1:100
res2:null
result1:100
combine:100
任意一个任务完成,再做其他事
runAfterEitherAsync
- 无入参、无出参
public static void testThirteen(){
CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start1");
System.out.println("end1");
return 100;
});
CompletableFuture<Void> twoFuture = CompletableFuture.runAsync(() -> {
System.out.println("start2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end2");
});
oneFuture.runAfterEitherAsync(twoFuture, ()->{
System.out.println("do something");
});
System.out.println("result:" + oneFuture.join());
}
start1
end1
start2
result:100
do something
acceptEitherAsync
- 有入参、无出参
public static void testFourteen(){
CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start1");
System.out.println("end1");
return 100;
});
CompletableFuture<Integer> twoFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end2");
return 10;
});
oneFuture.acceptEitherAsync(twoFuture, (res)->{
System.out.println("res:"+ res);
});
System.out.println("result:" + oneFuture.join());
}
start1
end1
start2
result:100
res:100
applyToEitherAsync
- 有入参、有出参
public static void testFifteen(){
CompletableFuture<Integer> oneFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start1");
System.out.println("end1");
return 100;
});
CompletableFuture<Integer> twoFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("start2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end2");
return 10;
});
CompletableFuture<Integer> applyFuture = oneFuture.applyToEitherAsync(twoFuture, (res) -> {
System.out.println("res:" + res);
return res * 10;
});
System.out.println("result:" + oneFuture.join());
System.out.println("applyFuture:" + applyFuture.join());
}
start1
end1
start2
result:100
res:100
applyFuture:1000
总结
根据以上api,在多任务的情况下可以实现任意组合,实现异步执行逻辑,并提高了代码的执行效率。文章来源地址https://www.toymoban.com/news/detail-444849.html
到了这里,关于CompletableFuture异步任务编排使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!