【SpringBoot】springboot数据使用多线程批量入数据库

这篇具有很好参考价值的文章主要介绍了【SpringBoot】springboot数据使用多线程批量入数据库。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

环境

springboot、mybatisPlus、mysql8

mysql8(部署在1核2G的服务器上,很卡,所以下面的数据条数用5000,太大怕不是要等到花儿都谢了 0.0)

原始的for循环入库

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {

    @Override
    @Transactional(rollbackFor = Exception.class)
    public Object doTest() {
        long start = System.currentTimeMillis();

        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 5000; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
						
            //在循环中入库
            baseMapper.insert(entity);
        }

        long end = System.currentTimeMillis();

        System.err.println(end - start);

        return end - start;
    }
}

共耗时:180121 ms

批量保存操作

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {

    @Override
    @Transactional(rollbackFor = Exception.class)
    public Object doTest() {
        long start = System.currentTimeMillis();

        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 5000; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
        }
				
      	//mybatisPlus提供的批量保存方法,数字代表每几条数据提交一次事务,默认1000
        saveBatch(entityList, 1000);

        long end = System.currentTimeMillis();

        System.err.println(end - start);

        return end - start;
    }
}

耗时时间:87217ms

在批量插入的基础上使用多线程

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {

    @Override
    @Transactional(rollbackFor = Exception.class)
    public Object doTest() throws InterruptedException {
        long start = System.currentTimeMillis();

        //手动创建线程池,注意你 数据库连接池的 允许连接数量,别超过了就行。
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                5,
                5,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10),
                //isDaemon 设置线程是否是守护线程,true的话,主线程结束,new的线程就不会继续工作
                new NamedThreadFactory("执行线程", false),
                (r, executor) -> System.out.println("拒绝" + r));


        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 5000; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
        }

        //拆分list,将其拆分成5份,然后上面线程池创建也是5个核心线程,刚好执行
        List<List<MoreTestEntity>> partition = ListUtils.partition(entityList, 1000);
        //使用CountDownLatch保证所有线程都执行完成
        CountDownLatch latch = new CountDownLatch(5);
        partition.forEach(item -> {
            poolExecutor.execute(() -> {
                saveBatch(item, 1000);
                latch.countDown();
            });
        });
        latch.await();
        // 也可以这么写,设定超时时间
        //latch.await(100,TimeUnit.SECONDS);
        long end = System.currentTimeMillis();

        System.err.println(end - start);
        //关闭线程池
        poolExecutor.shutdown();
        return end - start;
    }
}

耗时时间: 28235

可见时间从180秒,缩短到了28秒,但是@Transactional对于多线程是控制不了所有的事务的。

Spring实现事务的原理是通过ThreadLocal把数据库连接绑定到当前线程中,同一个事务中数据库操作使用同一个jdbc connection,新开启的线程获取不到当前jdbc connection。

如下代码:

partition.forEach(item -> {
            poolExecutor.execute(() -> {
                saveBatch(item, 1000);
                latch.countDown();
                //让每个都报错
                int i = 1/0;
            });
        });

控制台打印:

Exception in thread "执行线程5" java.lang.ArithmeticException: / by zero
	at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程2" java.lang.ArithmeticException: / by zero
	at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程4" java.lang.ArithmeticException: / by zero
	at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程1" java.lang.ArithmeticException: / by zero
	at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程3" 30179
java.lang.ArithmeticException: / by zero
	at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

可见5个线程都报错了,但是去查询数据库,却可以查询到5000条数据,这是不应该出现的情况。

处理多线程入库的事务问题

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {

    @Resource
    private DataSourceTransactionManager dataSourceTransactionManager;
    @Resource
    private TransactionDefinition transactionDefinition;

    @Override
    //此处手动管理事务的提交后,这个注解就可以去掉了
    //    @Transactional(rollbackFor = Exception.class)
    public Object doTest() {
        long start = System.currentTimeMillis();

        //手动创建线程池,注意你 数据库连接池的 允许连接数量,别超过了就行。
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                5,
                5,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10),
                //isDaemon 设置线程是否是守护线程,true的话,主线程结束,new的线程就不会继续工作
                new NamedThreadFactory("执行线程", false),
                (r, executor) -> System.out.println("拒绝" + r));


        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
        }

        //拆分list,将其拆分成5份,然后上面线程池创建也是5个核心线程,刚好执行
        List<List<MoreTestEntity>> partition = ListUtils.partition(entityList, 10);
        //使用CountDownLatch保证所有线程都执行完成
        CountDownLatch sonLatch = new CountDownLatch(5);
        //主线程的 肯定为1
        CountDownLatch mainLatch = new CountDownLatch(1);
        AtomicBoolean hasError = new AtomicBoolean(false);
        partition.forEach(item -> {
            poolExecutor.execute(() -> {
                doSave(item, sonLatch, hasError, mainLatch);
            });
        });

        try {
            //此处应该是用try catch 包裹着主线程的所有业务代码,以此保证主线程中任何一处报错都可以通知子线程

            //这里加一个是为了调试主线程中的数据入库操作
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) 99999);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            save(entity);

            //主线程报错
            int i = 10 / 0;

            sonLatch.await();
        } catch (InterruptedException e) {
            hasError.set(true);
            e.printStackTrace();
        }

        mainLatch.countDown();

        long end = System.currentTimeMillis();

        System.err.println(end - start);
        //关闭线程池
        if (!poolExecutor.isShutdown()) {
            poolExecutor.shutdown();
        }
        return end - start;
    }

    /**
     * 包装后的子线程的保存代码
     *
     * @param entityList 要保存的集合
     * @param sonLatch   子线程 CountDownLatch
     * @param hasError   是否发生错误
     * @param mainLatch  主线程 CountDownLatch
     */
    private void doSave(List<MoreTestEntity> entityList,
                        CountDownLatch sonLatch,
                        AtomicBoolean hasError,
                        CountDownLatch mainLatch) {
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        try {

            //            //子线程报错
            //            int i = 10 / 0;

            saveBatch(entityList);

        } catch (Throwable throwable) {
            throwable.printStackTrace();
            hasError.set(true);
        } finally {
            //这是必须的,每个子线程走完,要让主线程继续走,然后再回到子线程的每个任务,决定是提交还是回滚
            sonLatch.countDown();
        }
        try {
            //等待主线程的执行结束
            mainLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
            hasError.set(true);
        }

        //事务操作
        if (hasError.get()) {
            dataSourceTransactionManager.rollback(transactionStatus);
        } else {
            dataSourceTransactionManager.commit(transactionStatus);
        }

    }

}

分别放开子线程报错和主线程报错,会发现事务都可以正常回滚,达到了预期的效果。

主要思路就是通过子线程CountDownLatch和主线程CountDownLatch,控制线程好代码的执行顺序即可。

最后补充几点:文章来源地址https://www.toymoban.com/news/detail-431324.html

  • 上述代码中的countDown()一旦出现不执行的情况那会导致线程堵塞堆积,所以建议给await()增加超时时间
  • 这样操作可能还会出现问题,比如主线程通知子线程可以进行实务操作了,但是各个子线程之间非透明,所以还是有几率存在某个子线程事务回滚失败的情况。

到了这里,关于【SpringBoot】springboot数据使用多线程批量入数据库的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot 如何使用 EmbeddedDatabaseBuilder 进行数据库集成测试

    在开发 SpringBoot 应用程序时,我们通常需要与数据库进行交互。为了确保我们的应用程序在生产环境中可以正常工作,我们需要进行数据库集成测试,以测试我们的应用程序是否能够正确地与数据库交互。在本文中,我们将介绍如何使用 SpringBoot 中的 EmbeddedDatabaseBuilder 来进行

    2024年02月16日
    浏览(46)
  • SpringBoot使用flywaydb实现数据库版本管理【附源码】

    本文主要是配合SpringBoot使用用户输入的自定义数据源启动一文附带产出。前文主要介绍了SpringBoot无数据源启动,然后通过用户录入自定义数据库配置后,连接数据库的操作。但是当整个项目交给用户使用时,谁使用都不知道情况下,数据源都自己定义的情况下,我们项目升

    2024年02月07日
    浏览(28)
  • SpringBoot项目整合MybatisPlus并使用SQLite作为数据库

    SQLite 是一个进程内库,它实现了 独立的、无服务器的、零配置 的事务性 SQL 数据库引擎。SQLite 没有单独的服务器进程。 SQLite直接读取和写入普通磁盘文件,就是一个完整的 SQL 数据库 , 包含多个表、索引、 触发器和视图包含在单个磁盘文件中 。 数据库文件格式是跨平台

    2024年01月21日
    浏览(37)
  • SpringBoot使用Jasypt对配置文件加密、数据库密码加密

    Dmo源码请点这里! Jasypt是一个Java简易加密库,用于加密配置文件中的敏感信息,如数据库密码。jasypt库与springboot集成,在实际开发中非常方便。 1、Jasypt Spring Boot 为 spring boot 应用程序中的属性源提供加密支持,出于安全考虑,Spring boot 配置文件中的敏感信息通常需要对它进

    2024年04月28日
    浏览(30)
  • Springboot使用ProcessBuilder创建系统进程执行shell命令备份数据库

    Springboot执行shell命令备份数据库。 主要就是使用ProcessBuilder创建系统进程,执行终端命令。

    2024年02月07日
    浏览(32)
  • springboot使用达梦数据库(DM8)整合MybatisPlus

    在idea中开发spring boot项目,用到的数据库是达梦数据库,想要使用 MybatisPlus 自动生成实体类和服务,并且通过 MybatisPlus 完成一些简单的数据库CRUD ps:这里的 MybatisPlus 版本必须要是3.0以上 2.1、pom ps:其中需要将达梦数据库的依赖添加到指定目录下,不然达梦的依赖无法生效

    2024年02月16日
    浏览(42)
  • 【Spring Boot】SpringBoot和数据库交互: 使用Spring Data JPA

    在现代应用程序的开发中,数据是核心部分。为了能够持久化、检索、更新和删除数据,应用程序需要与数据库进行交互。 1.1 为什么需要数据库交互 数据持久化 :当你关闭应用程序或者服务器时,你仍希望数据能够保存。数据库提供了一个持久的存储方案,使得数据在关闭

    2024年02月12日
    浏览(35)
  • 基于SpringBoot 2+Layui实现的管理后台系统源码+数据库+安装使用说明

    一个基于SpringBoot 2 的管理后台系统,包含了用户管理,组织机构管理,角色管理,功能点管理,菜单管理,权限分配,数据权限分配,代码生成等功能 相比其他开源的后台系统,SpringBoot-Plus 具有一定的复杂度 系统基于Spring Boot2.1技术,前端采用了Layui2.4。数据库以MySQL/Oracle

    2024年02月04日
    浏览(40)
  • springboot+redis+mysql+quartz-使用pipeline+lua技术将缓存数据定时更新到数据库

    代码讲解:7.3点赞功能-定时持久化到数据库-Java程序整合pipeline+lua_哔哩哔哩_bilibili https://www.bilibili.com/video/BV1Lg4y1w7U9 代码: blogLike_schedule/like08 · xin麒/XinQiUtilsOrDemo - 码云 - 开源中国 (gitee.com) https://gitee.com/flowers-bloom-is-the-sea/XinQiUtilsOrDemo/tree/master/blogLike_schedule/like08 数据库表:

    2024年02月13日
    浏览(32)
  • springboot+redis+mysql+quartz-通过Java操作jedis使用pipeline获取缓存数据定时更新数据库

    代码讲解:6-点赞功能-定时持久化到数据库-pipeline+lua-优化pipeline_哔哩哔哩_bilibili https://www.bilibili.com/video/BV1yP411C7dr 代码: blogLike_schedule/like06 · xin麒/XinQiUtilsOrDemo - 码云 - 开源中国 (gitee.com) https://gitee.com/flowers-bloom-is-the-sea/XinQiUtilsOrDemo/tree/master/blogLike_schedule/like06 数据库表的

    2024年02月16日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包