8. springboot + rabbitmq 消息发布确认机制

这篇具有很好参考价值的文章主要介绍了8. springboot + rabbitmq 消息发布确认机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


在 RabbitMQ之生产者发布确认原理章节已经介绍了rabbitmq生产者是如何对消息进行发布确认保证消息不丢失的。本章节继续看下springboot整合rabbitmq后是如何保证消息不丢失的。

1. 消息需要发布确认的原因

消息正常是通过生产者生产消息传递到交换机,然后经过交换机路由到消息队列中,最后消费者消费,如下图所示
8. springboot + rabbitmq 消息发布确认机制
上述为正常情况,但也有情况会导致消息丢失的情况:第一种情况,交换机重启,当消息经过消费者投递后,恰巧交换机正在重启,会导致生产者投递消息失败,从而导致消息丢失;第二种情况,交换机没问题,消息投递到交换机后,交换机路由到消息队列过程中出了问题,比如routingKey错误导致路由不到队列中。
针对上述两种导致消息丢失的情况,下面采用消息确认发布机制,分别采取消息正确投递到交换机后回调接口来确认消息正确被投递,消息经交换机正确路由到队列中回调接口来确认消息正确被路由。

2. 消息发送交换机后回调接口ConfirmCallback ,保证消息在发送交换机处不丢失

当消息经生产者投递到交换机后,为避免消息丢失,需要回调RabbitTemplate.ConfirmCallback接口,回调接口后,尤其是要对投递失败的消息进行处理或者记录下来保证消息不丢失。该接口不管消息投递到交换机成功或者失败都会进行回调,未避免消息丢失,可以选择在回调接口中只处理或者登记投递失败的消息,达到消息不丢失的目的。

下面通过案例演示生产者投递消息到交换机后回调ConfirmCallback接口情况。

8. springboot + rabbitmq 消息发布确认机制
`

2.1 在application.properties中添加spring.rabbitmq.publisher-confirm-type=correlated配置,开启回调机制

spring.rabbitmq.host=192.168.xx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123

spring.rabbitmq.publisher-confirm-type=correlated

2.2 声明交换机和队列并进行绑定
声明confirm_exchange交换机,声明confirm_queue队列,并通过routingKey=confirm绑定交换机和队列。

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    public static final String ROUTING_KEY = "confirm";

    /*声明交换机*/
    @Bean
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    /*声明队列*/
    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    /*绑定交换机和队列*/
    @Bean
    public Binding exchangeBindingQueue(@Qualifier("confirmExchange") DirectExchange confirmExchange,
                                        @Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);
    }
}

2.3 实现RabbitTemplate.ConfirmCallback接口
当消息由生产者发到交换机后会回调该接口中的confirm方法

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ExchangeCallback implements RabbitTemplate.ConfirmCallback {
	/* correlationData 内含消息内容
	 * ack 交换机接受成功或者失败。 true表示交换机接受消息成功, false表示交换机接受失败
	 * cause 表示失败原因
	*/
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("hello world");
        String id = correlationData.getId();
        String message = new String(correlationData.getReturnedMessage().getBody());
        if (ack){
            log.info("交换机收到消息id为{}, 消息内容为{}", id, message);
        }else {
            log.info("交换机未收到消息id为{}, 消息内容为{}, 原因为{}", id, message, cause);
        }
    }
}

2.4 创建生产者
创建生产者生产消息,本案例中生产者发送了2个消息,分别为hello rabbitmq 1hello rabbitmq 2
在创建生产者时要设置一下需要回调的接口ExchangeCallback ,在设置回调接口时用了java的@PostConstruct注解,该注解作用用来指定bean初始化的顺序,Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)。也就是说在初始化Producer3对象时,先调了Producer3的默认无参构造函数;然后执行Autowired注解部分,从Spring IOC容器中寻找rabbitTemplate和exchangeCallback对象分别注入到Producer3的rabbitTemplate和exchangeCallback属性中;最后执行PostConstruct注解部分,把exchangeCallback设置到rabbitTemplate中。

在发送hello rabbitmq 2消息时故意把routingKey写错导致hello rabbitmq 2消息不能从交换机发送到队列中,为下一节做铺垫。

import com.lzj.config.ConfirmConfig;
import com.lzj.config.ExchangeCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

@Slf4j
@Component
public class Producer3 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ExchangeCallback exchangeCallback;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(exchangeCallback);
    }

    public void produceMessage(){
        String message = "hello rabbitmq 1";
        CorrelationData correlationData1 = new CorrelationData("1");
        correlationData1.setReturnedMessage(new Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, correlationData1);
        message = "hello rabbitmq 2";
        CorrelationData correlationData2 = new CorrelationData("2");
        correlationData2.setReturnedMessage(new Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "error_routingkey", message, correlationData2);
    }
}

2.5 启动测试

@SpringBootApplication
@Slf4j
public class SpringbootDemo {
    public static void main(String[] args) {
        ConfigurableApplicationContext app = SpringApplication.run(SpringbootDemo.class, args);
        Producer3 producer = app.getBean(Producer3.class);
        producer.produceMessage();
    }
}

启动上面测试案例,生产者发送了id为1的hello rabbitmq 1消息和id为2的hello rabbitmq 2,交换机收到消息也回调了ExchangeCallback 接口对2条消息都进行了确认,尤其是对于失败的消息要在此步保存失败的消息,避免消息在交换机这一步丢失。

2022-08-04 00:43:28.817  INFO 13512 --- [           main] com.lzj.producer.Producer3               : 生产者发送id为1, 消息内容为hello rabbitmq 1
2022-08-04 00:43:28.823  INFO 13512 --- [           main] com.lzj.producer.Producer3               : 生产者发送id为2, 消息内容为hello rabbitmq 2
2022-08-04 00:43:28.827  INFO 13512 --- [nectionFactory1] com.lzj.config.ExchangeCallback          : 交换机收到消息id为1, 消息内容为hello rabbitmq 1
2022-08-04 00:43:28.828  INFO 13512 --- [nectionFactory2] com.lzj.config.ExchangeCallback          : 交换机收到消息id为2, 消息内容为hello rabbitmq 2

但上面还有一个问题,虽然回调ExchangeCallback接口,可以保证消息到交换机一步不会丢失,但如果交换机到队列的过程中出现了问题,消息一样会丢失。比如上面生产者把routingKey写错了,就会导致hello rabbitmq 2消息从交换机路由不到队列中。下面创建消费者程序,看消费者消费confirm_queue队列中消息情况

import com.lzj.config.ConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class Consumer3 {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void consuemrMessage(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("消费者消费消息{}", msg);
    }
}

然后重新启动测试代码,输出如下,可以看出消费端只消费了hello rabbitmq 1,而hello rabbitmq 2消息则丢失了。

2022-08-04 00:45:28.817  INFO 13512 --- [           main] com.lzj.producer.Producer3               : 生产者发送id为1, 消息内容为hello rabbitmq 1
2022-08-04 00:45:28.823  INFO 13512 --- [           main] com.lzj.producer.Producer3               : 生产者发送id为2, 消息内容为hello rabbitmq 2
2022-08-04 00:45:28.827  INFO 13512 --- [nectionFactory1] com.lzj.config.ExchangeCallback          : 交换机收到消息id为1, 消息内容为hello rabbitmq 1
2022-08-04 00:45:28.828  INFO 13512 --- [nectionFactory2] com.lzj.config.ExchangeCallback          : 交换机收到消息id为2, 消息内容为hello rabbitmq 2
2022-08-04 00:45:28.831  INFO 13512 --- [ntContainer#2-1] com.lzj.consumer.Consumer3   			   : 消费者消费消息hello rabbitmq 1

3. 消息经交换机路由到队列后回调接口ReturnCallback ,保证消息在发送队列处不丢失

为解决上一节中,消息由交换机和消息队列中异常,导致消息丢失问题,解决办法就是在添加消息从交换机路由到队列中失败后回调的接口,在回调接口中把失败的消息保存下来就可以避免消息丢失了。

在回调接口之前还需为RabbitMQ设置Mandatory标志,只有当该标志为true时,消息由交换机到队列失败后才会回调接口;如果该标志设置false时,消息由交换机路由到队列失败后自动丢弃消息,会导致消息丢失,这也是默认设置,所以如需保证消息不丢失,要打开Mandatory标志。

下面继续进行上面案例,在上面案例的基础上添加ReturnCallback接口实现

package com.lzj.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class QueueCallback implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息 {} 经交换机 {} 通过routingKey={} 路由到队列失败,失败code为:{}, 失败原因为:{}",
                new String(message.getBody()), exchange, routingKey, replyCode, replyText);
    }
}

然后修改上面的生产者Producer3 ,只需修改2点:第一点,为RabbitTemplate 设置Mandatory标志;第二点,把ReturnCallback的实现加入监听。

import com.lzj.config.ConfirmConfig;
import com.lzj.config.ExchangeCallback;
import com.lzj.config.QueueCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

@Slf4j
@Component
public class Producer3 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ExchangeCallback exchangeCallback;

    @Autowired
    private QueueCallback queueCallback;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(exchangeCallback);
        /**
		* true:交换机无法将消息进行路由时,会将该消息返回给生产者
		* false:如果发现消息无法进行路由,则直接丢弃
		*/
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(queueCallback);
    }

    public void produceMessage(){
        String message = "hello rabbitmq 1";
        CorrelationData correlationData1 = new CorrelationData("1");
        correlationData1.setReturnedMessage(new Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, correlationData1);
        log.info("生产者发送id为{}, 消息内容为{}", correlationData1.getId(), message);
        message = "hello rabbitmq 2";
        CorrelationData correlationData2 = new CorrelationData("2");
        correlationData2.setReturnedMessage(new Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "error_routingkey", message, correlationData2);
        log.info("生产者发送id为{}, 消息内容为{}", correlationData2.getId(), message);
    }
}

下面重新启动上面的测试案例,可以得出如下测试结果,生产者生产的hello rabbitmq 1hello rabbitmq 2消息都已发到交换机,也都被交换机确认了,但hello rabbitmq 2被交换机路由到队列时由于routingKey错误导致路由失败,已在ReturnCallback接口回调中被记录下来,最终正确被路由到队列中的消息只有hello rabbitmq 1,从打印日志看也只有hello rabbitmq 1被消费了。

2022-08-05 00:39:29.670  INFO 6832 --- [           main] com.lzj.producer.Producer3               : 生产者发送id为1, 消息内容为hello rabbitmq 1
2022-08-05 00:39:29.683  INFO 6832 --- [           main] com.lzj.producer.Producer3               : 生产者发送id为2, 消息内容为hello rabbitmq 2
2022-08-05 00:39:29.702  INFO 6832 --- [nectionFactory3] com.lzj.config.ExchangeCallback          : 交换机收到消息id为1, 消息内容为hello rabbitmq 1
2022-08-05 00:39:29.704  INFO 6832 --- [nectionFactory1] com.lzj.config.QueueCallback             : 消息 hello rabbitmq 2 经交换机 confirm_exchange 通过routingKey=error_routingkey 路由到队列失败,失败code为:312, 失败原因为:NO_ROUTE
2022-08-05 00:39:29.705  INFO 6832 --- [nectionFactory2] com.lzj.config.ExchangeCallback          : 交换机收到消息id为2, 消息内容为hello rabbitmq 2
2022-08-05 00:39:29.707  INFO 6832 --- [ntContainer#2-1] com.lzj.consumer.Consumer3               : 消费者消费消息hello rabbitmq 1

结论:从上面2个案例可以看出,通过ConfirmCallback和ReturnCallback接口的回调,保证了消息在交换机和队列处消息不丢失。

4. 备份交换机

备份交换机目的是解决消息交换机路由到队列中失败时保证消息不丢失的一种方法,与上面第3节中回调ReturnCallback达到的目的一致,但不管是回调ConfirmCallback还是回调ReturnCallback都增加了生产者的负担,当消息异常时会增加生产者的处理时间,导致生产端生产消息的吞吐量下降。另外对异常的消息也不可以用死信队列(死信队列详解参考此篇文章),因为消息在投递到交换机和队列过程中异常时无法被投递到死信交换机,因为不满足死信队列的3个条件(参见死信队列篇章)。因此备份交换机就是一种处理交换机路由队列过程中失败时保证消息不丢失的不错选择。

以下图为例,消息正常是有Producer生产者到confirm_exchange交换机,再由confirm_exchange交换机路由到confirm_queue队列中,最后被confirm_consumer消费掉。如果消息从Producer到confirm_exchange过程中出现了异常会回调ConfirmCallback接口保证消息不丢失。如果消息在confirm_exchange交换机到confirm_queue出现了异常,在前面案例中是回调ReturnCallback接口保证消息不丢失的,而此处是通过备用交换机实现,异常消息会被投递到back_exchange交换机中,该交换机可以是fanout类型交换机,可以把消息分别路由到backup_queue队列和warn_queue队列中,backup_queue队列中消息由backup_consumer消费者消费,warn_queue队列中消息由warn_consumer消费,其中backup_consumer用于备份失败的消息,比如失败的消息存储到数据库中,warn_consumer用于告警运维或开发人员有失败的消息。
8. springboot + rabbitmq 消息发布确认机制
1. 首先修改上述案例中ConfirmConfig类,用于声明confirm_exchange、backup_exchange交换机,声明confirm_queue、backup_queue、warn_queue队列,并对交换机和队列进行绑定。

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    public static final String BACKUP_QUEUE_NAME = "backup_queue";
    public static final String WARN_QUEUE_NAME = "warn_queue";

    /*声明确认交换机*/
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                              .durable(true)
                              .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME)     //设置备份交换机
                              .build();
    }

    /*声明确认队列*/
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    /*绑定确认交换机和确认队列*/
    @Bean
    public Binding confirmExchangeBindingQueue(@Qualifier("confirmExchange") DirectExchange confirmExchange,
                                               @Qualifier("confirmQueue") Queue confirmQueue){
       return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("confirm");
    }

    /*声明备份交换机*/
    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    /*声明备份队列*/
    @Bean("backupQueue")
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    /*绑定备份交换机和备份队列*/
    @Bean
    public Binding backupExchangeBindingBackupQueue(@Qualifier("backupExchange")FanoutExchange backupExchange,
                                                    @Qualifier("backupQueue")Queue backupQueue){
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    /*声明警告队列*/
    @Bean("warnQueue")
    public Queue warnQueue(){
        return QueueBuilder.durable(WARN_QUEUE_NAME).build();
    }

    /*绑定备份交换机和警告队列*/
    @Bean
    public Binding backupExchangeBindingWarnQueue(@Qualifier("backupExchange")FanoutExchange backupExchange,
                                                  @Qualifier("warnQueue")Queue warnQueue){
        return BindingBuilder.bind(warnQueue).to(backupExchange);
    }
}

2. 创建ConfirmConsumer消费者用于消费confirm_queue中的消息

import com.lzj.config.ConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ConfirmConsumer {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void consuemrMessage(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("消费者消费消息{}", msg);
    }
}

3. 创建BackupConsumer消费者用于消费backup_queue中的消息

import com.lzj.config.ConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class BackupConsumer {

    @RabbitListener(queues = ConfirmConfig.BACKUP_QUEUE_NAME)
    public void backupMessage(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("消息{} 备份到数据库", msg);
    }
}
  1. 创建WarnConsumer消费者用于消费warn_queue中消息
import com.lzj.config.ConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class WarnConsumer {

    @RabbitListener(queues = ConfirmConfig.WARN_QUEUE_NAME)
    public void warnMessage(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("消息{} 消费失败,请尽快处理", msg);
    }
}

5. 修改上述案例中的生产者Producer3中发消息时的routingKey为confirm,修改后代码如下

import com.lzj.config.ConfirmConfig;
import com.lzj.config.ExchangeCallback;
import com.lzj.config.QueueCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

@Slf4j
@Component
public class Producer3 {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ExchangeCallback exchangeCallback;

    @Autowired
    private QueueCallback queueCallback;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(exchangeCallback);
        /**
         * true:
         * 交换机无法将消息进行路由时,会将该消息返回给生产者
         * false:
         * 如果发现消息无法进行路由,则直接丢弃
         */
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(queueCallback);
    }

    public void produceMessage(){
        String message = "hello rabbitmq 1";
        CorrelationData correlationData1 = new CorrelationData("1");
        correlationData1.setReturnedMessage(new Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "confirm", message, correlationData1);
        log.info("生产者发送id为{}, 消息内容为{}", correlationData1.getId(), message);
        message = "hello rabbitmq 2";
        CorrelationData correlationData2 = new CorrelationData("2");
        correlationData2.setReturnedMessage(new Message(message.getBytes()));
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, "error_routingkey", message, correlationData2);
        log.info("生产者发送id为{}, 消息内容为{}", correlationData2.getId(), message);
    }
}

然后重新启动测试程序进行测试,输出代码如下所示,生产者生产了hello rabbitmq 1hello rabbitmq 2消息,并且2条消息偶成功发到了confirm_exchange交换机中,由于hello rabbitmq 2的routingKey写的有问题,导致hello rabbit 1消息被confirm_consumer正常消费,而hello rabbit 2则被投递到了backup_exchange备份交换机中,然后改消息被backup_consumer消费者保存,并同时被warn_consumer发出警告。

2022-08-18 00:56:29.549  INFO 7088 --- [           main] com.lzj.producer.Producer3               : 生产者发送id为1, 消息内容为hello rabbitmq 1
2022-08-18 00:56:29.559  INFO 7088 --- [nectionFactory1] com.lzj.config.ExchangeCallback          : 交换机收到消息id为1, 消息内容为hello rabbitmq 1
2022-08-18 00:56:29.563  INFO 7088 --- [           main] com.lzj.producer.Producer3               : 生产者发送id为2, 消息内容为hello rabbitmq 2
2022-08-18 00:56:29.578  INFO 7088 --- [ntContainer#0-1] com.lzj.consumer.BackupConsumer          : 消息hello rabbitmq 2 备份到数据库
2022-08-18 00:56:29.578  INFO 7088 --- [ntContainer#1-1] com.lzj.consumer.ConfirmConsumer         : 消费者消费消息hello rabbitmq 1
2022-08-18 00:56:29.580  INFO 7088 --- [ntContainer#4-1] com.lzj.consumer.WarnConsumer            : 消息hello rabbitmq 2 消费失败,请尽快处理
2022-08-18 00:56:29.584  INFO 7088 --- [nectionFactory1] com.lzj.config.ExchangeCallback          : 交换机收到消息id为2, 消息内容为hello rabbitmq 2

可见,备份队列f方式保证了消息不丢失。如果同时设置了备份队列以及第二个案例中的ReturnCallback回调接口,失败消息会先进备份队列,因为备份队列优先级高于回调RetrunCallback接口。文章来源地址https://www.toymoban.com/news/detail-424314.html

到了这里,关于8. springboot + rabbitmq 消息发布确认机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • rabbitmq消息确认机制

    (1) publish === broker 只要broker收到消息,就会执行 confirmCallback (2) exchange === queue 如果exchange有消息没有成功发送至queue,就会执行RuturnCallback,例:routing key错误导致发送消息到队列失败 (3)RabbitmqConfig (1) queue === consumer 默认是ack,consumer只要拿到消息就会自动确认,服务端

    2024年02月13日
    浏览(42)
  • RabbitMQ 消息确认机制

    为了保证消息从队列可靠的到达消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之

    2024年02月15日
    浏览(44)
  • RabbitMq 消息确认机制详解

    目录 1.消息可靠性 1.1.生产者消息确认 1.1.1.修改配置 1.1.2.定义Return回调 1.1.3.定义ConfirmCallback 1.2.消息持久化 1.2.1.交换机持久化 1.2.2.队列持久化 1.2.3.消息持久化 1.3.消费者消息确认 1.3.1.演示none模式 1.3.2.演示auto模式 1.4.消费失败重试机制 1.4.1.本地重试 1.4.2.失败策略 1.5.总结

    2024年01月21日
    浏览(45)
  • RabbitMQ 发布确认机制

    发布确认模式是避免消息由生产者到RabbitMQ消息丢失的一种手段   生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,之后RabbitMQ会返回Confirm.Select-OK命令表示同意生产者将当前信道设置为confirm模式。   confirm模式下的信道所发送的消息都将被应带ack或者nack一次

    2024年02月13日
    浏览(40)
  • RabbitMQ--基础--8.1--消息确认机制--接受确认机制(ACK)

    代码位置 消费者收到Queue中的消息,但没有处理完成就宕机的情况,这种情况下就可能会导致消息丢失。 为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除。 如果RabbitMQ没有收

    2024年02月10日
    浏览(50)
  • Rabbitmq入门与应用(六)-rabbitmq的消息确认机制

    确认消息是否发送给交换机 配置 编码RabbitTemplate.ConfirmCallback ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器, 也就是只确认是否正确到达 Exchange 中。 在配置类中编码确认回调函数。tips: 设置 rabbitTemplate.setMandatory(true); 配置类

    2024年02月20日
    浏览(47)
  • RabbitMQ的几种消息确认机制详细介绍

    前言:大家好,我是小威,24届毕业生,在一家满意的公司实习。本篇文章将详细介绍RabbitMQ的几种消息确认机制。 如果文章有什么需要改进的地方还请大佬不吝赐教 👏👏。 小威在此先感谢各位大佬啦~~🤞🤞 🏠个人主页:小威要向诸佬学习呀 🧑个人简介:大家好,我是

    2023年04月25日
    浏览(55)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制_rabbitmq 生产者消息确认

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月26日
    浏览(89)
  • RabbitMQ消息可靠性投递与ACK确认机制

    什么是消息的可靠性投递 保证消息百分百发送到消息队列中去 保证MQ节点成功接收消息 消息发送端需要接收到MQ服务端接收到消息的确认应答 完善的消息补偿机制,发送失败的消息可以再感知并二次处理 RabbitMQ消息投递路径 生产者–交换机–队列–消费者 通过两个节点控制

    2024年02月20日
    浏览(51)
  • RabbitMQ:第一章:6 种工作模式以及消息确认机制

    } System.out.println(“发送数据成功”); channel.close(); connection.close(); } } 消费者一: import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** 消费者1 */ public class ConsumerOne { public static void main(String[] args) throws Exception { Con

    2024年04月12日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包