【JavaEE】生产者消费者模式

这篇具有很好参考价值的文章主要介绍了【JavaEE】生产者消费者模式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

作者主页:paper jie_博客

本文作者:大家好,我是paper jie,感谢你阅读本文,欢迎一建三连哦。

本文于《JavaEE》专栏,本专栏是针对于大学生,编程小白精心打造的。笔者用重金(时间和精力)打造,将基础知识一网打尽,希望可以帮到读者们哦。

其他专栏:《MySQL》《C语言》《javaSE》《数据结构》等

内容分享:本期将会分享设计模式中的生产者消费者模式

目录

什么是阻塞队列

什么是生产者消费者模式

生产者消费者模式的特点

 Java标准库中的阻塞队列

自定义实现一个阻塞队列

普通队列

 实现线程安全

正解

实现阻塞

正解

 基于自己实现的阻塞队列实现一个简单的生产者消费者模型


什么是阻塞队列

阻塞队列它也是队列,遵守着先进先出的原则.且它是一种线程安全的数据结构.它有两个特性:

1. 当队列满的时候,继续入队就会进行堵塞,直到有其他线程从队列中拿走元素才会解除堵塞.

2. 当队列为空的时候,继续出队就会进行堵塞,直到有其他线程从队列中插入元素后才会解除堵塞.

我们的阻塞队列最经典的使用场景就是生产者消费者模式.

举个栗子:

在我们家中,捏饺子一般有两个步骤: 1. 捏饺子皮, 2.包饺子. 假设有三个人.一个滑稽捏饺子皮,其他两个滑稽包饺子. 滑稽捏好饺子皮厚后就会放到板子上,而其他的滑稽就不用直接去捏饺子皮的滑稽手上拿饺子皮,而是去板子上拿即可.这样他们直接就只需要关注这个板子即可. 而这个板子就充当了我们的阻塞队列.

【JavaEE】生产者消费者模式,# JavaEE,JAVA,java-ee,java

什么是生产者消费者模式

生产者消费者模式就是基于一个中间容器来解决它们之间的强耦合性问题.而这个中间容器就是阻塞队列.加入阻塞队列后,生产者与消费者之间就不会直接联系,而是通过通过阻塞队列来进行数据传输.这样生产者生产数据就不用知道是谁来处理他的数据,不用等待,直接交给阻塞队列即可.而消费者也不会去找生产者要数据,而是去阻塞队列里拿.

生产者消费者模式的特点

1. 通过阻塞队列可以降低生产者与消费者之间的耦合性

假设有三个服务器ABC,BC是将数据处理到,A是接受它们的数据.如果在没有加入阻塞队列的情况下.可能就会发生: 当B或者C挂了,这可能就会导致A也会挂,它们之间是强耦合的关系.因为B或者C的操作中需要涉及到一些关于A的操作.而A的操作也会涉及到一些关于B或C的操作.

但是当加入阻塞队列后就会发生不一样的结果. C,B处理好的结果只需要放到阻塞对列中,而A也只需要去阻塞队列中拿即可.这样B,C的操作对于A的影响就会很小,从而降低了他们之间的耦合性.

 【JavaEE】生产者消费者模式,# JavaEE,JAVA,java-ee,java

2. 削峰填谷 

这里就是加入阻塞队列起到一个平衡生产者与消费者之间的处理能力, 加入阻塞队列就可以防止当生产者一下生产出大量数据,而消费者一时间消费不了而导致挂了的问题.

举个栗子:

假设一个场景: 客户端发出请求,服务A接受请求,然后将请求交给BC处理器进行逻辑处理. 在正常情况下处理器是可以及时处理的.但是在一些特殊的时候会有一些突发峰值.外界客户端的请求非常的多.A接受这些请求一下子全部交给B,C服务器来处理,它们一下子可能就会支撑不住. 因为B,C需要就行逻辑处理业务,需要的资源开销就会比较大.如果一下给它大量的请求进行处理,处理器的资源可能就会超过它的上限而导致机器挂了.

但是在加入阻塞队列后就不用担心这种情况了. 就算客户端有大量的请求,A接受后也是传送给阻塞队列,再由B,C去阻塞队列中拿数据处理来慢慢消化.这就算数据再多,只要A服务器,阻塞队列不挂(阻塞队列和A服务器抗压能力很强,它们只需要进行存储数据和传送数据),BC也可以按正常速度进行处理.

【JavaEE】生产者消费者模式,# JavaEE,JAVA,java-ee,java

 Java标准库中的阻塞队列

在Java标准库中也提供了阻塞队列,如果我们需要使用阻塞队列,只需要使用标准库中的即可. 库中的阻塞队列叫BlockingQueue,它是一个interface接口,它实现的的类有:

ArrayBlockingQueue

LinkedBlockingQueue

priorityBlockingQueue

里面的put和offer方法就是入队方法,但是put是带有阻塞的功能. take也是出队方法,但它也带有阻塞的功能.

public class ThreadDemo1 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
        queue.put("aaa");

        System.out.println(queue.take());

        System.out.println(queue.take());
    }
}

自定义实现一个阻塞队列

这里我们准备实现一个基于数组的阻塞队列,也就是环形队列.这里队列里面我们需要一个数组,计数器和两个头尾指针,put和take方法. 这里我们分三步来实现这个阻塞队列.

1) 普通队列

2) 实现线程安全

3) 实现阻塞功能

【JavaEE】生产者消费者模式,# JavaEE,JAVA,java-ee,java

普通队列

class MyarrayBlockingQueue {
    private int elems[] = null;
    private int size = 0;
    private int head = 0;
    private int tail = 0;
    public MyarrayBlockingQueue(int capactiy) {
        elems = new int[capactiy];
    }
    public void put(int value) {
        //判断队列满没满
        if(size == elems.length) {
            //阻塞
            return;
        }
        //添加元素
        elems[tail] = value;
        tail++;
        //判断尾指针是不是需要循环到0位置
        if(tail == elems.length) {
            tail = 0;
        }
        size++;
    }
    public int take() {
        int elem = 0;
        //判断队列为不为空
        if(size == 0) {
            //阻塞
            return elem;
        }
        //出队
        elem = elems[head];
        head++;
        if(head == elems.length) {
            head = 0;
        }
        size--;
        return elem;
    }
}

 实现线程安全

实现线程安全,我们就需要加锁.但是当我们下面这个代码这样加锁时,就会出现问题.

public void put(int value) {
        //判断队列满没满
        if(size == elems.length) {
            //阻塞
            return;
        }
        synchronized (this) {
            //添加元素
            elems[tail] = value;
            tail++;
            //判断尾指针是不是需要循环到0位置
            if(tail == elems.length) {
                tail = 0;
            }
            size++;
        }
    }
    public int take() {
        int elem = 0;
        //判断队列为不为空
        if(size == 0) {
            //阻塞
            return elem;
        }
        synchronized(this) {
            //出队
            elem = elems[head];
            head++;
            if(head == elems.length) {
                head = 0;
            }
            size--;
            return elem;
        }
    }

我们发现如果当有两个线程t1,t2同时使用put或者take方法.假设当前有99个元素,容量为100.当t1执行到if(size == elems.length)后被调度走了,再轮到t2执行.当t2执行完后,队列的元素语已经满了.但是当轮到t1执行时因为上一次的if判断它还会再入队一个元素,这就会出现size为101的问题.

【JavaEE】生产者消费者模式,# JavaEE,JAVA,java-ee,java

正解

我们将if判断条件也放到锁中就可以了.

public void put(int value) {
        synchronized (this) {
            //判断队列满没满
            if(size == elems.length) {
                //阻塞
                return;
            }
            //添加元素
            elems[tail] = value;
            tail++;
            //判断尾指针是不是需要循环到0位置
            if(tail == elems.length) {
                tail = 0;
            }
            size++;
        }
    }
    public int take() {
        int elem = 0;
        synchronized(this) {
            //判断队列为不为空
            if(size == 0) {
                //阻塞
                return elem;
            }
            //出队
            elem = elems[head];
            head++;
            if(head == elems.length) {
                head = 0;
            }
            size--;
            return elem;
        }
    }

实现阻塞

这里需要实现阻塞就要使用我们的wait和notify方法.

当队列满时,使用put就会执行wait进入阻塞,只有当使用take调用notify队列才会解除堵塞.

当队列为空时,使用take就会执行wait进入堵塞,只有当使用put调用notify队列才会解除堵塞.

public void put(int value) throws InterruptedException {
        synchronized (this) {
            //判断队列满没满
            if(size == elems.length) {
                this.wait();
                return;
            }
            //添加元素
            elems[tail] = value;
            tail++;
            //判断尾指针是不是需要循环到0位置
            if(tail == elems.length) {
                tail = 0;
            }
            size++;
            this.notify();
        }
    }
    public int take() throws InterruptedException {
        int elem = 0;
        synchronized(this) {
            //判断队列为不为空
            if(size == 0) {
                this.wait();
                return elem;
            }
            //出队
            elem = elems[head];
            head++;
            if(head == elems.length) {
                head = 0;
            }
            size--;
            this.notify();
            return elem;
        }
    }

但是这样又会出现一个问题. 假设有三个线程t1,t2,t3 t1和t2调用put. t3调用take. 且这个队列已经满了. 这里就会有一种情况: t1和t2调用put发现满了就都会在wait那里堵塞,且解锁. 这时t3就执行take方法出队了一个且调用了notify方法唤醒了t1. 则t1也就向下执行入队了一个,此时队列是满了.但是!!!t1的notify方法就可能会唤醒t2.而t2就会直接向下执行又入队一个,但队列是满的,这就出现问题了.

【JavaEE】生产者消费者模式,# JavaEE,JAVA,java-ee,java

正解

我们可以在if判断那里将if改成while,这样就算被notify唤醒了,也会再次判断队列是不是满/空.才会选择是不是执行还是继续堵塞.文章来源地址https://www.toymoban.com/news/detail-755436.html

public void put(int value) throws InterruptedException {
        synchronized (this) {
            //判断队列满没满
            while(size == elems.length) {
                this.wait();
                return;
            }
            //添加元素
            elems[tail] = value;
            tail++;
            //判断尾指针是不是需要循环到0位置
            if(tail == elems.length) {
                tail = 0;
            }
            size++;
            this.notify();
        }
    }
    public int take() throws InterruptedException {
        int elem = 0;
        synchronized(this) {
            //判断队列为不为空
            while(size == 0) {
                this.wait();
                return elem;
            }
            //出队
            elem = elems[head];
            head++;
            if(head == elems.length) {
                head = 0;
            }
            size--;
            this.notify();
            return elem;
        }
    }

 基于自己实现的阻塞队列实现一个简单的生产者消费者模型

public class ThreadDemo8 {
    public static void main(String[] args) {
        block queue = new block(1000);
        Thread t1 = new Thread(() -> {
            int count = 0;
            while(true) {
                try {
                    queue.put(count);
                    System.out.println("生产者: " + count);
                    Thread.sleep(1000);
                    count++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

        });
        Thread t2 = new Thread(() -> {
            while(true) {
                try {
                    System.out.println("消费者: " +  queue.take());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t1.start();
        t2.start();
    }
}

到了这里,关于【JavaEE】生产者消费者模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • C# 快速写入日志 不卡线程 生产者 消费者模式

    有这样一种场景需求,就是某个方法,对耗时要求很高,但是又要记录日志到数据库便于分析,由于访问数据库基本都要几十毫秒,可在方法里写入BlockingCollection,由另外的线程写入数据库。 可以看到,在我的机子上面,1ms写入了43条日志。

    2024年02月15日
    浏览(37)
  • go语言中实现生产者-消费者模式有哪些方法呢

    本文将介绍在 Go 语言中实现生产者消费者模式的多种方法,并重点探讨了通道、条件变量的适用场景和优缺点。我们将深入讨论这些方法的特点,以帮助开发者根据应用程序需求选择最适合的方式。通过灵活运用 Go 语言提供的并发原语,我们能够实现高效、可靠的生产者消

    2024年02月05日
    浏览(22)
  • 【Rust 基础篇】Rust 通道实现单个消费者多个生产者模式

    在 Rust 中,我们可以使用通道(Channel)来实现单个消费者多个生产者模式,简称为 MPMC。MPMC 是一种常见的并发模式,适用于多个线程同时向一个通道发送数据,而另一个线程从通道中消费数据的场景。本篇博客将详细介绍 Rust 中单个消费者多个生产者模式的实现方法,包含

    2024年02月16日
    浏览(27)
  • 高效协作处理缓存清理需求:生产者-消费者模式助力多模块缓存管理

    在现代应用系统中,缓存是提高性能和减少数据库负载的重要手段之一。然而,缓存的数据在某些情况下可能会过期或者变得无效,因此需要及时进行清理。在复杂的应用系统中,可能有多个系统、多个模块产生缓存清理需求,而这些系统、模块之间的清理任务需要高效的协

    2024年02月15日
    浏览(37)
  • 线程安全版本的单例设计模式 与 生产者消费者模型简介

    目录 单例设计模式 单例设计模式——饿汉式 单例设计模式——懒汉式 单例设计模式——懒汉式(优化步骤) 生产者消费者模型 介绍 优点 补充:关于阻塞队列 单例设计模式能够保证 某个类的实例在程序运行过程中始终都只会存在一份 。这一点在很多场景上都有需要,比

    2023年04月24日
    浏览(45)
  • 【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池

    死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。 当多线程并发执行并都需要访问临界资源时,因为每个线程都是不同的执行流,这就有可能 导致数据不一致问题 ,为了避免此问题的发生

    2024年01月24日
    浏览(30)
  • Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

    Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。 Kafka 提供了三种 ACK 机制的配置选项,分别是: acks=0:生产者在成功将消息发送到网络缓冲区后即视

    2024年02月04日
    浏览(37)
  • 【设计模式】C语言使用共享内存和信号量,完美实现生产者与消费者模式

    生产者和消费者模式适用于生产者和消费者之间存在数据交换的场景。在这种模式中,生产者负责生产数据并将其放入缓冲区,而消费者负责从缓冲区中取出数据并进行处理。这种模式的优点是可以实现生产者和消费者之间的解耦,使得它们可以独立地进行操作,从而提高了

    2024年02月03日
    浏览(28)
  • 7.5.tensorRT高级(2)-RAII接口模式下的生产者消费者多batch实现

    杜老师推出的 tensorRT从零起步高性能部署 课程,之前有看过一遍,但是没有做笔记,很多东西也忘了。这次重新撸一遍,顺便记记笔记。 本次课程学习 tensorRT 高级-RAII 接口模式下的生产者消费者多 batch 实现 课程大纲可看下面的思维导图 这节课我们利用上节课学到的 RAII

    2024年02月12日
    浏览(28)
  • 生产者-消费者模型

    目录 1、生产者-消费者模型是什么 2、Java中的实现 3、应用于消息队列 3.1 引入依赖 3.2 rabbitmq网站新建队列queue 3.3 模块中配置application.yml 3.4 生产者实现类 3.5 单元测试,发送msg到rabbitmq的队列(my_simple_queue) 3.6 消费者实现类 3.7 从rabbitmq队列(my_simple_queue)消费数据 3.8 队列的配

    2024年02月06日
    浏览(27)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包