RabbitMQ 消费者

这篇具有很好参考价值的文章主要介绍了RabbitMQ 消费者。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

  RabbitMQ的消费模式分两种:推模式和拉模式,推模式采用Basic.Consume进行消费,拉模式则是调用Basic.Get进行消费。
  消费者通过订阅队列从RabbitMQ中获取消息进行消费,为避免消息丢失可采用消费确认机制

拉模式

  顾名思义,拉模式就是消费者主动的从RabbitMQ中获取数据,通过拉模式每次获取数据只能获取一条。拉模式的时序图如下图所示。
RabbitMQ 消费者,中间件 - RabbitMQ,rabbitmq,分布式
  RabbitMQ每次接收到Get请求后会将队列中即将被消费的消息发送给消费者,消费者接收处理消息后向RabbitMQ发送消费应答,然后该消息将从队列中移除。
  需要注意的是拉模式普遍仅适用用从RabbitMQ中获取一条数据的场景,如果以循环的方式获取批量数据将影响RabbitMQ的性能。

拉模式的实现

  拉模式通过以下方法实现:

/**
* queue 队列名称
* autoAck 是否开启自动应答
*/
GetResponse basicGet(String queue,boolean autoAck)

  如上述代码所示channel.basicGet方法返回的是一个GetResponse,在GetResponse对象中包含了一条消息内容,消费者可以获取该消息并进行处理。

推模式

  推模式是指RabbitMQ将消息主动推送给订阅监听队列的消费者。在RabbitMQ推送消息的过程中其并不关心该消费者是否完成上一条消息的消费,只要队列中存在消息则向消费者推送,当然推送消息的个数会受Basic.Qos的限制。Basic.Qos指定了某个消费者可以保持的未应答的消息数量。

    /**
     * Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}
     * method.
     * Provide access to <code>basic.deliver(Broker推送消息)</code>, <code>basic.cancel</code>
     * and shutdown signal callbacks (which is sufficient
     * for most cases). See methods with a {@link Consumer} argument
     * to have access to all the application callbacks.
     * @param queue 队列名称
     * @param autoAck 是否自动确认
     * @param consumerTag 消费者标签,消费者的唯一标识符
     * @param noLocal 是否可以接收同Connection中生产者的消息(true不能接收)
     * @param exclusive 是否设置排他
     * @param arguments 其他参数
     * @param deliverCallback 消息接收回调
     * @param cancelCallback 消费取消回调
     * @param shutdownSignalCallback 连接或者信道关闭回调
     * @return the consumerTag associated with the new consumer
     */
    String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;

    String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;

  可以通过上述两种方法(设置参数最多的)实现声明消费者。其中Consumer的定义如下:


public interface Consumer {
    /**
     * 消费者通过basicConsume被注册后调用
     */
    void handleConsumeOk(String consumerTag);

    /**
     * 消费者通过basicCancel取消时调用
     */
    void handleCancelOk(String consumerTag);

    /**
     * 消费者不通过basicCancel取消时调用
     */
    void handleCancel(String consumerTag) throws IOException;

    /**
     * 通道或者连接关闭时调用
     */
    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

    /**
     * 接收重新发送的未被确认的消息时调用
     */
    void handleRecoverOk(String consumerTag);

    /**
     * 接收消息时调用
     * @param consumerTag 消费者标签
     * @param envelope 打包消息的数据
     * @param properties 消息的内容标头数据
     * @param body 消息内容
     */
    void handleDelivery(String consumerTag,
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body)
        throws IOException;
}

消费确认与拒绝

  为了保障消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数,当autoAck为true时RabbitMQ会自动的把发送出去的消息设置为确认,然后从队列中删除;当autoACK为false时RabbitMQ会等待消费者显式回复确认信号后才从内存中移去消息(先标记再删除)。
  autoAck参数意为自动应答,但是如果该参数为true时则rabbitMQ将自动将发送的消息标记确认,无需消费者进行应答。

  当autoAck参数为false时,对于RabbitMQ服务器而言,队列中的消息分成两部分:一部分时等待投递给消费者的消息;一部分时已经投递给消费者,但是还未收到消费者确认消息的消息
  RabbitMQ不会为未确认的消息设置过期时间,如果一个消息一直未被消费者确认,那么这个消息再RabbitMQ中将一直保存为投递未确认状态,指导消费者确认或者消费者断开连接,如果消费者断开连接,则该消费者接收但未确认的消息将重新入队。

消息确认的实现

  消息的显式确认需要消费者再声明的过程中设置autoAck=false。然后该消费者消费的消息可以显式的进行确认应答。确认应答方法如下:

	 /**
     * @param 消息的标签,可通过Delivery.getEnvelope().getDeliveryTag()获取
     * @param 如果为true则将发送给该消费者的该消息之前的所有未应答的消息进行应答,如果为false则仅应答一条消息
     */
    void basicAck(long deliveryTag, boolean multiple) throws IOException;

  当进行消息的批量确认时,将所有发送给该消费者未确认的消息进行确认,而针对监听同一队列的其他消费者的未确认消息并不进行处理。

消息拒绝的实现

  RabbitMQ提供了两种消息拒绝的方法:Basic.Reject和Basic.Nack命令;其两者的区别时Nack可以进行批量拒绝。

    /**
     * @param deliveryTag 消息标签
     * @param requeue 为true时被拒绝的消息重新入队,否则将成为死信
     * @throws java.io.IOException if an error is encountered
     */
    void basicReject(long deliveryTag, boolean requeue) throws IOException;

    /**
     * @param deliveryTag 消息标签
     * @param multiple 如果为true则批量拒绝自该消息之前所有未确认的发送给该消费者的消息
     * @param requeue 为true时被拒绝的消息重新入队,否则将成为死信
     * @throws java.io.IOException if an error is encountered
     */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue)
            throws IOException;

basicRecover

该方法可以将某个消费者未应答(确认或者拒绝)的消息重新入队,该方法会导致:

  • 投递而未被应答的消息可以重新发送给消费者进行处理
  • 消费者的消息队列被清空,可以重新接收到其他消息
    /**
     * <p>
     *  Ask the broker to resend unacknowledged messages.  In 0-8
     * basic.recover is asynchronous; in 0-9-1 it is synchronous, and
     * the new, deprecated method basic.recover_async is asynchronous.
     * </p>
     * Equivalent to calling <code>basicRecover(true)</code>, messages
     * will be requeued and possibly delivered to a different consumer.
     * @see #basicRecover(boolean)
     */
     Basic.RecoverOk basicRecover() throws IOException;

    /**
     * Ask the broker to resend unacknowledged messages.  In 0-8
     * basic.recover is asynchronous; in 0-9-1 it is synchronous, and
     * the new, deprecated method basic.recover_async is asynchronous.
     * @param requeue If true, messages will be requeued and possibly
     * delivered to a different consumer. If false, messages will be
     * redelivered to the same consumer.
     */
    Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

basicQos 限制消费

  默认情况下,消费者对于接收的消息数量并未限制,也就是说,一旦RabbitMQ中接收到消息并且存在消费者,则RabbitMQ将把消息发送到相关的消费者中,并不关心消费者是否消息完信息。
  轮询的默认消息分发机制会导致消费者资源不能合理利用、消费者消息积压导致内存溢出等问题。为解决上述问题可以使用basicQos方法实现限制信道上消费者所能保持的最大未确认消息数量。该方法如下:

    /**
     * @param prefetchSize 消息大小
     * @param prefetchCount 消息数量
     * @param global 是否全局
     * @throws java.io.IOException if an error is encountered
     */
    void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

针对global参数需要注意一下内容:

  • 当global=true时信道上所有的消费者都需要遵从消息数量限定值(某个信道上所有消费者未确认消息数量<=prefetchCount)
  • 等global=false时新的消费者需要遵从消息数量的限定值。
  • 可以调用两次basicQos方法,并使用不同的global参数,这种情况下两次配置都可以生效。

总结

  消费者就是针对某个队列进行消息监听和消息消费的。消费者消费消息存在拉模式和推模式,推模式的是使用场景相对比较多。
  为确保消息被合法的消费,RabbitMQ提供了消费确认机制,投递的消息并不能被理解完成了消费,仅消费者确认消费该消息才会被移除队列。
  默认的消息投递机制时轮询,轮询的消息分发并会关系消费者的性能以及消息积压的问题,因此需要限制每个消费者所能保持的最大未确认的消息数量。文章来源地址https://www.toymoban.com/news/detail-673112.html

到了这里,关于RabbitMQ 消费者的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ多消费者实例时,保证只有一个消费者进行消费(单活消费者模式)

    有一种业务场景,当人员组织结构变更时,会有大量数据进行推送。这些数据类型有的是add,有的是update,并且必须先add,才能进行update。 这时,为了保证消费顺序,需要 只有一个实例进行按顺序消费,其他实例仅提供日常对外服务 ,不进行消息消费。当唯一消费实例无法

    2024年02月11日
    浏览(48)
  • RabbitMQ-消费者确认机制

    none:不做任何处理,消息投递到消费者了之后,立即返回ACK,并且从MQ将消息删除,非常不安全,不建议使用。 manual:手动模式,需要在业务中调用api,ack或者reject。 auto:自动模式,SpringAMQP利用AOP对我们的消息处理做了环绕增强,当业务正常执行时返回ACK,执行异常时,根

    2024年01月21日
    浏览(54)
  • RabbitMq同一队列多个消费者问题

    RabbitMQ只有Queue,如果多个消费者绑定同一个queue,那么一条消息,只能被其中一个消费者取走(轮询)。 本质上,RabbitMq的消费者的消息确认机制,就注定不可能让多个消费者同时去消费同一个队列中的同一条消息,只能轮询的方式去消费。 我感觉我们的目的是想用rabbitmq

    2024年02月04日
    浏览(36)
  • RabbitMQ消费者的可靠性

    目录 一、消费者确认 二、失败重试机制 2.1、失败处理策略 三、业务幂等性 3.1、唯一消息ID  3.2、业务判断 3.3、兜底方案 一、消费者确认 RabbitMQ提供了消费者确认机制( Consumer Acknowledgement )。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息

    2024年02月07日
    浏览(41)
  • rabbitmq消费者与生产者

    在第一次学习rabbitmq的时候,遇到了许多不懂得 第一步导包 第二步新增生产者 在这里中: connectionFactory.setVirtualHost(\\\"my_vhost\\\");//填写自己的队列名称,如果你的为”/“则填写\\\'\\\'/\\\'\\\' 第三步新增消费者 消息获取成功 注意如果你用的云服务器需要打开这两个端口 5672 15672 如果你使

    2024年02月11日
    浏览(46)
  • RabbitMQ系列(5)--使用Java实现RabbitMQ的消费者接收消息

    前言:先简单了解RabbitMQ的工作过程,方便后续开发理清思路 简略: 详细: 1、新建消费者类 效果图: 2、编写消费者消费消息的代码 例: 3、查看代码运行结果 运行代码后如果有输出生产者发送的”Hello World”信息,则证明消费者消费消息成功 4、在web页面上查看队列的消

    2024年02月06日
    浏览(43)
  • 【RabbitMQ实战】 03 SpringBoot RabbitMQ生产者和消费者示例

    上一节我们写了一段原生API来进行生产和消费的例子。实际上SpringBoot对原生RabbitMQ客户端做了二次封装,让我们使用API的代价更低。 依赖引入 RabbitMQ的配置如下 每个配置的具体含义,详见配置 代码说明 使用RabbitTemplate可以发送消息 这个Controller定义了一个发送的接口,调用

    2024年02月07日
    浏览(39)
  • 消息中间件RabbitMQ

    1.1.1. 什么是MQ MQ(message queue) ,从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,

    2024年01月17日
    浏览(85)
  • RabbitMQ消息中间件

    RabbitMQ消息中间件 RabbitMQ简介 windows下安装RabbitMQ RabbitMQ基本概念 RabbitMQ简单模式 RabbitMQ工作队列模式 RabbitMQ发布订阅模式 RabbitMQ路由模式 RabbitMQ主题模式 RabbitMQ RPC模式 RabbitMQ发布确认模式

    2024年02月10日
    浏览(60)
  • RabbitMQ原理(五):消费者的可靠性

    当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如: 消息投递的过程中出现了网络故障 消费者接收到消息后突然宕机 消费者接收到消息后,因处理不当导致异常 … 一旦发生

    2024年02月08日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包