【并发编程】自研数据同步工具的优化:创建线程池多线程异步去分页调用其他服务接口获取海量数据

这篇具有很好参考价值的文章主要介绍了【并发编程】自研数据同步工具的优化:创建线程池多线程异步去分页调用其他服务接口获取海量数据。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

场景:

前段时间在做一个数据同步工具,其中一个服务的任务是调用A服务的接口,将数据库中指定数据请求过来,交给kafka去判断哪些数据是需要新增,哪些数据是需要修改的。

刚开始的设计思路是,,我创建多个服务同时去请求A服务的接口,每个服务都请求到全量数据,由于这些服务都注册在xxl-job上,而且采用的是分片广播的路由策略,那么每个服务就可以只处理请求到的所有数据中id%服务总数==分片索引的部分数据,然后交给kafka,由kafka决定这条数据应该放到哪个分区上。

解决方案

最近学了线程池后,回过头来思考,认为之前的方案还有很大的优化空间。

  • 1.当数据量很大时,一次性查询所有数据会导致数据库的负载过大,而使用分页查询,每次只查询部分数据,可以减轻数据库的负担,从而提高数据库的性能和响应速度,所以请求数据方每次分页查询少量数据,这样可以整体降低请求数据的时间。
  • 第一次优化.之前是每个服务都要把全量数据请求过来,假设全量数据1000w条,一个服务请求数据需要100s,我开5个服务,那请求数据的总时长就是500s。现在把1000w条数据均分给5个服务,那1个服务就只需要请求200w条数据,耗时20s,那所有服务的请求总时长就是100s。总体耗时缩小了5倍。上面说的分页查询就可以实现:页面大小假设10w(也就是将1000w/10w,逻辑上分成了100页),每个服务自己的分片索引作为页号,每次请求完,都给索引加上分片总数(例如:当前注册了五个服务,那分片总数=5,对于分片索引为1的服务来说,请求的页号为1,6,11,16,21。。。,对于分片索引为2的服务来说,请求的页号为2,7,12,17。。。,对于分片索引为3的服务来说,请求的页号为3,8,13,18,。。。。,对于分片索引为4的服务来说,请求的页号为4,9,14,19。。。。,对于分片索引为5的服务来说,请求的页号为5,10,15,20.。。)这样1000w条数据就均分到每个服务上了。对于每个服务都是单线程去请求数据,就可以将请求操作以及(页号+总服务数)的操作写在一个while循环里,一直请求数据,直到请求的数据为空时(也就是页号超过100了),退出while。
        //单线程情况下
        while(true){

            String body = HttpUtil.get(remoteURL+"?pageSize=100000&pageNum="+shardIndex);
//        logger.info("body:{}",body);
            //2.获取返回结果的message
            JSONObject jsonObject = new JSONObject();
//        if (StrUtil.isNotBlank(body)) {
            jsonObject = JSONUtil.parseObj(body);
//            logger.info("name:{}",Thread.currentThread().getName());
//        }
//        logger.info("jsonObject:{}",jsonObject);
            //3.从body中获取data
            List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);
            if(CollectionUtil.isEmpty(tests)){
                break;
            }
            shardIndex+=shardTotal;
        }
  • 第二次优化: 了解了线程池后,还可以再优化。之前是一个服务单线程循环请求需要20s(假设),每次请求10w条,需要请求200w/10w,也就是20次,那一次请求就需要1s。如果使用线程池的话,那么耗时还会更小,因为当你将任务都交给线程池去执行时,多个线程会同时(并行)去请求各自页的数据,假如你只设置了4个线程,那这4个线程会同时发起请求获取数据,1s会完成4次请求,那分给服务的200w,5s就请求完了。那5个服务从总耗时500s,降到了总耗时5s*5=25s。
    这次优化,第一版代码(只展示了请求数据的代码,其他业务代码没有展示)
    一直向线程池里扔请求数据的任务,当某个任务请求到的数据是空的时候,意味着要请求的数据已经没了,那就结束循环,不再扔请求数据的任务。
    //线程共享变量
    static volatile boolean flag = true;
    @XxlJob(value = "fenpian")
    public void fenpian() {
        int shardIndex = XxlJobHelper.getShardIndex();
//        int shardTotal = XxlJobHelper.getShardTotal();
        //分片总数
        int shardTotal = 4;
        AtomicInteger pageNum = new AtomicInteger(shardIndex);
        //多线程情况下
//        List<CompletableFuture>completableFutureList=new ArrayList<>();
        while (flag){
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                String body = HttpUtil.get(remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal));
	             JSONObject jsonObject = new JSONObject();
	             jsonObject = JSONUtil.parseObj(body);
	             List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);
	             logger.info("tests的size:{}",tests.size());
	             if(CollectionUtil.isEmpty(tests)){
	                 flag=false;
	             }
            },executorService);


        completableFutureList.add(future);
        }
        CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);
        CompletableFuture.allOf(completableFutures).join();
       logger.info("任务结束");
        executorService.shutdown();

上面代码会有一个问题,就是while循环往线程池里扔任务,所有线程在执行时,会在请求数据那里”停留“一段时间,“停留期间”还会一直循环向线程池扔任务,当线程执行完某次请求得到空数据结束循环时,等待队列中还排着大堆任务等着去请求数据。

为了解决这个问题,我改用了for循环提交任务,提前根据请求数据总量、每次读取的条数,以及服务总数得到每个服务需要执行的任务数。
第二版代码

@XxlJob(value = "fenpian")
    public void fenpian() {
        int shardIndex = XxlJobHelper.getShardIndex()+1;
        int shardTotal = XxlJobHelper.getShardTotal();
        //分片总数
//        int shardTotal = 4;
        AtomicInteger pageNum = new AtomicInteger(shardIndex);
        //多线程情况下
        List<CompletableFuture>completableFutureList=new ArrayList<>();
        //总条数
        double total = 10000000;
        //读取的条数
        double pageSize=1000;
        double tasks = Math.ceil( total / (double) shardTotal / pageSize);
        logger.info("任务数{}",tasks);
        for(double i=0;i<tasks;i++){
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                String url = remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal);
                logger.info("url:{},threadName:{}",url,Thread.currentThread().getName());
                String body = HttpUtil.get(url);
                JSONObject jsonObject = new JSONObject();
                jsonObject = JSONUtil.parseObj(body);
                List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);
                logger.info("tests的size:{}",tests.size());
            },executorService);
        completableFutureList.add(future);
        }
        CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);
        CompletableFuture.allOf(completableFutures).join();
       logger.info("任务结束");

如有问题,请求指正(^^ゞ文章来源地址https://www.toymoban.com/news/detail-651554.html

到了这里,关于【并发编程】自研数据同步工具的优化:创建线程池多线程异步去分页调用其他服务接口获取海量数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java并发编程学习笔记(一)线程的入门与创建

    认识 程序由指令和数据组成,简单来说,进程可以视为程序的一个实例 大部分程序可以同时运行多个实例进程,例如记事本、画图、浏览器等 少部分程序只能同时运行一个实例进程,例如QQ音乐、网易云音乐等 一个进程可以分为多个线程,线程为最小调度单位,进程则是作

    2024年02月16日
    浏览(51)
  • 【并发编程】SpringBoot创建线程池的六种方式

    1. 自定义线程池 1.1 示例代码   控制台打印: 2. 固定长度线程池 2.1 示例代码   控制台打印:   前3个任务被同时执行,因为刚好有3个核心线程。后2个任务会被存放到阻塞队列,当执行前3个任务的某个线程空闲时会从队列中获取任务并执行。 2.2 源码剖析   该类型

    2024年02月16日
    浏览(42)
  • JUC并发编程-集合不安全情况以及Callable线程创建方式

    1)List 不安全 ArrayList 在并发情况下是不安全的 解决方案 : 1.Vector 2.Collections.synchonizedList() 3. CopyOnWriteArrayList 核心思想 是,如果有 多个调用者(Callers)同时要求相同的资源 (如内存或者是磁盘上的数据存储),他们 会共同获取相同的指针指向相同的资源 , 直到某个调用者

    2024年01月23日
    浏览(45)
  • 大家都说Java有三种创建线程的方式!并发编程中的惊天骗局!

    在Java中,创建线程是一项非常重要的任务。线程是一种轻量级的子进程,可以并行执行,使得程序的执行效率得到提高。Java提供了多种方式来创建线程,但许多人都认为Java有三种创建线程的方式,它们分别是 继承Thread类、实现Runnable接口和使用线程池。 但是,你们知道吗?

    2024年02月08日
    浏览(69)
  • 《C++并发编程实战》读书笔记(2):线程间共享数据

    在C++中,我们通过构造 std::mutex 的实例来创建互斥量,调用成员函数 lock() 对其加锁,调用 unlock() 解锁。但通常更推荐的做法是使用标准库提供的类模板 std::lock_guard ,它针对互斥量实现了RAII手法:在构造时给互斥量加锁,析构时解锁。两个类都在头文件 mutex 里声明。 假设

    2024年02月10日
    浏览(42)
  • c++并发编程实战-第3章 在线程间共享数据

    多线程之间共享数据,最大的问题便是数据竞争导致的异常问题。多个线程操作同一块资源,如果不做任何限制,那么一定会发生错误。例如: 输出: 显然,上面的输出结果存在问题。出现错误的原因可能是: 某一时刻, th1线程获得CPU时间片,将g_nResource从100增加至200后时

    2024年02月08日
    浏览(37)
  • 【Linux系统编程:线程】 线程控制 -- 创建、终止、等待、分离 | 线程互斥与同步 | 互斥量与条件变量 | 生产者消费者模型 | 线程池 | STL/智能指针与线程安全 | 读者写者模型

    写在前面 本文重点: 了解线程概念,理解线程与进程区别与联系。 学会线程控制,线程创建,线程终止,线程等待。 了解线程分离与线程安全。 学会线程同步。 学会使用互斥量,条件变量,posix 信号量,以及读写锁。 理解基于读写锁的读者写者问题。 一、线程概念 💦

    2024年02月04日
    浏览(65)
  • 【并发编程】线程池多线程异步去分页调用其他服务接口获取海量数据

    前段时间在做一个数据同步工具,其中一个服务的任务是调用A服务的接口,将数据库中指定数据请求过来,交给kafka去判断哪些数据是需要新增,哪些数据是需要修改的。 刚开始的设计思路是,,我创建多个服务同时去请求A服务的接口,每个服务都请求到全量数据,由于这些

    2024年02月13日
    浏览(36)
  • 基于多线程并发-线程同步-系统实现

    系统实现:相对于STL来说非标准的实现,Linux和Windows平台各自的实现。 线程同步:通过限制多个线程同时执行某段代码来保护资源的。 1、线程互斥量 pthread_mutex_t 的初始化 a、定义再初始化: pthread_mutex_init函数的第二个参数attr是定义互斥锁的属性,一般为NULL。成功初始化返

    2024年02月08日
    浏览(33)
  • Python异步编程之web框架 异步vs同步 数据库IO任务并发支持对比

    主题: 比较异步框架和同步框架在数据库IO操作的性能差异 python版本 :python 3.8 数据库 :mysql 8.0.27 (docker部署) 压测工具 :locust web框架 :同步:flask 异步:starlette 请求并发量 : 模拟10个用户 服务器配置 : Intel(R) i7-12700F 客户端配置 :Intel(R) i7-8700 3.20GHz python中操作数据库通常

    2024年02月08日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包