Java中的生产者/消费者模型

这篇具有很好参考价值的文章主要介绍了Java中的生产者/消费者模型。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、什么是生产者/消费者模型

生产者-消费者模型(Producer-Consumer problem)是一个非常经典的多线程并发协作的模型。

比如某个模块负责生产数据,而另一个模块负责处理数据。产生数据的模块就形象地被称为生产者;而处理数据的模块,则被称为消费者。

生产者和消费者在同一段时间内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。

如图所示:

java 生产者消费者,多线程,java,多线程,wait,notify,notifyAll

 二、生产者-消费者模式的优点

1、解耦

由于有缓冲区的存在,生产者和消费者之间不直接依赖,耦合度降低。

2、支持并发

由于生产者与消费者是两个独立的并发体,它们之间是通过缓冲区作为桥梁连接,生产者只需要往缓冲区里丢数据,就可以继续生产下一个数据,而消费者只需要从缓冲区中拿数据接口,这样就不会因为彼此的处理速度而发生阻塞。(通过使用多个生产者和消费者线程,可以实现并发处理,提高系统的吞吐量和响应性)

3、支持忙闲不均

缓冲区还有另一个好处:当数据生产过快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等消费者处理掉其他数据时,再从缓存区中取数据来处理。(通过使用缓冲区可以平衡生产者与消费者之间的速度差异,以及处理能力的不匹配)

三、生产者-消费者模式所遵循的规则

  • 生产者仅仅在缓冲区未满时生产,缓冲区满则停止生产。
  • 消费者仅仅在缓冲区有产品时才能消费,缓冲区为空则停止消费。
  • 当消费者发现缓冲区没有可消费的产品时会通知生产者。
  • 当生产者生产出可消费的产品时,应该通知等待的消费者去消费。

四、生产者-消费者模型的实现

1、通过阻塞队列方式实现

public class ProducerConsumerDemo1 {

    /**
     * 缓冲队列
     */
    private final ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);

    /**
     * 生产者
     */
    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.put(i);
            }
        }
    }

    /**
     * 消费者
     */
    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
            }
        }
    }

    public Producer getProducer() {
        return new Producer();
    }

    public Consumer getConsumer() {
        return new Consumer();
    }


    public static void main(String[] args) {
        ProducerConsumerDemo1 producerConsumerDemo1 = new ProducerConsumerDemo1();
        new Thread(producerConsumerDemo1.getProducer()).start();
        new Thread(producerConsumerDemo1.getConsumer()).start();
    }
}

2、通过wait和notifyAll来实现

    /**
     * 缓冲区(仓库)
     */
    private final List<Object> list = new ArrayList<>();

    private final int bufferCount = 10;

    public final Object lock = new Object();

    /**
     * 生产者
     */
    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                synchronized (lock) {
                    while (list.size() >= bufferCount) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    //仓库未满,继续生产产品
                    list.add(new Object());
                    //唤醒消费者去消费产品
                    lock.notifyAll();
                }
            }
        }
    }

    /**
     * 消费者
     */
    class Consumer implements Runnable {

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                synchronized (lock) {
                    while (list.size() == 0) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    //仓库不是空,继续消费
                    list.remove(0);
                    //唤醒生产者去生产产品
                    lock.notifyAll();
                }
            }
        }
    }

    public Producer getProducer(){
        return new Producer();
    }

    public Consumer getConsumer(){
        return new Consumer();
    }

    public static void main(String[] args) {
        ProducerConsumerDemo2 producerConsumerDemo2=new ProducerConsumerDemo2();
        new Thread(producerConsumerDemo2.getProducer()).start();
        new Thread(producerConsumerDemo2.getConsumer()).start();
    }
}

3、通过ReentrantLock和Condition来实现

public class ProducerConsumerDemo3 {

    /**
     * 缓冲区(仓库)
     */
    private final List<Object> list = new ArrayList<>();
    /**
     * 缓冲区大小
     */
    private final int bufferCount = 10;
    public ReentrantLock lock = new ReentrantLock();
    //创建两个条件变量
    private final Condition condition1 = lock.newCondition();
    private final Condition condition2 = lock.newCondition();


    /**
     * 生产者
     */
    class Producer implements Runnable {

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);//模拟生产操作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                try {
                    lock.lock();
                    while (list.size() >= bufferCount) {
                        condition1.await();//当仓库数据数量超过缓冲区设定的最大数量,则让生产线程进入等待状态
                    }

                    list.add(new Object());
                    System.out.println(Thread.currentThread().getName() + "-生产者生产,数量为:" + list.size());
                    condition2.signal();//唤醒消费线程
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }

            }
        }
    }

    class Consumer implements Runnable {

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                try {
                    lock.lock();
                    while (list.size() == 0) {
                        condition2.await();//当仓库中数据为空时,则让消费线程进入等待状态
                    }
                    list.remove(0);
                    System.out.println(Thread.currentThread().getName() + "-消费者消费,数量为:" + list.size());
                    condition1.signal();//唤醒生产线程
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    public Producer getProducer() {
        return new Producer();
    }

    public Consumer getConsumer() {
        return new Consumer();
    }

    public static void main(String[] args) {
        ProducerConsumerDemo3 producerConsumerDemo3 = new ProducerConsumerDemo3();
        new Thread(producerConsumerDemo3.getProducer()).start();
        new Thread(producerConsumerDemo3.getConsumer()).start();
    }
}

五、使用两个线程轮流打印字符串A和字符串B(A和B交替打印,各打印10次。)

1、通过wait()和notifyAll()实现:

public class PrintDemo2 {

    private int num = 10;
    private boolean flag;

    private final Object obj = new Object();

    public static void main(String[] args) {
        PrintDemo2 printDemo2 = new PrintDemo2();
        printDemo2.printMethod();
    }

    public void printMethod() {
        Thread thread1 = new Thread("Thread One") {
            @Override
            public void run() {
                super.run();
                for (int i = 0; i < num; i++) {
                    synchronized (obj) {
                        while (flag) {
                            try {
                                obj.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        System.out.println(Thread.currentThread().getName() + "=========A");
                        flag = true;
                        obj.notifyAll();
                    }
                }

            }
        };

        Thread thread2 = new Thread("Thread Two") {
            @Override
            public void run() {
                super.run();
                for (int i = 0; i < num; i++) {
                    synchronized (obj) {
                        while (!flag) {
                            try {
                                obj.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }

                        System.out.println(Thread.currentThread().getName() + "=========B");
                        flag = false;
                        obj.notifyAll();
                    }
                }
            }
        };

        thread1.start();
        thread2.start();
    }

}

打印结果如下:

java 生产者消费者,多线程,java,多线程,wait,notify,notifyAll

小结:关键字synchronized与wait()、notify()/notifyAll()方法相结合可以实现wait/notify模式,ReentrantLock同样可以实现同样的功能,但需要借助于Condition对象。  下面我们来看一下使用ReentrantLock+Condition来实现的方式。

2、方式二:使用ReentrantLock和Condition实现:

public class PrintDemo {

    private int num = 10;
    private boolean flag;

    public static void main(String[] args) {
        PrintDemo printDemo = new PrintDemo();
        printDemo.printMethod();
    }

    public void printMethod() {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        Thread thread1 = new Thread("Thread One") {
            @Override
            public void run() {
                super.run();
                for (int i = 0; i < num; i++) {
                    try {
                        lock.lock();
                        while (flag) {
                            condition.await();
                        }

                        System.out.println(Thread.currentThread().getName() + "=========A");
                        flag = true;
                        condition.signalAll();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        lock.unlock();
                    }
                }
            }
        };


        Thread thread2 = new Thread("Thread Two") {
            @Override
            public void run() {
                super.run();
                for (int i = 0; i < num; i++) {
                    try {
                        lock.lock();
                        while (!flag) {
                            condition.await();
                        }

                        System.out.println(Thread.currentThread().getName() + "=========B");
                        flag = false;
                        condition.signalAll();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            }
        };


        thread1.start();
        thread2.start();
    }

}

打印结果如下:

java 生产者消费者,多线程,java,多线程,wait,notify,notifyAll
       

六、等待通知机制

是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。

  • wait():表示让当前线程进入等待状态,并且释放对象上的锁及CPU资源。然后无限期等待,直到被唤醒为止(注意:调用wait()方法后会释放对象的锁)
  • nofity():通知一个正在等待的线程(是从等待中的线程中随机通知一个),使其从wait方法返回,而返回的前提是该线程获取到了对象的锁,没有获得锁的线程重新进入WAITING状态。
  • notifyAll():与notify方法相同,唯一不同的是,notifyAll是通知所有等待的线程,而notify是随机地通知等待中的线程中的一个。
  • wait(long):超时等待一段时间,这里的参数时间是毫秒,也就是等待n毫秒。线程将等待指定的时间长度后自动苏醒,或者直到其他线程通过调用相同对象上的notify()或者notifyAll()来唤醒它。如果超过了指定时间没有被唤醒,则线程会自动苏醒。
  • wait(long ,int):同上述wait(long),只是对于超时的时间更细粒度的控制,可以达到纳秒。

注意:在调用wait()、notify()、notifyAll()方法之前,线程必须要获得对象的对象级别锁,即只能在同步方法或同步块中调用wait()方法、notify()、notifyAll()方法。

1、等待和通知的标准范式

等待方遵循如下原则:

(1)获取对象的锁

(2)如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件

(3)条件满足则执行对应的逻辑

java 生产者消费者,多线程,java,多线程,wait,notify,notifyAll

 通知方遵循如下原则:

(1)获得对象的锁

(2)改变条件

(3)通知所有等待在对象上的线程。

java 生产者消费者,多线程,java,多线程,wait,notify,notifyAll

2、notify和notifyAll应该用哪一个

尽可能用notifyAll(),谨慎用notify(),因为notify()只会唤醒一个线程,我们无法确保被唤醒的这个线程一定就是我们需要唤醒的线程。

3、为什么wait方法必须在synchronized保护的同步代码块中使用?

先来看看wait方法的源码注释:

java 生产者消费者,多线程,java,多线程,wait,notify,notifyAll

“wait method should always be used in a loop:

 synchronized (obj) {
     while (condition does not hold)
         obj.wait();
     ... // Perform action appropriate to condition
}

 This method should only be called by a thread that is the owner of this object's monitors"

意思就是说,在使用wait方法时,必须把wait方法写在synchronized保护的while代码块中,并始终判断执行条件是否满足,如果满足就往下继续执行,如果不满足就执行wait方法,而在执行wait方法之前,必须先持有锁对象的monitor对象。

4、wait/notify 和 sleep方法的异同?

相同点:

(1)它们都可以让线程阻塞

(2)它们都可以响应interrupt中断:在执行的过程中,如果收到中断信号都可以响应,并抛出InterruptedException异常

不同点:

(1)wait方法必须在synchronized保护的代码中使用,而sleep方法并没有这个要求

(2)在同步代码中执行sleep方法时,并不会释放monitor锁,但执行wait方法时会主动释放monitor锁

(3)sleep方法中要求必须定义一个时间,时间到期后会主动恢复,而对于没有参数的wait方法而言,意味着永久等待,直到被中断或唤醒才能恢复,它并不会主动恢复。

(4)wait/notify 是Object 类的方法,而sleep是Thread类的方法。

5、Java多线程中wait()为什么要放在while循环中

将wait()方法放在while循环中是为了避免虚假唤醒(spurious wakeups)。

虚假唤醒是指当一个线程被唤醒时,尽管没有收到对应的通知信号,但它还是会继续执行。这种情况可能发生在某些特定的条件下,例如操作系统级别的中断、调度器的行为等。

为了防止虚假唤醒导致逻辑或数据不一致性问题,在使用wait()方法进行线程等待时,我们通常会将其放在一个while循环内,并且与需要满足的条件进行检查。只有当满足特定的条件才会继续执行后面的代码。如果不满足条件,则线程会再次进入等待状态。以下是一个示例:

synchronized (lock) {
    while (!condition) {
        lock.wait();
    }
    
    // 执行其他操作
}

通过使用while(condition)来检查条件是否满足,即使发生虚假唤醒也能够安全地重新检查和等待。因此,在多线程编程中建议将wait()方法置于while循环内部以保证正确性。

6、为什么wait/notify/notifyAll 被定义在Object类中,而sleep被定义在Thread类中?

wait(),notify()和notifyAll()方法被定义在Object类中而不是Thread类中的原因是因为它们操作的是对象的锁(monitor)。

这些方法用于实现线程之间的协作与通信机制,即等待其他线程满足特定条件后再继续执行。由于每个对象都有一个关联的锁,因此将这些方法定义在所有对象共享的父类Object中更加合适。通过调用这些方法来控制同步代码块或同步方法内部对锁资源进行管理。

另一方面,sleep()方法并定义在Thread类中,它使当前正在执行的线程暂停指定时间段,并且不会释放持有的任何锁。该方法属于线程级别的操作,在调用时直接影响当前运行状态下的线程自身。

总结起来:

wait(),notify()和notifyAll()是基于对象级别的操作,用于多个线程之间进行协作和通信。

由于每个Java对象都具备监视器(monitor)功能(即互斥锁),所以这些方法需要定义在所有Java对象共享的父类Object中。

而sleep()方法则是一个与当前正在运行状态下的Thread相关联,独立存在并影响其自身行为状态。

 7、ReentrantLock和Condition

Condition类是JDK5的技术,具有更好的灵活性,例如,可以实现多路通知功能,也就是在一个Lock对象中可以创建多个Condition实例,线程对象注册在指定的Condition中,从而可以有选择性地进行线程通知,在调度线程上更加灵活。

        在使用notify()/notifyAll()方法进行通知时,被通知的线程由JVM进行选择,而方法notifyAll()会通知所有的waiting线程,没有选择权,会出现相当大的效率问题,但使用ReentrantLock结合Condition类可以实现”选择性通知“,这个功能是Condition类默认提供的。

       Condition对象的作用是控制并处理线程的状态,它可以使线程呈现wait状态,也可以使线程继续运行。

        Condition的await()方法的作用是使当前线程在接到通知或被中断之前一直处于等待wait状态。它和wait()方法的作用一样。

condition.await()方法调用之前必须先调用lock.lock()方法获得锁,否则会抛出java.lang.IIIleagalMonitorStateException:current thread is not owner。如下所示:

public class PrintDemo3 {

    public static void main(String[] args) {
        PrintDemo3 printDemo3=new PrintDemo3();
        printDemo3.printMethod();
    }

    public void printMethod() {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        Thread thread1 = new Thread() {
            @Override
            public void run() {
                super.run();
                try {
                    System.out.println("MM");
                    condition.wait();
                    System.out.println("NN");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        thread1.start();
    }
}

控制台的输出:

java 生产者消费者,多线程,java,多线程,wait,notify,notifyAll文章来源地址https://www.toymoban.com/news/detail-729483.html

  • Object类中的wait()方法相当于Condition类中的await()方法。
  • Object类中的wait(long timeout)方法相当于Condition类中的await(long time,TimeUnit unit)方法。
  • Object类中的notify()方法相当于Condition类中的signal()方法。
  • Object类中的notifyAll()方法相当于Condition类中的signalAll()方法。

到了这里,关于Java中的生产者/消费者模型的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。 acks :Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认

    2024年02月16日
    浏览(44)
  • 生产者-消费者模型

    目录 1、生产者-消费者模型是什么 2、Java中的实现 3、应用于消息队列 3.1 引入依赖 3.2 rabbitmq网站新建队列queue 3.3 模块中配置application.yml 3.4 生产者实现类 3.5 单元测试,发送msg到rabbitmq的队列(my_simple_queue) 3.6 消费者实现类 3.7 从rabbitmq队列(my_simple_queue)消费数据 3.8 队列的配

    2024年02月06日
    浏览(41)
  • Python多线程Thread——生产者消费者模型 python队列与多线程——生产者消费者模型

    下面面向对象的角度看线程 那么你可以试试看能不能用面向对象的方法实现生产者消费者模型吧。

    2024年02月09日
    浏览(52)
  • 生产者与消费者问题

            本篇文章我们使用C++探讨一下生产者与消费者问题.          我们学习了操作系统, 知道了进程和线程的概念, 但是如果不进行代码实战的话, 会很难理解它们. 特别是编程的初学者(比如我), 在了解了进程和线程后通常会感到疑惑: 多线程怎么用? 为啥我平时写代码

    2024年02月12日
    浏览(43)
  • 【JavaEE】生产者消费者模式

    作者主页: paper jie_博客 本文作者:大家好,我是paper jie,感谢你阅读本文,欢迎一建三连哦。 本文于《JavaEE》专栏,本专栏是针对于大学生,编程小白精心打造的。笔者用重金(时间和精力)打造,将基础知识一网打尽,希望可以帮到读者们哦。 其他专栏:《MySQL》《C语言》

    2024年02月05日
    浏览(42)
  • LabVIEW建立生产者消费者

    LabVIEW建立生产者消费者 生产者/消费者设计模式由并行循环组成,这些循环分为两类:生产者循环和消费者循环。生产者循环和消费者循环间的通信可以使用队列或通道连线来实现。 队列 LabVIEW内置的队列操作VI可在函数选板数据通信队列操作( Functions Data Communication  Que

    2024年02月07日
    浏览(37)
  • 线程同步--生产者消费者模型

    条件变量是 线程间共享的全局变量 ,线程间可以通过条件变量进行同步控制 条件变量的使用必须依赖于互斥锁以确保线程安全,线程申请了互斥锁后,可以调用特定函数 进入条件变量等待队列(同时释放互斥锁) ,其他线程则可以通过条件变量在特定的条件下唤醒该线程( 唤醒后线

    2024年01月19日
    浏览(39)
  • Linux——生产者消费者模型

    目录 一.为何要使用生产者消费者模型  二.生产者消费者模型优点  三.基于BlockingQueue的生产者消费者模型 1.BlockingQueue——阻塞队列 2.实现代码  四.POSIX信号量 五.基于环形队列的生产消费模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者

    2024年02月08日
    浏览(43)
  • rabbitmq消费者与生产者

    在第一次学习rabbitmq的时候,遇到了许多不懂得 第一步导包 第二步新增生产者 在这里中: connectionFactory.setVirtualHost(\\\"my_vhost\\\");//填写自己的队列名称,如果你的为”/“则填写\\\'\\\'/\\\'\\\' 第三步新增消费者 消息获取成功 注意如果你用的云服务器需要打开这两个端口 5672 15672 如果你使

    2024年02月11日
    浏览(43)
  • kafka生产者消费者练习

    需求:写一个生产者,不断的去生产用户行为数据,写入到kafka的一个topic中 生产的数据格式: 造数据 {“guid”:1,“eventId”:“pageview”,“timestamp”:1637868346789} isNew = 1 {“guid”:1,“eventId”:“addcard”,“timestamp”:1637868347625} isNew = 0 {“guid”:2,“eventId”:“collect”,“timestamp”

    2024年02月08日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包