Java使用线程池批量处理数据操作

这篇具有很好参考价值的文章主要介绍了Java使用线程池批量处理数据操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Java使用线程池批量处理数据操作

疑问&思路:

1.如何保证数据按顺序批量处理
2.如何保证数据全部处理完统一返回
3.如何保证是多任务异步操作
4.如何提高运行效率,减少运行时间

1.使用ArrayList 插入数据有序且可重复
2.CountDownLatch / Future / CompletableFuture
3.多线程
4.线程池创建多线程

具体流程:

Java使用线程池批量处理数据操作

  • 获取需要进行批量更新的大集合oldList,对大集合进行拆分操作,分成N个小集合nweList-1 ~ nweList-N 。
  • 开启线程池,针对集合的大小进行调参,对小集合进行批量更新操作。
  • 对流程进行控制,控制线程执行顺序。

创建List分割工具类:

public class ListSplitUtils {

    //这里使用泛型T 接收  做到通用工具类
    //resList总数据List  subListLength:需要切割的长度
    public static <T> List<List<T>> split(List<T> resList, int subListLength) {  
   
        if (CollectionUtils.isEmpty(resList) || subListLength <= 0) {
            return Lists.newArrayList();
        }
        List<List<T>> ret = Lists.newArrayList();
        int size = resList.size();
        if (size <= subListLength) {  //指定数据过小直接处理
            ret.add(resList);
        } else {
            int n = size / subListLength;
            int last = size % subListLength;
            // 分成n个集合,每个大小都是 subListLength 个元素
            for (int i = 0; i < n; i++) {
                List<T> itemList = Lists.newArrayList();
                for (int j = 0; j < subListLength; j++) {
                    itemList.add(resList.get(i * subListLength + j));
                }
                ret.add(itemList);
            }
            // last的进行处理
            if (last > 0) {
                List<T> itemList = Lists.newArrayList();
                for (int i = 0; i < last; i++) {
                    itemList.add(resList.get(n* subListLength + i));
                }
                ret.add(itemList);
            }
        }
        return ret;
    }

创建线程池:

// 初始化线程池
/**
* corePoolSize: 一直保持的线程的数量,即使线程空闲也不会释放。除非设置了 allowCoreThreadTimeout 为 true;
* maxPoolSize:允许最大的线程数,队列满时开启新线程直到等于该值;
* keepAliveTime:表示空闲线程的存活时间。当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。只有当线程池中的线程数大于corePoolSize时keepAliveTime才会起作用,直到线程中的线程数不大于corepoolSIze;
* TimeUnitunit:表示keepAliveTime的单位;
* workQueue:缓存任务的队列;
* handler:表示当 workQueue 已满,且池中的线程数达到 maxPoolSize 时,线程池拒绝添加新任务时采取的策略。
*/
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 50, 4, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy());
        //大集合拆分成N个小集合,保证多线程异步执行, 过大容易回到单线程
        List<List<CheckRecordDetailsDanger>> splitNList = ListSplitUtils.split(CheckRecordDetailsDangerPage, 100); //先设置100  100以内不考虑性能
        // 记录单个任务的执行次数
        CountDownLatch countDownLatch = new CountDownLatch(splitNList.size());

        for (List<CheckRecordDetailsDanger> singleList : splitNList) {
            // 线程池执行
            threadPool.execute(new Thread(() -> {
                for (CheckRecordDetailsDanger checkRecordDetailsDanger : singleList) {
                    //统一赋值方法
                    //unifySetData(checkRecordDetailsDanger);  这是我的方法,需要替换成自己的处理逻辑
                    countDownLatch.countDown();
                }
            }));
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

后话

学习过程中可以了解一下 CountDownLatch 和 Future 以及 ThreadPoolExecutor 。文章来源地址https://www.toymoban.com/news/detail-486048.html

到了这里,关于Java使用线程池批量处理数据操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • HBase的数据批量操作与事务处理

    HBase是一个分布式、可扩展、高性能的列式存储系统,基于Google的Bigtable设计。它是Hadoop生态系统的一部分,可以与HDFS、MapReduce、ZooKeeper等组件集成。HBase具有高可用性、高可扩展性和高性能等特点,适用于大规模数据存储和实时数据处理。 在大数据时代,数据的批量操作和

    2024年02月22日
    浏览(40)
  • 解决在使用 Elasticsearch(ES)多线程批量操作时导致并发一致性的问题!!

    先说一下什么是数据库数据库中 并发一致性 问题! 1、在并发环境下,事务的隔离性很难保证,因此会出现很多并发一致性问题。 数据丢失 T1 和 T2 两个事务都对一个数据进行修改,T1 先修改,T2 随后修改,T2 的修改覆盖了 T1 的修改。 读脏数据 T1 修改一个数据,T2 随后读取

    2024年02月04日
    浏览(52)
  • Python地理数据处理 22:基于arcpy批量操作(四)

    代码描述:遍历a文件夹下的所有tif影像,并使用每个a文件夹中的tif影像对b文件夹下的所有tif影像进行裁剪。裁剪后的栅格将以两个tif文件进行组合命名,并保存到另一个文件夹中。 获取栅格数据的平均值,并输出程序运行进度: 程序运行进度: 某文件夹中包含多个子文件

    2024年02月04日
    浏览(46)
  • python实现视频抽帧,文件批量操作,文件批量处理(数据集制作的工具箱)

    环境准备 数据集制作 文件批量重命名 文件批量移动 将文件批量按照一定格式进行重命名 修改xml文件内容的方法 Pathlib库的常用接口 在计算机视觉项目中,文件批量操作和文件批量预处理是必不可少的步骤。它们涉及处理大量的图像文件,包括读取、处理、保存和预处理。

    2024年02月09日
    浏览(69)
  • Python地理数据处理 二十一:基于arcpy批量操作(三)

    实现将给定的 .shp 文件中的所有省份作为裁剪范围,对给定的 .tif 文件进行裁剪,输出所有省份的单独 .tif 文件: 实现对文件名前14个字符一样的tif图像进行栅格运算求和: 如:XXXX_XXX_2003.M01_Mean、XXXX_XXX_2003.M02_Mean、XXXX_XXX_2003.M03_Mean;XXXX_XXX_2004.M01_Mean、XXXX_XXX_2004.M02_Mean、

    2024年02月01日
    浏览(50)
  • 【SpringBoot】springboot数据使用多线程批量入数据库

    springboot、mybatisPlus、mysql8 mysql8(部署在1核2G的服务器上,很卡,所以下面的数据条数用5000,太大怕不是要等到花儿都谢了 0.0) 共耗时:180121 ms 耗时时间:87217ms 耗时时间: 28235 可见时间从180秒,缩短到了28秒,但是@Transactional对于多线程是控制不了所有的事务的。 Spring实现

    2024年02月02日
    浏览(42)
  • HBase Java API 开发:批量操作 第2关:批量删除数据

    删除单行数据 删除一行数据很简单,我们来看个示例: 这段代码就可以删除行键为 row1 的行。 删除多行数据 如何删除多行数据呢? 相信你已经猜到了,既然 get() 方法有重载方法,那应该 delete() 方法也有,确实: 这样就可以删除多行数据啦。 编程要求 还等啥,亲自试一试

    2024年02月05日
    浏览(53)
  • java 线程池实现多线程处理list数据

    需要注意的问题点,多线程处理List数据可能发生线程不安全, 引入CopyOnWriteArrayList,Semaphore解决,或者加锁解决问题;所有线程执行完毕后再进行后续业务的处理,引入awaitTermination()方法。 发现上述逻辑有问题,被其他资料误导,awaitTermination并不是上述描述的作用。为了保

    2024年02月11日
    浏览(33)
  • 使用 multiprocessing 多进程处理批量数据

    示例代码 multiprocessing.Pool 创建进程池, 传入的参数是要要使用的 CPU 内核数量, 直接用 cpu_count() 可以拿到当前硬件配置所有的 CPU 内核数. pool.map 可以直接将处理后的结果拼接成一个 list 对象 应用在实际数据处理代码的效果对比: 普通处理方式, 用时 221 秒 多进程处理方式, 用时

    2024年02月09日
    浏览(40)
  • java 多线程处理大量并发数据

    Java中多线程是一种处理数据的常见方式,它可以同时执行多个线程以提高程序的性能和效率。下面是一个使用多线程处理数据的示例代码: 在上面的代码中,我们创建了一个数组 data 来存储待处理的数据。然后,我们创建了一个线程数组 threads ,用于存储要执行的线程。 通

    2024年02月09日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包