rabbitmq笔记-rabbitmq进阶-数据可靠性,rabbitmq高级特性

这篇具有很好参考价值的文章主要介绍了rabbitmq笔记-rabbitmq进阶-数据可靠性,rabbitmq高级特性。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

消息何去何从

mandatory和immediate是channel.basicPublish方法的两个参数,都有消息传递过程中不可达目的地时将消息返回给生产者的功能。

mandatory参数
  • true:交换器无法根据自身的类型 和路由键找到符合条件的队列,rabbitmq调用Basic.Return命令将消息返回给生产者
    • 生产者调用channel.addReturnListener添加ReturnListener监听器实现
  • false:消息直接丢弃
immediate参数

告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。

Rabbitmq3.0去掉了对immediate参数支持,建议采用TTL和DLX方法替代

  • true:如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,这条消息将不会存入队列中,当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return返回至生产者。
备份交换器(AE)

生产者发送消息不设置mandatory,消息未被路由会丢失,设置了,需要添加ReturnListener。如果不想编程复杂也不想消息丢失使用备份交换器。

使用:

  • 声明交换器时添加alternate-exchange参数实现
    • channel.exchangeDeclare(“myAe”,“fanout”,true,false,null);
    • channel.queueDeclare(“unroutedQueue”,true,false,false,null);
    • channel.queueBind(“unroutedQueue”,“myAe”,“”);
  • 通过策略(Policy)实现
    • rabbitmqctl set_policy AE “^normalExchange$” ‘{“alternate-exchange”:“myAE”}’

特殊情况:

  • 如果设置了备份交换器不存在,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失
  • 如果备份交换器没有绑定任何队列,客户端和rabbitmq服务端都不会有异常出现,此时消息会丢失
  • 如果备份交换器没有任何匹配的队列,客户端和rabbitmq服务端都不会有异常出现,此时消息会丢失
  • 如果备份交换器和mandatory参数一起使用,那么mandatory参数无效

过期时间(Time to Live ,TTL)

设置消息TTL
  • 通过队列属性设置,队列中所有消息都有相同的过期时间(一旦过期,就从队列中抹去,消息已经在队列头部,只要定期从队列头部开始扫描即可)

    • channel.queueDeclare方法中加入x-message-ttl参数实现,单位ms

      • Map<String,Object> args = new HashMap<String,Object>();
        args.put("x-message-ttl",6000);
        channel.queueDeclare(queueName,durable,exclusive,autoDelete,args);
        
    • 通过Policy方式设置ttl

      • rabbitmqctl set_policy TTL ".*" '{"message-ttl":6000}' --apply-to queue
        
    • 通过调用http api接口设置

  • 通过对消息本身单独设置,每条消息的ttl可以不同(即使过期,也不会马上抹去,是否过期是在即将投递到消费者之前判定的)

    • 代码设置
      • 设置AMQP.BasicProperties属性
      • set属性:deliveryMode(持久化消息),expiration(ttl时间)
    • 通过 http api接口设置
  • 如果两个方法一起使用,消息的ttl以两者之间较小的数值为准,消息在队列中一旦超过设置的ttl时,就会变成死信,消费者将无法再收到该消息

  • 不设置ttl,表示此消息不会过期,ttl=0,表示除非此时可以直接将消息投递到消费者,否则立即丢弃。

设置队列TTL

channel.queueDeclare方法中的x-expires参数可以控制队列被自动删除前处于未使用状态的时间(未使用:队列上没有任何消费者,队列也没有被重新声明,并在过期时间段内也未调用过Basic.Get命令)

Map<String, Object> args = new HashMap<String,Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue",false,false,false,args);

死信队列

当消息在一个队列中变成死信后,能被重新被发送到另一个交换器中,这个交换器就是DLX,死信交换器,绑定DLX的队列就是死信队列

消息变成死信情况

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

当队列中存在死信时,rabbitmq会自动将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,死信队列,可以监听这个队列的消息进行相应处理

设置方法

  • 代码设置:
    • channel.queueDeclare方法中设置x-dead-letter-exchange为队列添加DLX
  • 通过Policy方式设置

延迟队列

延迟队列存储的对象是对应的延迟消息(当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费)。

场景:

  • 订单系统,30min内未支付,进行异常处理
  • 手机遥控设备指定时间工作

通过DLX 和TTL模拟延迟队列的功能。

假设一个应用中需要将每条消息都设置为10秒延迟,生产者通过exchange.normal交换器将发送的消息存储在queue.normal队列,消费者订阅的是queue.dlx队列,当消息从queue.normal整个队列中过期之后被存入queue.dlx队列,消费者恰巧消费到了延迟10秒的这条消息。

优先级队列

实现:通过设置队列的x-max-priority参数实现

默认最低优先级为0,越高越优先消费

前提:如果在消费速度大于生成者的速度且broker中没有消息堆积的情况下,对发送的消息设置优先级就没什么意义了。

RPC实现

客户端发送请求消息,服务端回复响应的消息,为了接收响应的消息,需要在请求消息中发送一个回调队列

String callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("","rpc_queue",props,message.getBytes());
  • replyTo:用来设置一个回调队列
  • correlationId:用来关联请求和其调用RPC之后的回复,每一个请求设置一个唯一的correlationId

可以为每一个客户端创建一个单一的回调队列。

持久化

  • 交换器持久化:通过在声明队列是将durable参数置为true实现的。如果不持久化,rabbitmq服务重启后,相关的交换器元数据会丢失,消息不丢失,只是不能将消息发送到这个交换器中了。
  • 队列持久化:通过在声明队列时将durable置为true,如果不设置持久化,rabbitmq重启后,相关队列元数据会丢失,此时数据也会丢失。
  • 消息持久化:通过将消息的投递模式BasicProperties中的diliveryMode属性设置为2即可实现消息的持久化
  • 设置了队列和消息的持久化,rabbitmq服务重启后,消息依旧存在。

将交换器、队列、消息都设置持久化后不能保证数据百分百丢失。

生产者确认

确定消息到底有没有正确到达服务器。可以通过事务机制和发送方确认机制

事务机制

rabbitmq客户端与事务机制相关方法

  • channel.txSelect:用于当前的信道设置成事务模式
  • channel.txCommit:用于提交事务
  • channel.txRollback:用于事务回滚

开启事务流程

  • 客户端发送Tx.select,将信道置为事务模式
  • Broker回复Tx.Select-Ok,确认已将信道置为事务模式
  • 在发送完消息后,客户端发送Tx.Commit提交事务
  • Broker回复Tx.Commit-Ok,确认提交事务
  • 如果发生异常,在捕获异常后,channel.txRollback()回滚

缺点:会有性能损失

发送方确认机制
  • 生产者将信道设置成confirm模式(channel.confirmSelect),rabbitmq同意:Confirm.Select-Ok;
  • 一旦信道进入confirm模式,所有在该信道上发布的消息都会被指派一个唯一id
  • 一旦消息被投递到匹配的队列后,rabbitmq会发送一个确认给生产者(包含消息唯一id),使得生产者知晓消息已经正确到达了目的地。

事务机制在一条消息发送后会使发送端阻塞,等待rabbitmq回应后才发下一条消息,而发送发确认机制最大好处是异步的。生产者通过回调方法处理该确认消息。如果rabbitmq因自身内部错误导致消息丢失,会发送一条nack命令,生产者应用程序同样可以在回调方法中处理nack命令。

publisher confirm优势

  • 批量confirm方法,每发送一批消息后,调用channel.waitForConfirms方法,等待服务器的确认返回
  • 异步confirm方法:提供一个回调方法,服务端确认了一条或多条消息后客户端会回调这个方法进行处理。

消费端要点介绍

消息分发

rabbitmq队列拥有多个消费者时,队列收到的消息将以轮询的分发方式发送给消费者,每条消息只会发送给订阅列表里的一个消费者。

  • 问题:如果某些空闲,某些忙碌造成整体下降
  • 方法:channel.basicQos方法允许限制信道上的消费者所能保持的最大未确认消息的数量。如果达到上限,就不会向这个消费者再发送任何消息,知道消费者确认了某条消息后,相应计数减1,之后消费者可以继续接受消息。
消息顺序性

指消费者消费到的消息和发送者发布的消息的顺序是一致的。

打破顺序性的情形

  • 如果生产者使用了事务机制,发送消息遇到异常进行了事务回滚,需重新补偿发送,如果是另一个线程实现,则出现乱序。
  • 如果生产者发送的消息设置了不同的超时时间,并设置了死信队列,顺序不一致。
  • 设置了优先级,也不是顺序的。

要保证消息的顺序性,需要业务方使用rabbitmq之后进一步处理,例如在消息体内添加全局有序标识实现。

弃用QueueingConsumer

缺陷

  • 内存溢出问题:队列堆积较多的消息,导致消费者客户端内存溢出假死,不断堆积
    • 使用Basic.Qos限制某个消费者所保持未确认消息的数量。
  • 会拖累同一个connection下的所有信道,性能降低
  • 同步递归调用QueueingConsumer会产生死锁
  • rabbitmq的自动连接恢复机制不支持Queueing Consumer这种形式
  • QueueingConsumer不是事件驱动的

消息传输保障

一般消息中间件消息传输保障分为三个层级

  • 最多一次
  • 最少一次
  • 恰好一次

rabbitmq支持其中的最多一次和最少一次,其中最少一次投递实现需要考虑

  • 消息生产者需要开启事务机制或publisher confirm机制,以确保消息可以可靠地传输到rabbitmq中
  • 消息生产者需要配合使用mandatory参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃
  • 消息和队列都需要进行持久化处理,以确保rabbitmq服务器在遇到异常情况时不会造成消息丢失
  • 消费者在消费消息的同时需要将autoAck设置为false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失。

参考:《RabbitMQ实战指南》文章来源地址https://www.toymoban.com/news/detail-687828.html

到了这里,关于rabbitmq笔记-rabbitmq进阶-数据可靠性,rabbitmq高级特性的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ --- 消息可靠性

    消息队列在使用过程中,面临着很多实际问题需要思考:      消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 co

    2024年02月14日
    浏览(46)
  • RabbitMQ-保证消息可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月07日
    浏览(48)
  • RabbitMQ消息的可靠性

    面试题: Rabbitmq怎么保证消息的可靠性? 1.消费端消息可靠性保证: 消息确认(Acknowledgements) : 消费者在接收到消息后,默认情况下RabbitMQ会自动确认消息(autoAck=true)。为保证消息可靠性,可以设置autoAck=false,使得消费者在处理完消息后手动发送确认(basicAck)。如果消费

    2024年04月14日
    浏览(69)
  • RabbitMQ之MQ可靠性

    RabbitMQ实现数据持久化包括3个方面 (1)交换机持久化 (2)队列持久化 (3)消息持久化 注:开启持久化和生产者确认时,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK回执 从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列 注:从3.12版本后,所有队

    2024年01月21日
    浏览(35)
  • RabbitMQ保证消息的可靠性

    消息从发送,到消费者接收,会经理多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括: 发送时丢失: 生产者发送的消息未送达exchange 消息到达exchange后未到达queue MQ宕机,queue将消息丢失 consumer接收到消息后未消费就宕机 针对这些问题,RabbitMQ分别给出了

    2024年02月19日
    浏览(45)
  • RabbitMQ如何保证消息可靠性

    目录 1、RabbitMQ消息丢失的可能性 1.1 生产者消息丢失场景 1.2 MQ导致消息丢失 1.3 消费者丢失 2、如何保证生产者消息的可靠性 2.1 生产者重试机制 2.2 生产者确认机制 2.3 实现生产者确认 2.3.1 配置yml开启生产者确认 2.3.2 定义ReturnCallback 2.3.3 定义ConfirmCallback 3、MQ消息可靠性 3.1

    2024年02月20日
    浏览(51)
  • RabbitMQ----生产者可靠性

    生产者可靠性主要分为两个方面: 生产者重连 生产者确认         有的时候由于网络波动,可能会出现客户端连接MO失败的情况。通过配置我们可以开启连接失败后的重连机制: 注意:         当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不

    2024年01月21日
    浏览(46)
  • RabbitMQ-生产者可靠性

            由于网络波动导致客户端无法连接上MQ,这是可以开启MQ的失败后重连机制。         注意:                 是连接失败的重试,而不是消息发送失败后的重试。         这种超时重连的方式是 阻塞式 的,后面的代码没办法执行,如果说业务要求比较严格,则需

    2024年01月21日
    浏览(40)
  • RabbitMQ消息可靠性问题及解决

    说明:在RabbitMQ消息传递过程中,有以下问题: 消息没发到交换机 消息没发到队列 MQ宕机,消息在队列中丢失 消息者接收到消息后,未能正常消费(程序报错),此时消息已在队列中移除 针对以上问题,提供以下解决方案: 消息确认:确认消息是否发送到交换机、队列;

    2024年02月16日
    浏览(43)
  • 【RabbitMQ】之消息的可靠性方案

    一、数据丢失场景 二、数据可靠性方案 1、生产者丢失消息解决方案 2、MQ 队列丢失消息解决方案 3、消费者丢失消息解决方案 MQ 消息数据完整的链路为 :从 Producer 发送消息到 RabbitMQ 服务器中,再由 Broker 服务的 Exchange 根据 Routing_Key 路由到指定的 Queue 队列中,最后投送到消

    2024年02月14日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包