并发工具类Phaser

这篇具有很好参考价值的文章主要介绍了并发工具类Phaser。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

在面试这一篇我们介绍过CountDownLatch和CyclicBarrier,它们都是jdk1.5提供的多线程并发控制类,内部都是用AQS这个同步框架实现。
在我们的实际项目中,有很多场景是需要从数据库查询一批数据,多线池执行某些操作,并且要统计结果,我们对这个过程做了一些封装,由于要统计结果,所以需要等所有任务都处理完成,我们用到了CountDownLatch实现同步。伪代码如下:

        ExecuteInstance ei = ExecuteInstance.build(myExecutor); //线程池
		
        //循环
        LoopShutdown.build("myTask").loop(() -> {

            //不断从数据获取数据
            List<Task> list = getFromDb();
            
            //设置countdownlatch
  	    ei.setCountDownSize(list.size());

	    list.forEach(item -> ei.execute(() -> {
		//提交到线程池执行,并且统计
	    }));
            
            //等待这一批做完
	    ei.await();
		
	});

        //内部使用了CountDownLatch await()
	return ei.awaitResult();

代码很简单,容易理解。不过后来有同学提到每次都要setCountDownSize() + await() 这套组合太麻烦,能不能省略这两步呢。另外也不够灵活,有些场景不能提前知道要处理的数据总数,例如从迭代器遍历数据,Iterator接口并没有size方法可以获取到总数。

那怎么实现这个功能呢?就是本篇要介绍的Phaser。

Phaser原理

Phaser类是jdk7提供的,可重用的,同步的,在功能上和CountDownLatch,CyclicBarrier类似,但更加灵活的类。
"phaser" google翻译一下是:"移相器"的意思,完全不知道是什么~,不过"phase"是阶段的意思,还是能从名字了解到一些信息。

Phaser运行机制:
并发工具类Phaser

  • Registration(注册)
    跟其他barrier不同,在phaser上注册的parties会随着时间的变化而变化。任务可以随时注册(使用方法register,bulkRegister注册,或者由构造器确定初始parties),并且在任何抵达点可以随意地撤销注册(方法arriveAndDeregister)。就像大多数基本的同步结构一样,注册和撤销只影响内部计数;不会创建更深的内部记录,所以任务不能查询他们是否已经注册。(不过,可以通过继承来实现类似的记录)
    可以动态的注册是它的特点之一,我们知道CountDownLatch之类的在开始就需要指定一个计数,并且不能更改,而Phaser可以开始指定,也可以运行时更改。

  • Synchronization(同步机制)
    和CyclicBarrier一样,Phaser也可以重复await。方法arriveAndAwaitAdvance的效果类似CyclicBarrier.await。phaser的每一代都有一个相关的phase number,初始值为0,当所有注册的任务都到达phaser时phase+1,到达最大值(Integer.MAX_VALUE)之后清零。使用phase number可以独立控制到达phaser和等待其他线程的动作,通过下面两种类型的方法:

    Arrival(到达机制) arrive和arriveAndDeregister方法记录到达状态。这些方法不会阻塞,但是会返回一个相关的arrival phase number;也就是说,phase number用来确定到达状态。当所有任务都到达给定phase时,可以执行一个可选的函数,这个函数通过重写onAdvance方法实现,通常可以用来控制终止状态。重写此方法类似于为CyclicBarrier提供一个barrierAction,但比它更灵活。

    Waiting(等待机制) awaitAdvance方法需要一个表示arrival phase number的参数,并且在phaser前进到与给定phase不同的phase时返回。和CyclicBarrier不同,即使等待线程已经被中断,awaitAdvance方法也会一直等待。中断状态和超时时间同样可用,但是当任务等待中断或超时后未改变phaser的状态时会遭遇异常。如果有必要,在方法forceTermination之后可以执行这些异常的相关的handler进行恢复操作,Phaser也可能被ForkJoinPool中的任务使用,这样在其他任务阻塞等待一个phase时可以保证足够的并行度来执行任务。

  • Termination(终止机制)
    可以用isTerminated方法检查phaser的终止状态。在终止时,所有同步方法立刻返回一个负值。在终止时尝试注册也没有效果。当调用onAdvance返回true时Termination被触发。当deregistration操作使已注册的parties变为0时,onAdvance的默认实现就会返回true。也可以重写onAdvance方法来定义终止动作。forceTermination方法也可以释放等待线程并且允许它们终止。

  • Tiering(分层结构)
    Phaser支持分层结构(树状构造)来减少竞争。注册了大量parties的Phaser可能会因为同步竞争消耗很高的成本, 因此可以设置一些子Phaser来共享一个通用的parent。这样的话即使每个操作消耗了更多的开销,但是会提高整体吞吐量。在一个分层结构的phaser里,子节点phaser的注册和取消注册都通过父节点管理。子节点phaser通过构造或方法register、bulkRegister进行首次注册时,在其父节点上注册。子节点phaser通过调用arriveAndDeregister进行最后一次取消注册时,也在其父节点上取消注册。
    这也是它的主要亮点之一,这一点很像ConcurrentHashMap(对HashTable)和LongAdder(对AtomicLong),通过分散热点来降低资源竞争,提升并发效率。
    并发工具类Phaser

  • Monitoring(状态监控)
    由于同步方法可能只被已注册的parties调用,所以phaser的当前状态也可能被任何调用者监控。在任何时候,可以通过getRegisteredParties获取parties数,其中getArrivedParties方法返回已经到达当前phase的parties数。当剩余的parties(通过方法getUnarrivedParties获取)到达时,phase进入下一代。这些方法返回的值可能只表示短暂的状态,所以一般来说在同步结构里并没有啥卵用。

CountDownLatch和CyclicBarrier都非常简单,从Phaser提供的api数量就可以看出为什么说它更加灵活,show me the code,接下来我们通过几个例子感受一下。
并发工具类Phaser

Phaser例子

例子1:子线程会等全部子线程达到后才开始执行,实现类似CyclicBarrier的效果。

	@Test
	public void test1() throws InterruptedException {
		List<Runnable> list = Lists.newArrayList();
		for (int i = 0; i < 10; i++) {
			final int j = i;
			list.add(() -> System.out.println(j));
		}

		final Phaser phaser = new Phaser(); // "1" to register self
		// create and start threads
		int i = 0;
		for (final Runnable task : list) {
			i++;
			final int j = i;
			phaser.register();
			new Thread(() -> {
				try {
					Thread.sleep(j * 1000);
				} catch (InterruptedException e) {
				}
				//全部子线程到达后才开始执行
				phaser.arriveAndAwaitAdvance(); // await all creation
				task.run();
			}).start();
		}
		Thread.sleep(15000);
	}

例子2:task会循环做3次,通过重写onAdvance可以控制phaser结束的条件。

    	@Test
	public void test2() throws InterruptedException {
		//重复做3次
		int iterations = 3;
		List<Runnable> list = Lists.newArrayList();
		for (int i = 0; i < 2; i++) {
			final int j = i;
			list.add(() -> System.out.println(j));
		}

		final Phaser phaser = new Phaser() {			
			//每做一次,phase+1,该方法返回true,就会结束
			protected boolean onAdvance(int phase, int registeredParties) {
				return phase > iterations || registeredParties == 0;
			}
		};
		phaser.register();
		for (final Runnable task : list) {
			phaser.register();
			new Thread(() -> {
				do {
					task.run();
					phaser.arriveAndAwaitAdvance();
				} while (!phaser.isTerminated());
			}).start();
		}
		phaser.arriveAndDeregister(); // deregister self, don't wait  
		Thread.sleep(5000);
	}

例子3:创建多个phaser,并关联到父phaser上,就是上面提到的分层结构。

    	@Test
	public void test3() {
		Phaser parent = new Phaser(1);
		Phaser phaser1 = new Phaser(parent);
		Phaser phaser2 = new Phaser(parent);

		for (int i = 0; i < 20; i++) {
			final int j = i;
			if (i < 10) {
				phaser1.register();
				new Thread(() -> {
					try {
						Thread.sleep(1000);
						phaser1.arriveAndAwaitAdvance(); // await all creation
						System.out.println(j);
					} catch (InterruptedException e) {
					}
				}).start();
			} else if (i < 20) {
				phaser2.register();
				new Thread(() -> {
					try {
						Thread.sleep(10000);
						phaser2.arriveAndAwaitAdvance(); // await all creation
						System.out.println(j);
					} catch (InterruptedException e) {
					}
				}).start();
			}
		}
		parent.arriveAndAwaitAdvance();
		System.out.println("done");
	}

例子4:使用Phaser改写我们的代码,如下:

    	//维护一个Phaser    
	public static ExecuteInstance buildWithPhaser(Executor executor) {
		ExecuteInstance ei = new ExecuteInstance();
        	ei.executor = executor;
		ei.phaser = new Phaser(1);        
		return ei;
	}

    	//提交线程池前注册一下
    	public void executeRR(Callable<ReturnResult> task, Consumer<Exception> exceptionHandler, int batch) {
		phaser.register();
		executor.execute(() -> executeStatistics(task, exceptionHandler, batch));		
	}

    	//执行后deregister一下
    	private void executeStatistics(Callable<ReturnResult> task, Consumer<Exception> exceptionHandler, int batch) {
		ReturnResult result = ReturnResult.NONE;
		try {
        	    	//任务处理
			result = task.call();
		} catch (Exception e) {
			if (statistics) {
				counter.incrException(batch);
			}
			if (exceptionHandler != null) {
				//自定义异常处理
				try {
					exceptionHandler.accept(e);
				} catch (Exception he) {
				}
			}
		} finally {
			phaser.arriveAndDeregister(); //deregister   
			if (statistics) {
				if (ReturnResult.SUCCESS.equals(result)) {
					counter.incrSuccess(batch);
				} else if (ReturnResult.FAIL.equals(result)) {
					counter.incrFail(batch);
				} else if (ReturnResult.FILTER.equals(result)) {
					counter.incrFilter(batch);
				}
			}
		}
	}

    	//等待结果
    	public ExecuteResult awaitResult() {
		phaser.arriveAndAwaitAdvance();
		return getExecuteResult();
    	}

使用就非常简单了

	ExecuteInstance ei = ExecuteInstance.buildWithPhaser(myExecutor); //线程池
		
    	//循环
     	LoopShutdown.build("myTask").loop(() -> {

        	//不断从数据获取数据
        	List<Task> list = getFromDb();            

		list.forEach(item -> ei.execute(() -> {
			//提交到线程池执行,并且统计
		}));        	
	});

	return ei.awaitResult();

总结

Phaser是jkd7后提供的同步工具类,它底层并没有使用AQS同步工具。相比CountDownLatch等它提供了更丰富的功能,但也意味着它更复杂,需要更多的资源,一些简单的场景CountDownLatch等工具类能满足的就使用它们即可,考虑性能,还有灵活性时才考虑使用Phaser,如笔者的场景使用Phaser就更加适合。

更多分享,欢迎关注我的github:https://github.com/jmilktea/jtea文章来源地址https://www.toymoban.com/news/detail-639546.html

到了这里,关于并发工具类Phaser的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java多线程&并发篇----第二十一篇

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。 ArrayB

    2024年01月18日
    浏览(61)
  • 车载测试面试,我们该如何准备?

    在进行车载测试方面的简历撰写以及面试时,我们需要注意的几点如下 1.没有相关项目怎么办? 要投递和面试的岗位所要参与的项目和做过的项目不可能是完全一样的。招聘企业更关注工作思路以及解决问题的思路。 不同的公司就算是做一样的项目,使用的技术以及工具会存

    2024年02月02日
    浏览(62)
  • [Java基础系列第5弹]Java多线程:一篇让你轻松掌握并发编程的指南

    多线程是一种编程技术,它可以让一个程序同时执行多个任务,从而提高程序的性能和效率。但是,使用Java多线程也不是一件容易的事情,它涉及到很多复杂的概念和问题,如线程安全、同步、锁、原子类、并发集合、生产者消费者模式、线程池模式、Future模式、线程协作模

    2024年02月14日
    浏览(45)
  • Phaser笔记-scene中的preload、create、update、player、键盘控制

    一般phaser最简单的配置文件如下: 其中scene有3个函数:preload、create、update preload:是在create函数前调用的,一般用于资源的加载; create:preload完成后,就会调用到这函数,这个函数一般用于构造界面,关联玩家键盘,游戏大部分逻辑事件等等等; update:会按周期进行调用,

    2024年02月11日
    浏览(33)
  • 2023面试到底有多难?大厂为何都注重算法?我们该如何应对?

    文章的开头大家先来看一看一道字节的算法题,看是否能做出来: 给定一个单链表的头节点 head,实现一个调整单链表的函数,使得每K个节点之间为一组进行逆序,并且从链表的尾部开始组起,头部剩余节点数量不够一组的不需要逆序。(不能使用队列或者栈作为辅助) 大家

    2023年04月14日
    浏览(89)
  • 【面试】java并发编程面试题

    java并发面试题 https://javaguide.cn/home.html java基础面试题 https://blog.csdn.net/jackfrued/article/details/44921941 java集合面试题 https://javaguide.cn/java/collection/java-collection-questions-01.html javaIO面试题 https://javaguide.cn/java/io/io-basis.html JVM面试题 https://javaguide.cn/java/jvm/jvm-garbage-collection.html 计算机网络

    2024年01月21日
    浏览(48)
  • 前端面试话术集锦第一篇

    这是记录 前端面试的话术集锦第1篇博文——基础篇一 ,我会不断更新前端面试话术的博文。❗❗❗ 合理的 title 、 description 、 keywords ,搜索对着三项的权重逐个减⼩: title :值强调重点即可,重要出现不要超过 2 次,⽽且要靠前,不同⻚⾯ title 要有所不同 descript

    2024年02月11日
    浏览(37)
  • 2023年MySQL核心技术面试第一篇

    目录 前言: MySQL开篇前言补充含有前三点,先认识大概的MySQL,从下一篇开始进入MySQL的核心技术讲解。 一 . MySQL开篇前言补充 存储:一个完整的数据存储过程是怎样的? 1.1 数据存储过程  1.1.1 创建MySQl 数据库 1.1.1.1 为什么我们要先创建一个数据库,而不是直接创建数据表

    2024年02月11日
    浏览(40)
  • 《面试专题-----经典高频面试题收集一》解锁 Java 面试的关键:深度解析常见高频经典面试题(第一篇)

    大家好,我是码农阿豪,一位热爱 Java 编程的程序员。今天我想和大家分享一些常见的 Java 面试题,通过收集解析这些问题,希望能够帮助大家更好地准备面试,突破技术瓶颈, 把面试官按在地上摩擦 。 1. 运算符 运算符和、|和||的区别? 用最有效率的方法计算2乘以2的3次

    2024年02月04日
    浏览(42)
  • 【我们一起60天准备考研算法面试(大全)-第二十九天 29/60】【二进制】

    专注 效率 记忆 预习 笔记 复习 做题 欢迎观看我的博客,如有问题交流,欢迎评论区留言,一定尽快回复!(大家可以去看我的专栏,是所有文章的目录) 文章字体风格: 红色文字表示:重难点★✔ 蓝色文字表示:思路以及想法★✔ 如果大家觉得有帮助的话,感谢大家帮

    2024年02月15日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包