RabbitMQ:工作队列模式

这篇具有很好参考价值的文章主要介绍了RabbitMQ:工作队列模式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件

1.基本介绍

rabbitmq 工作队列模式,消息中间件,微服务,java-rabbitmq,rabbitmq,java,工作模式

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务并等待其完成。相反,我们将任务安排在以后完成。我们将_任务_封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行多个工作线程时,任务将在它们之间共享。
这个概念在 Web 应用程序中特别有用,因为在 Web 应用程序中,不可能在较短的 HTTP 请求窗口中处理复杂的任务。

在Work Queues工作队列模式中,我们不需要设置交换机(会使用默认的交换机进行消息转换),但是我们需要指定唯一的消息队列来进行消息传递,可以有多个消费者。
多个消费者通过轮询的方式来依次接收消息队列中存储的消息,一旦消息被某个消费者接收了,消息队列就会把消息移除,其他消费者就不能接收这条消息了。消费者必须要等消费完一条消息后才可以准备接收下一条消息。
对于任务过重或者任务比较多的情况,使用工作队列可以提高任务处理速度

rabbitmq 工作队列模式,消息中间件,微服务,java-rabbitmq,rabbitmq,java,工作模式

2.轮询发送消息

1.如果一个队列中有多个消费者,那么消费者之间对于同一个消息是竞争关系
2.对于任务过重或者任务比较多的情况,使用工作队列可以提高任务处理速度,比如发送短信,我们可以部署多个短信服务,只要有一个节点发送成功即可。

2.1抽取工具类

public class ConnectUtil {
    public static Connection getConnection() throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置连接参数
        //服务器IP地址
        factory.setHost("192.168.88.133");
        //连接端口
        factory.setPort(5672);
        //设置连接的虚拟机名称
        factory.setVirtualHost("/myhost");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("123456");

        //2.创建Connection对象
        Connection connection = factory.newConnection();
        return connection;
    }

    /**
     * 创建信道对象
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    public static Channel getChannel() throws IOException, TimeoutException {
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        return channel;

    }

}

2.2 生产者

public class Producer {
    static final String QUEUE_NAME="work_queue";
    public static void main(String[] args) {
        try {
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //发送消息
            for (int i = 1; i <= 10; i++) {
                String msg="hello rabbitmq!"+i;
                /**
                 * 参数1:交换机名称,不填写交换机名称的话则使用默认的交换机
                 * 参数2:队列名称(路由key)
                 * 参数3:其他参数
                 * 参数4:消息内容
                 */
                channel.basicPublish("", QUEUE_NAME, null,msg.getBytes() );
            }
            System.out.println("消息已经发送完毕");


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

rabbitmq 工作队列模式,消息中间件,微服务,java-rabbitmq,rabbitmq,java,工作模式

rabbitmq 工作队列模式,消息中间件,微服务,java-rabbitmq,rabbitmq,java,工作模式

rabbitmq 工作队列模式,消息中间件,微服务,java-rabbitmq,rabbitmq,java,工作模式

2.3消费者

消费者1

public class Consumer1 {
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) {

        try {
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //接受消息
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 * 消费回调函数,当收到消息以后,会自动执行这个方法
                 * @param consumerTag 消费者标识
                 * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                 * @param properties   属性信息
                 * @param body         消息数据
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消息者1接受到的消息:" + new String(body, "UTF-8"));

                }
            };
            //监听消息(队列名称,是否自动确认消息,消费对象)

            channel.basicConsume(QUEUE_NAME, true, consumer);


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


    }
}

消费者2

public class Consumer2 {
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) {

        try {
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //接受消息
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 * 消费回调函数,当收到消息以后,会自动执行这个方法
                 * @param consumerTag 消费者标识
                 * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                 * @param properties   属性信息
                 * @param body         消息数据
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消息者2接受到的消息:" + new String(body, "UTF-8"));

                }
            };
            //监听消息(队列名称,是否自动确认消息,消费对象)

            channel.basicConsume(QUEUE_NAME, true, consumer);


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


    }
}

2.4测试

为了方便测试,我们需要先启动消费者,然后再启动生产者,不然生产者发送的消息会瞬间被某个消费者消费完
rabbitmq 工作队列模式,消息中间件,微服务,java-rabbitmq,rabbitmq,java,工作模式
rabbitmq 工作队列模式,消息中间件,微服务,java-rabbitmq,rabbitmq,java,工作模式

3.消息应答

3.1消息应答基本介绍

👀我们知道消费者完成一个任务是需要一定的时间的,如果消费者在处理一个长任务的时候,当它只处理一部分但是此时消费者却挂掉了,可能会出现下面的情况:
👀如果说RabbitMQ向消费者传递一条消息以后,不管消费者有没有处理完或者有没有接收到,就马上把消息标记为删除,那么,如果这个时候消费者挂掉了,就会导致丢失当前正在处理的消息,以及后续发送给消费者的消息,因为消费者不能接收到。
👀为了保证消息在发送过程中不会丢失,RabbitMQ引入了消息应答机制,消息应答就是消费者在接收到消息并且处理该消息以后,告诉RabbitMQ它已经处理了,RabbitMQ就可以把这个消息从消息 队列中删除了。

3.2消息自动应答

  • 消息发送后就马上认为已经传递成功了,这种模式需要在高吞吐量和数据传输安全性方面做权衡。因为如果使用这种模式,如果消息在被接收到之前,消费者那么出现连接或者信道关闭,那么消息就会丢失;不过,对于这种模式来说,消费者那里可以传递过载的消息,没有对传递的消息数量进行限制,这样就可能使得消费者这边因为接收了太多还来不及处理的消息,导致消息积压,最后使得内存耗尽,导致这些消费者线程被操作系统杀死,所以这种模式仅仅适用消费者可以高效并且以某种苏联能够处理这些消息的情况下使用。
  • 信息过载:是指社会信息超过了个人或系统所能接受、处理或有效利用的范围,并导致故障的状况。

3.3消息手动应答

  • 消费者从队列中消费消息,默认采用的是自动应答,自动应答可能导致消息没有完全消费而导致消息失效问题,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。而且,使用手动应答可以批量应答减少网络拥堵,下面三个方法可以用于手动应答消息:
  • Channel。basicAck():用于肯定确认,RabbitMQ已经知道消息被消费并且成功处理消息,可以把消息丢弃。
  • Channle.basicNack():用于否定确认
  • Channel.basicReject():用于否定确认,不处理该消息直接拒绝,然后把消息丢弃

3.4批量确认(Multiple)

批量确认的方法是channel.basicAck(deliverTag,true),参数2标识是否批量确认。如果为true,表示批量确认队列中没有应答的消息。
比如channel中传送tag的消息5,6,7,8,当前tag为8,如果参数2为true,那么此时5-8这些还没有被应答的消息都会被确认收到消息应答。如果为false,那么只会应答tag=8的消息。5,6,7这三个消息仍然不会被确认收到消息应答
image.png

3.5消息自动重新入队

如果一个消费者死了(它的通道被关闭,连接被关闭,或者TCP连接丢失)而没有发送一个ack,RabbitMQ就会明白一条消息没有被完全处理,并会重新排队。如果同时有其他消费者在线,它将迅速将其重新交付给另一个消费者。通过这种方式,您可以确保即使消费者偶尔死亡,也不会丢失任何消息。
image.png

3.6消息手动应答代码

生产者

public class Producer2 {
    static final String QUEUE_NAME="ack_work_queue";
    public static void main(String[] args) {
        try {
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            //发送消息
            for (int i = 1; i <= 10; i++) {
                String msg="你好,小兔子!"+i;
                /**
                 * 参数1:交换机名称,不填写交换机名称的话则使用默认的交换机
                 * 参数2:队列名称(路由key)
                 * 参数3:其他参数
                 * 参数4:消息内容
                 */
                channel.basicPublish("", QUEUE_NAME, null,msg.getBytes() );
            }
            System.out.println("消息已经发送完毕");


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

消费者1

public class Consumer3 {
    static final String QUEUE_NAME = "ack_work_queue";

    public static void main(String[] args) {

        try {
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            System.out.println("消费者1-消费消息的时间比较短。");
            //接受消息
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 * 消费回调函数,当收到消息以后,会自动执行这个方法
                 * @param consumerTag 消费者标识
                 * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                 * @param properties   属性信息
                 * @param body         消息数据
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   //睡眠一秒
                    SleepUtil.sleep(1);
                    System.out.println("消息者1接受到的消息:" + new String(body, "UTF-8"));
                    //手动确认
                    //每条消息都有对应的id,表明是第几条消息,false表示不批量
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            //监听消息(队列名称,是否自动确认消息,消费对象)

            channel.basicConsume(QUEUE_NAME, false, consumer);


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


    }
}

消费者2

public class Consumer4 {
    static final String QUEUE_NAME = "ack_work_queue";

    public static void main(String[] args) {

        try {
            Channel channel = ConnectUtil.getChannel();
            //声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            System.out.println("消费者2-消费消息的时间比较长。");
            //接受消息
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 * 消费回调函数,当收到消息以后,会自动执行这个方法
                 * @param consumerTag 消费者标识
                 * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                 * @param properties   属性信息
                 * @param body         消息数据
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                   //睡眠一秒
                    SleepUtil.sleep(50);
                    System.out.println("消息者2接受到的消息:" + new String(body, "UTF-8"));
                    //手动确认
                    //每条消息都有对应的id,表明是第几条消息,false表示不批量
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            //监听消息(队列名称,是否自动确认消息,消费对象)

            channel.basicConsume(QUEUE_NAME, false, consumer);


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


    }
}

3.7消息手动应答效果

第一次测试,两个消费者都睡眠1秒
image.png
image.png
image.png
第二次测试,让消费者2睡眠30秒,然后观察两个消费者的消费情况,
image.png
image.png
image.png
接着把消费者2停掉,再次观察消费者1控制台打印的消息,发现队列中没有被消费的消息重新进入到队列中,并且被消费者1进行消费
image.png

4.消息的持久化

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍将丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久。
首先,我们需要确保队列在 RabbitMQ 节点重新启动后仍能存活下来。为此,我们需要将其声明为_持久:_
如果我们之前创建的队列是非持久化的,如果RabbitMQ重启的话,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把durable参数设置为持久化;

4.1队列持久化

**如果之前创建队列的时候,没有设置成持久化,我们需要把原来的队列先删除掉,或者说重新创建一个新的持久化队列,不然会报错。因为RabbitMQ 不允许我们使用不同的参数重新定义现有队列,并且会向任何尝试执行此操作的程序返回错误。但是有一个快速的解决方法 - 让我们声明一个具有不同名称的队列, **

// 参数1 queue :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
// 参数5 arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

image.png

4.2消息持久化

我们需要将消息标记为持久性 - 通过将消息属性(实现基本属性)设置为PERSISTENT_TEXT_PLAIN的值。

//交换机名称,队列名称,消息持久化,消息
channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但当 RabbitMQ 接受消息但尚未保存消息时,仍有一个较短的时间窗口。另外, RabbitMQ 不会对每条消息都执行 fsync( fsync函数同步内存中所有已修改的文件数据到储存设备。) – 它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们的简单任务队列来说已经足够了。如果您需要更强的保证,则可以使用发布者确认模式。

4.3公平调度

image.png文章来源地址https://www.toymoban.com/news/detail-816391.html

int prefetchCount = 1;
channel.basicQos(prefetchCount);

到了这里,关于RabbitMQ:工作队列模式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 消息队列中间件 - Docker安装RabbitMQ、AMQP协议、和主要角色

    不管是微服务还是分布式的系统架构中,消息队列中间件都是不可缺少的一个重要环节,主流的消息队列中间件有RabbitMQ、RocketMQ等等,从这篇开始详细介绍以RabbitMQ为代表的消息队列中间件。 AMQP协议 AMQP协议是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与

    2024年02月03日
    浏览(61)
  • 基于golang多消息队列中间件的封装nsq,rabbitmq,kafka

    场景 在创建个人的公共方法库中有这样一个需求,就是不同的项目会用到不同的消息队列中间件,我的思路把所有的消息队列中间件进行封装一个消息队列接口(MQer)有两个方法一个生产一个消费,那么在实例化对象的时候根据配置文件指定当前项目使用的那个消息队列中

    2024年02月14日
    浏览(64)
  • 消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

    目录 0.交换机种类和区别 1.声明队列和交换机以及RountKey 2.初始化循环绑定 3.声明交换机 4.监听队列 4.1 监听普通队列 4.2监听死信队列  5.削峰填谷的实现 Direct Exchange(直连交换机) : 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。

    2024年04月23日
    浏览(155)
  • 【云原生进阶之PaaS中间件】第四章RabbitMQ-1-简介及工作模式

            RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。AMQP(Advanced Message Queue:高级消息队列协议)它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。RabbitMQ 最初起源于

    2024年02月21日
    浏览(86)
  • 消息中间件学习笔记--RabbitMQ(二、模式,一次违反常规的Java大厂面试经历

    .Fanout:转发消息到所有绑定队列 比较常用的是Direct、Topic、Fanout. Fanout 这种Fanout模式不处理路由键,只·需要简单的将队列绑定到exchange上,一个发送到exchange的消息都会被转发到与该exchange绑定的所有队列上。很像广播子网,每台子网内的主机都获得了一份复制的消息。Fan

    2024年04月09日
    浏览(99)
  • 利用消息中间件RabbitMQ创建队列以及扇出(Fanout)、订阅(Direct)、主题(Topic)交换机来完成消息的发送和监听接收(完整版)

    目录 一、前期项目环境准备 1.1父项目以及子项目 1.2配置pom.xml 1.3配置application.yml 二、扇出(Fanout) 交换机实现消息的发送和接收 2.1编写子项目consumer(消费者,接收消息)的代码实现扇出(Fanout)交换机接收消息 2.1.1consumer子项目结构 2.1.2FanoutConfig类的实现扇出(Fanout)交

    2024年02月05日
    浏览(61)
  • 消息队列之RabbitMQ工作模式

    提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 消息队列之RabbitMQ工作模式 提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档 提示:这里可以添加本文要记录的大概内容: 在这篇博客中,我将深入探讨 RabbitMQ 的工作模式,带你

    2024年01月18日
    浏览(56)
  • RabbitMQ消息队列的工作模式

    官方文档地址:https://www.rabbitmq.com/getstarted.html 工作模式其实就是消息队列分发消息的路由方式。 RabbitMQ常用的几种工作模式: 简单模式 WorkQueues工作队列模式 PubSub生产者/PubSub消费者模式 Routing路由模式 Topics通配符模式 发布/订阅模式(Publish/Subscribe):该模式用于一对多的

    2024年02月15日
    浏览(46)
  • 消息队列——rabbitmq的不同工作模式

    目录 Work queues 工作队列模式  Pub/Sub 订阅模式 Routing路由模式 Topics通配符模式   工作模式总结 C1和C2属于竞争关系,一个消息只有一个消费者可以取到。  代码部分只需要用两个消费者进程监听同一个队里即可。 两个消费者呈现竞争关系。 用一个生产者推送10条消息 两个监

    2024年02月16日
    浏览(44)
  • 消息队列——RabbitMQ基本概念+容器化部署和简单工作模式程序

    目录 基本概念  MQ 的优势  1.应用解耦  2.异步提速  3.削峰填谷  MQ 的劣势 使用mq的条件  常见MQ产品  RabbitMQ简介 RabbitMQ的六种工作模式   JMS RabbitMQ安装和配置。 RabbitMQ控制台使用。 RabbitMQ快速入门——生产者 需求: RabbitMQ快速入门——消费者 小结  多个系统之间的通信方

    2024年02月16日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包