RabbitMQ 消息确认机制

这篇具有很好参考价值的文章主要介绍了RabbitMQ 消息确认机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ 消息确认机制

为了保证消息从队列可靠的到达消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待持有消息直到消费者显式调用 Basic.Ack 命令为止。

当autoAck 参数为 false 时,对于 RabbitMQ 服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。如果 RabbitMQ 服务器端一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。

RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息连接是否已经断开,这个设置的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。

RabbitMQ 的 Web 管理平台上可以看到当前队列中的 “Ready” 状态和 “Unacknowledged” 状态的消息数,分别对应等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数。

在rabbitmq中,我们可以通过持久化数据,解决rabbitmq服务器异常的数据丢失问题。生产者将消息发送出去之后,消息到底有没有到达 rabbitmq服务器,默认的情况是不知道的,有两种方式:事务机制和confirm 模式 可以做到消息的确认。

事务机制

txSelect: 用户将当前的channel设置成transaction模式
txCommit:用于提交事务
txRollback:回滚事务

当我们使用txSelect提交开始事务之后,我们就可以发布消息给Broke代理服务器,如果txCommit提交成功了,则消息一定到达了Broke了,如果在txCommit执行之前Broker出现异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback方法进行回滚事务了。

  1. 消息生产者:Send
/**
 * 类描述:
 * 事务机制,消息生产者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 17:07
 */
public class Send {

    public static final String QUEUE_NAME = "test_queue_tx";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String msg = "hello tx message";

        try{
            // 开启事务
            channel.txSelect();
            // 发送消息
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            // 手动触发异常
            System.out.println(1/0);
            //提交事务
            channel.txCommit();
        } catch(Exception e){
            // 回滚事务
            channel.txRollback();
            System.out.println("send message txRollback");
        } finally{
            channel.close();
            connection.close();
        }
    }
}
  1. 消息消费者:Recv
/**
 * 类描述:
 * 事务机制,消息消费者
 * @author cfl
 * @version 1.0
 * @date 2022/10/10 17:08
 */
public class Recv {

    public static final String QUEUE_NAME="test_queue_tx";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("recv[tx] msg:" + msg);
            }
        });
    }
}

此种模式还是很耗时的,采用了这种方式,降低了Rabbitmq的消息吞吐量

Confirm 模式

生产者消息确认

生产者端模式的实现原理

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始)。一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID) 。这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker 回传给生产者的确认消息中deliver-tag 域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域。表示到这个序列号之前的所有消息都已经得到了处理。

Confirm 模式最大的好处在于他是异步

开启confirm模式

// 消息生产者使用
channel.confirmSelect();

编程模式:

  1. 普通Confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端Confirm。实际上是一种串行Confirm了,每publish一条消息之后就等待服务端Confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传;
  2. 批量Confirm模式:批量Confirm模式,每发送一批消息之后,调用waitForConfirms()方法,等待服务端Confirm,这种批量确认的模式极大的提高了Confirm效率,但是如果一旦出现Confirm返回false或者超时的情况,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降;
  3. 异步Confirm模式:提供一个回调方法,服务端Confirm了一条或者多条消息后Client端会回调这个方法。

boolean flag= channel.waitForConfirms();会堵塞线程等带服务器来返回确认消息。可以为这个函数指定一个毫秒值用于等待服务器的确认超时时间。如果抛出异常表示服务器出了问题,需要补发消息。无论是返回false还是抛出异常都有可能消息发送成功或者没有发送成功。

普通 confirm 单条普通

  1. 消息生产者:Send1
/**
 * 普通模式
 */
public class Send1 {
    public static final String QUEUE_NAME = "test_queue_confirm1";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 生产者调用confirmSelect 将chancel设置为confirm模式。注意(事务机制改为这个会出异常)
        channel.confirmSelect();

        String msg = "hello confirm message";

        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        System.out.println("send message txRollback");

        /*
            确认消息
            堵塞线程等待服务器返回响应
            如果服务器确认消费者已经发送完成则返回true
         */
        if(channel.waitForConfirms()){
            System.out.println("message send ok");
        } else {
            System.out.println("message send failed");
        }

        channel.close();
        connection.close();
    }
}

普通Confirm模式最简单,publish一条消息后,等待服务器端Confirm,如果服务端返回false或者超时时间内未返回,客户端就可以在else中进行消息重传。

  1. 消息生产者:Send2
    批量的 发一批 waitForConfirms
/**
 * 批量模式
 */
public class Send2 {
    public static final String QUEUE_NAME = "test_queue_confirm2";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 生产者调用confirmSelect 将chancel设置为confirm模式。注意(事务机制改为这个会出异常)
        channel.confirmSelect();

        // 批量发送
        for (int i = 0; i < 10; i++) {
            String msg = "hello confirm message";

            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }

        System.out.println("send message txRollback");

        // 确认是否发送成功
        if(channel.waitForConfirms()){
            System.out.println("message send ok");
        } else {
            System.out.println("message send failed");
        }

        channel.close();
        connection.close();
    }
}

还可以调用channel.waitForConfirmsOrDie()方法,该方法会等到最后一条消息得到确认或者得到nack才会结束,也就是说在waitForConfirmsOrDie处会造成当前程序的阻塞

异步模式

Channel 对象提供的ConfirmListener()回调方法只包含deliveryTag (当前Channel发出的消息序号)。我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1。每回调一次handleAck方法,unconfirm 集合删掉相应的一条(multiple=false) 或多条( multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。

  1. 消息生产者:Send3
/**
 * 异步
 */
public class Send3 {
    public static final String QUEUE_NAME = "test_queue_confirm3";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false,false,false, null);

        // 生产者调用confirmSelect 将channel设置为confirm模式
        channel.confirmSelect();

        // 存放未确认的消息标识
        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

        // 监听通道 当收到Broker发送过来的ack消息时就会调用handleAck方法,收到nack时就会调用handleNack方法。
        channel.addConfirmListener(new ConfirmListener(){

            // 没问题的handleAck
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if(multiple){
                    System.out.println("【handleAck】-----multiple true");
                    confirmSet.headSet(deliveryTag+1).clear();
                } else {
                    System.out.println("【handleAck】-----multiple false");
                    confirmSet.remove(deliveryTag);
                }
            }

            // handleNack 有问题的
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {

                if(multiple){
                    System.out.println("【N】handleNack-----multiple");
                    confirmSet.headSet(deliveryTag + 1).clear();
                } else {
                    System.out.println("【N】handleNack-----multiple false");
                    confirmSet.remove(deliveryTag);
                }
            }
        });

        String msgStr = "ssss";

        while(true){
            long seqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("", QUEUE_NAME, null, msgStr.getBytes());
            confirmSet.add(seqNo);
        }

    }
}

消费者消息确认

首先介绍消息消费的前提,rabbitmq 消费消息有两种模式,一个是推送 push ,一个是自己拉取pull。

推模式:消息中间件主动将消息推送给消费者
拉模式:消费者主动从消息中间件拉取消息。

但实际使用中,拉取消息是会降低系统吞吐量的,以及消费者很难实时获取消息,因此,一般使用的是push 模式。
在 mq 推消息给消费者不是等消费者消费完一个再推一个,而是根据prefetch_count 参数来决定可以推多个消息到消费者的缓存里面。
在消费者确认中,为了保证数据不会丢失,RabbitMQ 支持消息确定ACK。ACK 机制是消费者从 RabbitMQ 收到消息并处理完成后,返回给RabbitMQ,RabbitMQ 收到反馈后才将此消息从队列中删除。

自动确认

自动确认是指消费者在消费消息的时候,当消费者收到消息后,消息就会被 RabbitMQ 从队列中删除掉。这种模式认为 “发送即成功”。这是不安全的,因为消费者可能在业务中并没有成功消费完就中断了。

//autoAck=true 表示自动确认,autoAck=false表示手动确认
channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer);
手动确认 autoAck:false

消费者首先需要将其信道设置成手动确认:

boolean autoAck = false; //false 手动回执
channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer);

手动确认又分为肯定确认否定确认

肯定确认 BasicAck

// false 表示只确认 b.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
channel.basicAck(b.getEnvelope().getDeliveryTag(), false);

否定确认: BasicNack、BasicReject

否定确认的场景不多,但有时候某个消费者因为某种原因无法立即处理某条消息时,就需要否定确认了.
否定确认时,需要指定是丢弃掉这条消息,还是让这条消息重新排队,过一会再来,又或者是让这条消息重新排队,并尽快让另一个消费者接收并处理它.

丢弃:

// requeue=false表示直接丢弃不放入队列。
channel.basicNack(deliveryTag, multiple, requeue)

重新排序

// requeue=true表示放入队列。
channel.basicNack(deliveryTag, multiple, requeue)

一般来说,如果出现异常,就使用channel.BasicNack 把消费失败的消息重新放入到队列中去。文章来源地址https://www.toymoban.com/news/detail-614467.html

到了这里,关于RabbitMQ 消息确认机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • rabbitmq消息确认机制

    (1) publish === broker 只要broker收到消息,就会执行 confirmCallback (2) exchange === queue 如果exchange有消息没有成功发送至queue,就会执行RuturnCallback,例:routing key错误导致发送消息到队列失败 (3)RabbitmqConfig (1) queue === consumer 默认是ack,consumer只要拿到消息就会自动确认,服务端

    2024年02月13日
    浏览(42)
  • RabbitMQ 消息确认机制

    为了保证消息从队列可靠的到达消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之

    2024年02月15日
    浏览(44)
  • RabbitMq 消息确认机制详解

    目录 1.消息可靠性 1.1.生产者消息确认 1.1.1.修改配置 1.1.2.定义Return回调 1.1.3.定义ConfirmCallback 1.2.消息持久化 1.2.1.交换机持久化 1.2.2.队列持久化 1.2.3.消息持久化 1.3.消费者消息确认 1.3.1.演示none模式 1.3.2.演示auto模式 1.4.消费失败重试机制 1.4.1.本地重试 1.4.2.失败策略 1.5.总结

    2024年01月21日
    浏览(45)
  • 8. springboot + rabbitmq 消息发布确认机制

    在 RabbitMQ之生产者发布确认原理章节已经介绍了rabbitmq生产者是如何对消息进行发布确认保证消息不丢失的。本章节继续看下springboot整合rabbitmq后是如何保证消息不丢失的。 消息正常是通过生产者生产消息传递到交换机,然后经过交换机路由到消息队列中,最后消费者消费,

    2023年04月25日
    浏览(63)
  • Rabbitmq入门与应用(六)-rabbitmq的消息确认机制

    确认消息是否发送给交换机 配置 编码RabbitTemplate.ConfirmCallback ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器, 也就是只确认是否正确到达 Exchange 中。 在配置类中编码确认回调函数。tips: 设置 rabbitTemplate.setMandatory(true); 配置类

    2024年02月20日
    浏览(47)
  • RabbitMQ的几种消息确认机制详细介绍

    前言:大家好,我是小威,24届毕业生,在一家满意的公司实习。本篇文章将详细介绍RabbitMQ的几种消息确认机制。 如果文章有什么需要改进的地方还请大佬不吝赐教 👏👏。 小威在此先感谢各位大佬啦~~🤞🤞 🏠个人主页:小威要向诸佬学习呀 🧑个人简介:大家好,我是

    2023年04月25日
    浏览(55)
  • 「RabbitMQ」实现消息确认机制以确保消息的可靠发送、接收和拒收

    目录 介绍 方案 配置手动确认 使用 「Bean 」 配置RabbitMQ的属性 确定消费、拒绝消费、拒绝消费进入死信队列 模拟生产者发送消息①         RabbitMQ 的消息确认机制应用场景非常广泛,尤其是在需要确保消息可靠性和避免消息丢失的场合下更为重要,例如:金融系统、电

    2024年02月08日
    浏览(39)
  • RabbitMQ消息可靠性投递与ACK确认机制

    什么是消息的可靠性投递 保证消息百分百发送到消息队列中去 保证MQ节点成功接收消息 消息发送端需要接收到MQ服务端接收到消息的确认应答 完善的消息补偿机制,发送失败的消息可以再感知并二次处理 RabbitMQ消息投递路径 生产者–交换机–队列–消费者 通过两个节点控制

    2024年02月20日
    浏览(51)
  • RabbitMQ:第一章:6 种工作模式以及消息确认机制

    } System.out.println(“发送数据成功”); channel.close(); connection.close(); } } 消费者一: import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** 消费者1 */ public class ConsumerOne { public static void main(String[] args) throws Exception { Con

    2024年04月12日
    浏览(37)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制_rabbitmq 生产者消息确认

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月26日
    浏览(89)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包