自定义注解实现springboot 多线程事务(基于@Async注解的多线程)

这篇具有很好参考价值的文章主要介绍了自定义注解实现springboot 多线程事务(基于@Async注解的多线程)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

前言

一、springboot多线程(声明式)的使用方法?

二、自定义注解实现多线程事务控制

1.自定义注解

2.AOP内容

3.注解使用Demo


前言

本文是基于springboot的@Async注解开启多线程, 并通过自定义注解和AOP实现的多线程事务, 避免繁琐的手动提交/回滚事务  (CV即用, 参数齐全, 无需配置)


一、springboot多线程(声明式)的使用方法?

1、springboot提供了注解@Async来使用线程池, 具体使用方法如下:

(1) 在启动类(配置类)添加@EnableAsync来开启线程池

(2) 在需要开启子线程的方法上添加注解@Async

注意: 框架默认 ----->   来一个请求开启一个线程,在高并发下容易内存溢出

所以使用时需要配置自定义线程池, 如下:

@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {

    @Bean("threadPoolTaskExecutor")//自定义线程池名称
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        //线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
        executor.setCorePoolSize(16);

        //如果设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
        //executor.setAllowCoreThreadTimeOut(true);

        //阻塞队列 当核心线程数达到最大时,新任务会放在队列中排队等待执行
        executor.setQueueCapacity(124);

        //最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
        //任务队列已满时, 且当线程数=maxPoolSize,,线程池会拒绝处理任务而抛出异常
        executor.setMaxPoolSize(64);

        //当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
        //允许线程空闲时间30秒,当maxPoolSize的线程在空闲时间到达的时候销毁
        //如果allowCoreThreadTimeout=true,则会直到线程数量=0
        executor.setKeepAliveSeconds(30);

        //spring 提供的 ThreadPoolTaskExecutor 线程池,是有setThreadNamePrefix() 方法的。
        //jdk 提供的ThreadPoolExecutor 线程池是没有 setThreadNamePrefix() 方法的
        executor.setThreadNamePrefix("自定义线程池-");

        // rejection-policy:拒绝策略:当线程数已经达到maxSize的时候,如何处理新任务
        // CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行, (个人推荐)
        // AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
        // DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
        // DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        //设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁
		executor.setWaitForTasksToCompleteOnShutdown(true);
		//设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
		executor.setAwaitTerminationSeconds(60);

        executor.initialize();
        return executor;
    }
}

 开启子线程方法:  在需要开启线程的方法上添加 注解@Async("threadPoolTaskExecutor")即可, 其中注解中的参数为自定义线程池的名称.

二、自定义注解实现多线程事务控制

1.自定义注解

本文是使用了两个注解共同作用实现的, 主线程当做协调者,各子线程作为参与者

package com.example.anno;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 多线程事务注解: 主事务
 *
 * @author zlj
 * @since 2022/11/3
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MainTransaction {
	int value();//子线程数量
}
package com.example.anno;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 多线程事务注解: 子事务
 *
 * @author zlj
 * @since 2022/11/3
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface SonTransaction {
    String value() default "";
}

解释:

两个注解都是用在方法上的.须配合@Transactional(rollbackFor = Exception.class)一起使用

@MainTransaction注解 用在调用方, 其参数为必填, 参数值为本方法中调用的方法开启的线程数, 如: 在这个方法中调用的方法中有2个方法用@Async注解开启了子线程, 则参数为@MainTransaction(2), 另外如果未使用@MainTransaction注解, 则直接已无多线程事务执行(不影响方法的单线程事务)

@SonTransaction注解 用在被调用方(开启线程的方法), 无需传入参数

2.AOP内容

代码如下:

package com.example.aop;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.example.anno.MainTransaction;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 多线程事务
 *
 * @author zlj
 * @since 2022/11/3
 */
@Aspect
@Component
public class TransactionAop {

    //用来存储各线程计数器数据(每次执行后会从map中删除)
	private static final Map<String, Object> map = new HashMap<>();

	@Resource
	private PlatformTransactionManager transactionManager;

	@Around("@annotation(mainTransaction)")
	public void mainIntercept(ProceedingJoinPoint joinPoint, MainTransaction mainTransaction) throws Throwable {
		//当前线程名称
		Thread thread = Thread.currentThread();
		String threadName = thread.getName();
		//初始化计数器
		CountDownLatch mainDownLatch = new CountDownLatch(1);
		CountDownLatch sonDownLatch = new CountDownLatch(mainTransaction.value());//@MainTransaction注解中的参数, 为子线程的数量
		// 用来记录子线程的运行状态,只要有一个失败就变为true
		AtomicBoolean rollBackFlag = new AtomicBoolean(false);
		// 用来存每个子线程的异常,把每个线程的自定义异常向vector的首位置插入,其余异常向末位置插入,避免线程不安全,所以使用vector代替list
		Vector<Throwable> exceptionVector = new Vector<>();

		map.put(threadName + "mainDownLatch", mainDownLatch);
		map.put(threadName + "sonDownLatch", sonDownLatch);
		map.put(threadName + "rollBackFlag", rollBackFlag);
		map.put(threadName + "exceptionVector", exceptionVector);

		try {
			joinPoint.proceed();//执行方法
		} catch (Throwable e) {
			exceptionVector.add(0, e);
			rollBackFlag.set(true);//子线程回滚
			mainDownLatch.countDown();//放行所有子线程
		}

		if (!rollBackFlag.get()) {
			try {
				// sonDownLatch等待,直到所有子线程执行完插入操作,但此时还没有提交事务
				sonDownLatch.await();
				mainDownLatch.countDown();// 根据rollBackFlag状态放行子线程的await处,告知是回滚还是提交
			} catch (Exception e) {
				rollBackFlag.set(true);
				exceptionVector.add(0, e);
			}
		}
		if (CollectionUtils.isNotEmpty(exceptionVector)) {
			map.remove(threadName + "mainDownLatch");
			map.remove(threadName + "sonDownLatch");
			map.remove(threadName + "rollBackFlag");
			map.remove(threadName + "exceptionVector");
			throw exceptionVector.get(0);
		}
	}

	@Around("@annotation(com.huigu.common.anno.SonTransaction)")
	public void sonIntercept(ProceedingJoinPoint joinPoint) throws Throwable {
		Object[] args = joinPoint.getArgs();
		Thread thread = (Thread) args[args.length - 1];
		String threadName = thread.getName();
		CountDownLatch mainDownLatch = (CountDownLatch) map.get(threadName + "mainDownLatch");
		if (mainDownLatch == null) {
			//主事务未加注解时, 直接执行子事务
			joinPoint.proceed();//这里最好的方式是:交由上面的thread来调用此方法,但我没有找寻到对应api,只能直接放弃事务, 欢迎大神来优化, 留言分享
			return;
		}
		CountDownLatch sonDownLatch = (CountDownLatch) map.get(threadName + "sonDownLatch");
		AtomicBoolean rollBackFlag = (AtomicBoolean) map.get(threadName + "rollBackFlag");
		Vector<Throwable> exceptionVector = (Vector<Throwable>) map.get(threadName + "exceptionVector");

		//如果这时有一个子线程已经出错,那当前线程不需要执行
		if (rollBackFlag.get()) {
			sonDownLatch.countDown();
			return;
		}

		DefaultTransactionDefinition def = new DefaultTransactionDefinition();// 开启事务
		def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);// 设置事务隔离级别
		TransactionStatus status = transactionManager.getTransaction(def);

		try {
			joinPoint.proceed();//执行方法

			sonDownLatch.countDown();// 对sonDownLatch-1
			mainDownLatch.await();// 如果mainDownLatch不是0,线程会在此阻塞,直到mainDownLatch变为0
			// 如果能执行到这一步说明所有子线程都已经执行完毕判断如果atomicBoolean是true就回滚false就提交
			if (rollBackFlag.get()) {
				transactionManager.rollback(status);
			} else {
				transactionManager.commit(status);
			}
		} catch (Throwable e) {
			exceptionVector.add(0, e);
			// 回滚
			transactionManager.rollback(status);
			// 并把状态设置为true
			rollBackFlag.set(true);
			mainDownLatch.countDown();
			sonDownLatch.countDown();
		}
	}
}

扩展说明: CountDownLatch是什么?

一个同步辅助类

创建对象时: 用给定的数字初始化 CountDownLatch

countDown() 方法: 使计数减1

await() 方法: 阻塞当前线程, 直至当前计数到达零。

本文中:

用 计数 1 初始化的 mainDownLatch 当作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。

用 子线程数量 初始化的 sonDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。


3、注解使用Demo

任务方法:

package com.example.demo.service;

import com.example.demo.anno.SonTransaction;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
 * @author zlj
 * @since 2022/11/14
 */
@Service
public class SonService {

    /**
     * 参数说明:  以下4个方法参数和此相同
     *
     * @param args   业务中需要传递的参数
     * @param thread 调用者的线程, 用于aop获取参数, 不建议以方法重写的方式简略此参数,
     *               在调用者方法中可以以此参数为标识计算子线程的个数作为注解参数,避免线程参数计算错误导致锁表
     *               传参时参数固定为: Thread.currentThread()
     */
    @Transactional(rollbackFor = Exception.class)
    @Async("threadPoolTaskExecutor")
    @SonTransaction
    public void sonMethod1(String args, Thread thread) {
        System.out.println(args + "开启了线程");
    }

    @Transactional(rollbackFor = Exception.class)
    @Async("threadPoolTaskExecutor")
    @SonTransaction
    public void sonMethod2(String args1, String args2, Thread thread) {
        System.out.println(args1 + "和" + args2 + "开启了线程");
    }

    @Transactional(rollbackFor = Exception.class)
    @Async("threadPoolTaskExecutor")
    @SonTransaction
    public void sonMethod3(String args, Thread thread) {
        System.out.println(args + "开启了线程");
    }

    //sonMethod4方法没有使用线程池
    @Transactional(rollbackFor = Exception.class)
    public void sonMethod4(String args) {
        System.out.println(args + "没有开启线程");
    }
}

调用方:文章来源地址https://www.toymoban.com/news/detail-408182.html

package com.example.demo.service;

import com.example.demo.anno.MainTransaction;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

/**
 * @author zlj
 * @since 2022/11/14
 */
@Service
public class MainService {

    @Resource
    private SonService sonService;

    @MainTransaction(3)//调用的方法中sonMethod1/sonMethod2/sonMethod3使用@Async开启了线程, 所以参数为: 3
    @Transactional(rollbackFor = Exception.class)
    public void test1() {
        sonService.sonMethod1("路飞", Thread.currentThread());
        sonService.sonMethod2("索隆", "山治", Thread.currentThread());
        sonService.sonMethod3("娜美", Thread.currentThread());
        sonService.sonMethod4("罗宾");
    }

    /*
     * 有的业务中存在if的多种可能, 每一种走向调用的方法(开启线程的方法)数量如果不同, 这时可以选择放弃使用@MainTransaction注解避免锁表
     * 这时候如果发生异常会导致多线程不能同时回滚, 可根据业务自己权衡是否使用
     */
    @Transactional(rollbackFor = Exception.class)
    public void test2() {
        sonService.sonMethod1("路飞", Thread.currentThread());
        sonService.sonMethod2("索隆", "山治", Thread.currentThread());
        sonService.sonMethod3("娜美", Thread.currentThread());
        sonService.sonMethod4("罗宾");
    }

}

到了这里,关于自定义注解实现springboot 多线程事务(基于@Async注解的多线程)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot中异步注解@Async介绍

    介绍在SpringBoot项目中,使用@Async不生效的原因介绍和分析; 代码参考gitee仓库:spring-boot-2022-05: 主要是介绍Spring框架注解、常用的功能的使用案例,记录平时遇到的技术知识点,进行实践操作; - Gitee.com 1.启动类中没有添加注解@EnableAsync; 2.同一个类中调用含有@Async的方法;因

    2023年04月17日
    浏览(37)
  • SpringBoot异步方法支持注解@Async应用

    合理使用异步方法可以有效的提高执行效率 同步执行(同在一个线程中): 异步执行(开启额外线程来执行): 在SpringBoot中并不需要我们自己去创建维护线程或者线程池来异步的执行方法, SpringBoot已经提供了异步方法支持注解. service层: controller层: 测试结果: 我们可以感受到接口

    2024年02月11日
    浏览(48)
  • SpringBoot中@EnableAsync和@Async注解的使用

    异步的优点: 提高应用程序的响应能力 提高系统的吞吐量 节约资源 :异步操作可以避免在请求处理期间占用过多的线程资源,减少服务器的负载。 优化用户体验 需要注意的问题: 事务问题 :异步处理时,需要注意在事务没有结束时做异步操作,可能会导致读取不到甚至

    2023年04月25日
    浏览(35)
  • 基于Qt的多线程TCP即时通讯软件的设计与实现

    本文将从涉及到主要技术开始,讲解使用Qt来实现一个支持多客户端链接的 多线程TCP服务器 及其 客户端 的设计与实现的解决方案。 注:本文使用的开发环境为Qt5.15.2, 使用MSVC2019_64编译器, C++11及以上 接下来我将会详细讲解客户端和服务端的设计与实现的关键细节。完整的源

    2024年01月16日
    浏览(49)
  • Springboot项目:解决@Async注解获取不到上下文信息问题

    springboot项目中,需要使用到异步调用某个方法,此时 第一个想到的就是 @Async 注解,但是 发现 方法执行报错了,具体报错如下: 上面日志有点多,其实核心就是这一部分日志: 这块逻辑就是,使用spring底层提供的获取上下文信息的方法。 所以说明 获取不到上下文信息,结

    2024年01月17日
    浏览(45)
  • SpringBoot自定义Jackson注解,实现自定义序列化BigDecimal(增强JsonFormat注解)

    在处理BigDecimal字段的时候,希望自定义序列化格式。虽然有 JsonFormat可以自定义格式,但是还是不够,JsonFormat不能对 BigDecimal 进行个性化处理,比如指定的RoundingMode。 现在就是需要有个注解,可以实现自定序列化BigDecimal类型 首先,自定义一个注解 BigDecimalFormatter 实现一个

    2024年02月09日
    浏览(41)
  • 【Spring全家桶系列】Spring中的事务管理(基于注解完成实现)

    ⭐️ 前面的话 ⭐️ 本文已经收录到《Spring框架全家桶系列》专栏,本文将介绍Spring中的事务管理,事务的概念与作用,以及Spring事务的属性和传播机制。 📒博客主页:未见花闻的博客主页 🎉欢迎关注🔎点赞👍收藏⭐️留言📝 📌本文由 未见花闻 原创, CSDN 首发! 📆首

    2024年02月07日
    浏览(44)
  • SpringBoot通过自定义注解实现多数据源

    ✅作者简介:大家好,我是Leo,热爱Java后端开发者,一个想要与大家共同进步的男人😉😉 🍎个人主页:Leo的博客 💞当前专栏: Java从入门到精通 ✨特色专栏: MySQL学习 🥭本文内容:SpringBoot通过自定义注解实现多数据源 📚个人知识库: Leo知识库,欢迎大家访问 大家好

    2024年02月03日
    浏览(60)
  • SpringBoot定义拦截器+自定义注解+Redis实现接口防刷(限流)

    在拦截器Interceptor中拦截请求 通过地址+请求uri作为调用者访问接口的区分在Redis中进行计数达到限流目的 定义参数 访问周期 最大访问次数 禁用时长 代码实现 定义拦截器:实现HandlerInterceptor接口,重写preHandle()方法 注册拦截器:配置类实现WebMvcConfigurer接口,重写addIntercep

    2024年02月05日
    浏览(59)
  • springboot自定义注解+aop+redis实现延时双删

    redis作为用的非常多的缓存数据库,在多线程场景下,可能会出现数据库与redis数据不一致的现象 数据不一致的现象:https://blog.csdn.net/m0_73700925/article/details/133447466 这里采用aop+redis来解决这个方法: 删除缓存 更新数据库 延时一定时间,比如500ms 删除缓存 这里之所以要延时一

    2024年01月17日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包