无限分解流----Fork/Join框架

这篇具有很好参考价值的文章主要介绍了无限分解流----Fork/Join框架。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Fork译为拆分,Join译为合并
Fork/Join框架的思路是把一个非常巨大的任务,拆分成若然的小任务,再由小任务继续拆解。直至达到一个相对合理的任务粒度。然后执行获得结果,然后将这些小任务的结果汇总,生成大任务的结果,
直至汇总成最初巨大任务的结果。如下图:

无限分解流----Fork/Join框架

红色箭头代表拆分子任务。
绿色箭头代表返回子任务结果
这个框架的思路听起来,其实用传统的线程池、多线程完全就可以解决。但是内部却有很多小的细节(后边会说到),再加上清晰的使用思路,让这个框架还是在多线程并发中,占有了一席之地。
Fork/Join框架下,我们常用到三个类:(防盗连接:本文首发自http://www.cnblogs.com/jilodream/ )
RecursiveAction,子任务类,支持子任务有返回结果任务
RecursiveTask,子任务类,用于有返回结果的任务
ForkJoinPool,执行子任务的线程池。
话不多说,我们直接看代码:

 1 public class SumDemo extends RecursiveTask<Long> {
 2 
 3     int maxLen = 800_0000;
 4 
 5     int[] arr;
 6     int start;
 7     int end;
 8 
 9 
10     public SumDemo(int[] arr, int start, int end) {
11         this.arr = arr;
12         this.start = start;
13         this.end = end;
14     }
15 
16     @Override
17     protected Long compute() {
18         if (end - start < maxLen) {
19             long a = sum();
20             try {
21                 //Thread.sleep(1);
22             } catch (Exception e) {
23             }
24             return a;
25         }
26         int middle = (start + end) / 2;
27         SumDemo left = new SumDemo(arr, start, middle);
28         SumDemo right = new SumDemo(arr, middle + 1, end);
29         left.fork();
30         right.fork();
31         //invokeAll(left,right);
32         long leftRtn = left.join();
33         long rightRtn = right.join();
34         return leftRtn + rightRtn;
35     }
36 
37     private Long sum() {
38         System.out.println("now" + Thread.currentThread().getName() + "-start:" + start + "-end:" + end);
39         long sum = 0;
40         for (int i = start; i <= end; i++) {
41             sum += arr[i];
42         }
43         return sum;
44     }
45 
46     public static void main(String[] args) throws ExecutionException, InterruptedException {
47         int size = 30000_0000;
48         int[] arr = new int[size];
49         Random random = new Random(0);
50         for (int i = 0; i < size; i++) {
51             arr[i] = random.nextInt(10_0000_0000);
52         }
53         long cal = 0;
54         long start = System.currentTimeMillis();
55         for (int i = 0; i < size; i++) {
56             if (i % 800_0000 == 0) {
57                 Thread.sleep(1);
58             }
59             cal += arr[i];
60         }
61         long finish = System.currentTimeMillis();
62         long timeCost = finish - start;
63         System.out.println("cal" + cal);
64         long start1 = System.currentTimeMillis();
65         ForkJoinPool forkJoinPool = new ForkJoinPool();
66         ForkJoinTask<Long> result = forkJoinPool.submit(new
67                 SumDemo(arr, 0, size - 1));
68         long rtn = result.get();
69         long finish1 = System.currentTimeMillis();
70         long forkJoinCost = finish1 - start1;
71         System.out.println("one thread  cost" + (timeCost));
72         System.out.println("fork join cost" + forkJoinCost);
73     }
74 }

执行的结果大概是这样的

 1 cal150000314007254036
 2 nowForkJoinPool-1-worker-1-start:0-end:4687499
 3 nowForkJoinPool-1-worker-3-start:187500000-end:192187499
 4 nowForkJoinPool-1-worker-5-start:37500000-end:42187499
 5 nowForkJoinPool-1-worker-6-start:225000000-end:229687499
 6 .....
 7 nowForkJoinPool-1-worker-3-start:220312500-end:224999999
 8 nowForkJoinPool-1-worker-7-start:267187500-end:271874999
 9 nowForkJoinPool-1-worker-2-start:107812500-end:112499999
10 nowForkJoinPool-1-worker-4-start:281250000-end:285937499
11 nowForkJoinPool-1-worker-7-start:271875000-end:276562499
12 nowForkJoinPool-1-worker-5-start:135937500-end:140624999
13 nowForkJoinPool-1-worker-11-start:140625000-end:145312499
14 nowForkJoinPool-1-worker-6-start:276562500-end:281249999
15 nowForkJoinPool-1-worker-4-start:285937500-end:290624999
16 nowForkJoinPool-1-worker-11-start:145312500-end:149999999
17 nowForkJoinPool-1-worker-7-start:290625000-end:295312499
18 nowForkJoinPool-1-worker-4-start:295312500-end:299999999
19 one thread cost136
20 fork join cost67

线程池默认大小是根据cpu当前的可用核数来作为大小的,我们这里是12核,但是12核居然只比单一线程用时少50%,这是挺奇怪的,这主要是由于我们Demo中的任务是连续的计算密集型任务,这种情况下单一线程的表现也很优秀,forkJoin反而由于要不断协调线程

任务而导致会损耗性能,所以差距并不明显。倘若放开注释中的睡眠时间,则两者的差距会拉开的非常大,如下:

1 one thread  cost675
2 fork join cost194

代码的思路大概是这样的:

我们先定义一个子任务类,子任务类设置一个阈值,子任务开始任务时会判断:
如果计算量未超过阈值呢,说明任务足够小,我们当前子任务直接就执行计算了。
如果计算量超过阈值,说明任务比较大我们需要进行拆分,此时创建好拆分子任务,并使用fork()方法即可。拆分后的子任务,则后续使用join等待结果即可。
这样通过Fork/Join框架实现大任务的计算就算是搞定了。(防盗连接:本文首发自http://www.cnblogs.com/jilodream/ )

那既然是线程池,是如何协调线程来计算子任务的呢?

(1)与传统线程池共享一个任务队列不同的是,Fork/Join框架中,每个子任务都有一个属于自己线程的任务队列(但是两者其实并不是一对一的关系,源码很复杂),如下图:

无限分解流----Fork/Join框架

这样肯定会由于任务规模、计算难度的不同,导致有些线程很快执行完了,其它线程还有很长的任务队列,那怎么办呢?
Fork/Join框架会让任务已经完成的线程,从其它任务的队列的尾端去取任务,这样一方面加速了任务的完成,一方面又减少了线程由于并发操作队列可能存在的并发问题。
这种方式,我们也将它称为“工作窃取”如下图:

无限分解流----Fork/Join框架

(2)Fork出来的子任务被谁执行了:
通过阅读源码我们可以发现,如果当前线程是线程池线程,则直接把fork出的子任务丢到当前线程的队列中,否则会通过计算随机的提交到其他的线程所拥有的的队列中。由其他线程来完成。

1     public final ForkJoinTask<V> fork() {
2         Thread t;
3         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
4             ((ForkJoinWorkerThread)t).workQueue.push(this);
5         else
6             ForkJoinPool.common.externalPush(this);
7         return this;
8     }

 文章来源地址https://www.toymoban.com/news/detail-482129.html

到了这里,关于无限分解流----Fork/Join框架的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【JUC基础】16. Fork Join

    “分而治之”一直是一个非常有效的处理大量数据的方法。著名的MapReduce也是采取了分而治之的思想。。简单地说,就是如果你要处理 1000 个数据,但是你并不具备处理 1000个数据的能力,那么你可以只处理其中的 10 个,然后分阶段处理 100 次,将 100 次的结进行合成,就是最

    2024年02月08日
    浏览(50)
  • Three.js 实现模型材质分解,拆分,拆解效果

    原理:通过修改模型材质的 x,y,z 轴坐标 positon.set( x,y,z) 来实现拆解,分解的效果。 注意:支持模型材质position 修改的材质类型为 type=“Mesh” ,其他类型的材质修改了 position 可能没有实际效果 在上一篇 Three.js加载外部glb,fbx,gltf,obj 模型文件 的文章基础上新增一个 setModelMeshD

    2024年02月11日
    浏览(83)
  • 【pandas基础】--数据拆分与合并

    数据集拆分是将一个大型的数据集拆分为多个较小的数据集,可以让数据更加清晰易懂,也方便对单个数据集进行分析和处理。 同时,分开的数据集也可以分别应用不同的数据分析方法进行处理,更加高效和专业。 数据集合并则是将多个数据集合并成一个大的数据集,可以

    2024年02月05日
    浏览(45)
  • 获取图像的属性、图像通道拆分合并实现

    图像的属性包括图像的高度、宽度、通道数、位深度等信息,这些属性可以帮助我们了解图像的基本特征。而图像的通道拆分和合并则是对图像的颜色通道进行操作,可以用于提取、修改或合成图像的不同颜色信息。 首先,让我们来介绍图像的属性。图像的高度和宽度表示了

    2024年02月09日
    浏览(45)
  • 头歌 HBase 性能优化:优化拆分和合并

    1. source /etc/profile 2. 3. start - all . sh zkServer . sh start start - hbase.sh    进入终端界面 hbase shell create \\\'stu\\\' ,{ NAME = \\\'info\\\' },{ NAME = \\\'desc\\\' }, SPLITS =   [ \\\'1000\\\' , \\\'2000\\\' , \\\'3000\\\' , \\\'4000\\\' , \\\'5000\\\'] put \\\'stu\\\' , \\\'15653216541\\\' , \\\'info:num\\\' , \\\'14561235651\\\' put \\\'stu\\\' , \\\'15653216541\\\' , \\\'info:s_name\\\' , \\\'cg\\\' put \\\'stu\\\' , \\\'15653216541\\\' ,

    2024年04月14日
    浏览(62)
  • pandas(十七)批量拆分与合并Excel文件

    一、Pandas 进行索引和切片的iloc、loc方法 iloc是基于整数位置进行索引和切片的方法 它允许您使用整数来访问 DataFrame 或 Series 中的特定行和列 loc是基于标签进行索引和切片的方法。 它允许您使用标签来访问 DataFrame 或 Series 中特定的行和列 二、Pandas批量拆分与合并Excel文件

    2024年02月12日
    浏览(42)
  • 一款IP合并和分解工具

             近期在工作中有个需求,需要将七千多个ip地址(有的带掩码,有的不带掩码)进行合并尝试,看能不能通过合并减少ip的条目数。这就涉及到ip和掩码的计算,举例如下: 192.168.1.0/25 192.168.1.128/29 192.168.1.136/30 192.168.1.140/32 192.168.1.141/32 192.168.1.142/31 192.168.1.144/28 1

    2024年02月05日
    浏览(70)
  • Python实现Excel的批量合并和拆分操作

    引言 Excel是一种常用的电子表格软件,广泛应用于数据处理、数据分析和报表生成等工作中。在处理大量的Excel文件时,人工逐个操作无疑是费时费力的。然而,通过使用Python编程语言和相关扩展库,我们可以实现对Excel文件的批量合并和拆分操作,以提高工作效率。本文将详

    2024年02月12日
    浏览(97)
  • 一款IP合并和分解工具(一)

             近期在工作中有个需求,需要将七千多个ip地址(有的带掩码,有的不带掩码)进行合并尝试,看能不能通过合并减少ip的条目数。这就涉及到ip和掩码的计算,举例如下: 192.168.1.0/25 192.168.1.128/29 192.168.1.136/30 192.168.1.140/32 192.168.1.141/32 192.168.1.142/31 192.168.1.144/28 1

    2024年02月13日
    浏览(35)
  • 3D模型拆分与合并展示,IVX真的可以简单实现

    随着IT行业的发展,低代码和无代码平台已成为未来的发展趋势,因为它们能够大大提高软件开发的效率。iVX作为其中的一员,具有非常显著的优势,如逻辑完备性、操作流畅性、面向对象设计、可独立作为编程语言等方面的特点。它简单易用、功能丰富、高效稳定,不仅可

    2024年02月02日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包