201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue)

这篇具有很好参考价值的文章主要介绍了201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

★ 工作队列介绍

工作队列: 就是让多个消费者竞争消费同一个消息队列的消息,相当于多个消费者共享消息队列。

▲ RabbitMQ可以让多个消费者竞争消费同一个消息队列

▲ 消息队列默认会将消息“均分”给每个消费者,但这样做往往并不合适:
因为有的消费者需要更多时间处理一条消息,有的消费者只要更少时间即可处理一条消息,
如果让它们“均分”这些消息,就会造成资源浪费。

▲ 比较理想的做法是“能者多劳”,让队列将消息多分给需要更少时间的消费者(快),
将消息少分给需要更多时间的消费者(慢)。

▲ 调用Channel的basicQos(int prefetchCount)方法可控制消费者在同一时间点最多能得到的消息数量
——此时应该采用手动确认。

201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue),RabbitMQ,Spring Boot,rabbitmq,分布式,WorkQueue
201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue),RabbitMQ,Spring Boot,rabbitmq,分布式,WorkQueue

这个就是上一篇写的采用自动确认策略
注意: channel.basicConsume 的第二个参数 autoAck:true,就是表示自动确认消息已经被消费完成了。就是当消费者接收到消息之后,就立马返回一个已经确认消费的消息回去给消息队列。
这样容易出现问题,就是消费者这边因为一收到消息就会自动确认消息被消费了并返回已经消费消息的结果回去给消息队列,但是可能消费者其实还没有把消息消费掉,而消息队列那边又以为消费者已经把消息消费了,所以就继续发消息给那个消费者。
而消费者一收到消息又自动确认消费并返回,就会导致这个消息队列的消息越来越多,然后消费者消费不完。
201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue),RabbitMQ,Spring Boot,rabbitmq,分布式,WorkQueue

代码演示

在上一篇的代码基础上修改
200、使用默认 Exchange 实现 P2P 消息 之 消息生产者(发送消息) 和 消息消费者(消费消息)

思路:
1、创建一个消息生产者和两个消息消费者。
2、生产者发送20条消息
3、消费者01 和 消费者 02 都用 channel.basicQos(3); 设置同一时间点只能获取3条消息来处理,只有这3条消息处理完才能再次获取3条消息
4、每个消费者都在消息处理完之后添加 channel.basicAck() 这个方法来手动确认消息成功消费并返回确认成功消费的消息给消息队列。
5、消费者01 每次消费完后,先睡眠个1秒,再手动确认消息已经消费,消费者02不需要,当消息消费完成后就马上手动确认。用于看两个消费者的消费情况

代码如图:
生产者 Producer
生产者代码不变,只是设置发送20条消息
201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue),RabbitMQ,Spring Boot,rabbitmq,分布式,WorkQueue

消费者01 Consumer01
经过测试:同一时间点每次只能消费3条消息,只有这3条消息消费完成,并手动确认消费完成后,才能再获取3条消息进行消费。如果把手动确认消费的代码注释掉,那么这个消费者只能消费到3条消息。最后面有演示:

201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue),RabbitMQ,Spring Boot,rabbitmq,分布式,WorkQueue
201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue),RabbitMQ,Spring Boot,rabbitmq,分布式,WorkQueue

消费者02 Consumer02
多个了睡眠1秒再手动确认消息
201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue),RabbitMQ,Spring Boot,rabbitmq,分布式,WorkQueue
201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue),RabbitMQ,Spring Boot,rabbitmq,分布式,WorkQueue

测试

生产者发送20条消息,消费者01 和 消费者02 每次获取3条消息,消息消费并手动确认后才能再获取3条消息进行消费。
然后消费者02 因为每次消费完都睡眠一秒,而消费者01没有。
这个睡眠 用来演示消费者01的消息处理速度比消费者02 快的情况。
所以那个消费者消费的快,哪个消费者处理的消息就越多
这个就是工作队列:
工作队列 就是让多个消费者竞争消费同一个消息队列的消息,相当于多个消费者共享消息队列。
201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue),RabbitMQ,Spring Boot,rabbitmq,分布式,WorkQueue

注意点1:

如图:这个 multiple 参数,设置为false,表示 不对之前未确认的的消息进行批量确认。
可以经过测试,无论改成true还是false,只要消息队列里面有已消费未确认的消息,再次启动这个消费者,它还是会对之前已消费未确认的消息进行批量确认。
201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue),RabbitMQ,Spring Boot,rabbitmq,分布式,WorkQueue
测试流程:
1、首先,关闭消费者,然后生产者发送20条消息。
现在就是消息队列有20条消息未被消费
201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue),RabbitMQ,Spring Boot,rabbitmq,分布式,WorkQueue

2、这时候把确认消费的代码注释掉,然后如图,成功消费3条消息,但是未确认,还有17条消息待消费。
201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue),RabbitMQ,Spring Boot,rabbitmq,分布式,WorkQueue
3、重新启动消费者01,这个时候正确应该是消费剩下的17条消息,但是那3条消费未确认的消息应该还在。

但是结果却如图:
重启消费者01,把自动确认的代码放开,multiple 为 false,但是最终还是把所有消息消费了,包括3条已消费未确认的消息。

所以感觉这个 multiple 为 false 没起作用。
201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue),RabbitMQ,Spring Boot,rabbitmq,分布式,WorkQueue

201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue),RabbitMQ,Spring Boot,rabbitmq,分布式,WorkQueue

注意点2:

注释掉手动确认代码的演示: 经过测试:同一时间点每次只能消费3条消息,只有这3条消息消费完成,并手动确认消费完成后,才能再获取3条消息进行消费。如果把手动确认消费的代码注释掉,那么这个消费者只能消费到3条消息
201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue),RabbitMQ,Spring Boot,rabbitmq,分布式,WorkQueue

完整代码:

ConnectionUtil

package cn.ljh.rabbitmq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//连接工具
public class ConnectionUtil
{
    //获取连接的方法
    public static Connection getConnection() throws IOException, TimeoutException
    {
        //创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来
        ConnectionFactory connectionFactory =  new ConnectionFactory();
        //设置连接信息
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("ljh");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/"); //连接虚拟主机
        //从连接工厂获取连接
        Connection connection = connectionFactory.newConnection();
        //返回连接
        return connection;
    }
}

Producer

package cn.ljh.rabbitmq.producer;

import cn.ljh.rabbitmq.consumer.Consumer01;
import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

//消息生产者--使用默认的exchange
public class Producer
{
    //(1)创建ConnectionFactory,设置连接信息,再通过ConnectionFactory获取Connection。
    //(2)通过Connection获取Channel。
    //(3)根据需要调用exchangeDeclare()、queueDeclare()方法声明Exchange和队列、并完成队列与Exchange的绑定。
    //    如果声明的Exchange还不存在,则创建该Exchange;否则直接使用已有的Exchange。
    //(4)调用Channel的basicPublish()方法发送消息,调用该方法的第一个参数是exchange,
    //    第二个参数为路由key,最后两个参数依次是消息属性和消息数据体。
    public static void main(String[] args) throws IOException, TimeoutException
    {
        //1、创建连接
        Connection conn = ConnectionUtil.getConnection();
        //2、通过Connection获取Channel。
        Channel channel = conn.createChannel();
        //3、调用exchangeDeclare()方法声明Exchange、调用queueDeclare()方法声明队列,并完成队列与Exchange的绑定
        //此处打算直接使用默认的Exchange来分发消息,因此无需声明 Exchange,只需声明队列
        channel.queueDeclare(Consumer01.QUEUE_NAME, true, false, false, null);

        //生产者发送20条消息
        for (int i = 1; i <= 20; i++)
        {
            String message = "生产者发送的第【 "+i+" 】条消息的内容";

            //4、调用Channel的basicPublish()方法发送消息
            channel.basicPublish(""/*默认的 Exchange 没有名字,所以用空的字符串*/,
                    Consumer01.QUEUE_NAME/*使用队列名作为路由key,表明该消息将会被路由到该队列*/,
                    null /*指定额外的消息的属性*/,
                    message.getBytes(StandardCharsets.UTF_8)/*消息体必须是字节数组类型-->byte[]*/
            );
        }
        //5、关闭资源
        //关闭通道
        channel.close();
        //关闭连接
        conn.close();
    }
}

Consumer01

package cn.ljh.rabbitmq.consumer;

import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//消息消费者1
public class Consumer01
{
    // 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:
    //(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。
    //(2)通过Connection获取Channel。
    //(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布
    //    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。
    //(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。

    //常量
    public final static String QUEUE_NAME = "myQueue01";

    public static void main(String[] args) throws IOException, TimeoutException
    {
        //1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接
        Connection conn = ConnectionUtil.getConnection();

        //2、通过Connection获取Channel 消息通道
        Channel channel = conn.createChannel();

        //工作队列的关键,设置该消费者在同一时间点,最多只会获取3条消息来处理
        //不要纠结同一时间点是多久,重点是这个消费者每次只能获取3条消息,只有这条消息处理完后,才能再获取3条信息来处理
        channel.basicQos(3);


        //3、调用 Channel 的 queueDeclare() 方法声明队列
        //如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列
        //参数1:声明的队列名; 参数2:消息队列是否持久化
        //参数3:是否只允许该消息消费者消费该队列的消息,为true,则其他消费者在这个myQueue01队列消息积堆过多的情况下,也无法帮忙消费。
        //参数4:是否自动删除(如果为true,在该队列没消息的情况下,会自动删除该队列) 参数5:填写额外的参数
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);


        //4、调用Channel 的 basicConsume()方法开始处理消费消息
        channel.basicConsume(QUEUE_NAME/*消费这个名字的消费队列里面的消息*/,
                false/*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,
                new DefaultConsumer(channel)
                {
                    //处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,
                                               AMQP.BasicProperties properties /*消息的那些属性*/,
                                               byte[] body /*body:消息的消息体*/) throws IOException
                    {
                        //把消息体中的消息拿出来
                        String message = new String(body, "UTF-8");
                        //printf:格式化输出函数   %s:输出字符串  %n:换行
                        System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",
                                envelope.getExchange(),envelope.getRoutingKey(),message);

                        //到这里消息已经处理完了=======
                        //因为 basicConsume 方法的 参数2 的 autoAck 为 false , 所以这里需要进行手动确认消息
                        //等消息处理完成后,确认本条消息
                        //getDeliveryTag() 获取这条消息的标志,作为参数,让basicAck()方法确认这条消息已经消费完成并返回给消息队列
                        channel.basicAck(envelope.getDeliveryTag(),
                                false /* 指定是否对之前未确认的消息进行批量确认 */);

                    }
                }
        );
    }
}

Consumer02

package cn.ljh.rabbitmq.consumer;

import cn.ljh.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//消息消费者2
public class Consumer02
{
    // 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:
    //(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。
    //(2)通过Connection获取Channel。
    //(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布
    //    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。
    //(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。
    public static void main(String[] args) throws IOException, TimeoutException
    {
        //1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接
        Connection conn = ConnectionUtil.getConnection();

        //2、通过Connection获取Channel 消息通道
        Channel channel = conn.createChannel();

        //工作队列的关键,设置该消费者在同一时间点,最多只会获取3条消息来处理
        //不要纠结同一时间点是多久,重点是这个消费者每次只能获取3条消息,只有这条消息处理完后,才能再获取3条信息来处理
        channel.basicQos(3);

        //3、调用 Channel 的 queueDeclare() 方法声明队列
        //如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列
        //参数1:声明的队列名; 参数2:消息队列是否持久化
        //参数3:是否只允许该消息消费者消费该队列的消息,为true,则其他消费者在这个myQueue01队列消息积堆过多的情况下,也无法帮忙消费。
        //参数4:是否自动删除(如果为true,在该队列没消息的情况下,会自动删除该队列) 参数5:填写额外的参数
        channel.queueDeclare(Consumer01.QUEUE_NAME, true, false, false, null);

        //4、调用Channel 的 basicConsume()方法开始处理消费消息
        channel.basicConsume(Consumer01.QUEUE_NAME/*消费这个名字的消费队列里面的消息*/,
                false/*消消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,
                new DefaultConsumer(channel)
                {
                    //处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,
                                               AMQP.BasicProperties properties /*消息的那些属性*/,
                                               byte[] body /*body:消息的消息体*/) throws IOException
                    {
                        //把消息体中的消息拿出来
                        String message = new String(body, "UTF-8");
                        //printf:格式化输出函数   %s:输出字符串  %n:换行
                        System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",
                                envelope.getExchange(),envelope.getRoutingKey(),message);

                        try
                        {
                            //模拟耗时的操作 1秒
                            Thread.sleep(1000);
                        } catch (InterruptedException e)
                        {
                            e.printStackTrace();
                        }

                        //等消息处理完成后,确认本条消息=======到这里消息已经处理完了
                        //因为 basicConsume 方法的 参数2 的 autoAck 为 false , 所以这里需要进行手动确认消息
                        //getDeliveryTag() 获取这条消息的标志,作为参数,让basicAck()方法确认这条消息已经消费完成并返回给消息队列
                        channel.basicAck(envelope.getDeliveryTag(),
                                false /* 指定是否对之前未确认的消息进行批量确认 */);
                    }
                }
        );
    }


}

pom.xml

依赖不变文章来源地址https://www.toymoban.com/news/detail-724970.html

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.ljh</groupId>
    <artifactId>workqueue</artifactId>
    <version>1.0.0</version>
    <name>workqueue</name>

    <!--  属性  -->
    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>11</java.version>
    </properties>

    <!--  依赖  -->
    <dependencies>
        <!-- RabbitMQ 的依赖库 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.13.0</version>
        </dependency>

    </dependencies>

</project>

到了这里,关于201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ Exchange类型和工作模式介绍

    RabbitMQ常用的交换器类型有: fanout、 direct、 topic、 headers四种。 会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中 如图: direct类型的交换器路由规则很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的 队列中 ,如下图: topic类型的交换器在direc

    2024年02月11日
    浏览(45)
  • Docker中为RabbitMQ安装rabbitmq_delayed_message_exchange延迟队列插件

    1、前言 rabbitmq_delayed_message_exchange是一款向RabbitMQ添加延迟消息传递(或计划消息传递)的插件。 插件下载地址:https://www.rabbitmq.com/community-plugins.html 1、下载插件 首先需要确定我们当前使用的RabbitMQ的版本,我们可以直接登录Web端的管理界面查看版本   也可以在RabbitMQ容器中

    2024年02月12日
    浏览(49)
  • RabbitMQ入门 消息队列快速入门 SpringAMQP WorkQueue 队列和交换机 Fanout Direct exchange RAbbitMQ单体部署

    微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。 1.

    2024年04月08日
    浏览(71)
  • 消息队列-RabbitMQ:Exchanges、绑定 bindings以及3大常用交换机(Fanout exchange、Direct exchange、Topics exchange)

    RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列 。实际上, 通常生产者甚至都不知道这些消息传递传递到了哪些队列中 。 相反, 生产者只能将消息发送到交换机 (exchange) , 交换机工作 的内容非常简单, 一方面它接收来自生产者的消息 , 另一

    2024年04月08日
    浏览(53)
  • Rabbitmq入门与应用(二)-RabbitMQ工作模型

    RabbitMQ Tutorials — RabbitMQ Broker RabbitMQ服务。 Connection 生产者或是服务者都需要与Broker建立的TCP连接。 Channel 保持的TCP长连接里面去创建和释放Channel,从而减少资源的消耗。其中Channel是相互隔离的,不能共享。 Queue Queue是生产者与消费者的中间交互队列,生产者发送的消息到达

    2024年02月21日
    浏览(41)
  • 198、RabbitMQ 的核心概念 及 工作机制概述; Exchange 类型 及 该类型对应的路由规则;了解什么是JMS。

    看RabbitMQ 之前,可以先了解下什么是 JMS。 先简单看下 JMS 是什么,用来对比。 JMS 也是一种消息机制 JMS 规范,属于 Java EE 规范 AMQP ( Advanced Message Queuing Protocol ) 高级消息队列协议 Connection: 代表客户端(包括消息生产者和消费者)与RabbitMQ之间的连接。 Channel: 连接内部的

    2024年02月07日
    浏览(43)
  • RabbitMQ工作模式-工作队列

    官网关于工作模式的解释地址:https://www.rabbitmq.com/getstarted.html Work Queue(工作队列) 生产者发消息,启动多个消费者来消费消息,每个消费者仅消费部分消息,可达到负载均衡的效果。 创建生产者 创建消费者 首先运行消息费,为了测试工作队列模式,消费都需要启动多个,

    2024年02月10日
    浏览(40)
  • RabbitMQ:工作队列模式

    📃个人主页:不断前进的皮卡丘 🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记 🔥个人专栏:消息中间件 工作队列(又名: 任务队列 )背后的主要思想是避免立即执行资源密集型任务并等待其完成。相反

    2024年01月23日
    浏览(41)
  • 消息队列之RabbitMQ工作模式

    提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 消息队列之RabbitMQ工作模式 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 提示:这里可以添加本文要记录的大概内容: 在这篇博客中,我将深入探讨 RabbitMQ 的工作模式,带你

    2024年01月18日
    浏览(56)
  • RabbitMQ消息队列的工作模式

    官方文档地址:https://www.rabbitmq.com/getstarted.html 工作模式其实就是消息队列分发消息的路由方式。 RabbitMQ常用的几种工作模式: 简单模式 WorkQueues工作队列模式 PubSub生产者/PubSub消费者模式 Routing路由模式 Topics通配符模式 发布/订阅模式(Publish/Subscribe):该模式用于一对多的

    2024年02月15日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包