CyclicBarrier线程同步

这篇具有很好参考价值的文章主要介绍了CyclicBarrier线程同步。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

关于作者: CSDN内容合伙人、技术专家, 从零开始做日活千万级APP,带领团队单日营收超千万。
专注于分享各领域原创系列文章 ,擅长java后端、移动开发、商业化变现、人工智能等,希望大家多多支持。

CyclicBarrier线程同步,java基础,java,spring,android,线程,同步

一、导读

我们继续总结学习Java基础知识,温故知新。

本文涉及知识点:
AQS - AbstractQueuedSynchronizer
CAS(Compare And Swap)
锁概念 volatile
ReentrantLock

二、概览

一、是什么
CyclicBarrier是JDK提供的一个同步工具,它的作用是让一组线程全部达到一个状态之后再全部同时执行。
特点:
所有线程执行完毕之后是可以重用的。

二、实现原理
CyclicBarrier就是利用了 AQS - AbstractQueuedSynchronizer 的共享锁来实现。
调用await的线程会被阻塞,直到计数器为0,在继续执行。
内部维护parties记录总线程数,count用于计数,最开始count=parties,调用await()之后count原子递减,当count为0之后,再次将parties赋值给count,这就是复用的原理

三、使用场景
一组线程全部达到一个状态之后再全部同时执行。
eg:
多线程计算结果,最后在合并结果。

四、优劣

  • 优点 :代码优雅,不需要对线程池进行操作,将线程池作为 Bean 的情况下有很好的使用场景。
  • 缺点 :需要提前知道线程数量;性能确实,呃呃呃呃呃,差了点。哦对了,还需要在线程代码块内加上异常判断,否则在 countDown 之前发生异常而没有处理,就会导致主线程永远阻塞在 await。
    需要提前知道数量。

需要注意异常的处理,await() 跟 num 必须成对出现,否则线程会一直阻塞。

CyclicBarrier 和 countdownlatch 的区别

CyclicBarrier和CountDownLatch都是多线程同步工具,但是它们的工作机制和用途略有不同:
CyclicBarrier更适合于需要循环等待多个线程的场景,而CountDownLatch则更适合于需要等待特定数量线程完成的场景。

  • 构造和计数机制
    CountDownLatch在构造时需要一个初始计数值,每调用一次countDown()方法,计数器减1,当计数器达到0时,await()方法会释放等待线程。CyclicBarrier则不需要初始计数值,它通过在每个线程中设置一个屏障(barrier)来阻塞线程,直到所有线程都到达屏障后才会释放。

  • 线程状态
    CountDownLatch不会改变线程状态,只是让线程等待,而CyclicBarrier在屏障未达到时阻塞线程,达到屏障后才解除阻塞。

  • 重用性
    CyclicBarrier是可重用的,在所有线程都通过屏障后,可以再次设置新的屏障,而CountDownLatch是一次性的。

  • 等待线程数
    CountDownLatch等待的线程数是固定的,而CyclicBarrier可以等待任意数量的线程

countdownlatch 是一个线程等待其他线程执行完毕后再执行,
CyclicBarrier 是每一个线程等待所有线程执行完毕后,再执行

三、使用

从字面上的意思可以知道,这个类的中文意思是“循环栅栏”。大概的意思就是一个可循环利用的屏障。
它的作用就是会让所有线程都等待完成后才会继续下一步行动。
现实生活中我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始。例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始。

CyclicBarrier叫做回环屏障,它的作用是让一组线程全部达到一个状态之后再全部同时执行,而且他有一个特点就是所有线程执行完毕之后是可以重用的。

public class CyclicBarrierTest {
    private static int num = 3;
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> {
        System.out.println("---------所有人都好了, await 次数到 ----------");
    });
    private static ExecutorService executorService = Executors.newFixedThreadPool(num);

    public static void main(String[] args) throws Exception{

        test2Cyclic();
//        test2Cyclic1();
        executorService.shutdown();
    }

    public static void test2Cyclic1() {
        executorService.submit(() -> {
            System.out.println("A在上厕所");
            try {
                Thread.sleep(4000);
                System.out.println("A上完了");
                cyclicBarrier.await();
                System.out.println("会议结束,A退出");
            } catch (Exception e) {
                e.printStackTrace();
            }finally {

            }
        });
        executorService.submit(()->{
            System.out.println("B在上厕所");
            try {
                Thread.sleep(2000);
                System.out.println("B上完了");
                cyclicBarrier.await();
                System.out.println("会议结束,B退出");
            } catch (Exception e) {
                e.printStackTrace();
            }finally {

            }
        });
        executorService.submit(()->{
            System.out.println("C在上厕所");
            try {
                Thread.sleep(3000);
                System.out.println("C上完了");
                cyclicBarrier.await();
                System.out.println("会议结束,C退出");
            } catch (Exception e) {
                e.printStackTrace();
            }finally {

            }
        });
    }

    public static void test2Cyclic() {
        executorService.submit(() -> {
            System.out.println("A在上厕所");
            try {
                Thread.sleep(4000);
                System.out.println("A上完了");
                cyclicBarrier.await();
                System.out.println("会议结束,A退出,开始撸代码");
                cyclicBarrier.await();
                System.out.println("C工作结束,下班回家");
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

            }
        });
        executorService.submit(() -> {
            System.out.println("B在上厕所");
            try {
                Thread.sleep(2000);
                System.out.println("B上完了");
                cyclicBarrier.await();
                System.out.println("会议结束,B退出,开始摸鱼");
                cyclicBarrier.await();
                System.out.println("B摸鱼结束,下班回家");
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

            }
        });
        executorService.submit(() -> {
            System.out.println("C在上厕所");
            try {
                Thread.sleep(3000);
                System.out.println("C上完了");
                cyclicBarrier.await();
                System.out.println("会议结束,C退出,开始摸鱼");
                cyclicBarrier.await();
                System.out.println("C摸鱼结束,下班回家");
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

            }
        });

        executorService.shutdown();

    }
}

四、原理

子类的任务有:

  1. 通过CAS操作维护共享变量state。
  2. 重写资源的获取方式。
  3. 重写资源释放的方式。

CyclicBarrier线程同步,java基础,java,spring,android,线程,同步

一起看下代码

// 1、 创建CountDownLatch并设置计数器值,count代表计数器个数(内部是共享锁,本质就是上了几次锁)
第二个构造方法有一个 Runnable 参数,这个参数的意思是最后一个到达线程要做的任务
public CyclicBarrier(int parties, Runnable barrierAction) 

// 2、子线程调用await()方法时,获取独占锁,同时对count递减,进入阻塞队列,然后释放锁
// 当第一个线程被阻塞同时释放锁之后,其他子线程竞争获取锁,操作同1
public void await()

// 等待一定时间
public void await(long timeout, TimeUnit unit)

//同步锁
    private final ReentrantLock lock = new ReentrantLock();

    //条件队列
    private final Condition trip = lock.newCondition();

CyclicBarrier借助ReentranLock与Condition来对线程进行阻塞的

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //当前代
            final Generation g = generation;
            //判断当前代的状态,如果当前代后的屏障被打破,则g.broken返回true,否则返回false。
            if (g.broken)
                throw new BrokenBarrierException();
            //判断当前线程是否被中断
            if (Thread.interrupted()) {
                //如果当前线程已经被中断,则调用breakBarrier()
                //该方法代码为generation.broken = true;count = parties;trip.signalAll();
                //可见,只做了3件事:先将当前代的屏障变为打破状态,接着重置计数器的值,最后唤醒所有被阻塞的线程
                breakBarrier();
                //最后抛出中断异常
                throw new InterruptedException();
            }
            //将计数器的值减1
            int index = --count;
            if (index == 0) {
                //如果当前计数器的值为0
                boolean ranAction = false;
                try {
                    //则先执行换代任务,可以看得出来,是由最后一个到达屏障的线程执行的
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //开启下一代,这个方法的代码为trip.signalAll();count = parties;generation = new Generation();
                    //该代码唤醒所有被阻塞的线程,重置计数器的值,并且实例化下一代
                    nextGeneration();
                    return 0;
                } finally {
                    //如果换代任务未执行成功,则先将当前代的屏障变为打破状态,接着重置计数器的值,最后唤醒所有被阻塞的线程
                    if (!ranAction)
                        breakBarrier();
                }
            }

            //当前线程一直阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生
            //死循环
            for (;;) {
                try {
                    if (!timed)
                        //如果不是定时等待,则调用条件队列的await()进行阻塞
                        trip.await();
                    else if (nanos > 0L)
                        //如果是定时等待,则调用条件队列的awaitNanos进行等待
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //如果在等待过程中,当前线程被打断
                    if (g == generation && ! g.broken) {
                        //被打断后,还处于当前代,且当前代的屏障也未被打破
                        //现在的情况是,最后一个线程还未到屏障,当前线程早早到了,并且在进行等待,但是在等待的过程中,被打断了。
                        //则打破当前代的屏障,唤醒所有被阻塞的线程
                        breakBarrier();
                        throw ie;
                    } else {
                        //如果已经换代,则手动进行打断
                        Thread.currentThread().interrupt();
                    }
                }
                //此时线程被唤醒,需要判断自己为什么被唤醒了                
                
                //如果是其他某个线程被打断或者是由于超时导致当前代的屏障被打破,则抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();

                //如果是正常换代,则返回index值
                if (g != generation)
                    return index;

                //如果是定时等待,且时间已经到了,则打破屏障,唤醒所有阻塞的线程,最后抛出异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
  1. 当子线程调用await()方法时,获取独占锁,同时对count递减,进入阻塞队列,然后释放锁
  2. 当第一个线程被阻塞同时释放锁之后,其他子线程竞争获取锁,操作同1
  3. 直到最后count为0,执行CyclicBarrier构造函数中的任务,执行完毕之后子线程继续向下执行

五、 推荐阅读

Java 专栏

SQL 专栏

数据结构与算法

Android学习专栏

CyclicBarrier线程同步,java基础,java,spring,android,线程,同步文章来源地址https://www.toymoban.com/news/detail-740106.html

到了这里,关于CyclicBarrier线程同步的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【JAVA】CyclicBarrier源码解析以及示例

    前言 在多线程编程中,同步工具是确保线程之间协同工作的重要组成部分。 CyclicBarrier (循环屏障)是Java中的一个强大的同步工具,它允许一组线程在达到某个共同点之前互相等待。 在本文中,我们将深入探讨 CyclicBarrier 的源码实现以及提供一些示例,以帮助您更好地理解

    2024年02月04日
    浏览(37)
  • Java多线程编程中的线程同步

    基本概念: ​ 线程同步是多线程编程中的一个重要概念,用于控制多个线程对共享资源的访问,以防止数据的不一致性和并发问题。 在多线程环境下,多个线程同时访问共享资源可能导致数据的竞争和不正确的结果。 是确保多个线程按照特定的顺序和规则访问共享资源,以

    2024年02月13日
    浏览(47)
  • java 线程安全问题 三种线程同步方案 线程通信(了解)

    线程安全问题指的是,多个线程同时操作同一个共享资源的时候,可能会出现业务安全问题。 下面代码演示上述问题,先定义一个共享的账户类: 在定义一个取钱的线程类 最后,再写一个测试类,在测试类中创建两个线程对象 某个执行结果: 为了解决前面的线程安全问题,

    2024年02月09日
    浏览(45)
  • java多线程怎么同步返回结果

    在 Java 多线程中,如果需要等待线程执行完成并返回结果,可以使用 Java 的线程同步机制来实现。以下是一些常用的方式: 使用 join() 方法:可以使用线程的 join() 方法来等待线程执行完成。在主线程中调用 join() 方法,会阻塞主线程,直到该线程执行完成。在被等待的线程执

    2024年02月14日
    浏览(33)
  • Java多线程系列——同步关键字

    目录 一、线程安全和数据不一致性 二、synchronized的作用 三、synchronized工作原理 四、锁的级别 五、synchronized的优点与缺点 六、实战应用 七、总结 在Java中, synchronized 是并发编程中的一个基本构建块,用于控制多个线程对共享资源的访问,以确保数据的一致性

    2024年02月21日
    浏览(42)
  • Java线程同步和协作的5种方式

    有3个线程为A,B,C,同时启动,C必须等待A和B完成才能继续执行,如何实现? 要求:仅使用 java 语言和它原生API 使用 Java 中提供的 CountDownLatch 类实现线程之间的同步和协作 使用 Java 中提供的 CyclicBarrier 类实现线程之间的同步和协作 使用join()方法:Thread类提供了join()方法,

    2024年02月02日
    浏览(73)
  • 5.5. Java并发工具类(如CountDownLatch、CyclicBarrier等)

    5.5.1 CountDownLatch CountDownLatch 是一个同步辅助类,它允许一个或多个线程等待,直到其他线程完成一组操作。 CountDownLatch 有一个计数器,当计数器减为0时,等待的线程将被唤醒。计数器只能减少,不能增加。 示例:使用CountDownLatch等待所有线程完成任务 假设我们有一个任务需

    2024年02月07日
    浏览(49)
  • Java 并发专题 : CyclicBarrier 打造一个安全的门禁系统(1)

    } 学生1 已刷卡,等待开门回家~ 学生3 已刷卡,等待开门回家~ 学生5 已刷卡,等待开门回家~ 学生9 已刷卡,等待开门回家~ 学生7 已刷卡,等待开门回家~ 学生0 已刷卡,等待开门回家~ 学生2 已刷卡,等待开门回家~ 学生6 已刷卡,等待开门回家~ 学生8 已刷卡,等待开门回家

    2024年04月26日
    浏览(30)
  • 学Java线程,你不知道什么是AQS?一文带你了解Java多线程同步的灵魂

    关于作者:CSDN内容合伙人、技术专家, 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 ,擅长java后端、移动开发、人工智能等,希望大家多多支持。 我们继续总结学习 Java基础知识 ,温故知新。 CLH(Craig, Landin, and Hagersten locks)是一种自旋锁,能确保无饥饿性,提

    2024年02月16日
    浏览(47)
  • Java并发编程(三)线程同步 上[synchronized/volatile]

    当使用多个线程来访问同一个数据时,将会导致数据不准确,相互之间产生冲突,非常容易出现线程安全问题,比如多个线程都在操作同一数据,都打算修改商品库存,这样就会导致数据不一致的问题。 所以我们通过线程同步机制来保证线程安全,加入同步锁以避免在该线程没有完成

    2024年02月13日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包