目录
一.消息可靠性传递概述
二.生产者消息确认机制
三.publisher-comfirm
四.publisher-return
五.消息持久化
六.消费者消息确认机制
七.如何确保RabbitMQ消息的可靠性?
八.死信交换机
九.延迟队列
十.惰性队列
十一.MQ集群
一.消息可靠性传递概述
生产者发送消息到交换机,交换机将消息路由到队列,消费者从队列获取消息。哪些环节会导致消息丢失。
1.生产者发送消息丢失
- 生产者没有将消息发送到交换机
- 交换机没有成功将消息路由到队列
2.MQ宕机导致消息丢失
3.消费者处理消息丢失
消费者获取到消息后,未来得及处理,宕机
消费者获取到消息后,处理消息抛异常。
二.生产者消息确认机制
生产者消息确认机制一共有2种方式
- publisher-comfirm
- publisher-return
在publisher这个微服务的application.yml中添加配置
spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
1.publish-confirm-type:
开启publisher-confirm,这里支持两种类型:
- simple:同步等待confirm结果,直到超时
- correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
2.publish-returns:
开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
3.template.mandatory:
定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
三.publisher-comfirm
作用:开启后,生产者发送消息到RabbitMQ交换机,RabbitMQ会进行结果返回。
- ack:生产者成功将消息发送到队列
- nack:生产者发送到交换机失败
如何使用
1.在生产者配置文件中开启
spring:
rabbitmq:
publisher-confirm-type: correlated # 异步回调
2.如何接受RabbitMQ结果返回
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
}
});
correlationData:消息的唯一标识
ack
- true:生产者成功将消息发送到交换机
- false:生产者发送到交换机失败
cause:失败原因
注意点:rabbitTemplate.setConfirmCallback方法的调用,只能调用一次。
3.如何保证rabbitTemplate.setConfirmCallback方法只会被调用一次
方案1:初始化方法
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
}
});
}
方案2:CommandLineRunner(推荐)
@Component
public class MyComandLineRunner implements CommandLineRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void run(String... args) throws Exception {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
}
});
}
}
方案3:实现ApplicationContextAware实现类
@Component
public class MyApplicationContext implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
}
});
}
}
4.发送消息的时候,需要给消息指定一个唯一标识
rabbitTemplate.convertAndSend(exchangeName,routingKey,message,new CorrelationData(id));
5.逻辑问题:如果生产者发送消息到交换机失败了?怎么重发
- 在发送消息之前,将消息先存储到数据库(MySQL,Redis)
- 如果消息发送交换机失败,读取Redis中信息,重新发送
6.测试
1.成功向交换机发送消息:观察
- correlationData
- ack
- cause
2.发送消息到交换机失败:删除交换机
四.publisher-return
作用:开启后,交换机将消息路由到消息队列失败,RabbitMQ会进行结果返回。
如何使用
1.在生产者配置文件中开启
spring:
rabbitmq:
publisher-returns: true
template:
mandatory: true
template:mandatory: true定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
2.如何接受RabbitMQ结果返回
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
}
});
- message:交换机路由队列失败的那个消息
- replyCode:错误码
- replyText:错误信息
- exchange:交换机
- routingKey:路由key
3.注意点
rabbitTemplate.setReturnCallback方法的调用,只能调用一次。
方案1:初始化方法
@PostConstruct
public void init() {
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
}
});
}
方案2:
@Component
public class MyComandLineRunner implements CommandLineRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void run(String... args) throws Exception {
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
}
});
}
}
方案3:实现ApplicationContextAware实现类
@Component
public class MyApplicationContext implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
}
});
}
}
4.逻辑问题:交换机路由消息到队列失败了,如何重新发送
直接调用rabbitTemplate.convertAndSet发送
5.测试
将路由key故意改错
- message:交换机路由队列失败的那个消息
- replyCode:错误码
- replyText:错误信息
- exchange:交换机
- routingKey:路由key
五.消息持久化
交换机、队列、消息持久化(都默认持久化)
交换机
ExchangeBuilder.directExchange(ITCAST_DIRECT ).durable(true).build();
new DirectExchange(ITCAST_DIRECT,true,false);
队列
QueueBuilder.durable(DIRECT_QUEUE1).build()
new Queue(DIRECT_QUEUE2,true);
消息持久化
1.如果发送普通字符串,默认持久化
2.如果期望消息不持久化。
Message msg = MessageBuilder.withBody(message.getBytes("utf-8")) .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) .build();
六.消费者消息确认机制
解决问题:
- 消费者处理消息丢失
- 消费者获取到消息后,未来得及处理,宕机
- 消费者获取到消息后,处理消息抛异常。
1.开启消费者消息确认机制
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto
- manual:手动ack,需要在业务代码结束后,调用api发送ack。
- auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
作用:消费者获取到消息后,如果处理消息出现异常,会给MQ返回nack. MQ将消息放入队列头部再给消费者。
消费者获取到消息后,如果处理消息正常,会给MQ返回ack. MQ将消息从消息队列中删除。
2.消费者消息确认机制-问题
如果消费者代码写的有问题
无限重试,导致MQ压力过大
3.开启消费者消息重试机制
优势
1.重试在消费者本地重试。
2.重试可以有延迟时间。
3.重试有次数限制
如何使用
rabbitmq:
listener:
simple:
retry:
enabled: true #开启失败重试
initial-interval: 100 # 初次失败,间隔时间
multiplier: 2 # 间隔时间倍数
max-attempts: 3 #最大重试次数
stateless: true #是否是无状态,true无状态,和事务相关,有事务写false
重试耗尽
触发重试耗尽策略
MessageRecover
RejectAndDonotMessageRecover(默认)重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediaRequeueMessageRecover重试耗尽后,返回nack,消息重新入队
RepublishMessageRecover
1.创建错误交换机
@Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct");}
2.错误队列
@Bean public Queue errorQueue(){ return new Queue("error.queue", true); }
3.绑定
@Bean public Binding errorBinding(){ return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");}
4.RepublishMessageRecover交由spring管理,进行重发。
@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }
5.重试耗尽后,将失败消息投递到指定的交换机
七.如何确保RabbitMQ消息的可靠性?
- 开启生产者确认机制,确保生产者的消息能到达队列
- 开启持久化功能,确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
- 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
八.死信交换机
死信
死信满足之一
- 消息被消费者拒绝,不让重新入队
- 消息队列满了,溢出的消息。
- 消息在消息队列中超时过期
去哪里?
- 被丢弃
- 如果队列指定了死信交换机。
死信交换机
普通交换机
怎么给队列指定死信交换机
- 给队列设置dead-letter-exchange属性,指定一个交换机
- 给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey
死信交换机 + 消息ttl实现延迟消息队列
延迟消息队列
生产者----->消息,消费者不能立即消费,需要等待一定时间才能消费。
如何实现
给消息设置ttl有2种方式
1.创建队列设置消息过期时间 ttl() x-message-ttl
2.创建消息的时候可以指定过期时间
Message msg = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8)) .setContentType("text/plain") .setExpiration("5000").build();
实现方式
我们声明消费者一组死信交换机和队列,基于注解方式:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "dl.queue", durable = "true"), exchange = @Exchange(name = "dl.direct"), key = "dl")) public void listenDlQueue(String msg){ log.info("接收到 dl.queue的延迟消息:{}", msg);}
消费者config中要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:
@Bean public DirectExchange ttlExchange(){ return new DirectExchange("ttl.direct"); } @Bean public Queue ttlQueue(){ return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化 .ttl(10000) // 设置队列的超时时间,10秒 .deadLetterExchange("dl.direct") // 指定死信交换机 .deadLetterRoutingKey("dl") // 指定死信RoutingKey .build();} @Bean public Binding simpleBinding(){ return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");}
发送消息时,给消息本身设置超时时间
@Test public void testTTLMsg() { // 创建消息 Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)) .setExpiration("5000") .build(); // 消息ID,需要封装到CorrelationData中 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 发送消息 rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);}
如何实现发送一个消息20秒后消费者才收到消息?
- 给消息的目标队列指定死信交换机
- 消费者监听与死信交换机绑定的队列
- 发送消息时给消息设置ttl为20秒
九.延迟队列
使用场景包括:
- 延迟发送短信
- 用户下单,如果用户在15 分钟内未支付,则自动取消
- 预约工作会议,20分钟后自动通知所有参会人员
延迟队列插件的使用步骤包括哪些?
- 声明一个交换机,添加delayed属性为true
- 发送消息时,添加x-delay头,值为超时时间
安装DelayExchange插件
DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。
1.基于注解方式:
2.基于java代码的方式:
然后我们向这个delay为true的交换机中发送消息,一定要给消息添加一个header:x-delay,值为延迟的时间,单位为毫秒:
十.惰性队列
消息堆积问题
- 生产者>消费者消费速度
- 如果消息堆积超过队列容量上限,溢出的消息就会称为死信。死信会被丢弃。
怎么解决
- 增加更多消费者,提高消费速度
- 在消费者内开启线程池加快消息处理速度
- 扩大队列容积(使用惰性队列),提高堆积上限
惰性队列特点
- 将消息直接存入磁盘,不存储内存
- 支持海量消息存储
- 消费者要获取消息,MQ将消息加载到内容。
创建
- lazy()
- 注解
- 管理控制台
优点
- 基于磁盘存储,消息上限高
- 没有间歇性的page-out,性能比较稳定
缺点
- 基于磁盘存储,消息时效性会降低
- 性能受限于磁盘的IO
十一.MQ集群
普通集群(分布式)
- 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
- 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
- 队列所在节点宕机,队列中的消息就会丢失
镜像集群(主从)数据存在延迟
1.主从架构集群,队列可以在多个节点上有
2.主节点:在那个节点上创建队列,那个节点就是主节点
3.镜像节点:备份主节点上队列的节点
4.创建备份策略:
exactly
all
nodes5.创建队列,根据队列名称,指定那些节点作为镜像节点。
仲裁队列代替镜像集群
1.生产者----->主节点队列------->镜像节点队列
2.与镜像队列一样,都是主从模式,支持主从数据同步
3.使用非常简单,没有复杂的配置
4.主从同步基于Raft协议,强一致
5.创建队列
指定类型quorum
java代码 quorum();
默认5个镜像节点
java代码怎么操作集群
和单机区别;文章来源:https://www.toymoban.com/news/detail-828470.html
spring:
rabbitmq:
addresses: 192.168.200.128:8071,192.168.200.128:8072,192.168.200.128:8073
username: itcast
password: 123
virtual-host: /
文章来源地址https://www.toymoban.com/news/detail-828470.html
到了这里,关于微服务RabbitMQ高级篇的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!