参考链接1
EnableAsync
源码
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
Class<? extends Annotation> annotation() default Annotation.class;
boolean proxyTargetClass() default false;
AdviceMode mode() default AdviceMode.PROXY;
int order() default Ordered.LOWEST_PRECEDENCE;
}
修饰范围:类型
AdviceMode
public enum AdviceMode {
/**
* JDK proxy-based advice.
*/
PROXY,
/**
* AspectJ weaving-based advice.
*/
ASPECTJ
}
AsyncConfigurationSelector
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
Async
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
String value() default "";
}
修饰范围:类型、方法。
案例1 无返回值
@Component
public class LogService {
@Async
public void log(String msg) throws InterruptedException {
System.out.println(Thread.currentThread() + "开始记录日志," + System.currentTimeMillis());
//模拟耗时2秒
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread() + "日志记录完毕," + System.currentTimeMillis());
}
}
@ComponentScan
@EnableAsync
public class Client {
public static void main(String[] args) throws InterruptedException {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(Client.class);
context.refresh();
LogService logService = context.getBean(LogService.class);
System.out.println(Thread.currentThread() + " logService.log start," + System.currentTimeMillis());
logService.log("异步执行方法!");
System.out.println(Thread.currentThread() + " logService.log end," + System.currentTimeMillis());
TimeUnit.SECONDS.sleep(3);
}
}
Thread[main,5,main] logService.log start,1667887578200
14:06:18.209 [main] DEBUG org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor - Could not find default TaskExecutor bean
org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.core.task.TaskExecutor' available
at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:351)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:342)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.getDefaultExecutor(AsyncExecutionAspectSupport.java:233)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.getDefaultExecutor(AsyncExecutionInterceptor.java:157)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.lambda$configure$2(AsyncExecutionAspectSupport.java:119)
at org.springframework.util.function.SingletonSupplier.get(SingletonSupplier.java:100)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.determineAsyncExecutor(AsyncExecutionAspectSupport.java:172)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:107)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.example.lurenjia.spring.c37.d1.LogService$$EnhancerBySpringCGLIB$$ecabcff7.log(<generated>)
at com.example.lurenjia.spring.c37.d1.Client.main(Client.java:23)
14:06:18.210 [main] INFO org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor - No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
Thread[main,5,main] logService.log end,1667887578212
Thread[SimpleAsyncTaskExecutor-1,5,main]开始记录日志,1667887578220
Thread[SimpleAsyncTaskExecutor-1,5,main]日志记录完毕,1667887580234
进程已结束,退出代码为 0
为什么不报错的原因在这里
AsyncExecutionInterceptor 这个类有一个getDefaultExecutor
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
首先在容器中找TaskExecutor这个类型的bean如果有就用,没有就会创建一个SimpleAsyncTaskExecutor
想要修复这个错就得自定义一个线程池
@Configuration
public class TaskExecutorConfig {
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setThreadNamePrefix("my-thread-");
return executor;
}
}
修复后输出
14:27:03.298 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'taskExecutor'
Thread[main,5,main] logService.log start,1667888823308
Thread[main,5,main] logService.log end,1667888823310
Thread[my-thread-1,5,main]开始记录日志,1667888823316
Thread[my-thread-1,5,main]日志记录完毕,1667888825316
案例2 有返回值
@Async
@Component
public class GoodsService {
public Future<String> getGoodsInfo(long goodsId) throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(500);
return AsyncResult.forValue(String.format("商品%s基本信息!", goodsId));
}
public Future<String> getGoodsDesc(long goodsId) throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(500);
return AsyncResult.forValue(String.format("商品%s描述信息!", goodsId));
}
public Future<List<String>> getGoodsComments(long goodsId) throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(500);
List<String> comments = Arrays.asList("评论1", "评论2");
return AsyncResult.forValue(comments);
}
}
@Configuration
public class TaskExecutorConfig {
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setThreadNamePrefix("d2-thread-");
return executor;
}
}
@ComponentScan
@EnableAsync
public class Client {
public static void main(String[] args) throws InterruptedException, ExecutionException {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(Client.class);
context.refresh();
GoodsService goodsService = context.getBean(GoodsService.class);
long starTime = System.currentTimeMillis();
System.out.println("开始获取商品的各种信息");
long goodsId = 1L;
Future<String> goodsInfoFuture = goodsService.getGoodsInfo(goodsId);
Future<String> goodsDescFuture = goodsService.getGoodsDesc(goodsId);
Future<List<String>> goodsCommentsFuture = goodsService.getGoodsComments(goodsId);
System.out.println(goodsInfoFuture.get());
System.out.println(goodsDescFuture.get());
System.out.println(goodsCommentsFuture.get());
System.out.println("商品信息获取完毕,总耗时(ms):" + (System.currentTimeMillis() - starTime));
//休眠一下,防止@Test退出
TimeUnit.SECONDS.sleep(3);
}
}
15:00:41.668 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'taskExecutor'
开始获取商品的各种信息
商品1基本信息!
商品1描述信息!
[评论1, 评论2]
商品信息获取完毕,总耗时(ms):521
自定义线程池
方式一
@Configuration
public class TaskExecutorConfig {
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setThreadNamePrefix("d2-thread-");
return executor;
}
}
文章中说名称必须为taskExecutor,这个是不对的,将名字修改为任意值测试即可。
异常处理
在之前 juc 的篇目中我们知道Thread的runable方法只能通过UncaughtExceptionHandler来处理,future可以通过try-catch包裹get方法来捕获异常,在这里也是相同的道理。
有返回值
@Service
public class LogService {
@Async
public Future<String> mockException() {
//模拟抛出一个异常
throw new IllegalArgumentException("参数有误!");
}
@Async
public void mockNoReturnException() {
//模拟抛出一个异常
throw new IllegalArgumentException("无返回值的异常!");
}
}
@ComponentScan(basePackageClasses = {TaskExecutorConfig.class, Client.class, LogService.class})
@EnableAsync
public class Client {
private static final Logger logger = LoggerFactory.getLogger(Client.class);
public static void main(String[] args) throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(Client.class);
context.refresh();
LogService logService = context.getBean(LogService.class);
try {
Future<String> future = logService.mockException();
System.out.println(future.get());
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}
无返回值
@ComponentScan(basePackageClasses = {TaskExecutorConfig.class, Client.class, LogService.class})
@EnableAsync
public class Client {
private static final Logger logger = LoggerFactory.getLogger(Client.class);
public static void main(String[] args) throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(Client.class);
context.refresh();
LogService logService = context.getBean(LogService.class);
logService.mockNoReturnException();
}
}
ERROR org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler - Unexpected exception occurred invoking async method: public void com.example.lurenjia.spring.c37.service.LogService.mockNoReturnException()
java.lang.IllegalArgumentException: 无返回值的异常!
at com.example.lurenjia.spring.c37.service.LogService.mockNoReturnException(LogService.java:25)
at com.example.lurenjia.spring.c37.service.LogService$$FastClassBySpringCGLIB$$e4f93305.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我们还没有添加处理器直接打印的结果如下,观察打印有一个SimpleAsyncUncaughtExceptionHandler 这么一个类。
public class SimpleAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
private static final Log logger = LogFactory.getLog(SimpleAsyncUncaughtExceptionHandler.class);
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
if (logger.isErrorEnabled()) {
logger.error("Unexpected exception occurred invoking async method: " + method, ex);
}
}
}
添加自定义处理器
@Bean
public AsyncConfigurer asyncConfigurer() {
return new AsyncConfigurer() {
@Nullable
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> {
String msg = String.format("方法[%s],参数[%s],发送异常了,异常详细信息:", method, Arrays.asList(params));
System.out.println(msg);
ex.printStackTrace();
};
}
};
}
输出
方法[public void com.example.lurenjia.spring.c37.service.LogService.mockNoReturnException()],参数[[]],发送异常了,异常详细信息:
java.lang.IllegalArgumentException: 无返回值的异常!
at com.example.lurenjia.spring.c37.service.LogService.mockNoReturnException(LogService.java:25)
at com.example.lurenjia.spring.c37.service.LogService$$FastClassBySpringCGLIB$$e4f93305.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
原理:AbstractAsyncConfiguration这个类有一个bean
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
this.executor = configurer::getAsyncExecutor;
this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}
线程池隔离
通过 @Async(“executor”) 来指定线程池 bean的名称。
@Service
public class InsulateService {
@Async("executor")
public void a() {
System.out.println(Thread.currentThread().getName());
}
@Async("executor2")
public void b() {
System.out.println(Thread.currentThread().getName());
}
@Async("executor3")
public void c() {
System.out.println(Thread.currentThread().getName());
}
}
@Configuration
public class TaskExecutorConfig {
@Bean
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setThreadNamePrefix("executor-thread-");
return executor;
}
@Bean("executor2")
public ThreadPoolTaskExecutor executor2() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setThreadNamePrefix("executor2-thread-");
return executor;
}
@Bean("executor3")
public ThreadPoolTaskExecutor executor3() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setThreadNamePrefix("executor3-thread-");
return executor;
}
}
@ComponentScan(basePackageClasses = {TaskExecutorConfig.class, com.example.lurenjia.spring.c37.d4.Client.class, LogService.class})
@EnableAsync
public class Client {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(Client.class);
context.refresh();
InsulateService service = context.getBean(InsulateService.class);
service.a();
service.b();
service.c();
}
}
executor2-thread-1
executor3-thread-1
executor-thread-1
原理
内部使用aop实现的,@EnableAsync会引入一个bean后置处理器:AsyncAnnotationBeanPostProcessor,将其注册到spring容器,这个bean后置处理器在所有bean创建过程中,判断bean的类上是否有@Async注解或者类中是否有@Async标注的方法,如果有,会通过aop给这个bean生成代理对象,会在代理对象中添加一个切面:org.springframework.scheduling.annotation.AsyncAnnotationAdvisor,这个切面中会引入一个拦截器:AnnotationAsyncExecutionInterceptor,方法异步调用的关键代码就是在这个拦截器的invoke方法中实现的,可以去看一下。文章来源:https://www.toymoban.com/news/detail-692646.html
AsyncAnnotationAdvisor文章来源地址https://www.toymoban.com/news/detail-692646.html
到了这里,关于《Spring 高手系列》(异步)笔记的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!