延迟和死信队列的配置
- 延迟队列有效期一分钟,后进入死信队列,如果异常就进入异常队列
@Configuration
@Data
public class RabbitMQConfig {
/**
* 交换机
*/
private String orderEventExchange="order.event.exchange";
/**
* 延迟队列, 不能被监听消费
*/
private String orderCloseDelayQueue="order.close.delay.queue";
/**
* 关单队列, 延迟队列的消息过期后转发的队列,被消费者监听
*/
private String orderCloseQueue="order.close.queue";
/**
* 进入延迟队列的路由key
*/
private String orderCloseDelayRoutingKey="order.close.delay.routing.key";
/**
* 进入死信队列的路由key,消息过期进入死信队列的key
*/
private String orderCloseRoutingKey="order.close.routing.key";
/**
* 过期时间 毫秒,临时改为1分钟定时关单
*/
private Integer ttl=1000*60;
/**
* 消息转换器
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 创建交换机 Topic类型,也可以用dirct路由
* 一般一个微服务一个交换机
* @return
*/
@Bean
public Exchange orderEventExchange(){
return new TopicExchange(orderEventExchange,true,false);
}
/**
* 延迟队列
*/
@Bean
public Queue orderCloseDelayQueue(){
Map<String,Object> args = new HashMap<>(3);
args.put("x-dead-letter-exchange",orderEventExchange);
args.put("x-dead-letter-routing-key",orderCloseRoutingKey);
args.put("x-message-ttl",ttl);
return new Queue(orderCloseDelayQueue,true,false,false,args);
}
/**
* 死信队列,普通队列,用于被监听
*/
@Bean
public Queue orderCloseQueue(){
return new Queue(orderCloseQueue,true,false,false);
}
/**
* 第一个队列,即延迟队列的绑定关系建立
* @return
*/
@Bean
public Binding orderCloseDelayBinding(){
return new Binding(orderCloseDelayQueue,Binding.DestinationType.QUEUE,orderEventExchange,orderCloseDelayRoutingKey,null);
}
/**
* 死信队列绑定关系建立
* @return
*/
@Bean
public Binding orderCloseBinding(){
return new Binding(orderCloseQueue,Binding.DestinationType.QUEUE,orderEventExchange,orderCloseRoutingKey,null);
}
}
异常队列配置类
public class RabbitMQErrorConfig {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 异常交换机
*/
private String orderErrorExchange = "order.error.exchange";
/**
* 异常队列
*/
private String orderErrorQueue = "order.error.queue";
/**
* 异常routing.key
*/
private String orderErrorRoutingKey = "order.error.routing.key";
/**
* 异常交换机
* @return
*/
@Bean
public TopicExchange errorTopicExchange(){
return new TopicExchange(orderErrorExchange,true,false);
}
/**
* 异常队列
* @return
*/
@Bean
public Queue errorQueue(){
return new Queue(orderErrorQueue,true);
}
/**
* 队列交换机进行绑定
* @param errorQueue
* @return
*/
@Bean
public Binding BindingErrorQueueAndExchange(Queue errorQueue,TopicExchange errorTopicExchange){
return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(orderErrorRoutingKey);
}
/**
* 配置 RepublishMessageRecoverer
* 用途:消息重试一定次数后,用特定的routingKey转发到指定的交换机中,方便后续排查和告警
*
* 顶层是 MessageRecoverer接口,多个实现类
*
* @return
*/
@Bean
public MessageRecoverer messageRecoverer(){
return new RepublishMessageRecoverer(rabbitTemplate,orderErrorExchange,orderErrorRoutingKey);
}
}
文章来源地址https://www.toymoban.com/news/detail-623612.html
文章来源:https://www.toymoban.com/news/detail-623612.html
到了这里,关于RabbitMQ延迟队列,死信队列配置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!