Java线程间通信方式(3)

这篇具有很好参考价值的文章主要介绍了Java线程间通信方式(3)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前文了解了线程通信方式中的CountDownLatch, Condition,ReentrantLock以及CyclicBarrier,接下来我们继续了解其他的线程间通信方式。

Phaser

Phaser是JDK1.7中引入的一种功能上和CycliBarrier和CountDownLatch相似的同步工具,相对这两者而言其用法更加灵活,同时Phaser也支持重用。

在Phaser中将需要协作完成的任务分成多个阶段,每个阶段的参与者可指定,参与者可以随时注册并参与到某个阶段或者取消参与本阶段。以选修课考试为例,说明Phaser的工作逻辑,假设现有选修课3门,政治,历史,地理,各选修人数分别为20,10,10.按Phaser实现考试逻辑如下:

  • 第一阶段考政治,总共应有9名同学参加考试,在考试开始时,8位同学开始答题,另外一位同学未到,考试中途,最后一位同学进入,开始考试,所有同学答题完成后,政治考试结束
  • 第二阶段考历史,总共9名同学参考考试,在考试结束前,3名同学弃考,则实际参与考试有6名同学,所有同学答题完成后,历史考试结束
  • 第三阶段考地理,总共9名同学参与考试,中途无意外,所有同学答题完成后,地理考试结束

至此选修课考试的三个阶段均完成,所以选修课考试这个任务结束,其中第一阶段中晚到参考考试的同学说的就是参与者可以随时注册并参与到某个阶段,第二阶段中弃考的同学说的就是参与者可以随时取消参与本阶段,当所有参与本阶段的参与者均取消,则意味着该阶段完成。

在Phaser中,针对一个阶段而言,每一个参与者都被称为一个party,可以通过构造函数指定参与者数量,也可以通过register使parties(party的总和)自增,当当前阶段的所有参与者等于parties的数量时,此时phase自增1,进入下一个阶段,回调onAdvance方法

Phaser提供的核心函数如下所示:

函数名称 描述 备注
register() 注册一个party,使得parties+1 /
bulkRegister(int parties) 批量注册party,使得parties变为已有个数与传入参数之和 /
arriveAndDeregister() 当前任务已完成,使parties计数减1,不会形成阻塞 /
arriveAndAwaitAdvance() 已达到执行点,线程阻塞,等待下一阶段唤醒继续执行 /
awaitAdvance(int phase) 参数是一个已完成的阶段编号,通常以已完成任务的arrive或者arriveAndDeregister函数的返回值作为取值,如果传入参数的阶段编号和当前阶段编号相同,则在此处等待,如果不同或者Phaser已经是terminated状态,则立即返回 /
arrive() 达到当前阶段,不等待其他参与者到达 /
arriveAndAwaitAdvance

以上述政治考试为例,学习Phaser基本使用

public static void main(String[] args) {
    // 创建Phaser
    Phaser phaser = new Phaser(){
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            switch (phase) {
                case 0:
                    System.out.println("政治考试完成");
                    break;
                case 1:
                    System.out.println("历史考试完成");
                    break;
                case 2:
                    System.out.println("地理考试完成");
                    break;
            }
            // 如果到达某一阶段,Phaser中参与者为0,则会销毁该Phaser
            return super.onAdvance(phase, registeredParties);
        }
    };
    
    IntStream.range(1,10).forEach(number->{
        phaser.register();
        Thread student= new Thread(()->{
            System.out.println("学生"+number+"arrive advance");
            // 等待其他线程,此时block
            phaser.arriveAndAwaitAdvance();
            System.out.println("学生"+number+"政治开始答题");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("学生"+number+"政治交卷");
            // 考试完成,取消计数,参与者减1
            phaser.arriveAndDeregister();
            System.out.println("Phaser is terminated :" +phaser.isTerminated());
        });
        student.start();
    });
    System.out.println("Phaser is terminated :" +phaser.isTerminated());
}

输出如下:

Java线程间通信方式(3)

从上面可以看出,Phaser中通过arriveAndAwaitAdvance阻塞当前线程,当所有线程到达阻塞栅栏时,唤醒等待线程继续执行,进而达到线程间同步协作。

awaitAdvance

有时候,当Phaser 在当前阶段结束时,我们需要兜底做一些策略,比如说资源的释放,状态的检查上报等,此时就需要用到awaitAdvance,awaitAdvance接受一个阶段编号,如果当前阶段编号和传入的相等,则会进入等待状态,等到所有参与者都到达该阶段栅栏时,被唤醒。实例代码如下:

public static class ThreadA implements Runnable {
    private Phaser phaser;

    public ThreadA(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " start ");


        phaser.arriveAndAwaitAdvance();

        System.out.println(Thread.currentThread().getName() + " end " );
    }
}

public static class ThreadB implements Runnable {
    private Phaser phaser;

    public ThreadB(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " start " );

        phaser.arriveAndAwaitAdvance();

        System.out.println(Thread.currentThread().getName() + " end ");
    }
}

public static class ThreadC implements Runnable {
    private Phaser phaser;

    public ThreadC(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
            System.out.println(Thread.currentThread().getName() + " start ");
            System.out.println(Thread.currentThread().getName() + " phaser.getPhase()=" + phaser.getPhase());
            phaser.awaitAdvance(0);
            System.out.println(Thread.currentThread().getName() + " end ");
    }
}

public static class ThreadD implements Runnable {
    private Phaser phaser;

    public ThreadD(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " begin sleep");

            Thread.sleep(5000);

            System.out.println(Thread.currentThread().getName() + " sleep completed ");
            phaser.arriveAndAwaitAdvance();

            System.out.println(Thread.currentThread().getName() + " end ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public static void main(String[] args) {
    // 声明Phaser
    Phaser phaser = new Phaser(3) {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            System.out.println("Phaser arrived at :"+phase);
            return super.onAdvance(phase, registeredParties);
        }
    };

    Thread t1 = new Thread(new ThreadA(phaser));
    Thread t2 = new Thread(new ThreadB(phaser));
    Thread t3 = new Thread(new ThreadC(phaser));
    Thread t4 = new Thread(new ThreadD(phaser));

    t1.setName("ThreadA");
    t2.setName("ThreadB");
    t3.setName("ThreadC");
    t4.setName("ThreadD");

    t1.start();
    t2.start();
    t3.start();
    t4.start();
}

如上代码所示,声明Phaser有三个参与者ThreadA,ThreadB,ThreadD,在三个参与者都执行到arriveAndAwaitAdvance之前,ThreadC 阻塞等待,当三个参与者都执行到arriveAndAwaitAdvance后,回调onAdvance方法,此时被阻塞的参与者被唤醒执行,之后ThreadC被唤醒继续执行,运行结果如下:

Java线程间通信方式(3)

Exchanger

Exchanger用于两个线程之间的通信,无论哪个线程先调用Exchanger,都会等待另外一个线程调用时进行数据交换,示例代码如下:

private static Exchanger<String> exchanger = new Exchanger<>();

public static void main(String[] args) {
    new Thread(()->{
        try {
            System.out.println(Thread.currentThread().getName()+" sleep start");
            Thread.sleep(10000);
            System.out.println(Thread.currentThread().getName()+" sleep end");
            System.out.println(Thread.currentThread().getName()+" send data to Exchanger");
            String aa = exchanger.exchange("data from Thread1");
            System.out.println(Thread.currentThread().getName() + "   "+aa);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "Thread1").start();

    new Thread(()->{
        try {
            System.out.println(Thread.currentThread().getName()+" send data to Exchanger");
            String bb = exchanger.exchange("data from Thread2");
            System.out.println(Thread.currentThread().getName() + "   "+bb);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "Thread2").start();
}

运行输出如下:

Java线程间通信方式(3)

总结

结合前文,我们一共学习了种线程间通信方式,主要有:

  1. Object.wait/Object.notify/Object.notifyAll + synchronized
  2. Semaphore(信号量)
  3. CountDownLatch
  4. CyclicBarrier
  5. Condition+ReentrantLock
  6. Phaser
  7. Exchanger

大家日常开发中可灵活使用,针对各通信方式比较见下表:文章来源地址https://www.toymoban.com/news/detail-426974.html

通信方式 应用场景 是否可重用 子任务异常处理 备注
Object.wait/Object.notify/Object.notifyAll + synchronized 大多数线程通信场景 依赖开发者维护,在finally块中完成释放,避免死锁 /
Semaphore(信号量) 通知唤醒类线程间通信场景 依赖开发者维护,在finally块中释放信号量,避免死锁 /
CountDownLatch 串行多线程运行场景 不加处理的话,子任务发生异常导致退出,则所有等待的线程都会一致等待,直到超时时间来临 /
CyclicBarrier 聚合类线程通信场景 不加处理的话,如果在所有线程都到达屏障陷入阻塞前,如果有线程发生异常导致未到达栅栏提前退出,则所有等待在栅栏都会以BrokenBarrierException或InterruptedException异常退出 /
Condition+ReentrantLock 大多数线程通信场景 依赖开发者维护,在finally块中完成释放,避免死锁 /
Phaser 适用CountDownLatch与CyclicBarrier组合场景 依赖开发者维护,在finally块中取消参与者,避免死锁 /
Exchanger 线程间数据交换场景 依赖开发者维护,确保两个线程状态正常,并行运行 /

到了这里,关于Java线程间通信方式(3)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • [Java]线程生命周期与线程通信

    【版权声明】未经博主同意,谢绝转载!(请尊重原创,博主保留追究权) https://www.cnblogs.com/cnb-yuchen/p/18162522 出自【进步*于辰的博客】 线程生命周期与进程有诸多相似,所以我们很容易将两者关联理解并混淆,一些细节之处确有许多不同,因为线程调度与进程调度虽都由

    2024年04月27日
    浏览(24)
  • java 线程安全问题 三种线程同步方案 线程通信(了解)

    线程安全问题指的是,多个线程同时操作同一个共享资源的时候,可能会出现业务安全问题。 下面代码演示上述问题,先定义一个共享的账户类: 在定义一个取钱的线程类 最后,再写一个测试类,在测试类中创建两个线程对象 某个执行结果: 为了解决前面的线程安全问题,

    2024年02月09日
    浏览(33)
  • Java 进阶(12) 线程通信

    多个线程在处理同⼀个资源,但是处理的动作(线程的任务)却不相同。 为什么要处理线程间通信 多个线程并发执⾏时, 在默认情况下CPU是随机切换线程的,当我们需要多个线程来共同完成⼀件任务,并且我们希望他们有规律的执⾏, 那么多线程之间需要⼀些协调通信,以此

    2023年04月16日
    浏览(28)
  • 【Java】详解多线程通信

    🌺 个人主页: Dawn黎明开始 🎀 系列专栏: Java ⭐ 每日一句:什么都不做,才会来不及 📢 欢迎大家:关注 🔍+ 点赞 👍+评论 📝+收藏⭐️ 文章目录 🔐多线程通信 (1).🔓由来 (2).🔓成员方法  (3).🔓案例引入 (4).🔓代码实现       现代社会崇尚合作精神,分工合作在日常

    2024年02月05日
    浏览(29)
  • 深入理解Java线程间通信

    合理的使用Java多线程可以更好地利用服务器资源。一般来讲,线程内部有自己私有的线程上下文,互不干扰。但是当我们需要多个线程之间相互协作的时候,就需要我们掌握Java线程的通信方式。本文将介绍Java线程之间的几种通信原理。 在Java中,锁的概念都是基于对象的,

    2024年02月09日
    浏览(34)
  • Java多线程 - Java创建线程的4种方式

    1. Java创建线程有哪几种方式? 一个线程在Java中使用一个Thread实例来描述。Thread类是Java语言的一个重要的基础类,位于java.lang包中。Thread类有不少非常重要的属性和方法,用于存储和操作线程的描述信息。 Thread类的构造方法: 1.1 线程创建方法一:继承Thread类创建线程类 (

    2023年04月08日
    浏览(27)
  • Java——》线程间是如何通信的

    推荐链接:     总结——》【Java】     总结——》【Mysql】     总结——》【Redis】     总结——》【Kafka】     总结——》【Spring】     总结——》【SpringBoot】     总结——》【MyBatis、MyBatis-Plus】     总结——》【Linux】     总结——》【MongoDB】    

    2024年02月09日
    浏览(29)
  • Java多线程(1)---多线程认识、四种创建方式以及线程状态

    目录 前言 一.Java的多线程 1.1多线程的认识  1.2Java多线程的创建方式 1.3Java多线程的生命周期 1.4Java多线程的执行机制 二.创建多线程的四种方式 2.1继承Thread类 ⭐创建线程  ⭐Thread的构造方法和常见属性  2.2.实现Runnable接口 ⭐创建线程 ⭐使用lambda表达式创建 2.3实现Callable接口

    2024年02月14日
    浏览(29)
  • 线程方法接收参数示例,Java的两种线程实现方式区别

    总所周知,Java实现多线程有两种方式,分别是继承Thread类和实现Runable接口,那么它们的区别是什么? 继承 Thread 类: 通过继承 Thread 类,你可以创建一个直接表示线程的类。你可以覆盖 Thread 类中的 run 方法来定义线程的逻辑。当调用 start 方法启动线程时,会执行该类中的

    2024年02月11日
    浏览(30)
  • 【面试精讲】Java线程6种状态和工作原理详解,Java创建线程的4种方式

    Java线程6种状态和工作原理详解,Java创建线程的4种方式 一、Java线程的六种状态 二、Java线程是如何工作的? 三、BLOCKED 和 WAITING 的区别 四、start() 和 run() 源码分析 五、Java创建线程的所有方式和代码详解 1. 继承Thread类 2. 实现Runnable接口 3. 实现Callable接口与FutureTask 4. 使用线

    2024年03月13日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包