SpringBoot高效批量插入百万数据

这篇具有很好参考价值的文章主要介绍了SpringBoot高效批量插入百万数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SpringBoot高效批量插入百万数据

前言:我相信很多小伙伴和我一样在初学的时候,面对几万几十万数据插入,不知道如何下手,面对百万级别数据时更是不知所措,我们大部分初学者,很多人都喜欢for循环插入数据,或者是开启多个线程,然后分批使用for循环插入,当我们需要将大量数据存储到数据库中时,传统的逐条插入方式显然效率低下,并且容易导致性能瓶颈。而批量插入是一种更加高效的方式,可以大幅提高数据的插入速度,特别是在数据量较大的情况下。本文将介绍如何使用 Spring Boot 实现高效批量插入百万数据,以解决传统逐条插入方式存在的性能问题。我们将使用不同的插入方式来比较。

1.抛出问题

传统的单条插入存在什么问题:

  1. 性能问题:如果循环插入的数据量比较大,每次插入都需要与数据库建立连接、执行插入操作,这将导致频繁的网络通信和数据库操作,性能会受到影响。可以考虑批量插入数据来提高性能,例如使用 JDBC 的批处理功能或使用框架提供的批处理方法。
  2. 事务问题:默认情况下,Spring Boot 的事务管理是基于注解的,每次循环插入数据都会开启一个新的事务,这可能导致事务管理的开销过大。可以考虑将整个循环插入数据放在一个事务中,或者使用编程式事务管理来控制事务的粒度。
  3. 数据库连接问题:在循环过程中,如果每次都重新获取数据库连接,可能会导致连接资源的浪费和性能下降。可以考虑使用连接池技术来管理数据库连接,确保连接的复用和高效利用。
  4. 异常处理问题:在循环插入数据时,可能会出现插入失败、异常等情况。需要适当处理异常,例如记录错误日志、回滚事务等,以确保数据的完整性和一致性。

2.前期准备工作

框架:springboot+mybatis plus +mysql

  • 准备工作
    创建一个简单的springboot项目,pom依赖
 <dependencies>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-web</artifactId>
              </dependency>
              <dependency>
                  <groupId>com.baomidou</groupId>
                  <artifactId>mybatis-plus-boot-starter</artifactId>
                  <version>3.4.2</version>
              </dependency>
              <dependency>
                  <groupId>mysql</groupId>
                  <artifactId>mysql-connector-java</artifactId>
                  <version>8.0.33</version>
              </dependency>
      
              <dependency>
                  <groupId>org.projectlombok</groupId>
                  <artifactId>lombok</artifactId>
                  <optional>true</optional>
              </dependency>
              <dependency>
                  <groupId>cn.hutool</groupId>
                  <artifactId>hutool-all</artifactId>
                  <version>5.8.15</version>
              </dependency>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-test</artifactId>
                  <scope>test</scope>
              </dependency>
          </dependencies>
  • 创建测试需要使用的表
CREATE TABLE `student` (
        `id` int NOT NULL AUTO_INCREMENT,
        `name` varchar(255) DEFAULT NULL,
        `age` int DEFAULT NULL,
        `addr` varchar(255) DEFAULT NULL,
        `addr_Num` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
        PRIMARY KEY (`id`)
      ) ENGINE=InnoDB AUTO_INCREMENT=8497107 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  • 修改application.yml配置
 server:
        port: 8090
      spring:
        datasource:
          url:  jdbc:mysql://localhost:3306/boot_study?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
          username: root
          password: 123456
          driver-class-name: com.mysql.cj.jdbc.Driver
  • 创建实体类StudentDO
    注意:在实际业务中,我们应该明确去定义controller service 层的数据模型,数据传输XxxDTO、数据表实体映射XxxDO、返回给前台数据实体XxxVO,这些模型数据都需要根据实际情况在Service实现类和Controller层转换
    这里为了演示方便就不按规范定义
@TableName(value = "student")
      @Data
      public class StudentDO {
          /**  主键  type:自增 */
          @TableId(type = IdType.AUTO)
          private Integer id;
      
          /**  名字 */
          private String name;
      
          /**  年龄 */
          private Integer age;
      
          /**  地址 */
          private String addr;
      
          /**  地址号  @TableField:与表字段映射 */
          @TableField(value = "addr_num")
          private String addrNum;
      
          public StudentDO(String name, int age, String addr, String addrNum) {
              this.name = name;
              this.age = age;
              this.addr = addr;
              this.addrNum = addrNum;
          }
      }
  • controller定义
@RestController
      @RequestMapping("/student")
      public class StudentController {
          @Autowired
          private StudentMapper studentMapper;
          @Autowired
          private StudentService studentService;
          @Autowired
          private SqlSessionFactory sqlSessionFactory;
      
          @Autowired
          private ThreadPoolTaskExecutor taskExecutor;
      
          @Autowired
          private PlatformTransactionManager transactionManager;
          
      
      }
  • service和impl定义
   public interface StudentService extends IService<StudentDO> {
      }
      //实现定义
      @Service
      public class StudentServiceImpl extends ServiceImpl<StudentMapper, StudentDO> implements StudentService {
      }
  • Mapper定义
public interface StudentMapper extends BaseMapper<StudentDO> {
       @Insert("<script>" +
                  "insert into student (name, age, addr, addr_num) values " +
                  "<foreach collection='studentDOList' item='item' separator=','> " +
                  "(#{item.name}, #{item.age},#{item.addr}, #{item.addrNum}) " +
                  "</foreach> " +
                  "</script>")
          public int insertSplice(@Param("studentDOList") List<StudentDO> studentDOList);
      }

3.测试示例演示

模拟100万条数据不同方式插入

  1. for循环单条插入(不建议)
    这里100万数据大概要20分钟以上,所以以10万条数据类推
    10万条数据总耗时348.864秒
      @GetMapping("/for")
           public void insertForData () {
               long start = System.currentTimeMillis();
       
               for (int i = 0; i < 1000000 ; i++) {
                   StudentDO StudentDO = new StudentDO("张三"+i, i, "地址"+i, i+"号");
                   studentMapper.insert(StudentDO);
       
               }
               long end = System.currentTimeMillis();
               System.out.println("插入数据耗费时间:"+(end-start));
           }

结果:实际上不知道等了多久很慢很慢,总体时间差不多半个多小时,因为这里的for循环进行单条插入时,每次都是在获取连接(Connection)、释放连接和资源关闭等操作上,(如果数据量大的情况下)极其消耗资源,导致时间长。
2. xml拼接foreach sql插入(大量数据不建议)
10万条数据插入数据耗费时间:3.554秒

 @GetMapping("/sql")
           public void insertSqlData () {
               long start = System.currentTimeMillis();
               ArrayList<StudentDO> arrayList = new ArrayList<>();
               for (int i = 0; i < 100000 ; i++) {
                   StudentDO StudentDO = new StudentDO("张三"+i, i, "地址"+i, i+"号");
                   arrayList.add(StudentDO);
       
               }
               studentMapper.insertSplice(arrayList);
               long end = System.currentTimeMillis();
               System.out.println("插入数据耗费时间:"+(end-start));
           }

结果:我们在Mapper里面是要insert注解拼接,拼接结果就是将所有的数据集成在一条SQL语句的value值上,其由于提交到服务器上的insert语句少了,相就不需要每次获取连接(Connection)、释放连接和资源关闭,网络负载少了,插入的性能有了提高。但是在数据量大的情况下可能会出现内存溢出、解析SQL语句耗时等情况。
3. mybatis-plus 批量插入saveBatch(推荐)
10万条数据插入数据耗费时间:2.481秒,在数据量不大的情况下和上面差不多
50万条数据插入数据耗费时间:12.473秒

   @GetMapping("/batch")
           public void insertSaveBatchData () {
               long start = System.currentTimeMillis();
               ArrayList<StudentDO> arrayList = new ArrayList<>();
               for (int i = 0; i < 100000 ; i++) {
                   StudentDO StudentDO = new StudentDO("张三"+i, i, "地址"+i, i+"号");
                   arrayList.add(StudentDO);
       
               }
               studentService.saveBatch(arrayList);
               long end = System.currentTimeMillis();
               System.out.println("插入数据耗费时间:"+(end-start));
           }

结果:使用MyBatis-Plus实现IService接口中批处理saveBatch()方法,可以很明显的看到性能有了提升,我们可以查看一下源码,它的底层实现原理利用分片处理(batchSize = 1000) + 分批提交事务的操作,来提高插入性能,并没有在连接上消耗性能,MySQLJDBC驱动默认情况下忽略saveBatch()方法中的executeBatch()语句,将需要批量处理的一组SQL语句进行拆散,执行时一条一条给MySQL数据库,造成实际上是分片插入,即与单条插入方式相比,有提高,但是性能未能得到实质性的提高。

  1. 手动开启批处理模式+批量插入手动提交(推荐)
    10万条数据插入数据耗费时间:2.481秒,
    50万条数据插入数据耗费时间:13.436秒
    和上面相比不管是大数据量还是小数据量两者都是差不多
@GetMapping("/forSaveBatch")
           public void insertforSaveBatchData () {
               //创建批量插入SqlSession
               SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
               StudentMapper studentMapper = sqlSession.getMapper(StudentMapper.class);
               long start = System.currentTimeMillis();
               ArrayList<StudentDO> arrayList = new ArrayList<>();
               for (int i = 0; i < 500000 ; i++) {
                   StudentDO StudentDO = new StudentDO("张三"+i, i, "地址"+i, i+"号");
                   studentMapper.insert(StudentDO);
       
               }
               sqlSession.commit();
               sqlSession.close();
               long end = System.currentTimeMillis();
               System.out.println("插入数据耗费时间:"+(end-start));
           }

结果:手动开启批处理,手动处理关闭自动提交事务,共用同一个SqlSession之后,for循环单条插入的性能得到实质性的提高;由于同一个SqlSession省去对资源相关操作的耗能、减少对事务处理的时间等,从而极大程度上提高执行效率。
5. ThreadPoolTaskExecutor分割多线程插入(大数据量强烈推荐)
50万条数据插入数据耗费时间:3。536秒,插入速度直接是前面的4倍,是不是很疑惑这样就快了这么多?
原理:多线程批量插入的过程,首先定义了一个线程池(ThreadPoolTaskExecutor),用于管理线程的生命周期和执行任务。然后,我们将要插入的数据列表按照指定的批次大小分割成多个子列表,并开启多个线程来执行插入操作,通过 TransactionManager 获取事务管理器,并使用 TransactionDefinition 定义事务属性。然后,在每个线程中,我们通过 transactionManager.getTransaction() 方法获取事务状态,并在插入操作中使用该状态来管理事务。
在插入操作完成后,我们再根据操作结果调用transactionManager.commit()或 transactionManager.rollback() 方法来提交或回滚事务。在每个线程执行完毕后,都会调用 CountDownLatch 的 countDown() 方法,以便主线程等待所有线程都执行完毕后再返回。

@GetMapping("/threadPoolInsert")
           public void insertThreadPoolBatchData () {
       
       
               ArrayList<StudentDO> arrayList = new ArrayList<>();
               for (int i = 0; i < 500000 ; i++) {
                   StudentDO StudentDO = new StudentDO("张三"+i, i, "武汉"+i, i+"号");
                   arrayList.add(StudentDO);
       
               }
               int count = arrayList.size();
               int pageSize = 10000;
               int threadNum = count % pageSize == 0 ?  count / pageSize:  count / pageSize + 1;
               CountDownLatch downLatch = new CountDownLatch(threadNum);
               long start = System.currentTimeMillis();
               for (int i = 0; i < threadNum; i++) {
                   //开始序号
                   int startIndex = i * pageSize;
                   //结束序号
                   int endIndex = Math.min(count, (i+1)*pageSize);
                   //分割list
                   List<StudentDO> StudentDOs = arrayList.subList(startIndex, endIndex);
                   taskExecutor.execute(() -> {
                       DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
                       TransactionStatus status = transactionManager.getTransaction(definition);
                       try {
       
                           studentMapper.insertSplice(StudentDOs);
                           transactionManager.commit(status);
       
                       }catch (Exception e){
                           transactionManager.rollback(status);
                           e.printStackTrace();
       
                       }finally {
                           //执行完后 计数
                           downLatch.countDown();
                       }
                   });
       
               }
               try {
                   //等待
                   downLatch.await();
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
               long end = System.currentTimeMillis();
               System.out.println("插入数据耗费时间:"+(end-start));
           }
  1. ThreadPoolTaskExecutor分割异步插入
    除了上面多线程分割插入,我们也可以使用多线程异步插入其实和上面插入的原理是差不多,下面演示异步插入
    • 修改application.yml增加配置
      这个参数根据自己的电脑配置合理设置
  async:
           executor:
             thread:
               core_pool_size: 35
               max_pool_size: 35
               queue_capacity: 99999
               name:
                 prefix:  async-testDB-
  • 自定义ThreadPoolTaskExecutor配置
@EnableAsync
         @Configuration
         public class ExecutorConfig {
             @Value("${async.executor.thread.core_pool_size}")
             private int corePoolSize;
             @Value("${async.executor.thread.max_pool_size}")
             private int maxPoolSize;
             @Value("${async.executor.thread.queue_capacity}")
             private int queueCapacity;
             @Value("${async.executor.thread.name.prefix}")
             private String namePrefix;
         
             @Bean(name = "asyncServiceExecutor")
             public Executor asyncServiceExecutor() {
                 ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
                 //设置核心线程数
                 taskExecutor.setCorePoolSize(corePoolSize);
                 //设置最大线程数
                 taskExecutor.setMaxPoolSize(maxPoolSize);
                 //设置队列容量
                 taskExecutor.setQueueCapacity(queueCapacity);
                 //设置线程名前缀
                 taskExecutor.setThreadNamePrefix(namePrefix);
                 //设置拒绝策略
                 // rejection-policy:当pool已经达到max size的时候,如何处理新任务
                 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
                 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
                 return taskExecutor;
             }
         
         }
  • 定义异步service和实现类
 //接口服务
         public interface AsyncService {
         
             void executeAsync(List<StudentDO> studentDOList, CountDownLatch countDownLatch);
         }
         
         //实现类
         @Service
         public class AsyncServiceImpl extends ServiceImpl implements AsyncService {
             @Autowired
             private StudentService studentService;
             @Async("asyncServiceExecutor")
             @Override
             public void executeAsync(List<StudentDO> studentDOList, CountDownLatch countDownLatch) {
                 try{
                     //异步线程要做的事情
                     studentService.saveBatch(studentDOList);
                 }finally {
                     countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放
                 }
             }
         }
     
  • control代码
    这里需要注意修改, 因为我们在ExecutorConfig配置类里面重新设置了ThreadPoolTaskExecutor
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    改为
    @Autowired
    private Executor taskExecutor;
@GetMapping("/asyncInsertData")
             public void asyncInsertData() {
                 List<StudentDO> studentDOList = getTestData();
                 //测试每100条数据插入开一个线程
                 long start = System.currentTimeMillis();
                 List<List<StudentDO>> lists = ListUtil.split(studentDOList, 10000);
                 CountDownLatch countDownLatch = new CountDownLatch(lists.size());
                 for (List<StudentDO> listSub:lists) {
                     asyncService.executeAsync(listSub,countDownLatch);
                 }
                 try {
                     countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
                     // 这样就可以在下面拿到所有线程执行完的集合结果
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
                 long end = System.currentTimeMillis();
                 System.out.println("插入数据耗费时间:"+(end-start));
             }
         
         
             public List<StudentDO> getTestData() {
                 ArrayList<StudentDO> arrayList = new ArrayList<>();
                 for (int i = 0; i < 500000  ; i++) {
                     StudentDO studentDO = new StudentDO("张三"+i, i, "武汉"+i, i+"号");
                     arrayList.add(studentDO);
         
                 }
                 return arrayList;
             }
     
 50万条数据插入数据耗费时间:2.604秒,这里插入和上面差不多因为他们使用的都是多线程插入

总结:经过上面的示例演示我们心里已经有谱了,知道什么时候该使用哪一种数据插入方式,针对对不同线程数的测试,发现不是线程数越多越好,具体多少合适,通常的算法:CPU核心数量*2 +2 个线程。

实际要根据每个人的电脑配置情况设置合适的线程数,可以根据下面这个公式获取:文章来源地址https://www.toymoban.com/news/detail-854979.html

 int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用处理器的Java虚拟机的数量
    int corePoolSize = (int) (processNum / (1 - 0.2));
    int maxPoolSize = (int) (processNum / (1 - 0.5));

到了这里,关于SpringBoot高效批量插入百万数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring Boot接口实现百万并发

    随着互联网的发展,越来越多的应用需要支持高并发。在这种情况下,如何实现高并发成为了一个重要的问题。Spring Boot是一个非常流行的Java框架,它提供了很多方便的功能来支持高并发。本文将介绍如何使用Spring Boot来实现百万并发。 Spring Boot是一个基于Spring框架的快速开

    2024年02月13日
    浏览(47)
  • Spring Boot整合Redis的高效数据缓存实践

    引言 在现代Web应用开发中,数据缓存是提高系统性能和响应速度的关键。Redis作为一种高性能的缓存和数据存储解决方案,被广泛应用于各种场景。本文将研究如何使用Spring Boot整合Redis,通过这个强大的缓存工具提高应用的性能和可伸缩性。 整合redis,需要先安装redis Redis 

    2024年01月22日
    浏览(63)
  • Spring Boot进阶(19):探索ElasticSearch:如何利用Spring Boot轻松实现高效数据搜索与分析

            ElasticSearch是一款基于Lucene的开源搜索引擎,具有高效、可扩展、分布式的特点,可用于全文搜索、日志分析、数据挖掘等场景。Spring Boot作为目前最流行的微服务框架之一,也提供了对ElasticSearch的支持。本篇文章将介绍如何在Spring Boot项目中整合ElasticSearch,并展

    2024年02月11日
    浏览(48)
  • Spring Boot中Excel数据导入导出的高效实现

    🌟 前言 欢迎来到我的技术小宇宙!🌌 这里不仅是我记录技术点滴的后花园,也是我分享学习心得和项目经验的乐园。📚 无论你是技术小白还是资深大牛,这里总有一些内容能触动你的好奇心。🔍 🤖 洛可可白 :个人主页 🔥 个人专栏 :✅前端技术 ✅后端技术 🏠 个人

    2024年03月15日
    浏览(43)
  • Spring Boot进阶(70):如何在Spring Boot中使用FastJson实现高效的JSON数据处理?

      随着互联网的发展,JSON(JavaScript Object Notation)已成为近年来使用最广泛的数据交换格式之一。为了提高JSON数据的处理效率,目前市面上常用的JSON解析库有Jackson、Gson、FastJson等。本文将介绍如何在Spring Boot中使用FastJson实现高效的JSON数据处理。   那么,具体如何实现

    2024年02月09日
    浏览(46)
  • Spring Boot进阶(58):轻松搞定数据存储!Spring Boot与PostgreSQL完美集成,让你的应用更稳定更高效!

            PostgreSQL是一种广泛使用的开源关系型数据库,具有可靠性高、性能优异、拥有丰富的数据类型和扩展等优点,越来越多的企业和开发者开始使用它来存储和管理数据。而Spring Boot是一种快速开发的框架,可以简化开发过程并提高开发效率。本文将介绍如何使用Sp

    2024年02月10日
    浏览(54)
  • Java8 实现批量插入和更新,SpringBoot实现批量插入和更新,Mybatis实现批量插入和更新

    基于mybatis实现的批量插入和更新 由于直接执行批量所有数据可能会出现长度超出报错问题,使用如下方式即可解决 原理还是分配执行,这里的100就是设定每次执行最大数 这里使用插入作为例子,也可以使用批量更新 更新的写法

    2024年02月12日
    浏览(52)
  • Python 大批量写入数据 百万级别

    方案二 方案一

    2024年02月11日
    浏览(77)
  • 【SpringBoot篇】使用Spring Cache高效处理缓存数据

    Spring Cache是一个框架,只要简单加一个注解,就能实现缓存功能。Spring Cache是Spring Framework提供的一个模块,它为应用程序添加了缓存支持。通过使用Spring Cache,你可以在方法级别上定义缓存规则,将方法的返回结果缓存起来,以提高方法调用的性能和响应速度。 是一个框架,只要简

    2024年02月05日
    浏览(56)
  • java使用jdbcTemplate查询并插入百万级数据解决方案

    背景:使用JdbcTemplate查询500万数据,然后插入到数据库。 这么多的数据按照普通的方式直接查询然后插入,服务器肯定会挂掉,我尝试过使用分页查询的方式去进行分批查询插入,虽然也能达到保证服务器不挂掉的效果,但是有一个严重的问题,每次查询的数据很难保证顺序

    2024年02月03日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包