rabbitmq消息可靠性之消息回调机制
rabbitmq在消息的发送与接收中,会经过上面的流程,这些流程中每一步都有可能导致消息丢失,或者消费失败甚至直接是服务器宕机等,这是我们服务接受不了的,为了保证消息的可靠性,rabbitmq提供了以下几种机制
-
生产者确认机制
-
消息持久化存储
-
消费者确认机制
-
失败重试机制
本文主要讲解生产者确认机制,也是rabbitmq提供的消息回调机制,这个机制可以解决生产者发送消息到交换机和交换机路由到队列过程中的消息丢失问题
这种机制必须给每个消息指定一个唯一ID,消息发送到rabbitmq之后会返回结果给生产者,表示消息是否发送成功,返回结果有以下两种
-
publisher-confirm:发送者确认:消息成功投递到交换机,返回 ack;消息未投递到交换机,返回 nack
-
publisher-return:发送者回执:消息成功投递到交换机,但是没有路由到队列。返回 ack,及路由失败原因
spring:
rabbitmq:
# rabbitMQ的ip地址
host: 127.0.0.1
# 端口
port: 5672
# 集群模式配置
# addresses: 127.0.0.1:8071, 127.0.0.1:8072, 127.0.0.1:8073
username: admin
password: 123456
virtual-host: /
# 消费者确认机制相关配置
# 开启publisher-confirm,
# 这里支持两种类型:simple:同步等待confirm结果,直到超时;# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-confirm-type: correlated
# publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
publisher-returns: true
# 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
template:
mandatory: true
然后定义 ReturnCallback 回调,每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置
package com.gitee.small.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class RabbitMQConfig implements ApplicationContextAware {
//绑定键
public final static String DOG = "topic.dog";
public final static String CAT = "topic.cat";
/**
* Queue构造函数参数说明
* new Queue(SMS_QUEUE, true);
* 1. 队列名
* 2. 是否持久化 true:持久化 false:不持久化
*/
@Bean
public Queue firstQueue() {
return new Queue(DOG);
}
@Bean
public Queue secondQueue() {
return new Queue(CAT);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
/**
* 将firstQueue和topicExchange绑定,而且绑定的键值为topic.dog
* 这样只要是消息携带的路由键是topic.dog,才会分发到该队列
*/
@Bean(name = "binding.dog")
public Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(DOG);
}
/**
* 将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
* 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
*/
@Bean(name = "binding.cat")
public Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 判断是否是延迟消息
Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
if (receivedDelay != null && receivedDelay > 0) {
// 是一个延迟消息,忽略这个错误提示
return;
}
// 记录日志
log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有需要的话,重发消息
});
}
}
接着定义 ConfirmCallback,ConfirmCallback 可以在发送消息时指定,因为每个业务处理 confirm 成功或失败的逻辑不一定相同,上面已经定义好exchange 和 queue,新建RabbitMqTest测试类
package smallJ;
import com.gitee.small.Application;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.UUID;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class RabbitMqTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() throws InterruptedException {
// 1.准备CorrelationData
// 1.1.消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 1.2.准备ConfirmCallback
correlationData.getFuture().addCallback(result -> {
// 判断结果
if (result.isAck()) {
// ACK
log.info("消息成功投递到交换机!消息ID: {}", correlationData.getId());
} else {
// NACK
log.error("消息投递到交换机失败!消息ID:{},原因:{}", correlationData.getId(), result.getReason());
// 重发消息
}
}, ex -> {
// 记录日志
log.error("消息发送异常, ID:{}, 原因{}", correlationData.getId(), ex.getMessage());
// 可以重发消息
});
rabbitTemplate.convertAndSend("topicExchange", "topic.dog", "路由模式测试-dog", correlationData);
// 程序休眠两秒等待回调
Thread.sleep(2000);
}
}
加两个监听器进行测试文章来源:https://www.toymoban.com/news/detail-717511.html
package com.gitee.small.rabbitmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class TopicRabbitReceiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.dog"),
exchange = @Exchange(value = "bindingExchangeMessage", type = ExchangeTypes.TOPIC)
))
public void process(String msg) {
log.info("dog-收到消息:{}", msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.cat"),
exchange = @Exchange(value = "bindingExchangeMessage2", type = ExchangeTypes.TOPIC)
))
public void process2(String msg){
log.info("cat-收到消息:{}", msg);
}
}
测试结果如下文章来源地址https://www.toymoban.com/news/detail-717511.html
smallJ.RabbitMqTest : 消息成功投递到交换机!消息ID: 83f057fa-042d-4f56-872d-9d31a0444b82
c.g.small.rabbitmq.TopicRabbitReceiver : dog-收到消息:路由模式测试-dog
c.g.small.rabbitmq.TopicRabbitReceiver : cat-收到消息:路由模式测试-dog
到了这里,关于rabbitmq消息可靠性之消息回调机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!