多线程事务怎么回滚?

这篇具有很好参考价值的文章主要介绍了多线程事务怎么回滚?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

项目中用到了多线程去批量处理一些数据,当时想当然认为只要方法上加上@Transactional注解就好了,实际并未达到想要的处理效果。特此去学习了下关于多线程事务回滚相关方案,参考了网上其他资料,这里整理并记录下学习历程。
站在巨人的肩膀上,我们可以看的更远!

一、准备相关基础方法

这里以多线程、分批次插入数据库employee表为例子进行演示。

1.线程池配置

/**
 * 线程池配置
 */
@Component
public class ExecutorConfig {
    private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
    private volatile static ExecutorService executorService;

    public static ExecutorService getThreadPool() {
        if (executorService == null){
            synchronized (ExecutorConfig.class){
                if (executorService == null){
                    executorService =  newThreadPool();
                }
            }
        }
        return executorService;
    }

    private static ExecutorService newThreadPool(){
        int queueSize = 1000;
        int corePool = Math.min(10, maxPoolSize);
        return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());
    }
    private ExecutorConfig(){}
}

2.list切分工具类

/**
 * list切分工具类
 */
public class ListUtil {
    /**
     * 平均拆分list
     *
     * @param source
     * @param n
     * @param <T>
     * @return
     */
    public static <T> List<List<T>> AverageList(List<T> source, int n) {
        List<List<T>> result = new ArrayList<>();
        int remaider = source.size() % n;
        int number = source.size() / n;
        //偏移量
        int offset = 0;
        for (int i = 0; i < n; i++) {
            List<T> value;
            if (remaider > 0) {
                value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
                remaider--;
                offset++;
            } else {
                value = source.subList(i * number + offset, (i + 1) * number + offset);
            }
            result.add(value);
        }
        return result;
    }
}

3.SqlSession工具类

/**
 * SqlSession工具类
 */
@Component
public class SqlContext {
    @Resource
    private SqlSessionTemplate sqlSessionTemplate;

    public SqlSession getSqlSession(){
        SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
        return sqlSessionFactory.openSession();
    }
}

4.员工实体类

/**
 * 员工
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName(value = "employee")
public class Employee {
    @TableField(value = "employee_id")
    private Integer employeeId;

    @TableField(value = "employee_name")
    private String employeeName;

    @TableField(value = "age")
    private Integer age;
}

5.员工EmployeeMapper

@Repository
public interface EmployeeMapper extends BaseMapper<Employee> {
   int saveBatchRollBack(List Employee);
}

6.员工对应EmployeeMapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.it.mapper.EmployeeMapper">
    <resultMap id="BaseResultMap" type="com.it.entity.Employee">
        <!--@Table `Employee`-->
        <result column="employee_id" jdbcType="INTEGER" property="employee_id" />
        <result column="employee_name" jdbcType="VARCHAR" property="employee_name" />
        <result column="age" jdbcType="INTEGER" property="age" />
    </resultMap>

    <sql id="Base_Column_List">
        employee_id, employee_name, age
    </sql>
    <insert id="saveBatchRollBack">
        insert into
        employee (employee_id,age,employee_name)
        values
        <foreach collection="list" item="item" index="index" separator=",">
            (
            #{item.employeeId},
            #{item.age},
            #{item.employeeName}
            )
        </foreach>
    </insert>
</mapper>

二、业务处理

1.EmployeeService接口

public interface EmployeeService extends IService<Employee> {
    /**
     * 使用@Transactional测试多线程回滚失败
     */
    void saveThreadByTransactional(List<Employee> employeeList);

    /**
     * 使用手动操作事务测试多线程回滚成功
     */
    void saveThreadRollBack(List<Employee> employeeList) throws SQLException;
}

2.测试多线程事务实现类

/**
 * 测试多线程事务
 */
@Service
@Slf4j
public class EmployeeServiceImpl extends ServiceImpl<EmployeeMapper, Employee> implements EmployeeService {

    @Resource
    SqlContext sqlContext;

    /**
     * 多线程环境下Transactional失效场景
     *
     * @param employeeList
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void saveThreadByTransactional(List<Employee> employeeList) {
        try {
            // 先做删除操作,如果子线程出现异常,此操作不会回滚
            this.getBaseMapper().delete(null);
            // 获取线程池
            ExecutorService executorService = ExecutorConfig.getThreadPool();
            // 拆分数据,拆分6份
            List<List<Employee>> lists = ListUtil.AverageList(employeeList, 6);
            // 执行的线程
            Thread[] threadArray = new Thread[lists.size()];
            // 监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭
            CountDownLatch countDownLatch = new CountDownLatch(lists.size());
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            for (int i = 0; i < lists.size(); i++) {
                if (i == lists.size() - 1) {
                    // 最后一个atomicBoolean设置为false
                    atomicBoolean.set(false);
                }
                List<Employee> list = lists.get(i);
                threadArray[i] = new Thread(() -> {
                    try {
                        // 最后一个线程抛出异常
                        if (!atomicBoolean.get()) {
                            throw new RuntimeException("最后一个线程添加时抛出异常");
                        }
                        //批量添加,mybatisPlus中自带的batch方法
                        this.saveBatch(list);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
            for (int i = 0; i < lists.size(); i++) {
                executorService.execute(threadArray[i]);
            }
            // 当子线程执行完毕时,主线程再往下执行
            countDownLatch.await();
            System.out.println("employee列表添加完成");
        } catch (Exception e) {
            log.info("error", e);
            throw new RuntimeException("employee列表添加过程出现异常");
        }
    }

    /**
     * 使用sqlSession控制手动提交事务
     *
     * @param employeeList
     */
    @Override
    public void saveThreadRollBack(List<Employee> employeeList) throws SQLException {
        {
        // 获取数据库连接,获取会话(内部自有事务)
        SqlSession sqlSession = sqlContext.getSqlSession();
        Connection connection = sqlSession.getConnection();
        try {
            // 设置手动提交
            connection.setAutoCommit(false);
            //获取mapper
            EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
            //先做删除操作
            employeeMapper.delete(null);
            //获取执行器
            ExecutorService service = ExecutorConfig.getThreadPool();
            List<Callable<Integer>> callableList = new ArrayList<>();
            //拆分list
            List<List<Employee>> lists = ListUtil.AverageList(employeeList, 6);
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            for (int i = 0; i < lists.size(); i++) {
                if (i == lists.size() - 1) {
                    atomicBoolean.set(false);
                }
                List<Employee> list = lists.get(i);
                //使用返回结果的callable去执行,
                Callable<Integer> callable = () -> {
                    //让最后一个线程抛出异常
                    if (!atomicBoolean.get()) {
                        throw new Exception("出现异常");
                    }
                    return employeeMapper.saveBatchRollBack(list);
                };
                callableList.add(callable);
            }
            //执行子线程
            List<Future<Integer>> futures = service.invokeAll(callableList);
            for (Future<Integer> future : futures) {
                //如果有一个执行不成功,则全部回滚
                if (future.get() <= 0) {
                    connection.rollback();
                    return;
                }
            }
            connection.commit();
            System.out.println("添加完毕");
        } catch (Exception e) {
            connection.rollback();
            log.info("error", e);
        } finally {
            connection.close();
        }
    }
}

3.员工Controller

@RestController
@RequestMapping(value = "/employee")
public class EmployeeController {

    @Autowired
    EmployeeService employeeService;

    @PostMapping("/saveThreadByTransactional")
    public ResponseEntity saveThreadByTransactional() {
        // 模拟需要插入12名员工到数据库
        List<Employee> list = IntStream.range(0, 12)
                .mapToObj(i -> {
                    Employee employee = new Employee();
                    employee.setEmployeeId(i);
                    employee.setEmployeeName("三丰" + i);
                    employee.setAge(i + 100);
                    return employee;
                })
                .collect(Collectors.toList());
        employeeService.saveThreadByTransactional(list);
        return new ResponseEntity<>(HttpStatus.OK);
    }

    @PostMapping("/saveThreadRollBack")
    public ResponseEntity saveThreadRollBack() throws SQLException {
        // 模拟需要插入12名员工到数据库
        List<Employee> list = IntStream.range(0, 12)
                .mapToObj(i -> {
                    Employee employee = new Employee();
                    employee.setEmployeeId(i);
                    employee.setEmployeeName("三丰" + i);
                    employee.setAge(i + 100);
                    return employee;
                })
                .collect(Collectors.toList());
        employeeService.saveThreadRollBack(list);
        return new ResponseEntity<>(HttpStatus.OK);
    }
}

三、方案验证

1.数据库表Employee存储1条原始数据,用于验证数据删除后是否被回滚。

多线程事务怎么回滚?,SpringBoot,Mybatis-Plus,java,springboot

2.EmployeeServiceImpl的saveThreadByTransactional方法

该方法通过使用@Transactional注解尝试处理多线程事务回滚。
利用postman测试saveThreadByTransactional接口
多线程事务怎么回滚?,SpringBoot,Mybatis-Plus,java,springboot
发现控制台显示我们自定义的线程报错
多线程事务怎么回滚?,SpringBoot,Mybatis-Plus,java,springboot
多线程事务怎么回滚?,SpringBoot,Mybatis-Plus,java,springboot
查询数据库Employee表,发现代码中this.getBaseMapper().delete(null);
可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚(数据库中表数据也已经被删除完成),则证明@Transactional注解并不能在多线程下进行事务回滚!
多线程事务怎么回滚?,SpringBoot,Mybatis-Plus,java,springboot
多线程事务怎么回滚?,SpringBoot,Mybatis-Plus,java,springboot

3.EmployeeServiceImpl的saveThreadRollBack方法

该方法通过使用sqlSession控制,手动提交事务,在多线程下进行事务回滚。
利用postman测试saveThreadRollBack接口。
多线程事务怎么回滚?,SpringBoot,Mybatis-Plus,java,springboot
发现控制台显示我们自定义的线程报错。
多线程事务怎么回滚?,SpringBoot,Mybatis-Plus,java,springboot
多线程事务怎么回滚?,SpringBoot,Mybatis-Plus,java,springboot
查询数据库Employee表,发现数据并未被删除,证明多线程执行过程中失败了,事务被回滚了。
多线程事务怎么回滚?,SpringBoot,Mybatis-Plus,java,springboot

四、方案总结

1.方案总结

在Spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效。
如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。
通过使用sqlSession控制手动提交事务,可以达到主线程和子线程数据事务回滚。

五.项目结构及下载

多线程事务怎么回滚?,SpringBoot,Mybatis-Plus,java,springboot
源码地址springboot-cacheable,创作不易,欢迎star哦~

参考资料
支付宝一面:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!
多线程事务怎么回滚?
多线程如何实现事务回滚?一招帮你搞定!文章来源地址https://www.toymoban.com/news/detail-647871.html

到了这里,关于多线程事务怎么回滚?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Mybatis-Plus+SpringBoot框架详解

    1、SpringBoot 简介 SpringBoot 提供了一种快速使用 Spring 的方式,基于约定优于配置的思想,可以让开发人员不必在配置与逻辑业务之间进行思维的切换,全身心的投入到逻辑业务的代码编写中,从而大大提高了开发的效率,一定程度上缩短了项目周期。 2014 年 4 月,Spring Boot 1

    2023年04月08日
    浏览(46)
  • SpringBoot整合Mybatis-Plus(SpringBoot3)

    依赖pom.xml: pom.xml resource包下的Application.yml: Aollication.yml pojo包下的实体类User: User mapper包下的接口UserMapper: UserMapper 主启动类DemoPlusApplication DemoPlusApplication 测试类DemoApplicationTest: DemoApplicationTest 实现结果 检测数据库连接: C(Create): D(Delete): U(Update) R(Read)

    2024年03月20日
    浏览(52)
  • 支付宝一面:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!

    1,最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。 2,在spring中可以使用 @Transactional 注解去控制事务,

    2024年02月08日
    浏览(78)
  • SpringBoot(整合MyBatis + MyBatis-Plus + MyBatisX插件使用)

    1.需求分析 2.数据库表设计 3.数据库环境配置 1.新建maven项目 2.pom.xml 引入依赖 3.application.yml 配置数据源 数据库名 用户名 密码 驱动是mysql8的(因为上面使用了版本仲裁) 4.Application.java 编写启动类 5.测试 6.配置类切换druid数据源 7.测试数据源是否成功切换 4.Mybatis基础配置 1

    2024年03月20日
    浏览(54)
  • SpringBoot整合JUnit--MyBatis--MyBatis-Plus--Druid

    文章转自黑马程序员SpringBoot学习笔记,学习网址:黑马程序员SpringBoot2教程 1.整合JUnit ​ SpringBoot技术的定位用于简化开发,再具体点是简化Spring程序的开发。所以在整合任意技术的时候,如果你想直观感触到简化的效果,你必须先知道使用非SpringBoot技术时对应的整合是如何做

    2023年04月23日
    浏览(45)
  • springboot+mybatis-plus实现自动建表

    好长时间没输出了,最近工作上也是太多事,领导动不动就拍脑门,那叫一个酸爽~ 工作能力的提现不但是技术或解决问题的能力上,还体现在要能立刻满足领导的各种需求,不管是哪方面的需求,这样才能够拍上马屁,步步高升。 言归正传,作为技术从业者,还是要多深耕

    2024年02月16日
    浏览(37)
  • Springboot 整合Mytbatis与Mybatis-Plus

    目录 1. springboot整合mybatis    1.1 添加pom.xml依赖  1.2 新建jdbc.properties 文件添加以下内容  1.3 新建generatorConfig.xml 文件添加以下内容 (自动生成代码类)   1.4 修改application.properties 文件 添加以下内容  1.5 修改主类MapperScan  1.6 编写接口实现类进行测试  2. springboot整合mybatis-p

    2024年02月06日
    浏览(45)
  • springboot3.2 整合 mybatis-plus

    springboot3.2 正式发布了 迫不及待地的感受了一下 结果在整个mybatis-plus 的时候遇到了如下报错 主要是由于 mybatis-plus 中 mybatis 的整合包版本不够导致的 排除 mybatis-plus 中自带的 mybatis 整合包,单独引入即可 修改依赖后正常

    2024年02月04日
    浏览(49)
  • SpringBoot整合Mybatis-plus实现商品推荐

    在开始编写代码之前,我们需要准备一下环境: Java 8+ IntelliJ IDEA Node.js 和 npm Vue CLI 如果你还没有安装Vue CLI,则可以使用以下命令在终端中安装: 首先,我们需要使用Spring Boot创建一个新项目。在IntelliJ IDEA中,选择“New Project”,然后选择“Spring Initializr”。 在“New Project”

    2024年02月01日
    浏览(44)
  • springboot使用Mybatis-plus分页插件

    在  pom.xml   文件中添加 MyBatis Plus 和分页插件的依赖: 注意替换  {mybatis-plus-version}  为对应的版本号。 在 Spring Boot 的配置文件  application.yml   中添加分页插件的配置参数: 注意代码中的注释,其中   PaginationInterceptor  表示使用 MyBatis Plus 提供的分页插件。 在接口层使用

    2024年02月07日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包