RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)

这篇具有很好参考价值的文章主要介绍了RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ–基础–8.1–消息确认机制–接受确认机制(ACK)


代码位置

https://gitee.com/DanShenGuiZu/learnDemo/tree/master/rabbitMq-learn/rabbitMq-03

1、场景和问题

1.1、需求

消费者收到Queue中的消息,但没有处理完成就宕机的情况,这种情况下就可能会导致消息丢失。

为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除。

如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。

这里不存在Timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。

1.2、消息确认消息引发的问题

如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的问题,Queue中堆积的消息会越来越多,消费者重启后会重复消费这些消息并重复执行业务逻辑。

2、channel.basicConsume(queueName,autoAck,callback)方法

RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)

2.1、参数

  1. queueName:队列名称
  2. autoAck:设置是否自动确认
    1. true:自动确认,消息一旦被消费者接收,队列中的消息就会被删除
    2. false:手动确认
  3. callback:设置消费者的回调函数,用来处理 RabbitMQ 推送过来的消息

3、消息确认机制 ACK

  1. 为了保证消息从队列可靠地达到消费者
  2. 当消费者获取消息后,会向 RabbitMQ 发送回执 ACK,告知消息已经被接收。
  3. ACK分两种情况
    1. 自动 ACK
    2. 手动 ACK

3.1、自动 ACK

  1. autoAck=false。
  2. RabbitMQ 会自动把发送出去的消息设置为确认,然后从内存或磁盘中删除,而不管消费者是否真正地消费了这些消息

3.2、手动 ACK

  1. autoAck=false。
  2. RabbitMQ 会等待消费者显示地回复确认信号后才从内存或磁盘中移去消息

3.2.1、好处

  1. autoAck=false,消费者就有足够的时间处理消息,不用担心处理消息过程中,消费者进程挂掉后消息丢失的问题。因为,RabbitMQ 会一直等待持有消息,直到消费者显示调用 Basic.Ack 命令为止。

3.2.2、原理

  1. autoAck=false,队列中的消息分成了两部分
    1. 等待投递给消费者的消息
    2. 已经投递给消费者,但还没有收到消费者确认信号的消息。
  2. 如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者。这样就保证消息不丢失了。

3.3、使用场景

  1. 如果消息不太重要,丢失也没有影响,那么autoAck=ture。
  2. 如果消息非常重要,不容丢失,那么autoAck=ture。

4、代码实现(手动 ACK)

4.1、代码结构

RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)

4.2、生产者


package com.example.rabbitmq03.business.test7;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
    // 简单模式
    public static void main(String[] args) {
        // 1. 获取连接
        Connection connection = null;
        try {
            connection = RabbitMqUtil.getConnection("生产者");
        } catch (Exception e) {
            System.out.println("获取连接时,出现异常");
        }
        
        Channel channel = null;
        try {
            // 2. 通过连接获取通道 Channel
            channel = connection.createChannel();
            String queueName = "code_simple_queue1";
            // 3. 通过通道创建声明队列
            channel.queueDeclare(queueName, false, false, false, null);
            // 4. 准备消息内容
            String message = "你好";
            // 5. 发送消息给队列 Queue
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("消息发送完成~~~发送的消息为:" + message);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                RabbitMqUtil.close(connection, channel);
            } catch (Exception e) {
                System.out.println("关闭时,出现异常");
            }
        }
    }
}

4.3、消费者

修改的地方
RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)


package com.example.rabbitmq03.business.test7;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import com.rabbitmq.client.*;
public class Consumer {

    public static void main(String[] args) throws Exception{
        // 获取连接
        Connection connection = RabbitMqUtil.getConnection("消费者");

        final Channel channel = connection.createChannel();
        String queueName = "code_simple_queue1";

        // 定义消费者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                // body 消息体
                String msg = new String(body,"utf-8");
                System.out.println("收到消息:" + msg);
                /**
                 * @param1:deliveryTag:用来标识消息的id
                 * @param2:multiple:是否批量。true:将一次性 ACK 所有小于 deliveryTag 的消息
                 */
                // 手动确认
                channel.basicAck(deliveryTag, false);
            }
        };

        // 监听队列  手动 ACK
        channel.basicConsume(queueName, false, consumer);

        System.out.println("开始接收消息~~~");
        System.in.read();

        // 关闭信道、连接
        RabbitMqUtil.close(connection, channel);
    }
}

4.4、测试

RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)
RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)

5、自动 ACK 带来的问题

5.1、执行生产者,产生一条记录

RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)

5.2、设置修改消费者为自动ACK

package com.example.rabbitmq03.business.test7;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import com.rabbitmq.client.*;

public class Consumer {
    
    public static void main(String[] args) throws Exception {
        // 获取连接
        Connection connection = RabbitMqUtil.getConnection("消费者");
        
        final Channel channel = connection.createChannel();
        String queueName = "code_simple_queue1";
        
        // 定义消费者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                int result = 1 / 0;
                // body 消息体
                String msg = new String(body, "utf-8");
                System.out.println("收到消息:" + msg);
                
            }
        };
        // 监听队列 自动 ACK
        channel.basicConsume(queueName, true, consumer);
        
        System.out.println("开始接收消息~~~");
        System.in.read();
        
        // 关闭信道、连接
        RabbitMqUtil.close(connection, channel);
    }
}

5.3、执行消费者

RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)
RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)

消费者代码报错,没有收到消息,但是队列的消息少了,原因就是,MQ将异常内部消化了。
RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)文章来源地址https://www.toymoban.com/news/detail-499046.html

到了这里,关于RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot-RabbitMQ06-持久化和ACK确认机制

    1.什么是消息确认ACK? 如果在处理消息的过程中,消费者的服务器在处理消息时出现异常,那么可能这条正在处理的消息刘没有完成消息消费,数据就会丢失,为了确保数据不会丢失RabbitMQ支持消息确认-ACK 2.ACK的消息确认机制 ACK机制是消费者从RabbitMQ收到消息并处理完成后,反

    2024年04月15日
    浏览(26)
  • springboot整合rabbitmq的发布确认,消费者手动返回ack,设置备用队列,以及面试题:rabbitmq确保消息不丢失

    目录 1.生产者发消息到交换机时候的消息确认 2.交换机给队列发消息时候的消息确认 3.备用队列 3.消费者手动ack   rabbitmq的发布确认方式,可以有效的保证我们的数据不丢失。   消息正常发送的流程是:生产者发送消息到交换机,然后交换机通过路由键把消息发送给对应的队

    2024年02月09日
    浏览(60)
  • RabbitMQ 消息确认机制

    为了保证消息从队列可靠的到达消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之

    2024年02月15日
    浏览(31)
  • RabbitMq 消息确认机制详解

    目录 1.消息可靠性 1.1.生产者消息确认 1.1.1.修改配置 1.1.2.定义Return回调 1.1.3.定义ConfirmCallback 1.2.消息持久化 1.2.1.交换机持久化 1.2.2.队列持久化 1.2.3.消息持久化 1.3.消费者消息确认 1.3.1.演示none模式 1.3.2.演示auto模式 1.4.消费失败重试机制 1.4.1.本地重试 1.4.2.失败策略 1.5.总结

    2024年01月21日
    浏览(32)
  • rabbitmq消息确认机制

    (1) publish === broker 只要broker收到消息,就会执行 confirmCallback (2) exchange === queue 如果exchange有消息没有成功发送至queue,就会执行RuturnCallback,例:routing key错误导致发送消息到队列失败 (3)RabbitmqConfig (1) queue === consumer 默认是ack,consumer只要拿到消息就会自动确认,服务端

    2024年02月13日
    浏览(31)
  • 8. springboot + rabbitmq 消息发布确认机制

    在 RabbitMQ之生产者发布确认原理章节已经介绍了rabbitmq生产者是如何对消息进行发布确认保证消息不丢失的。本章节继续看下springboot整合rabbitmq后是如何保证消息不丢失的。 消息正常是通过生产者生产消息传递到交换机,然后经过交换机路由到消息队列中,最后消费者消费,

    2023年04月25日
    浏览(51)
  • Rabbitmq入门与应用(六)-rabbitmq的消息确认机制

    确认消息是否发送给交换机 配置 编码RabbitTemplate.ConfirmCallback ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器, 也就是只确认是否正确到达 Exchange 中。 在配置类中编码确认回调函数。tips: 设置 rabbitTemplate.setMandatory(true); 配置类

    2024年02月20日
    浏览(34)
  • RabbitMQ的几种消息确认机制详细介绍

    前言:大家好,我是小威,24届毕业生,在一家满意的公司实习。本篇文章将详细介绍RabbitMQ的几种消息确认机制。 如果文章有什么需要改进的地方还请大佬不吝赐教 👏👏。 小威在此先感谢各位大佬啦~~🤞🤞 🏠个人主页:小威要向诸佬学习呀 🧑个人简介:大家好,我是

    2023年04月25日
    浏览(39)
  • 「RabbitMQ」实现消息确认机制以确保消息的可靠发送、接收和拒收

    目录 介绍 方案 配置手动确认 使用 「Bean 」 配置RabbitMQ的属性 确定消费、拒绝消费、拒绝消费进入死信队列 模拟生产者发送消息①         RabbitMQ 的消息确认机制应用场景非常广泛,尤其是在需要确保消息可靠性和避免消息丢失的场合下更为重要,例如:金融系统、电

    2024年02月08日
    浏览(28)
  • RabbitMQ:第一章:6 种工作模式以及消息确认机制

    } System.out.println(“发送数据成功”); channel.close(); connection.close(); } } 消费者一: import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** 消费者1 */ public class ConsumerOne { public static void main(String[] args) throws Exception { Con

    2024年04月12日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包