RabbitMQ系列(10)--RabbitMQ发布确认模式的概念及实现

这篇具有很好参考价值的文章主要介绍了RabbitMQ系列(10)--RabbitMQ发布确认模式的概念及实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

概念:虽然我们可以设置队列和队列中的消息持久化,但任然存在消息在持久化的过程中,即在写入磁盘的过程中,消息未完全写入,然后服务器宕机导致消息丢失的情况,发布确认就是为了解决这种情况的概念,在消息完全写入磁盘后才确认消息完全持久化了

1、发布确认模式:

(1)单个确认发布模式(简单,但吞吐量有限)

(2)批量确认发布模式(简单,吞吐量合理,但出现问题很难找出是那条消息出现的问题)

(3)异步确认发布模式(最佳性能和资源使用,在出现错误的情况下能很好的控制,推荐使用)

2、实现开启发布确认

在生产者的代码中在channel调用confirmSelect方法,即channel.confirmSelect()

注:RabbitMqUtils工具类的实现在我的另一篇文章里,有需要的同学可以查看参考

https://blog.csdn.net/m0_64284147/article/details/129465871

package com.ken.ack;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

public class Task02 {

    //队列名称(用于指定往哪个队列接收消息)
    public  static final String QUEUE_NAME = "my_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();

        //开启发布确认
        channel.confirmSelect();

        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //从控制台读取要发送的信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            /**
             * 用信道对消息进行发布
             * 第一个参数:发送到哪个交换机
             * 第二个参数:路由的Key值是哪个,本次是队列名
             * 第三个参数:其他参数信息
             * 第四个参数:发送消息的消息体
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息发送成功:" + message);
        }
    }

}

3、新建包,用于装实现确认发布模式的代码

(1)新建一个名为confirm的包,用于装发布确认的代码

RabbitMQ系列(10)--RabbitMQ发布确认模式的概念及实现,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

效果图:

RabbitMQ系列(10)--RabbitMQ发布确认模式的概念及实现,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

(2)新建一个名为ConfirmMessage的类

RabbitMQ系列(10)--RabbitMQ发布确认模式的概念及实现,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

4、单个确认发布模式

单个确认发布是一种同步确认发布的方式,在发布一个消息后并且该条消息被确认发布了,后续的消息才能继续发布,不过这种确认方式的最大缺点就是发布速度特别慢

代码如下:

package com.ken.confirm;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

/**
 * 发布确认模式:
 * 1、单个确认发布模式
 * 2、批量确认发布模式
 * 3、异步确认发布模式
 */
public class ConfirmMessage {

    //队列名称(用于指定往哪个队列接收消息)
    public static final String QUEUE_NAME = "my_queue";

    //批量发布消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception{

        //1、单个确认发布模式
        ConfirmMessage.publishMessageIndividually();

    }

    public static void publishMessageIndividually() throws  Exception{
        Channel channel = RabbitMqUtils.getChannel();

        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //开启发布确认
        channel.confirmSelect();

        //开始时间
        long begin = System.currentTimeMillis();

        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            //单个消息发布确认
            channel.waitForConfirms();
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个单个确认消息,耗时" + (end - begin) + "ms");
    }

}

运行代码,查看单个确认发布模式消耗的时间

RabbitMQ系列(10)--RabbitMQ发布确认模式的概念及实现,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

5、批量确认发布模式

批量确认发布是一种能极大的提高吞吐量的发布模式,在发布一批消息后一起确认,不过这种确认方式的缺点是当发送故障导致发布出现问题时,不知道是哪个消息出现的问题

代码如下:

package com.ken.confirm;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

/**
 * 发布确认模式:
 * 1、单个确认模式
 * 2、批量确认模式
 * 3、异步确认模式
 */
public class ConfirmMessage {

    //队列名称(用于指定往哪个队列接收消息)
    public static final String QUEUE_NAME = "my_queue";

    //批量发布消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception{
        
        //2、批量确认模式
        ConfirmMessage.publishMessageBatch();

    }

    public static void publishMessageBatch() throws  Exception{
        Channel channel = RabbitMqUtils.getChannel();

        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //开启发布确认
        channel.confirmSelect();

        //开始时间
        long begin = System.currentTimeMillis();

        //批量确认消息大小
        int batchSize = 100;

        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

            //达到100条消息的时候,批量确认一次
            if(i % batchSize == 0) {
                //批量消息发布确认
                channel.waitForConfirms();
            }

        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
    }

}

运行代码,查看批量确认发布模式消耗的时间

RabbitMQ系列(10)--RabbitMQ发布确认模式的概念及实现,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

6、异步确认发布模式

(1)代码实现

异步确认发布实现逻辑比上面两种要复杂,但性价比高,无论是可靠性还是效率都非常突出,异步确认发布通过回调函数来达到消息可靠性传递,消息的结构类似于map,都是key-value的结构,当相应的消息被消费了或消费失败了,都可以通过对应的key值来确认消费或消费失败的是哪一条消息,所以可靠性很高

RabbitMQ系列(10)--RabbitMQ发布确认模式的概念及实现,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

代码如下:

package com.ken.confirm;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;

/**
 * 发布确认模式:
 * 1、单个确认模式
 * 2、批量确认模式
 * 3、异步批量确认模式
 */
public class ConfirmMessage {

    //队列名称(用于指定往哪个队列接收消息)
    public static final String QUEUE_NAME = "my_queue";

    //批量发布消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception{

        //3、异步批量确认模式
        ConfirmMessage.publishMessageAsync();

    }

    //异步发布确认
    public static void  publishMessageAsync() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //开启发布确认
        channel.confirmSelect();

        //开始时间
        long begin = System.currentTimeMillis();

        /**
         * 消息确认成功回调函数
         * 第一个参数:消息的标记
         * 第二个参数:是否确立确认
         */
        ConfirmCallback ackCallback = (deliveryTag,  multiple) -> {
            System.out.println("确认的消息:" + deliveryTag);
        };

        /**
         * 消息确认失败回调函数
         * 第一个参数:消息的标记
         * 第二个参数:是否确立确认
         */
        ConfirmCallback nackCallback = (deliveryTag,  multiple) -> {
            System.out.println("未确认的消息:" + deliveryTag);
        };

        /**
         * 消息监听器,用于监听消息发送是否成功
         * 第一个参数:消息确认成功回调函数
         * 第二个参数:消息确认失败回调函数
         */
        channel.addConfirmListener(ackCallback,nackCallback);

        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "消息" + i;
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个异步发布确认消息,耗时" + (end - begin) + "ms");

    }

}

(2)运行代码,查看异步确认发布模式消耗的时间

这里因为是异步的,即代码执行完并输出耗时时间了,但消息监听器还在运行,所以还在时间输出后还在输出确认的消息

RabbitMQ系列(10)--RabbitMQ发布确认模式的概念及实现,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档
RabbitMQ系列(10)--RabbitMQ发布确认模式的概念及实现,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

(3)处理异步未确认消息

已确认的消息没必要处理,而未确认的消息需要进行重新入队的处理,但由上述步骤(2)的效果图可看出程序在执行完后监听器还在监听消息是否确认成功,而要怎么做才能在程序执行完后再处理监听器监听出来未确认的消息呢?最好的解决方案便是把未确认的消息放在一个基于内存的能被发布线程访问到的队列里,例如就用ConcurrentSkipListMap这个集合在confirm(发布确认)、 callbacks(回调)与发布线程之间进行消息的传递;实现的思路是先用ConcurrentSkipListMap记录发送的所有消息,然后监听器监听消息,确认消息成功后会执行消息确认成功的回调函数,而回调函数执行删除ConcurrentSkipListMap集合里当前被确认的消息的操作,最后ConcurrentSkipListMap里剩下的就是未确认成功的消息

代码如下:

package com.ken.confirm;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;

import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * 发布确认模式:
 * 1、单个确认模式
 * 2、批量确认模式
 * 3、异步批量确认模式
 */
public class ConfirmMessage {

    //队列名称(用于指定往哪个队列接收消息)
    public static final String QUEUE_NAME = "my_queue";

    //批量发布消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception{

        //3、异步批量确认模式
        ConfirmMessage.publishMessageAsync();

    }

    //异步发布确认
    public static void  publishMessageAsync() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        //开启发布确认
        channel.confirmSelect();

        /**
         * 线程安全有序的一个哈希表,适用于高并发的情况下
         * 功能:
         * 1、轻松的把序号(key)与消息(value)进行关联
         * 2、只要给了序号(key)就能批量删除条目
         * 3、支持高并发(多线程)
         */
        ConcurrentSkipListMap<Long,String> outStandingConfirms = new ConcurrentSkipListMap<>();

        //开始时间
        long begin = System.currentTimeMillis();

        /**
         * 消息确认成功回调函数
         * 第一个参数:消息的标记
         * 第二个参数:是否确立确认
         */
        ConfirmCallback ackCallback = (deliveryTag,  multiple) -> {
            //删除队列里所有已经确认的消息,剩下的就是未确认的消息
            if(multiple) {
                //multiple为true时将一次性ack所有小于deliveryTag的消息,headMap是用于获取第一个key到传入key的所有的key
                ConcurrentNavigableMap<Long, String> confirmed = outStandingConfirms.headMap(deliveryTag);
                System.out.println("确认的消息:" + deliveryTag);
                confirmed.clear();
            }else {
                outStandingConfirms.remove(deliveryTag);
            }
        };

        /**
         * 消息确认失败回调函数
         * 第一个参数:消息的标记
         * 第二个参数:是否确立确认
         */
        ConfirmCallback nackCallback = (deliveryTag,  multiple) -> {
            //打印未确认的消息
            String message = outStandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息:" + message + "未确认消息的tag:" + deliveryTag);
        };

        /**
         * 消息监听器,用于监听消息发送是否成功
         * 第一个参数:消息确认成功回调函数
         * 第二个参数:消息确认失败回调函数
         */
        channel.addConfirmListener(ackCallback,nackCallback);

        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = "消息" + i;
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            //收集所有发送的消息(channel.getNextPublishSeqNo()用于获取下一次发布的序号)
            outStandingConfirms.put(channel.getNextPublishSeqNo(),message);
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个异步发布确认消息,耗时" + (end - begin) + "ms");

    }

}

效果图:文章来源地址https://www.toymoban.com/news/detail-534203.html

RabbitMQ系列(10)--RabbitMQ发布确认模式的概念及实现,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档
RabbitMQ系列(10)--RabbitMQ发布确认模式的概念及实现,rabbitmq,java-rabbitmq,rabbitmq,Powered by 金山文档

到了这里,关于RabbitMQ系列(10)--RabbitMQ发布确认模式的概念及实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 完全二叉树——堆的概念及实现

    堆(heap):是堆内存的简称,堆是动态分配内存,内存大小不固定,也不会自动释放,堆——数据结构是一种无序的树状结构,同时它还满足key-value键值对的存储方式。 如果有一个关键码的集合K = { , , ,…, },把它的所有元素按完全二叉树的顺序存储方式存储在一个一维

    2024年02月07日
    浏览(37)
  • 消息队列-RabbitMQ:发布确认—发布确认逻辑和发布确认的策略

    生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者 (包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列

    2024年02月21日
    浏览(39)
  • 数据结构 栈的概念及栈的实现

    目录 1.栈的概念及结构 2.栈的实现  2.1  初始化栈 2.2 入栈  2.3 出栈  2.4 获取栈顶元素 2.5 获取栈中有效元素个数   2.6  检测栈是否为空,如果为空返回非零结果,如果不为空返回0 2.7 销毁栈  3. 完整代码 test.c  Stack.h Stack.c   栈(后进先出,先进后出) : 一种 特殊

    2024年01月21日
    浏览(49)
  • 数据结构:栈的概念及栈的实现

    目录 1.栈的概念及结构 2.栈的实现  2.1  初始化栈 2.2 入栈  2.3 出栈  2.4 获取栈顶元素 2.5 获取栈中有效元素个数   2.6  检测栈是否为空,如果为空返回非零结果,如果不为空返回0 2.7 销毁栈  3. 完整代码 test.c  Stack.h Stack.c   栈(后进先出,先进后出) : 一种 特殊

    2024年02月01日
    浏览(41)
  • RabbitMQ(二) - RabbitMQ与消息发布确认与返回、消费确认

    SpringBoot与RabbitMQ整合后,对RabbitClient的“确认”进行了封装、使用方式与RabbitMQ官网不一致; 生产者给交换机发送消息后、若是不管了,则会出现消息丢失; 解决方案1: 交换机接受到消息、给生产者一个答复ack, 若生产者没有收到ack, 可能出现消息丢失,因此重新发送消息;

    2024年02月14日
    浏览(48)
  • rabbitmq的发布确认

    生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式, 所有在该信道上面发布的 消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队

    2024年02月12日
    浏览(52)
  • RabbitMQ 发布确认机制

    发布确认模式是避免消息由生产者到RabbitMQ消息丢失的一种手段   生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,之后RabbitMQ会返回Confirm.Select-OK命令表示同意生产者将当前信道设置为confirm模式。   confirm模式下的信道所发送的消息都将被应带ack或者nack一次

    2024年02月13日
    浏览(41)
  • 【RabbitMQ教程】第三章 —— RabbitMQ - 发布确认

                                                                       💧 【 R a b b i t M Q 教程】第三章—— R a b b i t M Q − 发布确认 color{#FF1493}{【RabbitMQ教程】第三章 —— RabbitMQ - 发布确认} 【 R abbi tMQ 教程】第三章 —— R abbi tMQ − 发布确认

    2024年02月08日
    浏览(90)
  • 【RabbitMQ教程】第七章 —— RabbitMQ - 发布确认高级

                                                                       💧 【 R a b b i t M Q 教程】第七章—— R a b b i t M Q − 发布确认高级 color{#FF1493}{【RabbitMQ教程】第七章 —— RabbitMQ - 发布确认高级} 【 R abbi tMQ 教程】第七章 —— R abbi tMQ −

    2024年02月09日
    浏览(84)
  • springboot整合rabbitmq发布确认高级

    在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们如何才能进行 RabbitMQ 的消息可靠投递。   在配置文件当中添加   spring.rabbitmq.publisher-confirm-type=correlated   NONE:禁用发布确认模式

    2024年02月11日
    浏览(88)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包