生产者-消费者模型

这篇具有很好参考价值的文章主要介绍了生产者-消费者模型。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

1、生产者-消费者模型是什么

2、Java中的实现

3、应用于消息队列

3.1 引入依赖

3.2 rabbitmq网站新建队列queue

3.3 模块中配置application.yml

3.4 生产者实现类

3.5 单元测试,发送msg到rabbitmq的队列(my_simple_queue)

3.6 消费者实现类

3.7 从rabbitmq队列(my_simple_queue)消费数据

3.8 队列的配置类

小结


本文是RabbitMQ初入门-CSDN博客的进一步拓展,着重介绍该模型在消息队列(如rabbitmq)中的应用。

1、生产者-消费者模型是什么

首先,生产者-消费者模型是一种常见的并发编程模型,用于解决多线程或多进程环境下的数据共享与同步问题。在这个模型中,生产者负责生成数据,并将数据放入一个共享的缓冲区中,而消费者则从缓冲区中取出数据进行处理。

生产者-消费者模型,java,rabbitmq,rabbitmq,java,学习

图片来源:Java多线程之生产者消费者模式详解_java_脚本之家

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。(引自链接:Java多线程之生产者和消费者模型 - 简书)

生产者-消费者模型通常包含以下几个关键元素:

  1. 生产者:负责生成数据并放入缓冲区。生产者不断地生成数据,直到达到某个条件才停止。一般情况下,生产者在向缓冲区放入数据之前需要先检查缓冲区是否已满,如果已满则等待。

  2. 消费者:负责从缓冲区中取出数据并进行处理。消费者不断地从缓冲区中取出数据,直到达到某个条件才停止。一般情况下,消费者在从缓冲区取出数据之前需要先检查缓冲区是否为空,如果为空则等待。

  3. 缓冲区:作为生产者和消费者之间的共享数据结构,用于存储生产者生成的数据。缓冲区的大小限制了生产者和消费者之间的数据传输量,它可以是一个队列、堆栈、循环缓冲区等。

  4. 同步机制:用于保护缓冲区的访问,避免生产者和消费者同时对缓冲区进行读写操作而导致的数据不一致性。常见的同步机制包括互斥锁(mutex)、条件变量(condition variable)、信号量(semaphore)等。

生产者-消费者模型的核心思想是通过合理地协调生产者和消费者的工作,实现数据的有序生成和处理。通过使用适当的同步机制,可以保证生产者和消费者之间的互斥访问和协调,避免数据竞争和死锁等并发问题。

在Java中,生产者-消费者模型通常是通过多线程来实现的。生产者线程负责生成数据,将数据放入共享的缓冲区中;消费者线程则从缓冲区中取出数据进行处理。为了保证生产者和消费者之间的同步和互斥,可以使用Java提供的同步机制,例如synchronized关键字、ReentrantLock类、Condition接口等

2、Java中的实现

首先,可以把每个生产者和消费者各看成是一个线程,做如下定义:

生产者

public class ProduceThread extends  Thread{
    private IKFC kfc;
    public ProduceThread(String name,IKFC kfc) {
        super(name);
        this.kfc = kfc;
    }

    @Override
    public void run() {
       while(true){
           try {
               kfc.produce(getName());
               sleep(200);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
    }
}

消费者

public class ConsumerThread extends  Thread{
    private IKFC kfc;
    public ConsumerThread(String name, IKFC kfc) {
        this.kfc = kfc;
    }

    @Override
    public void run() {
       while(true){
           try {
               kfc.consume(getName());
               sleep(300);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
    }
}

然后,可以通过synchronized方法, wait(), notifyAll()实现

这种方法等于使用this自带的锁来进行同步,具体办法是将入队和出队设成syncrhronized。生产者会在入队时(得到锁之后)检查队列是否为满,如果满了,就释放掉锁并进入阻塞(wait())。等到队列有了新的空位,消费者通过notifyAll()唤醒所有线程,此时被唤醒的生产者再次检查队列,发现了新的位置,就可以再继续将产品入队了,入队完后,生产者会用notifyAll()通知消费者去消费。相对的,消费者也会在出队时等待直至队列不为空,出队完通知。(引自链接:java生产消费者模式 java实现生产者消费者模型_mob6454cc6c8549的技术博客_51CTO博客)

实现类代码:

public class KFCImpl implements IKFC {

    private Queue<Food> queue = new LinkedBlockingQueue<>();
    private final int MAX_SIZE = 10;

    @Override
    public synchronized void produce(String name) throws InterruptedException {
        if (queue.size() >= MAX_SIZE) {
            System.out.println("[生产者" + name + "] KFC生成达到上限,停止生成......");
            wait();
        } else {
            Food food = new Food("上校鸡块");
            queue.add(food);
            System.out.println("[生产者" + name + "] 生成一个:" + food.getName() + ",KFC有食物:" + queue.size() + "个");

            //唤醒等待的线程来消费
            notifyAll();
        }
    }

    @Override
    public synchronized void consume(String name) throws InterruptedException {
        if (queue.isEmpty()) {
            System.out.println("[消费者" + name + "] KFC食物已空,消费者停止消费......");
            wait();
        } else {
            Food food = queue.poll();
            System.out.println("[消费者" + name + "] 消费一个:" + food.getName() + ",KFC有食物:" + queue.size() + "个");

            //唤醒等待的线程来消费
            notifyAll();
        }
    }
}

运行测试

public class Main {
    public static void main(String[] args) {
        IKFC kfc = new KFCImpl();

        Thread p1= new ProduceThread("A",kfc);
        Thread p2= new ProduceThread("B",kfc);
        Thread p3= new ProduceThread("C",kfc);

        Thread c1 = new ConsumerThread("X",kfc);
        Thread c2 = new ConsumerThread("Y",kfc);
        Thread c3 = new ConsumerThread("T",kfc);
        Thread c4 = new ConsumerThread("Z",kfc);
        Thread c5 = new ConsumerThread("K",kfc);

        p1.start();
        p2.start();
        p3.start();
        c1.start();
        c2.start();
        c3.start();
        c4.start();
        c5.start();

    }
}

测试结果,生产和消费交替进行

[生产者A] 生成一个:上校鸡块,KFC有食物:1个
[生产者B] 生成一个:上校鸡块,KFC有食物:2个
[生产者C] 生成一个:上校鸡块,KFC有食物:3个
[消费者Thread-2] 消费一个:上校鸡块,KFC有食物:2个
[生产者B] 生成一个:上校鸡块,KFC有食物:3个
[生产者C] 生成一个:上校鸡块,KFC有食物:4个
[生产者A] 生成一个:上校鸡块,KFC有食物:5个
[消费者Thread-3] 消费一个:上校鸡块,KFC有食物:4个
[消费者Thread-4] 消费一个:上校鸡块,KFC有食物:3个
[消费者Thread-1] 消费一个:上校鸡块,KFC有食物:2个
[消费者Thread-0] 消费一个:上校鸡块,KFC有食物:1个
[消费者Thread-2] 消费一个:上校鸡块,KFC有食物:0个
[生产者B] 生成一个:上校鸡块,KFC有食物:1个

3、应用于消息队列

在消息队列中,生产者-消费者模型也被广泛应用。消息队列是一种高效的消息传递机制,它可以实现不同应用程序或服务之间的异步通信。在消息队列中,生产者向队列中发送消息,而消费者则从队列中接收消息并进行处理。消息队列通常具有以下特点:

  1. 可靠性:消息队列通常使用持久化策略,可以保证消息在发送和接收过程中的可靠性和安全性。

  2. 异步性:生产者和消费者可以独立运行,不需要等待对方的响应,从而提高系统的吞吐量和响应速度。

  3. 解耦性:消息队列可以实现不同模块之间的解耦,降低应用程序的复杂度和耦合度。

  4. 扩展性:消息队列可以根据需求动态扩展,支持多个生产者和消费者并发访问。

在消息队列中,生产者-消费者模型可以通过使用不同的消息队列实现。常见的消息队列包括ActiveMQ、RabbitMQ、Kafka等,它们提供了丰富的API和特性,可以满足不同场景下的需求。例如,ActiveMQ支持JMS规范,提供了消息确认、持久化、事务等特性;RabbitMQ支持AMQP协议,具有高可用性、可扩展性等特点;Kafka支持高吞吐量、分布式部署等特性,适合大数据处理和流式计算。

代码实现

3.1 引入依赖

    </dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

3.2 rabbitmq网站新建队列queue

生产者-消费者模型,java,rabbitmq,rabbitmq,java,学习

3.3 模块中配置application.yml

spring:
  rabbitmq:
    host: 192.168.***.***
    port: 5672
    username: admin
    password: 123
logging:
  level:
    com.****.mq: debug

3.4 生产者实现类

@Service
public class ProducerServiceImpl implements IProducerService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendMessage(String msg) {
        System.out.println("准备发送数据到mq:" + msg);
        rabbitTemplate.convertAndSend("my_simple_queue", msg);
    }

    @Override
    public void sendUser(User user) {
        System.out.println("准备发送User对象数据到mq:" + user);
        rabbitTemplate.convertAndSend("my_simple_queue",user);
    }
}

3.5 单元测试,发送msg到rabbitmq的队列(my_simple_queue)

生产者-消费者模型,java,rabbitmq,rabbitmq,java,学习

3.6 消费者实现类

@Service
public class ConsumerServiceImpl implements IConsumerService {

   //@RabbitListener(queues = "my_simple_queue")
    @Override
    public void consumerMessage(String msg) {
        System.out.println("[消费者:]消费mq中的信息:" + msg);
    }

    @RabbitListener(queues = "my_simple_queue")
    @Override
    public void consumerUser(User user) {
        System.out.println("[消费者:]消费mq中的user信息:" + user.getUsername());
    }
}

3.7 从rabbitmq队列(my_simple_queue)消费数据

生产者-消费者模型,java,rabbitmq,rabbitmq,java,学习

3.8 队列的配置类

@Configuration
public class RabbitMQConfig {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){

        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //将对象转换为json对象形式
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;

    }

}

小结

总之,生产者-消费者模型是一种重要的并发编程模型,在Java中和消息队列中都得到了广泛的应用。通过合理地使用同步机制和消息队列,可以提高系统的性能、可靠性和扩展性,实现高效的数据传输和处理。此模型在很多领域都有广泛应用,例如任务调度、消息队列、事件驱动编程等,它能有效地解耦数据生成与处理的过程,并提高系统的可扩展性和资源利用率。

参考:

java生产消费者模式 java实现生产者消费者模型_mob6454cc6c8549的技术博客_51CTO博客

Java多线程之生产者和消费者模型 - 简书

生产者消费者模型(学习笔记)——java多线程典型案例_java写生产者消费者模型_未跑路的汪汪的博客-CSDN博客

Java多线程之生产者消费者模式详解_java_脚本之家


感谢阅读,码字不易,多谢点赞!如有不当之处,欢迎反馈指出,感谢!

生产者-消费者模型,java,rabbitmq,rabbitmq,java,学习文章来源地址https://www.toymoban.com/news/detail-734699.html

到了这里,关于生产者-消费者模型的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 线程同步--生产者消费者模型

    条件变量是 线程间共享的全局变量 ,线程间可以通过条件变量进行同步控制 条件变量的使用必须依赖于互斥锁以确保线程安全,线程申请了互斥锁后,可以调用特定函数 进入条件变量等待队列(同时释放互斥锁) ,其他线程则可以通过条件变量在特定的条件下唤醒该线程( 唤醒后线

    2024年01月19日
    浏览(27)
  • Linux——生产者消费者模型

    目录 一.为何要使用生产者消费者模型  二.生产者消费者模型优点  三.基于BlockingQueue的生产者消费者模型 1.BlockingQueue——阻塞队列 2.实现代码  四.POSIX信号量 五.基于环形队列的生产消费模型 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者

    2024年02月08日
    浏览(33)
  • 生产者消费者模型 C++ 版

    网上一般教程是使用std::queue,定义消费者 Consumer ,定义Producter类,在main函数里面加锁保证线程安全。 本片文章,实现一个线程安全的队列 threadsafe_queue,只在队列内部加锁。如此可适配,多生产者多消费者的场景 Consumer 头文件 cpp文件 运行结果如下: 优先队列做缓存 头文件

    2024年02月13日
    浏览(26)
  • 【设计模式】生产者消费者模型

    带你轻松理解生产者消费者模型!生产者消费者模型可以说是同步与互斥最典型的应用场景了!文末附有模型简单实现的代码,若有疑问可私信一起讨论。 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过

    2023年04月17日
    浏览(26)
  • 【Linux】深入理解生产者消费者模型

    生产者 - 消费者模型 Producer-consumer problem 是一个非常经典的多线程并发协作的模型,在分布式系统里非常常见。 在多线程开发中,如果生产者生产数据的速度很快,而消费者消费数据的速度很慢,那么生产者就必须等待消费者消费完了数据才能够继续生产数据,同理如果消费

    2024年02月06日
    浏览(26)
  • 【Linux】线程安全-生产者消费者模型

    1个线程安全的队列:只要保证先进先出特性的数据结构都可以称为队列 这个队列要保证互斥(就是保证当前只有一个线程对队列进行操作,其他线程不可以同时来操作),还要保证同步,当生产者将队列中填充满了之后要通知消费者来进行消费,消费者消费之后通知生产者

    2024年02月10日
    浏览(31)
  • 生产者消费者模型(基于go实现)

    基于 Channel 编写一个简单的单线程生产者消费者模型: 队列: 队列长度 10,队列元素类型为 int 生产者: 每 1 秒往队列中放入一个类型为 int 的元素,消费者: 每一秒从队列中获取一个元素并打印。 基于 Channel 编写一个简单的单线程生产者消费者模型: 队列: 队列长度

    2024年02月11日
    浏览(24)
  • 基于互斥锁的生产者消费者模型

    生产者消费者模型 是一种常用的 并发编程模型 ,用于 解决多线程或多进程环境下的协作问题 。该模型包含两类角色: 生产者和消费者 。 生产者负责生成数据 ,并将数据存放到共享的缓冲区中。 消费者则从缓冲区中获取数据 并进行处理。生产者和消费者之间通过共享的

    2024年02月12日
    浏览(31)
  • Linux——生产者消费者模型和信号量

    目录 ​​​​​​​ 基于BlockingQueue的生产者消费者模型 概念 条件变量的第二个参数的作用  锁的作用 生产者消费者模型的高效性 生产者而言,向blockqueue里面放置任务 消费者而言,从blockqueue里面拿取任务: 总结 完整代码(不含存储数据的线程) 完整代码(含存储线程)  信

    2024年02月07日
    浏览(31)
  • 基于 BlockQueue(阻塞队列) 的 生产者消费者模型

    阻塞队列(Blocking Queue) 是一种特殊类型的队列,它具有阻塞操作的特性。在并发编程中,阻塞队列可以用于实现线程间的安全通信和数据共享。 阻塞队列的 主要特点 是: 当 队列为空时 ,消费者线程尝试从队列中获取(出队)元素时会被阻塞,直到有新的元素被添加到队

    2024年02月12日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包