【RabbitMQ教程】第三章 —— RabbitMQ - 发布确认

这篇具有很好参考价值的文章主要介绍了【RabbitMQ教程】第三章 —— RabbitMQ - 发布确认。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

【RabbitMQ教程】第三章 —— RabbitMQ - 发布确认

                                                                  💧 【 R a b b i t M Q 教程】第三章—— R a b b i t M Q − 发布确认 \color{#FF1493}{【RabbitMQ教程】第三章 —— RabbitMQ - 发布确认} RabbitMQ教程】第三章——RabbitMQ发布确认💧          


🌷 仰望天空,妳我亦是行人.✨
🦄 个人主页——微风撞见云的博客🎐
🐳 《数据结构与算法》专栏的文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺
💧 《Java学习笔记》专栏的文章是本人在Java学习中总结的一些知识点~ 💐
🥣 《每天一点小知识》专栏的文章可以丰富你的知识库,滴水成河~ 🌊
🥕 《RabbitMQ》专栏的文章是在学习尚硅谷课程时整理的笔记,方便复习巩固~ 🍑
🪁 希望本文能够给读者带来一定的帮助~🌸文章粗浅,敬请批评指正!🐥



🌊RabbitMQ - 发布确认

发布确认逻辑

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者应用程序同样可以在回调方法中处理该 nack 消息。

发布确认的策略

开启发布确认的方法:

发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法

//开启发布确认
channel.confirmSelect();

单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

/**
 * 单个发送
 */
public static void publishMessageIndividually() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //队列声明
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName, true, false, false, null);
    //开启发布确认
    channel.confirmSelect();

    long begin = System.currentTimeMillis();

    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        //服务端返回 false 或超时时间内未返回,生产者可以消息重发
        boolean flag = channel.waitForConfirms();
        if (flag) {
            System.out.println("消息发送成功");
        }
    }

    long end = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");

}

批量确认发布

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出 问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

/**
 * 批量
 */
public static void publishMessageBatch() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    //队列声明
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName, true, false, false, null);
    //开启发布确认
    channel.confirmSelect();
    //批量确认消息大小
    int batchSize = 100;
    //未确认消息个数
    int outstandingMessageCount = 0;
    long begin = System.currentTimeMillis();

    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        outstandingMessageCount++;
        if (outstandingMessageCount == batchSize) {
            channel.waitForConfirms();
            outstandingMessageCount = 0;
        }
    }
    //为了确保还有剩余没有确认消息 再次确认
    if (outstandingMessageCount > 0) {
        channel.waitForConfirms();
    }
    long end = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}

异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功, 下面就让我们来详细讲解异步确认是怎么实现的。
【RabbitMQ教程】第三章 —— RabbitMQ - 发布确认
如何处理异步未确认消息?

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列, 比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

以上 3 种发布确认速度对比 :

  • 单独发布消息

    同步等待确认,简单,但吞吐量非常有限。

  • 批量发布消息

    批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。

  • 异步处理

    最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些


【RabbitMQ教程】第三章 —— RabbitMQ - 发布确认


🐳结语

🐬初学一门技术时,总有些许的疑惑,别怕,它们是我们学习路上的点点繁星,帮助我们不断成长。

🐟文章粗浅,希望对大家有帮助!

💧下一篇 -->【RabbitMQ教程】第四章 —— RabbitMQ - 交换机文章来源地址https://www.toymoban.com/news/detail-482892.html

到了这里,关于【RabbitMQ教程】第三章 —— RabbitMQ - 发布确认的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Linux第三章

    无论是Windows、MacOS、Linux均采用多用户的管理模式进行权限管理。在Linux系统中,拥有最大权限的账户名为:root(超级管理员) root用户拥有最大的系统操作权限,而普通用户在许多地方的权限是受限的(普通用户的权限,一般在其HOME目录内是不受限的,一旦出了HOME目录,大

    2023年04月26日
    浏览(51)
  • 第三章:函数

    1.定义 设 x , y 是两个变量,x的变化范围是实数集D,如果对于任何的x∈D,按照一定的法则都有唯一确定的y值与之对应。则称变量y是变量x的函数。记为 y = f(x) 。称D为函数的 定义域 ,x为自变量,y为因变量。全体函数值的集合称为函数y的 值域 。 2.函数的表示方法 1. 公式

    2024年02月01日
    浏览(76)
  • 第三章 Elasticsearch简介

    Elasticsearch (后称为 ES )是一个天生支持分布式的搜索、聚合分析和存储引擎。 搜索引擎 全文检索引擎 分布式文档系统 分布式数据库 OLAP系统 分布式搜索中间件 不要去死背概念,概念应该作为一种辅助的手段帮助我们去理解一项技术或知识,总之,等你真正会用了,你就

    2024年02月06日
    浏览(40)
  • 第三章-上网行为安全

    1)宽带滥用 2)上网难监管 3)信息泄露 4)网络违法 5)安全威胁 1)上网行为三要素:用户、流量、行为 2)功能需求 (AC的功能)-- 重点 用户认证 应用控制 网页过滤 行为审计 流量管理 应用选路 互联网上网行为管控 一体化网关 无线Wi-Fi管控营销 无线防共享上网 全网上

    2024年01月23日
    浏览(47)
  • 第三章nginx详解

    特点: 1,稳定性高。(没有apache稳定) 2,系统资源消耗地较低。(处理http请求的并发能力非常高,单台物理服务器可以处理30000-50000个并发请求) 稳定:一般在企业中,为了保持服务器的稳定,并发量的设置在20000个左右。占用内存2M左右。 nginx主要功能: 1,静态文件服

    2024年02月12日
    浏览(47)
  • 【计组】第三章练习

    4、设有一个具有20位地址和32位字长的存储器,问: (1)该存储器能存储多少个字节的信息? 220 × 32 bits = 1M × 4B = 4MB (220是2的20次方,上标打不出来…) (2)如果存储器由512K * 8位SRAM芯片组成,需要多少片? (1024K * 32)/(512K * 8) = 8 片 (3)需要多少位地址做芯片选择? 存

    2024年02月04日
    浏览(76)
  • 第三章 选择与循环

    程序员必备技能(思想):增量编写法。每写一部分代码要及时运行看结果是否正确,对于复杂程序很重要。 常用的运算符优先级: 逻辑非 ! 算术运算符 关系运算符 || 赋值运算符 单目运算符 逻辑非 ! 算术运算符 +、-、×、/、% 关系运算符 、、=、=、==、!= 逻辑运算符 、|

    2024年02月09日
    浏览(49)
  • 第三章-运输层

    运输层协议为运行在不同主机上的进程之间提供逻辑通信,即从应用程序角度看两个主机好像直连一样,实际可能相隔万里 运输层协议是在端系统上实现的,而不是路由器,为什么这么强调,因为运输层会将应用报文划分为较小的块然后加上一个运输层首部来生成运输层报文

    2024年02月14日
    浏览(39)
  • 第三章 decimal模块

    decimal 模块是 Python 提供的用于进行十进制定点和浮点运算的内置模块。使用它可以快速正确地进行十进制定点和浮点数的舍入运算,并且可以控制有效数字的个数。 使用 decimal 模块主要是因为它与 Python 自带的浮点数相比,有以下优点 : 基于浮点模型,提供与数字计算相同

    2024年02月09日
    浏览(54)
  • 408数据结构第三章

    特性后进先出 只允许在 一端 进行插入或删除操作的线性表 每接触一种新的数据结构类型,都应该分别从逻辑结构、存储结构和对数据的运算三方面入手 操作 initstack(s)初始化一个空栈s stackempty(s)判断一个栈是否为空 push(s,x)进栈,未满成为新栈顶 pop(s,x)出栈,非空弹出栈顶元

    2024年02月09日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包