【JUC基础】06. 生产者和消费者问题

这篇具有很好参考价值的文章主要介绍了【JUC基础】06. 生产者和消费者问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、前言

学习JUC,就不得不提生产者消费者。生产者消费者模型是一种经典的多线程模型,用于解决生产者和消费者之间的数据交换问题。在生产者消费者模型中,生产者生产数据放入共享的缓冲区中,消费者从缓冲区中取出数据进行消费。在这个过程中,生产者和消费者之间需要保持同步,以避免数据出现错误或重复。今天我们就来说说生产者消费者模型,以及JUC中如何解决该模型的同步问题。

2、什么是生产者消费者问题

生产者消费者问题是一种经典的多线程问题,用于描述生产者和消费者之间的数据交换问题。其实本质上就是线程间通信问题,即线程等待唤醒和通知唤醒。

生产者消费者问题通常包含以下三个元素:

  1. 生产者:负责生产数据,并将其放入共享的缓冲区中。
  2. 消费者:负责从缓冲区中取出数据,并进行消费。
  3. 缓冲区:用于存放生产者生产的数据,消费者从中取出数据进行消费。

在实际应用中,生产者和消费者可能存在速度差异,导致缓冲区的数据量不断变化。如果缓冲区满了,生产者需要等待,直到消费者取走了一部分数据。同样,如果缓冲区为空,消费者需要等待,直到生产者生产了一些数据放入缓冲区中。

3、Synchronized解决方案

synchronized解决方案,一般采用wait()(等待唤醒)和notifyAll()(通知唤醒)进行线程的同步通信。

  • wait()方法用于使当前线程等待,直到另一个线程调用相同对象上的notify()方法或notifyAll()方法来唤醒它。wait()方法必须在synchronized块或方法中调用,以确保线程获得对象的监视器锁。
  • notify()方法用于通知等待在相同对象上的某个线程,告诉它们可以继续运行。
  • notifyAll()方法则通知等待在相同对象上的所有线程。

调用wait()方法会释放锁,使当前线程进入等待状态,直到其他线程调用相同对象上的notify()方法或notifyAll()方法唤醒它。而notify()方法则会随机选择一个等待的线程唤醒,而notifyAll()则会唤醒所有等待的线程,让它们竞争锁。

public class ProducerConsumerExample {
    public static void main(String[] args) {
        NumberOper object = new NumberOper();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                object.add();
            }
        }, "thread-add-1").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                object.sub();
            }
        }, "thread-sub-1").start();
    }
}

class NumberOper {
    private int number = 0;

    public synchronized void add() {
        if(number != 0) {
            try {
                // 等待唤醒
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        number++;
        System.out.println("线程" + Thread.currentThread().getName() + "执行了add(),number====>" + number);
        
        // 通知其他唤醒
        this.notifyAll();
    }

    public synchronized void sub() {
        if(number == 0) {
            try {
                // 等待唤醒
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        number--;
        System.out.println("线程" + Thread.currentThread().getName() + "执行了sub(),number====>" + number);
        
        // 通知其他唤醒
        this.notifyAll();
    }
}

执行结果:

【JUC基础】06. 生产者和消费者问题

不过需要注意的是,上面的代码没有考虑到多线程并发的情况,如果多个生产者和多个消费者同时访问缓冲区,就需要使用线程安全的数据结构或加锁来保证线程安全。也就是虚假唤醒问题。

【JUC基础】06. 生产者和消费者问题

虚假唤醒问题,请参考《wait(),notify()虚假唤醒》篇幅。

4、Lock解决方案

Synchronized解决方案,主要是依赖于wait()和notify()方法解决。相应的JUC中的Lock也是类似的解决手段。

  • Synchronized:(注意wait()和notify()方法是Object的方法)
    • wait():线程等待,直到其他线程将他唤醒
    • notify():唤醒其他等待的线程
    • notifyAll():换新所有等待的线程
  • Lock:
    • await():线程等待,直到其他线程将他唤醒
    • signal():唤醒正在等待的线程
    • signalAll():唤醒正在等待的线程

使用JUC Lock来解决生产者消费者问题,可以使用Condition(条件变量)来实现。

Condition是基于Lock来创建的,每个Condition对象都和一个Lock对象绑定。Condition对象提供了类似wait()和notify()的方法来控制线程的等待和唤醒。Condition对象可以通过Lock对象的newCondition()方法创建。

生产者消费者问题中,我们可以使用两个Condition对象来控制生产者和消费者的等待和唤醒。当缓冲区为空时,消费者线程等待,当缓冲区满时,生产者线程等待。

【JUC基础】06. 生产者和消费者问题

package com.github.fastdev.waitnotify;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/** * @author Shamee loop * @date 2023/4/9 */
public class ProducerConsumerExample {

    public static void main(String[] args) {
        NumberOper object = new NumberOper();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                object.add();
            }
        }, "thread-add-1").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                object.sub();
            }
        }, "thread-sub-1").start();
    }
}


class NumberOper {
    private int number = 0;
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    public void add() {
        lock.lock();
        try {
            if (number != 0) {
                condition.await();
            }
            number++;
            System.out.println("线程" + Thread.currentThread().getName() + "执行了add(),number====>" + number);
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void sub() {
        lock.lock();
        try {
            if (number == 0) {
                condition.await();
            }
            number--;
            System.out.println("线程" + Thread.currentThread().getName() + "执行了sub(),number====>" + number);
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}

执行结果:

【JUC基础】06. 生产者和消费者问题

5、Condition

那么既然synchronized就能解决生产者消费者问题,为什么还需要JUC的Lock这种方式呢? 从代码量上看,Lock的方式明显比较繁琐。

当然,存在即合理。JUC实现了Lock的方式,且引入了Condition。肯定是具备了synchronized所没有的特性。

试想一个场景:

synchronized的notify()虽然唤醒了等待的线程。但是如果存在多个等待的线程呢?唤醒后获得执行权的需要取决于分配策略。那么有没有一种可能,我需要指定唤醒某个等待的线程?Condition就来了,他可以指定唤醒某个线程,也就是精准唤醒。

Condition 是 Java 中 Lock 的一个重要组件,可以用于实现更加灵活、高效的线程同步。它提供了类似于 Object.wait() 和 Object.notify() 的等待/通知机制,但相较于传统的 synchronized,它更加灵活,可以实现更多高级特性。

Condition 的主要作用是允许线程在等待某些条件的情况下暂停执行(即阻塞线程),并且当条件满足时,可以重新唤醒这些线程。Condition 与 Lock 一起使用,通常需要创建一个 Lock 对象,然后调用 Lock 的 newCondition() 方法来创建一个 Condition 对象。

Condition 接口中最常用的方法包括:

  • await():当前线程等待,直到被通知或中断;
  • awaitUninterruptibly():当前线程等待,直到被通知,但不会响应中断;
  • signal():唤醒一个等待中的线程;
  • signalAll():唤醒所有等待中的线程。
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerExample {

    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity = 10;
    private final Lock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();

    public void produce() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                notFull.await();
            }
            int num = (int) (Math.random() * 100);
            queue.add(num);
            System.out.println("Produced " + num);
            
             // 指定唤醒线程
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public void consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await();
            }
            int num = queue.remove();
            System.out.println("Consumed " + num);
            
            // 指定唤醒线程
            notFull.signal();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ProducerConsumerExample example = new ProducerConsumerExample();
        Thread producerThread1 = new Thread(() -> {
            while (true) {
                try {
                    example.produce();
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread producerThread2 = new Thread(() -> {
            while (true) {
                try {
                    example.produce();
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread consumerThread1 = new Thread(() -> {
            while (true) {
                try {
                    example.consume();
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread consumerThread2 = new Thread(() -> {
            while (true) {
                try {
                    example.consume();
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        producerThread1.start();
        producerThread2.start();
        consumerThread1.start();
        consumerThread2.start();
    }
}

6、小结

到此,我们学习了生产者和消费者模型,以及他的一些问题,以及如何解决。还接触了Locks中的另一个类Condition的使用。一天进步一点点,一起加油~文章来源地址https://www.toymoban.com/news/detail-446016.html

到了这里,关于【JUC基础】06. 生产者和消费者问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Python多线程Thread——生产者消费者模型 python队列与多线程——生产者消费者模型

    下面面向对象的角度看线程 那么你可以试试看能不能用面向对象的方法实现生产者消费者模型吧。

    2024年02月09日
    浏览(56)
  • 生产者与消费者问题

            本篇文章我们使用C++探讨一下生产者与消费者问题.          我们学习了操作系统, 知道了进程和线程的概念, 但是如果不进行代码实战的话, 会很难理解它们. 特别是编程的初学者(比如我), 在了解了进程和线程后通常会感到疑惑: 多线程怎么用? 为啥我平时写代码

    2024年02月12日
    浏览(45)
  • LabVIEW编程基础之生产者消费者结构 -- Simon小游戏(以羊了个羊为背景)

           LabVIEW在工业控制及仪器测量领域中用的会比较多,那么就经常会涉及到数据采集、分析和处理的问题,为了提高效率,往往会在数据采集的同时就进行数据分析和处理,这时就需要用多线程来处理,多线程的概念如果有不了解的,可以参考这位博主的讲解多线程(

    2024年02月09日
    浏览(44)
  • Kafka 之生产者与消费者基础知识:基本配置、拦截器、序列化、分区器

    kafaf集群地址列表:理论上写一个节点地址,就相当于绑定了整个kafka集群了,但是建议多写几个,如果只写一个,万一宕机就麻烦了 kafka消息的key和value要指定序列化方法 kafka对应的生产者id 使用java代码表示则为以下代码:  可使用 retries 参数 进行设置,同时要注意记住两

    2024年02月05日
    浏览(55)
  • 线程同步--生产者消费者模型

    条件变量是 线程间共享的全局变量 ,线程间可以通过条件变量进行同步控制 条件变量的使用必须依赖于互斥锁以确保线程安全,线程申请了互斥锁后,可以调用特定函数 进入条件变量等待队列(同时释放互斥锁) ,其他线程则可以通过条件变量在特定的条件下唤醒该线程( 唤醒后线

    2024年01月19日
    浏览(43)
  • linux:生产者消费者模型

    个人主页 : 个人主页 个人专栏 : 《数据结构》 《C语言》《C++》《Linux》 本文是对于生产者消费者模型的知识总结 生产者消费者模型就是通过一个容器来解决生产者消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过之间的容器来进行通讯,所以生产者

    2024年04月15日
    浏览(43)
  • Linux——生产者消费者模型

    目录 一.为何要使用生产者消费者模型  二.生产者消费者模型优点  三.基于BlockingQueue的生产者消费者模型 1.BlockingQueue——阻塞队列 2.实现代码  四.POSIX信号量 五.基于环形队列的生产消费模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者

    2024年02月08日
    浏览(46)
  • rabbitmq消费者与生产者

    在第一次学习rabbitmq的时候,遇到了许多不懂得 第一步导包 第二步新增生产者 在这里中: connectionFactory.setVirtualHost(\\\"my_vhost\\\");//填写自己的队列名称,如果你的为”/“则填写\\\'\\\'/\\\'\\\' 第三步新增消费者 消息获取成功 注意如果你用的云服务器需要打开这两个端口 5672 15672 如果你使

    2024年02月11日
    浏览(46)
  • 多线程之生产者消费者

    目的是回顾多线程的几个api 多生产者+多消费者+共享池

    2024年02月07日
    浏览(52)
  • kafka生产者消费者练习

    需求:写一个生产者,不断的去生产用户行为数据,写入到kafka的一个topic中 生产的数据格式: 造数据 {“guid”:1,“eventId”:“pageview”,“timestamp”:1637868346789} isNew = 1 {“guid”:1,“eventId”:“addcard”,“timestamp”:1637868347625} isNew = 0 {“guid”:2,“eventId”:“collect”,“timestamp”

    2024年02月08日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包