RabbitMQ发送方确认机制

这篇具有很好参考价值的文章主要介绍了RabbitMQ发送方确认机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、前言

RabbitMQ消息首先发送到交换机,然后通过路由键【routingKey】和【bindingKey】比较从而将消息发送到对应的队列【queue】上。在这个过程有两个地方消息可能会丢失:

  1. 消息发送到交换机的过程。
  2. 消息从交换机发送到队列的过程。

而RabbitMQ提供了类似于回调函数的机制来告诉发送方消息是否发送成功。这里针对上述的两种情况,RabbitMQ也是给出了以下的应对策略:

  • publisher-confirm:消息到达交换机时会触发。
  • publisher-return:到达交换机但是没有路由到队列,会返回ack以及失败原因。

2、publisher-confirm

在SpringBoot项目的properties文件中加上

spring.rabbitmq.publisher-confirm-type=correlated

该配置有三个值:

  1. none:是禁用发布确认模式,是默认值
  2. correlated:是发布消息成功到交换器后会触发回调方法
  3. simple:有两种效果,第一种和correlated值一样会触发回调方法;第二种在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。

RabbitMQ的配置类实现ConfirmCallback

/**
 * @author LoneWalker
 * @date 2023/4/8
 * @description
 */
@Slf4j
@Configuration
public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback {

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        //设置给rabbitTemplate
        rabbitTemplate.setConfirmCallback(this);
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public DirectExchange getExchange(){
        return new DirectExchange("directExchange",false,false);
    }

    @Bean
    public Queue getQueue(){
        return new Queue("publisher.addUser",true,false,false);
    }

    @Bean
    public Binding getBinding(DirectExchange exchange,Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
    }

    /**
     * 消息成功到达交换机会触发
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("交换机收到消息成功:" + correlationData.getId());
        }else {
            log.error("交换机收到消息失败:" + correlationData.getId() + "原因:" + cause);
        }
    }
}

而需要这个correlationData是因为确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。所以我们改写一下发送消息的方法:

@RequiredArgsConstructor
@Service
public class PublisherServiceImpl implements PublisherService{

    private final RabbitTemplate rabbitTemplate;

    @Override
    public void addUser(User user) {

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("directExchange","publisher.addUser",user,correlationData);
    }
}

然后发送消息:

mq怎么确定发送成功了,RabbitMQ单排日记,rabbitmq,java,Springboot,原力计划

再模拟一下失败的情况——把交换机名称改成错的:

mq怎么确定发送成功了,RabbitMQ单排日记,rabbitmq,java,Springboot,原力计划

温馨提示:测试完把交换机名称改回去。

3、publisher-return

在SpringBoot项目的properties文件中添加:

spring.rabbitmq.publisher-returns=true
###消息在没有被队列接收时是否强行退回还是直接丢弃
spring.rabbitmq.template.mandatory=true

RabbitMQ的配置类再实现ReturnsCallback

@Slf4j
@Configuration
public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        //设置给rabbitTemplate
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public DirectExchange getExchange(){
        return new DirectExchange("directExchange",false,false);
    }

    @Bean
    public Queue getQueue(){
        return new Queue("publisher.addUser",true,false,false);
    }

    @Bean
    public Binding getBinding(DirectExchange exchange,Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("publisher.addUser");
    }

    /**
     * 消息成功到达交换机会触发
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("交换机收到消息成功:" + correlationData.getId());
        }else {
            log.error("交换机收到消息失败:" + correlationData.getId() + "原因:" + cause);
        }
    }

    /**
     * 消息未成功到达队列会触发
     * @param returnedMessage
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error("{}--消息未成功到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());
    }
}

把路由键改为错误的值:

mq怎么确定发送成功了,RabbitMQ单排日记,rabbitmq,java,Springboot,原力计划

正常来说消息到达交换机就一定可以到达队列,到不了队列基本上就是代码写错了。文章来源地址https://www.toymoban.com/news/detail-708759.html

到了这里,关于RabbitMQ发送方确认机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)

    代码位置 消费者收到Queue中的消息,但没有处理完成就宕机的情况,这种情况下就可能会导致消息丢失。 为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除。 如果RabbitMQ没有收

    2024年02月10日
    浏览(51)
  • RabbitMq生产者发送消息确认

    一般情况下RabbitMq的生产者能够正常的把消息投递到交换机Exchange,Exchange能够根据路由键routingKey把消息投递到队列Queue,但是一旦出现消息无法投递到交换机Exchange,或无法路由到Queue的这种特殊情况下,则需要对生产者的消息进行缓存或者保存到数据库,后续在调查完RabbitM

    2024年02月04日
    浏览(40)
  • RabbitMQ 发布确认机制

    发布确认模式是避免消息由生产者到RabbitMQ消息丢失的一种手段   生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,之后RabbitMQ会返回Confirm.Select-OK命令表示同意生产者将当前信道设置为confirm模式。   confirm模式下的信道所发送的消息都将被应带ack或者nack一次

    2024年02月13日
    浏览(41)
  • rabbitmq消息确认机制

    (1) publish === broker 只要broker收到消息,就会执行 confirmCallback (2) exchange === queue 如果exchange有消息没有成功发送至queue,就会执行RuturnCallback,例:routing key错误导致发送消息到队列失败 (3)RabbitmqConfig (1) queue === consumer 默认是ack,consumer只要拿到消息就会自动确认,服务端

    2024年02月13日
    浏览(43)
  • RabbitMq 消息确认机制详解

    目录 1.消息可靠性 1.1.生产者消息确认 1.1.1.修改配置 1.1.2.定义Return回调 1.1.3.定义ConfirmCallback 1.2.消息持久化 1.2.1.交换机持久化 1.2.2.队列持久化 1.2.3.消息持久化 1.3.消费者消息确认 1.3.1.演示none模式 1.3.2.演示auto模式 1.4.消费失败重试机制 1.4.1.本地重试 1.4.2.失败策略 1.5.总结

    2024年01月21日
    浏览(46)
  • RabbitMQ 消息确认机制

    为了保证消息从队列可靠的到达消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之

    2024年02月15日
    浏览(45)
  • RabbitMQ的确认机制

    生产者确认 生产者事务版确认 消费者确认 自动确认 消费后确认

    2024年02月16日
    浏览(26)
  • RabbitMQ-消费者确认机制

    none:不做任何处理,消息投递到消费者了之后,立即返回ACK,并且从MQ将消息删除,非常不安全,不建议使用。 manual:手动模式,需要在业务中调用api,ack或者reject。 auto:自动模式,SpringAMQP利用AOP对我们的消息处理做了环绕增强,当业务正常执行时返回ACK,执行异常时,根

    2024年01月21日
    浏览(54)
  • Rabbitmq入门与应用(六)-rabbitmq的消息确认机制

    确认消息是否发送给交换机 配置 编码RabbitTemplate.ConfirmCallback ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器, 也就是只确认是否正确到达 Exchange 中。 在配置类中编码确认回调函数。tips: 设置 rabbitTemplate.setMandatory(true); 配置类

    2024年02月20日
    浏览(47)
  • 8. springboot + rabbitmq 消息发布确认机制

    在 RabbitMQ之生产者发布确认原理章节已经介绍了rabbitmq生产者是如何对消息进行发布确认保证消息不丢失的。本章节继续看下springboot整合rabbitmq后是如何保证消息不丢失的。 消息正常是通过生产者生产消息传递到交换机,然后经过交换机路由到消息队列中,最后消费者消费,

    2023年04月25日
    浏览(64)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包