RabbitMQ系列(7)--RabbitMQ消息应答及消息未应答后重新入队

这篇具有很好参考价值的文章主要介绍了RabbitMQ系列(7)--RabbitMQ消息应答及消息未应答后重新入队。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

概念:消费者消费完一条消息可能需要等待一段时间,但如果这段时间内消费者在未完成消费信息的情况下时就挂掉了,这时候会怎么样?RabbitMQ一旦向消费者传递一条消息,该消息就会被标记为删除,这种情况下消费者挂掉了正在处理的消息就会丢失,为了保证消息在发送的过程中不会丢失,RabbitMQ引入了应答机制,即在消费者接收并处理了该条消息后告诉RabbitMQ它已经把该条消息处理了,RabbitMQ可以把这条消息删除了

1、自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,这种模式下万一消费者的连接或信道关闭,消息就丢失了,不过这种模式对传递的消息数量没有限制,但如果消息太多太大,消费者来不及消费,也可能出现消息的堆积导致内存耗尽,最终消费者程序被操作系统杀死的情况,所以这种模式只能在消费者可以高效的、高速率的处理消息的前提下使用

2、手动应答

以下方法用于手动应答

(1) channel.basicAck()(用于肯定确认,即向RabbitMQ表示该消息已经发送并处理成功了,可以将其丢弃)

(2)channel.basicNack()(用于否定确认,即不处理该信息直接丢弃)

(3)channel.basicReject()(用于否定确认,即不处理该信息直接丢弃,比basicNack方法少一个Multiple参数)

3、Multiple参数解释

channel.basicNack(deliveryTag,true)(第二个参数就是Multiple参数)

multiple的true和false的区别:

(1)true表示批量应答channel上未应答的消息,比如channel上有传送tag为5,6,7,8的消息,当前tag是8,那么此时5-8还未应答的消息就会被确认收到消息应答,但如果处理6或7消息失败了,5也会被应答,导致5消息丢失,所以一般情况下multiple为false。

(2)false表示只会应答tag=8的消息,5,6,7这三个消息依然不会被确认收到消息应答

4、消息重新入队

如果消费者由于某些原因失去连接,导致消费者未成功发送ACK确认应答,RabbitMQ将会对未完全处理完的消息重新入队,如果其他消费者可以处理,则该消息将被分配到另一个消费者,从而保证消息未丢失。

5、在utils包下新建一个名为SleepUtils的类,该类的方法能让线程睡眠指定的时间,用于模拟业务的处理时间,代码如下

package com.ken.utils;

/**
 * 睡眠工具类,用于模拟执行业务时间的长短
 */
public class SleepUtils {

    public static void sleep(int second) {
        try {
            Thread.sleep(1000 * second);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

效果图:

rabbitmq重新入队,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

6、使用代码实现消息手动应答,为此先新建一个名为ack的包,用于装消息手动应答的代码

rabbitmq重新入队,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

效果图:

rabbitmq重新入队,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

7、新建一个名为Task02的类,用作充当生产者,代码如下

注:RabbitMqUtils工具类的实现在我的另一篇文章里,有需要的同学可以查看参考

https://blog.csdn.net/m0_64284147/article/details/129465871

package com.ken.ack;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

public class Task02 {

    //队列名称(用于指定往哪个队列接收消息)
    public  static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //从控制台读取要发送的信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            /**
             * 用信道对消息进行发布
             * 第一个参数:发送到哪个交换机
             * 第二个参数:路由的Key值是哪个,本次是队列名
             * 第三个参数:其他参数信息
             * 第四个参数:发送消息的消息体
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息发送成功:" + message);
        }
    }

}

效果图:

rabbitmq重新入队,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

8、新建一个名为Worker03的类,用作充当消费者一号,代码如下

package com.ken.ack;

import com.ken.utils.RabbitMqUtils;
import com.ken.utils.SleepUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 *  手动应答的第一个消费者
 */
public class Worker03 {

    //队列名称(用于指定往哪个队列接收消息)
    public  static final String QUEUE_NAME = "ack_queue";

    //进行接收操作
    public static void main(String[] args) throws Exception{
        //通过工具类获取信道
        Channel channel = RabbitMqUtils.getChannel();

        /**
         * 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数
         *
         * 以下是DeliverCallback接口的源代码
         *  @FunctionalInterface
         *  public interface DeliverCallback {
         *      void handle (String consumerTag, Delivery message) throws IOException;
         *  }
         */
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            //沉睡1S,用于模拟业务处理需要1S的时间
            SleepUtils.sleep(1);
            System.out.println("接收的消息:" + new String(message.getBody()));
            /**
             * 手动应答
             *  第一个参数:表示消息的标记Tag(每个消息都有标记Tag)
             *  第二个参数:是否批量应答,true表示批量,false表示不批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        /**
         * 声明消费者取消接收消息后的回调方法(由于回调方法CancelCallback是函数式接口,所以需要给CancelCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数cancelCallback的类型CancelCallback用 @FunctionalInterface注解规定CancelCallback是一个函数式接口,所以要往cancelCallback参数传的值要是一个函数
         *
         *  @FunctionalInterface
         *  public interface CancelCallback {
         *      void handle (String consumerTag) throws IOException;
         *  }
         *
         */
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消费消息:" + consumerTag);
        };

        /**
         * 用信道对消息进行接收(采用手动应答)
         * 第一个参数:消费的是哪一个队列的消息
         * 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答
         * 第三个参数:消费者接收消息后的回调方法
         * 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)
         */
        System.out.println("Work03等待接收消息...");
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
    }

}

效果图:

rabbitmq重新入队,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

9、新建一个名为Worker04的类,用作充当消费者二号,代码如下

package com.ken.ack;

import com.ken.utils.RabbitMqUtils;
import com.ken.utils.SleepUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 *  手动应答的第二个消费者
 */
public class Worker04 {

    //队列名称(用于指定往哪个队列接收消息)
    public  static final String QUEUE_NAME = "ack_queue";

    //进行接收操作
    public static void main(String[] args) throws Exception{
        //通过工具类获取信道
        Channel channel = RabbitMqUtils.getChannel();

        /**
         * 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数
         *
         * 以下是DeliverCallback接口的源代码
         *  @FunctionalInterface
         *  public interface DeliverCallback {
         *      void handle (String consumerTag, Delivery message) throws IOException;
         *  }
         */
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            //沉睡30S,用于模拟业务处理需要30S的时间
            SleepUtils.sleep(30);
            System.out.println("接收的消息:" + new String(message.getBody()));
            /**
             * 手动应答
             *  第一个参数:表示消息的标记Tag(每个消息都有标记Tag)
             *  第二个参数:是否批量应答,true表示批量,false表示不批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        /**
         * 声明消费者取消接收消息后的回调方法(由于回调方法CancelCallback是函数式接口,所以需要给CancelCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数cancelCallback的类型CancelCallback用 @FunctionalInterface注解规定CancelCallback是一个函数式接口,所以要往cancelCallback参数传的值要是一个函数
         *
         *  @FunctionalInterface
         *  public interface CancelCallback {
         *      void handle (String consumerTag) throws IOException;
         *  }
         *
         */
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消费消息:" + consumerTag);
        };

        /**
         * 用信道对消息进行接收(采用手动应答)
         * 第一个参数:消费的是哪一个队列的消息
         * 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答
         * 第三个参数:消费者接收消息后的回调方法
         * 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)
         */
        System.out.println("Work04等待接收消息...");
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
    }

}

效果图:

rabbitmq重新入队,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

10、分别先后启动Task02、Worker03、Worker04

例:

rabbitmq重新入队,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

11、正常的在Task02输入消息,观察消息的被消费情况

(1)在Task02分别输入第一条和第二条消息

rabbitmq重新入队,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

(2)等待1秒后第一条消息被Work03消费

rabbitmq重新入队,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

(3)等待30秒后第二条消息被Work04消费

rabbitmq重新入队,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

12、再次在Task02输入消息,然后手动暂停Worker04用以模拟Worker04消费者宕机的情况,观察消息的被消费情况

(1)在Task02分别输入第三条和第四条消息

rabbitmq重新入队,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

(2)手动停掉Worker04,模拟Worker04宕机的情况

rabbitmq重新入队,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档
rabbitmq重新入队,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

(3)Worker04宕机后没有成功消费掉第四条消息,然后没有对消息进行应答,导致第四条消息重新入队,然后被Worker03消费掉文章来源地址https://www.toymoban.com/news/detail-743447.html

rabbitmq重新入队,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

到了这里,关于RabbitMQ系列(7)--RabbitMQ消息应答及消息未应答后重新入队的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ 消息应答与发布

    目录 一、消息应答 1、自动应答(默认) 2、手动消息应答的方法  ​编辑 3、消息重新入队 4、手动应答案列与效果演示 二、RabbitMQ持久化 1、队列持久化 2、消息持久化 三、不公平分发(能者多劳,弱者少劳) 1、介绍 2、效果演示 四、预取值分发 1、介绍 五、发布确认 六

    2024年02月06日
    浏览(37)
  • RabbitMQ基于Java实现消息应答

    概念 RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站, 一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不

    2024年04月10日
    浏览(39)
  • 消息队列-RabbitMQ:workQueues—工作队列、消息应答机制、RabbitMQ 持久化、不公平分发(能者多劳)

    Work Queues— 工作队列 (又称任务队列) 的主要思想是避免立即执行资源密集型任务,而不得不等待它完成 。 我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务 。 轮训分发消

    2024年02月21日
    浏览(56)
  • RabbitMQ (HelloWord 消息应答 持久化 不公平分发 预取值)

    在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列-RabbitMO.代表使用者保留的消息缓冲区 第一步:导入依赖 第二步:创建生产者 第三步:创建消费者 因为你为了确保同一条消息被其中一个工作线程接收到了之后,其它工作就不能消费的到了 三者

    2023年04月14日
    浏览(43)
  • rabbitMQ手动应答与自动应答

    手动应答模式(manual) 解释:         手动应答:既是当消费者消费了队列中消息时需要给队列一个应答,告诉队列这条消息我已经消费了,可以删除了;         若是不应答,即使消费了 队列没收到消费成功的提示 所有消息会一直在队列中;      注意 注意 注意 :重要的事情说三

    2024年02月10日
    浏览(35)
  • RabbitMQ系列(5)--使用Java实现RabbitMQ的消费者接收消息

    前言:先简单了解RabbitMQ的工作过程,方便后续开发理清思路 简略: 详细: 1、新建消费者类 效果图: 2、编写消费者消费消息的代码 例: 3、查看代码运行结果 运行代码后如果有输出生产者发送的”Hello World”信息,则证明消费者消费消息成功 4、在web页面上查看队列的消

    2024年02月06日
    浏览(43)
  • RabbitMQ系列(8)--实现RabbitMQ队列持久化及消息持久化

    概念:在上一章文章中我们演示了消费者宕机的情况下消息没有被消费成功后会重新入队,然后再被消费,但如何保障RabbitMQ服务停掉的情况下,生产者发过来的消息不会丢失,这时候我们为了消息不会丢失就需要将队列和消息都标记为持久化。 1、实现RabbitMQ队列持久化 只需

    2024年02月09日
    浏览(39)
  • RabbitMQ中的手动应答和自动应答

    当使用RabbitMQ来处理消息时,消息确认是一个重要的概念。RabbitMQ提供了两种不同的消息确认方式:自动应答(Automatic Acknowledgment)和手动应答(Manual Acknowledgment)。这两种方式适用于不同的应用场景,本文将通过Java代码示例来演示它们的区别以及如何在实际应用中使用它们

    2024年02月06日
    浏览(35)
  • RabbitMQ系列(19)--实现在RabbitMQ宕机的情况下对消息进行处理

    前言:在生产环境中由于一些不明原因,导致RabbitMQ重启的情况下,在RabbitMQ重启期间生产者投递消息失败,生产者发送的消息会丢失,那这时候就需要去想在极端的情况下,RabbitMQ集群不可用的时候,如果去处理投递失败的消息。 1、在config包里新建一个名为ConfirmConfig的类用

    2024年02月15日
    浏览(52)
  • RabbitMQ系列教程消息中间件技术精讲

    作者:禅与计算机程序设计艺术 消息中间件(Message Queue,MQ)是一种分布式应用间通信的组件。它可以在不同的系统之间传递消息、数据或指令。在现代IT架构中,越来越多的应用需要相互通信,所以出现了消息队列的概念。RabbitMQ是一个开源的AMQP实现,是一个可靠、可扩展

    2024年02月06日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包