《Spring 高手系列》(异步)笔记

这篇具有很好参考价值的文章主要介绍了《Spring 高手系列》(异步)笔记。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

参考链接1

EnableAsync

源码

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

	Class<? extends Annotation> annotation() default Annotation.class;	
	
	boolean proxyTargetClass() default false;
	
	AdviceMode mode() default AdviceMode.PROXY;
	
	int order() default Ordered.LOWEST_PRECEDENCE;
}

修饰范围:类型

AdviceMode

public enum AdviceMode {

	/**
	 * JDK proxy-based advice.
	 */
	PROXY,

	/**
	 * AspectJ weaving-based advice.
	 */
	ASPECTJ

}

AsyncConfigurationSelector

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
			"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";

	@Override
	@Nullable
	public String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			case PROXY:
				return new String[] {ProxyAsyncConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null;
		}
	}
}

Async

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
	String value() default "";
}

修饰范围:类型、方法。

案例1 无返回值

@Component
public class LogService {
    @Async
    public void log(String msg) throws InterruptedException {
        System.out.println(Thread.currentThread() + "开始记录日志," + System.currentTimeMillis());
        //模拟耗时2秒
        TimeUnit.SECONDS.sleep(2);
        System.out.println(Thread.currentThread() + "日志记录完毕," + System.currentTimeMillis());
    }
}
@ComponentScan
@EnableAsync
public class Client {
    public static void main(String[] args) throws InterruptedException {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();
        LogService logService = context.getBean(LogService.class);
        System.out.println(Thread.currentThread() + " logService.log start," + System.currentTimeMillis());
        logService.log("异步执行方法!");
        System.out.println(Thread.currentThread() + " logService.log end," + System.currentTimeMillis());
        TimeUnit.SECONDS.sleep(3);
    }
}
Thread[main,5,main] logService.log start,1667887578200
14:06:18.209 [main] DEBUG org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor - Could not find default TaskExecutor bean
org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.core.task.TaskExecutor' available
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:351)
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:342)
	at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.getDefaultExecutor(AsyncExecutionAspectSupport.java:233)
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.getDefaultExecutor(AsyncExecutionInterceptor.java:157)
	at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.lambda$configure$2(AsyncExecutionAspectSupport.java:119)
	at org.springframework.util.function.SingletonSupplier.get(SingletonSupplier.java:100)
	at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.determineAsyncExecutor(AsyncExecutionAspectSupport.java:172)
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:107)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
	at com.example.lurenjia.spring.c37.d1.LogService$$EnhancerBySpringCGLIB$$ecabcff7.log(<generated>)
	at com.example.lurenjia.spring.c37.d1.Client.main(Client.java:23)
14:06:18.210 [main] INFO org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor - No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
Thread[main,5,main] logService.log end,1667887578212
Thread[SimpleAsyncTaskExecutor-1,5,main]开始记录日志,1667887578220
Thread[SimpleAsyncTaskExecutor-1,5,main]日志记录完毕,1667887580234

进程已结束,退出代码为 0

为什么不报错的原因在这里

AsyncExecutionInterceptor 这个类有一个getDefaultExecutor

	@Override
	@Nullable
	protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
		Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
		return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
	}

首先在容器中找TaskExecutor这个类型的bean如果有就用,没有就会创建一个SimpleAsyncTaskExecutor

想要修复这个错就得自定义一个线程池

@Configuration
public class TaskExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("my-thread-");
        return executor;
    }
}

修复后输出

14:27:03.298 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'taskExecutor'
Thread[main,5,main] logService.log start,1667888823308
Thread[main,5,main] logService.log end,1667888823310
Thread[my-thread-1,5,main]开始记录日志,1667888823316
Thread[my-thread-1,5,main]日志记录完毕,1667888825316

案例2 有返回值

@Async
@Component
public class GoodsService {
    public Future<String> getGoodsInfo(long goodsId) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(500);
        return AsyncResult.forValue(String.format("商品%s基本信息!", goodsId));
    }

    public Future<String> getGoodsDesc(long goodsId) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(500);
        return AsyncResult.forValue(String.format("商品%s描述信息!", goodsId));
    }

    public Future<List<String>> getGoodsComments(long goodsId) throws InterruptedException {
        TimeUnit.MILLISECONDS.sleep(500);
        List<String> comments = Arrays.asList("评论1", "评论2");
        return AsyncResult.forValue(comments);
    }
}
@Configuration
public class TaskExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("d2-thread-");
        return executor;
    }
}
@ComponentScan
@EnableAsync
public class Client {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();
        GoodsService goodsService = context.getBean(GoodsService.class);

        long starTime = System.currentTimeMillis();
        System.out.println("开始获取商品的各种信息");

        long goodsId = 1L;
        Future<String> goodsInfoFuture = goodsService.getGoodsInfo(goodsId);
        Future<String> goodsDescFuture = goodsService.getGoodsDesc(goodsId);
        Future<List<String>> goodsCommentsFuture = goodsService.getGoodsComments(goodsId);

        System.out.println(goodsInfoFuture.get());
        System.out.println(goodsDescFuture.get());
        System.out.println(goodsCommentsFuture.get());

        System.out.println("商品信息获取完毕,总耗时(ms):" + (System.currentTimeMillis() - starTime));

        //休眠一下,防止@Test退出
        TimeUnit.SECONDS.sleep(3);
    }
}
15:00:41.668 [main] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'taskExecutor'
开始获取商品的各种信息
商品1基本信息!
商品1描述信息!
[评论1, 评论2]
商品信息获取完毕,总耗时(ms)521

自定义线程池

方式一

@Configuration
public class TaskExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("d2-thread-");
        return executor;
    }
}

文章中说名称必须为taskExecutor,这个是不对的,将名字修改为任意值测试即可。

异常处理

在之前 juc 的篇目中我们知道Thread的runable方法只能通过UncaughtExceptionHandler来处理,future可以通过try-catch包裹get方法来捕获异常,在这里也是相同的道理。

有返回值

@Service
public class LogService {
    
    @Async
    public Future<String> mockException() {
        //模拟抛出一个异常
        throw new IllegalArgumentException("参数有误!");
    }

    @Async
    public void mockNoReturnException() {
        //模拟抛出一个异常
        throw new IllegalArgumentException("无返回值的异常!");
    }
}
@ComponentScan(basePackageClasses = {TaskExecutorConfig.class, Client.class, LogService.class})
@EnableAsync
public class Client {

    private static final Logger logger = LoggerFactory.getLogger(Client.class);

    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();
        LogService logService = context.getBean(LogService.class);
        try {
            Future<String> future = logService.mockException();
            System.out.println(future.get());
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }
}

无返回值

@ComponentScan(basePackageClasses = {TaskExecutorConfig.class, Client.class, LogService.class})
@EnableAsync
public class Client {

    private static final Logger logger = LoggerFactory.getLogger(Client.class);

    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();
        LogService logService = context.getBean(LogService.class);
        logService.mockNoReturnException();
    }
}
ERROR org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler - Unexpected exception occurred invoking async method: public void com.example.lurenjia.spring.c37.service.LogService.mockNoReturnException()
java.lang.IllegalArgumentException: 无返回值的异常!
	at com.example.lurenjia.spring.c37.service.LogService.mockNoReturnException(LogService.java:25)
	at com.example.lurenjia.spring.c37.service.LogService$$FastClassBySpringCGLIB$$e4f93305.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

我们还没有添加处理器直接打印的结果如下,观察打印有一个SimpleAsyncUncaughtExceptionHandler 这么一个类。

public class SimpleAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {

	private static final Log logger = LogFactory.getLog(SimpleAsyncUncaughtExceptionHandler.class);


	@Override
	public void handleUncaughtException(Throwable ex, Method method, Object... params) {
		if (logger.isErrorEnabled()) {
			logger.error("Unexpected exception occurred invoking async method: " + method, ex);
		}
	}

}

添加自定义处理器

    @Bean
    public AsyncConfigurer asyncConfigurer() {
        return new AsyncConfigurer() {

            @Nullable
            @Override
            public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
                return (ex, method, params) -> {
                    String msg = String.format("方法[%s],参数[%s],发送异常了,异常详细信息:", method, Arrays.asList(params));
                    System.out.println(msg);
                    ex.printStackTrace();
                };
            }
        };
    }

输出

方法[public void com.example.lurenjia.spring.c37.service.LogService.mockNoReturnException()],参数[[]],发送异常了,异常详细信息:
java.lang.IllegalArgumentException: 无返回值的异常!
	at com.example.lurenjia.spring.c37.service.LogService.mockNoReturnException(LogService.java:25)
	at com.example.lurenjia.spring.c37.service.LogService$$FastClassBySpringCGLIB$$e4f93305.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

原理:AbstractAsyncConfiguration这个类有一个bean

	@Autowired(required = false)
	void setConfigurers(Collection<AsyncConfigurer> configurers) {
		if (CollectionUtils.isEmpty(configurers)) {
			return;
		}
		if (configurers.size() > 1) {
			throw new IllegalStateException("Only one AsyncConfigurer may exist");
		}
		AsyncConfigurer configurer = configurers.iterator().next();
		this.executor = configurer::getAsyncExecutor;
		this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
	}

线程池隔离

通过 @Async(“executor”) 来指定线程池 bean的名称。

@Service
public class InsulateService {

    @Async("executor")
    public void a() {
        System.out.println(Thread.currentThread().getName());
    }

    @Async("executor2")
    public void b() {
        System.out.println(Thread.currentThread().getName());
    }

    @Async("executor3")
    public void c() {
        System.out.println(Thread.currentThread().getName());
    }

}
@Configuration
public class TaskExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("executor-thread-");
        return executor;
    }

    @Bean("executor2")
    public ThreadPoolTaskExecutor executor2() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("executor2-thread-");
        return executor;
    }

    @Bean("executor3")
    public ThreadPoolTaskExecutor executor3() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setThreadNamePrefix("executor3-thread-");
        return executor;
    }


}
@ComponentScan(basePackageClasses = {TaskExecutorConfig.class, com.example.lurenjia.spring.c37.d4.Client.class, LogService.class})
@EnableAsync
public class Client {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(Client.class);
        context.refresh();
        InsulateService service = context.getBean(InsulateService.class);
        service.a();
        service.b();
        service.c();
    }
}
executor2-thread-1
executor3-thread-1
executor-thread-1

原理

内部使用aop实现的,@EnableAsync会引入一个bean后置处理器:AsyncAnnotationBeanPostProcessor,将其注册到spring容器,这个bean后置处理器在所有bean创建过程中,判断bean的类上是否有@Async注解或者类中是否有@Async标注的方法,如果有,会通过aop给这个bean生成代理对象,会在代理对象中添加一个切面:org.springframework.scheduling.annotation.AsyncAnnotationAdvisor,这个切面中会引入一个拦截器:AnnotationAsyncExecutionInterceptor,方法异步调用的关键代码就是在这个拦截器的invoke方法中实现的,可以去看一下。

AsyncAnnotationAdvisor文章来源地址https://www.toymoban.com/news/detail-692646.html

到了这里,关于《Spring 高手系列》(异步)笔记的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • yarn报错 error An unexpected error occurred httpsregistry.yarnpkg.com...

    1. 报错原因 使用yarn命令安装依赖包时报错,提示信息: error An unexpected error occurred: \\\"https://registry.yarnpkg.com/killable/-/killable-1.0.0.tgz: connect ETIMEDOUT 104.16.19.35:443\\\" 这是由于资源地址请求超时造成的,需要更换一下请求地址。 2. 解决方案 将资源地址设置为淘宝镜像: 查看当前镜像

    2024年02月12日
    浏览(48)
  • rabbitmq连接特别慢 一直连接超时 An unexpected connection driver error occured

    连接linux上的rabbitmq时,特别慢一直出现An unexpected connection driver error occured 或者 连接超时等异常。如下: 造成这个问题的原因大概有三种: 一、配置文件rabbitmq端口号错误,端口后应该写成5672   二、权限问题,登录rabbitmq,查看用户权限。如下: 像这个shop一样就可以了,如

    2024年02月15日
    浏览(37)
  • 解决方案:yarn 出现error An unexpected error occurred错误的解决办法

    yarn 相比于 npm 、 cnpm 、 npx 来说,效率很高,是前端工程师首选的 包管理 工具,但今日在项目中遇到一个问题,让人很是头疼,看一下报错: 网上搜索解决方案,查阅了一下yarn官方文档,查找到一种解决方法 相信出现类似的情况,会帮助到大家!!!

    2024年02月05日
    浏览(70)
  • @Scheduled定时器 定时任务调度:Unexpected error occurred in scheduled task错误

    目录 一、基本使用 二、参数详解 1. @Scheduled(fixedDelay = 5000) 2. @Scheduled(fixedRate = 5000) 3. @Scheduled(cron = “0 0 2 * * ?”) 4.cron表达式 案例 配置文件 写配置的时候,没有提示,解决方案 三、@Scheduled注意事项 四、 @Scheduled 的执行原理  1、加载使用 @Scheduled 注解的类及方法  2、解析

    2024年02月16日
    浏览(42)
  • An unexpected error has occurred. Conda has prepared the above report

    今日在服务器上创建anaconda虚拟环境的时候,出现了如下报错  直接上解决方案 在终端中输入如下指令  如果出现以下提示,说明多了一个文件  输入以下指令删掉这个文件  随后可以正常完成虚拟环境的创建

    2024年02月13日
    浏览(37)
  • During handling of the above exception, another exception occurred 处理

    字面意思为:在处理上述异常的过程中,发生了另一个异常。简单理解就是,程序执行——异常——异常处理——又引发异常——又异常处理…此时,在报错信息之间就会出现上面一行英文。 由于 req = self.RS.get(url=u, headers=self.HEADERS) 缺少参数 verify=False ,在执行上述代码,抛

    2024年02月13日
    浏览(36)
  • error An unexpected error occurred: “https://registry.npmmirror.com/moment: tunneling socket could n

    yarn install 的时候报了错误 解决方法 我换了原来的镜像 yarn 不行就换npm,确保都换了,然后这个报错就可以解决了 如果还不行的,检查你的代理地址是否正确,是否防火墙影响

    2024年02月04日
    浏览(35)
  • A JavaScript error occurred in the main processUncaught Exception

    A JavaScript error occurred in the main processUncaught Exception: Error: getaddrinfo ENOTFOUND rfw.jnsii.com at GetAddrInfoReqWrap.onlookup [as oncomplete] (dns.js:60:26)                                                                          💧 记录一下今天遇到的 b u g col

    2024年02月07日
    浏览(34)
  • 【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程

    🎉🎉 欢迎光临,终于等到你啦 🎉🎉 🏅我是 苏泽 ,一位对技术充满热情的探索者和分享者。🚀🚀 🌟持续更新的专栏 《Spring 狂野之旅:从入门到入魔》 🚀 本专栏带你从Spring入门到入魔   这是苏泽的个人主页可以看到我其他的内容哦👇👇 努力的苏泽 http://suzee.blog.

    2024年03月15日
    浏览(42)
  • 已解决:Unexpected exception during bean creation; nested exception is java.lang.IllegalStateException:

    这个异常通常是由于在使用 Spring Cloud Feign 客户端进行负载均衡时缺少相关的依赖引起的。具体来说,它提示你忘记在项目的依赖中包含  spring-cloud-starter-loadbalancer 。 spring-cloud-starter-loadbalancer  是用于支持负载均衡功能的 Spring Cloud Starter 组件之一。它提供了负责将请求分发

    2024年02月08日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包