SpringBoot异步任务及并行事务实现

这篇具有很好参考价值的文章主要介绍了SpringBoot异步任务及并行事务实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

        上一篇介绍了原生Java如何实现串行/并行任务,主要使用了线程池 + Future + CountDownLatch,让主线程等待子线程返回后再向下进行。而在SpringBoot中,利用@Async和AOP对异步任务提供了更加便捷的支持,下面就针对SpringBoot使用异步任务需要注意的细节做一些分析。

1 SpringBoot异步任务基础实现

        使用起来很简单,在启动类或配置类上加上@EnableAsync启动异步任务,并在需要异步调用的方法上加@Async,在注册Bean时就会生成该类的Proxy子类,也就是动态代理类,AOP会在代理类中重写并增强该异步方法。

1.1 配置异步任务线程池

        SpringBoot自然也选择了线程复用,想要实现就需要使用线程池,可以先来看看默认线程池的配置。

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(8);
//配置最大线程数
executor.setMaxPoolSize(Integer.MAX_VALUE);
//配置空闲线程保留时间
executor.setKeepAliveSeconds(60);
//配置队列大小
executor.setQueueCapacity(Integer.MAX_VALUE);
//设置饱和策略:当pool已经达到max size的时候,如何处理新任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

        配置过线程池参数的小伙伴,一眼就能看到几个很不合理的点:

  • 最大线程数为Integer.MAX_VALUE,创建线程过多会导致“oom:unable to create new native thread”。
  • 最长队列数为Integer.MAX_VALUE,队列堆积任务过多也会导致oom。
  • 饱和策略为AbortPolicy,队列满了直接抛异常,如果不catch程序直接爆炸。

        综上,我们应该给SpringBoot指定一个线程池,并让异步任务执行时使用他,配置就不赘述直接放在下面。

//自定义Spring默认线程池
//ThreadPoolTaskExecutor vs ThreadPoolExecutor :
//ThreadPoolTaskExecutor是对ThreadPoolExecutor的进一步封装
//ThreadPoolTaskExecutor来源于Spring,ThreadPoolExecutor属于JUC
//ThreadPoolTaskExecutor需要声明initialize,ThreadPoolExecutor不需要
@Bean("common")
public Executor commonExecutorBuild() {
    log.info("Common Executor Building Start!");

    //ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME,
    //                                                     UNIT, LINKED_BLOCKING_QUEUE, new ThreadPoolExecutor.CallerRunsPolicy());

    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    //配置核心线程数
    executor.setCorePoolSize(2);
    //配置最大线程数
    executor.setMaxPoolSize(5);
    //配置队列大小
    executor.setQueueCapacity(10240);
    //配置空闲线程保留时间
    executor.setKeepAliveSeconds(60);
    //配置线程池中的线程的名称前缀
    executor.setThreadNamePrefix("AsyncCommonThread-");
    //设置饱和策略:当pool已经达到max size的时候,如何处理新任务
    //CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    //执行初始化
    executor.initialize();

    return executor;
}

1.2 异步方法逻辑编写

        应用场景还是如上篇所说 Java串行/并行任务实现,这次就使程序逻辑更加完善且贴近现实。与Java原生实现大体类似,只是将子任务编写在单独的类与方法中,并标注@Async让其异步调用。

SpringBoot异步任务及并行事务实现

         入参为带有所有用户信息的VO实体类,在Service中将所有属性赋值到对应实体类,然后在主线程中办理银行卡,银行卡办理成功后调用两个子线程分别办理会员/申请信用卡,全部完成后根据SQL语句执行结果判断是否注册成功。先来编写一下子线程的逻辑。

    @Override
    @Async("common")
    public CompletableFuture<Integer> registerUser(BankUserInfo bankUserInfo) {
        Integer insert = bankUserMapper.registerUser(bankUserInfo);
        try {
            log.info(Thread.currentThread().getName() + "running!");
            //模拟阻塞3秒
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return CompletableFuture.completedFuture(insert);

        //return new AsyncResult<Integer>(insert);
    }

        只需要一步数据库操作,将用户信息插入到库中,为了模拟阻塞操作延时了3秒,信用卡申请的逻辑与其相同。有几点值得注意的地方:

  • 在@Async中指定了使用“common”线程池,也就是我们自行定义的线程池。
  • 异步方法的返回值只能为void或Future的子类,一般会指定返回值为“new AsyncResult<T>(result)”,AsyncResult实现了ListenableFuture,ListenableFuture是Future的子类。
  • 这里将异步方法的返回值设置为了CompletableFuture,可以将其理解为增强版的Future。在上篇文章中,我们采用了两种方式来等待两个子线程完成,其中用Future时使用了自旋锁循环判断线程是否有返回值。而CompletableFuture提供了操作Future执行的各种情况的API,就比如CompletableFuture.allOf(Thread1, Thread2, ...),该方法可以在传入的子线程执行完前,阻塞当前线程,下面我们就就会用到。还有其他很强大的API,以后用到了再介绍。

        下面是主方法逻辑,分别实现3个逻辑,CompletableFuture.allOf(future, future1)在子线程执行完毕前阻塞主线程。最后其实应该加上根据子线程执行结果,给用户展示是否办理成功的,偷了个懒没写,懂意思就行。

    @Override
    @Transactional(rollbackFor = Exception.class)
    public Integer applyBankCard(BankRegisterVo bankRegisterVo) throws InterruptedException {
        String UUID = IdWorker.get32UUID().substring(16);
        bankRegisterVo.setBankCardNum(UUID);

        BankCardInfo bankCardInfo = new BankCardInfo();
        BankUserInfo bankUserInfo = new BankUserInfo();
        CreditCardInfo creditCardInfo = new CreditCardInfo();
        BeanUtils.copyProperties(bankRegisterVo, bankCardInfo);
        BeanUtils.copyProperties(bankRegisterVo, bankUserInfo);
        BeanUtils.copyProperties(bankRegisterVo, creditCardInfo);
        creditCardInfo.setMoneyLimit(10000.00);

        //主线程执行,办理银行卡
        Thread.sleep(5000);
        int insert = bankCardMapper.applyBankCard(bankCardInfo);

        //会导致死锁!!!
        //Thread.currentThread().join();

        //子线程执行,申请会员 + 信用卡办理
        CompletableFuture<Integer> future = asyncBankService.registerUser(bankUserInfo);
        CompletableFuture<Integer> future1 = asyncBankService.applyCreditCard(creditCardInfo);
        CompletableFuture.allOf(future, future1);
        //Integer insert1 = future.join();
        //Integer insert2 = future1.join();
        //HashMap<String, Object> objectObjectHashMap = Maps.newHashMapWithExpectedSize(6);

        return insert;
    }

1.3 @Async与@Transactional失效

       这两个问题的出现其实是由于一个原因,Spring中的注解基本都是靠AOP来增强,实现原理就是在调用@Async的方法时,实际是在调用该方法的代理类,代理类中将该方法的执行逻辑提交给了线程池。出错的情景一般都如下面这段伪代码。

{

    Method1() {
        AsyncMethod1();
        AsyncMethod2();
    }    

    @Async
    @Transactional
    AsyncMethod1() {
        //Todo...
    }  

    @Async
    @Transactional
    AsyncMethod2() {
        //Todo...
    }  

}

        在同一个类中调用异步方法,等于调用this本类的方法,没有走Spring生成的代理类,也就不会让他异步执行,@Transactional的原理也类似。

2 异步事务管理

         尝试思考这样一个问题,现在有1个主线程事务 + 2个子线程事务,我们现在要保证他们仨的强原子性——即3个事务有任何一个报错,都会回滚所有事务。最简单的想法可能就是给1主2子都加上@Transactional注解,但这样实际是行不通的,子线程的异常只会回滚他自身事务。

        举个例子,子线程办理会员报错回滚,并不会影响没有报错的主线程银行卡办理和另一个子线程申请信用卡。事实也确实如此,在实际测试中,用户信息表插入失败回滚,银行卡信息表与信用卡信息表仍然会正常插入记录。

        如果有这样的强原子性场景存在,我们可以将代码逻辑改为串行,放在一个方法体中,这样具有天然原子性了。但这明显与预期不符,有些舍本逐末了。

        终极诉求就是灵活管理多个线程的事务,这时就要用到编程式事务。

2.1 编程式事务的基本使用

        需要注入两个Bean:

  1. TransactionDefinition,其中规定了一些事务的相关属性,例如事务的传播行为和隔离等级等。
  2. DataSourceTransactionManager,JDBC对应的事务管理器。

        将TransactionDefinition传入DataSourceTransactionManager中,就可以手动进行事务管理了,主要用到commit()和rollback()来对应提交和回滚。与声明式事务不同,在catch到异常后我们要手动进行回滚,如果全部正常执行,也需要自行提交事务。

2.2 多线程手动事务管理

        回到我们的需求,实际上是一个多线程手动事务管理的问题,经过分析后我们得到一个程序运行流程图,主要的难点在于如何让3个线程彼此等待,并根据一个统一的标志位判断是否回滚。

SpringBoot异步任务及并行事务实现

        既然是多线程,那就要在JUC里好好挖掘一下。

        首先是标志位,可以用AtomicInteger原子类,保证多线程下的数据一致。我们在主线程中初始化一个值为0的AtomicInteger并传给子线程,任何线程捕获到异常时就给AtomicInteger自增,全部线程执行完成后统一判断标志位是否大于0,如果大于0则全部回滚。

        之后是线程同步判断结果,由于主线程和子线程数量是已知的,可以用计数器CountDownLatch来实现,主线程计数器设为1,子线程计数器设为2。主计数器用于控制整个程序的运行,在所有线程执行完毕前,将程序阻塞在统一判断执行结果的前一步;子计数器用于告知主线程,各子线程是否执行完毕,未执行完毕就阻塞主线程。

        打个比方,现在有一个老大和两个小弟,老大坐在办公室等着小弟汇报工作结果,等两个小弟都告诉他“我干完了哈”之后,老大根据大家的工作成果判断“OK了,大伙可以去吃饭了”或者是“干得是啥啊,全部重做”;这时小弟再根据老大的回应,决定吃饭还是重做。

        首先将主计数器和子计数器都传入子线程中,主线程调用子计数器的await(),在子线程SQL执行结束、并调用countDown()以前会一直阻塞主线程。在子线程中调用主计数器的await(),在所有子线程SQL执行完毕后,主线程向下执行,并对主计数器调用countDown()。这样就实现了所有子线程SQL执行完以前,子线程会阻塞(因为主线程还在阻塞,主计数器未清零);所有线程子线程SQL执行完毕后,主线程、子线程都向下执行,统一判断事务执行标志位。下面用伪代码实现一下。

ThreadMain() {
    //主计数器和子计数器
    latchMain = countDownLatch(1);
    latchSlave = countDownLatch(2);
    
    //Todo:主线程SQL执行...
    //启动子线程
    ThreadSlave1(latchMain, latchSlave);
    ThreadSlave2(latchMain, latchSlave);

    //等待所有子线程SQL执行完毕
    latchSlave.await();
    //所有子线程SQL执行完毕后,主线程执行计数器-1,此时计数器清零,所有线程同步向下进行
    latchMain.countDown();
    latchMain.await();

    //Todo:AtomicInteger判断逻辑,决定所有事务提交/回滚
}

@Async
ThreadSlave1(latchMain, latchSlave) {
    //Todo:子线程SQL执行...
    //子线程SQL逻辑执行完后,子计数器-1
    latchSlave.countDown();
    //等待其他线程执行结果
    latchMain.await();
    
    //Todo:AtomicInteger判断逻辑,决定所有事务提交/回滚
}

@Asycn
ThreadSlave2(latchMain, latchSlave) {
    //Todo:子线程SQL执行...
    //子线程SQL逻辑执行完后,子计数器-1
    latchSlave.countDown();
    //等待其他线程执行结果
    latchMain.await();
    
    //Todo:AtomicInteger判断逻辑,决定所有事务提交/回滚
}

        如上面伪代码所示,子计数器清零后,主计数器也会清零,此时所有线程会同步进行事务的判断环节。当然还需要完善一下,当任意线程catch到SQL执行异常后,也需要处理对应的计数器,否则会导致线程永久阻塞。

        下面是主Service代码,有完整的手动事务管理、标志位使用、统一判断逻辑。

@Override
//手动管理事务
//@Transactional(rollbackFor = Exception.class)
public Integer applyBankCard(BankRegisterVo bankRegisterVo) {
    TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);
    AtomicInteger atomicInteger = new AtomicInteger(0);
    CountDownLatch latchMain = new CountDownLatch(1);
    CountDownLatch latch = new CountDownLatch(2);
    Integer result = null;

    try {
        String UUID = IdWorker.get32UUID().substring(16);
        bankRegisterVo.setBankCardNum(UUID);
        BankCardInfo bankCardInfo = new BankCardInfo();
        BankUserInfo bankUserInfo = new BankUserInfo();
        CreditCardInfo creditCardInfo = new CreditCardInfo();
        BeanUtils.copyProperties(bankRegisterVo, bankCardInfo);
        BeanUtils.copyProperties(bankRegisterVo, bankUserInfo);
        BeanUtils.copyProperties(bankRegisterVo, creditCardInfo);
        creditCardInfo.setMoneyLimit(10000.00);

        //主线程执行,办理银行卡
        int insert = bankCardMapper.applyBankCard(bankCardInfo);
        //子线程执行,申请会员 + 信用卡办理
        CompletableFuture<Integer> future = asyncBankService.registerUser(bankUserInfo, latchMain, latch, atomicInteger);
        CompletableFuture<Integer> future1 = asyncBankService.applyCreditCard(creditCardInfo, latchMain, latch, atomicInteger);

        latch.await();
        latchMain.countDown();
        latchMain.await();
        if (atomicInteger.get() > 0) {
            log.info("子线程事务报错,开始回滚");
            transactionManager.rollback(transaction);
            result = AppHttpCodeEnum.BANK_REGISTER_FAILED.getCode();
        } else {
            //手动提交
            transactionManager.commit(transaction);
            result = AppHttpCodeEnum.BANK_REGISTER_SUCCESS.getCode();
        }

    } catch (Exception e) {
        log.info("主线程事务报错,开始回滚");
        //手动回滚
        transactionManager.rollback(transaction);
        atomicInteger.getAndIncrement();
        latchMain.countDown();
        result = AppHttpCodeEnum.BANK_REGISTER_FAILED.getCode();
    }

    return result;
}

         异步方法逻辑如下。

@Override
@Async("common")
public CompletableFuture<Integer> registerUser(BankUserInfo bankUserInfo, CountDownLatch latchMain, CountDownLatch latch, AtomicInteger atomicInteger) {
    TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);
    Integer insert = null;
    try {
        log.info(Thread.currentThread().getName() + "running!");
        insert = bankUserMapper.registerUser(bankUserInfo);
        //模拟阻塞3秒
        Thread.sleep(3000);

        latch.countDown();
        latchMain.await();
        if (atomicInteger.get() > 0) {
            transactionManager.rollback(transaction);
            log.info("子线程事务报错,开始回滚");
        } else {
            //手动提交
            transactionManager.commit(transaction);
        }

    } catch (Exception e) {
        log.info("子线程事务报错,开始回滚");
        atomicInteger.getAndIncrement();
        //手动回滚
        transactionManager.rollback(transaction);
        latch.countDown();
    }

    return CompletableFuture.completedFuture(insert);

    //return new AsyncResult<Integer>(insert);
}

        主线程SQL如果报错,子线程方法也就不会开启,直接回滚事务并返回结果。子线程报错,标志位 + 1并回滚事务,其他线程发现标志位不为0,也会主动回滚事务。

2.3 程序执行

        下面来分别测试一下主线程、子线程异常的执行情况。

2.3.1 正常执行

        给接口传入正常的入参,日志正常打印了异步方法中调用的线程名。接口耗时3.29s,异步方法中手动阻塞了3秒,全部正常。

2023-02-09 15:43:59.469  INFO 16660 --- [cCommonThread-1] c.b.service.impl.AsyncBankServiceImpl    : AsyncCommonThread-1running!
2023-02-09 15:43:59.481  INFO 16660 --- [cCommonThread-2] c.b.service.impl.AsyncBankServiceImpl    : AsyncCommonThread-2running!

2.3.2 主线程异常

        先将实体类的Validator注释掉,入参的password不传,数据库的password字段约束不为null,这样就会执行失败。下面看一下日志。

==>  Preparing: INSERT INTO bank_card_info (bank_card_num, password, bank_name, create_by, create_time) VALUES (?, ?, ?, ?, ?)
==> Parameters: 950499ea8b2dc968(String), null, XianBank(String), -1(Long), 2023-02-09 15:51:12.478(Timestamp)
Releasing transactional SqlSession 

[org.apache.ibatis.session.defaults.DefaultSqlSession@222673da]
2023-02-09 15:51:12.577  INFO 428 --- [nio-6666-exec-6] c.b.s.impl.BankRegisterServiceImpl       : 主线程事务报错,开始回滚

        与预期相符,异步方法还没有调用,主线程直接异常回滚了事务。 

2.3.3 子线程异常

         数据库的username字段约束为不能重复,因此我们传入一个重复的username,让异步线程异常,再来看看结果。

2023-02-09 15:53:01.477  INFO 428 --- [cCommonThread-1] c.b.service.impl.AsyncBankServiceImpl    : AsyncCommonThread-1running!
2023-02-09 15:53:01.482  INFO 428 --- [cCommonThread-1] c.b.service.impl.AsyncBankServiceImpl    : 子线程事务报错,开始回滚
2023-02-09 15:53:01.484  INFO 428 --- [cCommonThread-2] c.b.service.impl.AsyncBankServiceImpl    : AsyncCommonThread-2running!
2023-02-09 15:53:04.490  INFO 428 --- [nio-6666-exec-8] c.b.s.impl.BankRegisterServiceImpl       : 子线程事务报错,开始回滚
2023-02-09 15:53:04.493  INFO 428 --- [cCommonThread-2] c.b.service.impl.AsyncBankServiceImpl    : 子线程事务报错,开始回滚

        通过线程名可以看出,子线程1异常后直接回滚,子线程2和主线程得知有线程异常后,也开始回滚。

        SpringBoot的异步基础实现,以及多线程事务控制到这里就介绍完了,下一篇再见哈。文章来源地址https://www.toymoban.com/news/detail-432338.html

到了这里,关于SpringBoot异步任务及并行事务实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot整合、SpringBoot与异步任务

    java 的代码是同步顺序执行,当我们需要执行异步操作时我们通常会去创建一个新线程去执行。比如new Thread()。start(),或者使用线程池线程池 new ThreadPoolExecutor().execute 。 在 Springboot 中对其进行了简化处理,使用@EnableAsync + @Async 可以快速开启一个异步线程执行任务 启动类上使

    2024年02月13日
    浏览(29)
  • SpringBoot(十三)异步任务

    目录 异步任务 1.1 什么叫异步 1、Java线程处理 2、SpringBoot异步任务 2.1 使用注解@EnableAsync开启异步任务支持 2.2、使用@Async注解标记要进行异步执行的方法 2.3、controller测试 3、异步任务相关限制 4、自定义 Executor(自定义线程池) 4.1、应用层级: 4.2、方法层级: 有时候,前端

    2024年02月04日
    浏览(27)
  • SpringBoot(19)异步任务

    有时候,前端可能提交了一个耗时任务,如果后端接收到请求后,直接执行该耗时任务,那么前端需要等待很久一段时间才能接受到响应。如果该耗时任务是通过浏览器直接进行请求,那么浏览器页面会一直处于转圈等待状态。 事实上,当后端要处理一个耗时任务时,通常都

    2024年02月16日
    浏览(27)
  • SpringBoot 异步、邮件任务

    创建一个Hello项目 创建一个类AsyncService 异步处理还是非常常用的,比如我们在网站上发送邮件,后台会去发送邮件,此时前台会造成响应不动,直到邮件发送完毕,响应才会成功,所以我们一般会采用多线程的方式去处理这些任务。 编写方法,假装正在处理数据,使用线程

    2024年02月13日
    浏览(39)
  • SpringBoot异步任务获取HttpServletRequest

    在使用框架日常开发中需要在controller中进行一些异步操作减少请求时间,但是发现在使用@Anysc注解后会出现Request对象无法获取的情况,本文就此情况给出完整的解决方案 @Anysc注解会开启一个新的线程,主线程的Request和子线程是不共享的,所以获取为null 在使用springboot的自定

    2024年02月21日
    浏览(39)
  • SpringBoot原理分析 | 任务:异步、邮件、定时

    💗wei_shuo的个人主页 💫wei_shuo的学习社区 🌐Hello World ! 异步任务 Java异步指的是在程序执行过程中,某些任务可以在后台进行,而不会阻塞程序的执行。通常情况下,Java异步使用线程池来实现,将任务放入线程池中,等待线程池中的线程执行这些任务。Java异步可以提高程

    2024年02月16日
    浏览(34)
  • SpringBoot多线程异步任务:ThreadPoolTaskExecutor + CompletableFuture

    在 SpringBoot 项目中,一个任务比较复杂,执行时间比较长,需要采用 多线程异步 的方式执行,从而缩短任务执行时间。 将任务拆分成多个独立的子任务,每个子任务在独立子线程中执行; 当所有子任务的子线程全部执行完成后,将几个子任务汇总,得到总任务的执行结果。

    2024年02月10日
    浏览(43)
  • 原来你是这样的SpringBoot--Async异步任务

    本节我们一起学习一下SpringBoot中的异步调用,主要用于优化耗时较长的操作,提高系统性能和吞吐量。 首先给启动类增加注解@EnableAsync,支持异步调用 然后定义要执行的Task,分类增加一个同步方法和异步方法,其中异步方法需要增加注解@Async 其实接下来就可以在controller中

    2024年02月11日
    浏览(23)
  • 利用谷歌云Pub/Sub 实现多任务并行分发处理方案

    目前老梁团队负责的 Global Data Integration Platform 每天有大量文件需要从来自不同地区的上游下载文件并进行处理后再发送到不同下游。老梁的数据集成平台集群有 6 个服务器节点,老梁希望所有机器的资源都能利用上,提升大量文件并行处理能力,并且不同机器节点的任务必须

    2024年02月06日
    浏览(27)
  • 任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK

    比如: 1.每天早上6点定时执行 2.每月最后一个工作日,考勤统计 3.每个月25号信用卡还款 4.会员生日祝福 5.每隔3秒,自动提醒 10分钟的超时订单的自动取消,每隔30秒或1分钟查询一次订单,拿当前的时间上前推10分钟 定时任务,资源会有误差的存在,如果使用定时任务 定时

    2024年02月08日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包