RabbitMQ消息应答

这篇具有很好参考价值的文章主要介绍了RabbitMQ消息应答。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.概念

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况,RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息以及后续发送给该消费者的消息,因为它无法接收到,为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制,消息应答就是,消费者在接收到消息并且处理该消息之后,告诉rabbimq它已经处理了,rabbitmq可以把该消息删除了

2.自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能对消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅使用在消费者可以高效并且以某种速率能够处理这些消息的情况下使用

3.消息应答的方法

(1)channel.basicAck 用于肯定确认,RabbitMQ已经知道该消息并且成功地处理消息,可以将其丢弃了

(2) channel.basicNack 用于否定确认

(3)channel.basicReject 用于否定确认,与channel.basicNack相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了

4.Multiple的解释

手动应答的好处是可以批量应答并且减少网络拥堵

channel.basicAck(deliveryTag,true)

(1)multiple的true和false代表不同意思

  • true代表批量应答channel上未应答的消息,比如说channel上有传送tag的消息5,6,7,8,当前tag是8,那么此时5-8的这些还未应答的消息都会被确认收到消息应答
  • false同上面相比,只会应答tag=8的消息,5,6,7这3个消息依然会不被确认收到消息应答

RabbitMQ消息应答

5.消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认 ,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样即使某个消费者偶尔死亡,也可以确保不丢失任何消息

RabbitMQ消息应答

 6.代码

package com.rabbitmq.three;

import com.rabbitmq.client.Channel;
import com.rabbitmq.utils.MqUtils;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * 生产者
 */
public class Task2 {

    // 队列名称
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = MqUtils.getChannel();
        // 声明队列
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        // 控制台输入生产消息
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:" + message);
        }
    }
}
package com.rabbitmq.three;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.MqUtils;
import com.rabbitmq.utils.SleepUtils;
import com.sun.org.apache.xpath.internal.operations.Bool;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者,手动应答,消息未被成功处理重新入队
 */
public class Worker03 {

    // 队列名称
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = MqUtils.getChannel();
        System.out.println("C1等待接收消息处理时间较短");
        boolean autoAck = false;
        DeliverCallback deliverCallback = (deliveryTag, message) -> {
            // 睡眠1秒
            SleepUtils.sleep(1);
            System.out.println("接收到消息:" + new String(message.getBody(), "UTF-8"));
            // 手动应答
            /**
             * 参数说明
             * 1.消息标记tag
             * 2.是否批量应答,false-不批量应答
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };

        CancelCallback cancelCallback = deliveryTag -> {
            System.out.println("消费者取消消费接口回调");
        };
        channel.basicConsume(TASK_QUEUE_NAME,false, deliverCallback, cancelCallback);
    }
}
package com.rabbitmq.three;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.MqUtils;
import com.rabbitmq.utils.SleepUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者,手动应答,消息未被成功处理重新入队
 */
public class Worker04 {

    // 队列名称
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = MqUtils.getChannel();
        System.out.println("C2等待接收消息处理时间较长");
        boolean autoAck = false;
        DeliverCallback deliverCallback = (deliveryTag, message) -> {
            // 睡眠1秒
            SleepUtils.sleep(30);
            System.out.println("接收到消息:" + new String(message.getBody(), "UTF-8"));
            // 手动应答
            /**
             * 参数说明
             * 1.消息标记tag
             * 2.是否批量应答,false-不批量应答
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };

        CancelCallback cancelCallback = deliveryTag -> {
            System.out.println("消费者取消消费接口回调");
        };
        channel.basicConsume(TASK_QUEUE_NAME,false, deliverCallback, cancelCallback);
    }
}

7.演示

RabbitMQ消息应答

 RabbitMQ消息应答

 RabbitMQ消息应答

 RabbitMQ消息应答

 RabbitMQ消息应答

 文章来源地址https://www.toymoban.com/news/detail-470625.html

到了这里,关于RabbitMQ消息应答的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包