RabbitMq生产者发送消息失败现象
一般情况下RabbitMq的生产者能够正常的把消息投递到交换机Exchange,Exchange能够根据路由键routingKey把消息投递到队列Queue,但是一旦出现消息无法投递到交换机Exchange,或无法路由到Queue的这种特殊情况下,则需要对生产者的消息进行缓存或者保存到数据库,后续在调查完RabbitMq服务器的问题之后,待RabbitMq服务器正常之后,需要对这些消息进行重新投递。正常来说RabbitMq做了集群之后是不会出现这种问题,整个集群挂断的概率也是非常小。
错误信息
当项目启动后,然后把交换机Exchange删除后,然后生产者发送消息时会提示交换机不存在。Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange' in vhost '/', class-id=60, method-id=40)
SpringBoot代码示例
SpringBoot的application.properties需要新增spring.rabbitmq.publisher-confirm-type配置要求值是correlated。默认值是none表示无需触发交换机收到消息的回调接口。correlated表示消息发布后会触发交换机收到消息的回调接口。
# springboot整合rabbitMq的配置
spring.rabbitmq.host=192.168.15.200
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated
队列和交换机配置类
package springbootrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class ConfirmConfig {
// 普通交换机名称
public static final String EXCHANGE_NAME = "confirm_exchange";
// 队列名称
public static final String QUEUE_NAME = "confirm_queue";
public static final String ROUTING_KEY = "key1";
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);
}
}
生产者消息发送确认配置类
package springbootrabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
// 1.先实现RabbitTemplate.ConfirmCallback接口,从写confirm回调函数
@Slf4j
@Component
public class RabbitMqCallBack implements RabbitTemplate.ConfirmCallback {
// 2.注入
@Autowired
private RabbitTemplate rabbitTemplate;
/**
*
* @param correlationData 消息
* @param b 发送成功是true,失败是false
* @param s 发送失败时的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("交换机已经收到id为{}的消息", id);
} else {
log.error("交换机未收到id为{}的消息, 原因是:{}", id, s);
// 消息缓存或入库,邮件提醒运维
}
}
// 3.然后在springBoot对象初始化之后再执行rabbitTemplate.setConfirmCallback(this);设置回调函数,避免使用默认的ConfirmCallback
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
}
生产者类
package springbootrabbitmq.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import springbootrabbitmq.config.ConfirmConfig;
import springbootrabbitmq.config.TtlQueueConfig;
import java.util.Date;
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ConfirmController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public String sendMsg(@PathVariable String message) {
log.info("当前时间:{}, 发送一条消息:{} 到队列", new Date().toString(), message);
rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message);
return "success";
}
@GetMapping("/sendMsg2/{message}")
public String sendMsg2(@PathVariable String message) {
log.info("当前时间:{}, 发送一条消息:{} 到队列", new Date().toString(), message);
CorrelationData data = new CorrelationData();
data.setId("1111");
rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, data);
return "success";
}
}
消费者类
package springbootrabbitmq.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import springbootrabbitmq.config.ConfirmConfig;
import java.util.Date;
@Component
@Slf4j
public class ConfirmConsumer {
//监听器接收消息
@RabbitListener(queues = ConfirmConfig.QUEUE_NAME)
public void receiveD(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{}, 收到一条消息:{} ", new Date().toString(), msg);
}
}
首先正常发送,然后再删除交换机然后再发送。测试结果如下
2023-01-29 21:07:12.367 INFO 79848 --- [nio-8080-exec-1] s.controller.ConfirmController : 当前时间:Sun Jan 29 21:07:12 CST 2023, 发送一条消息:12 到队列
2023-01-29 21:07:12.399 INFO 79848 --- [nectionFactory1] s.config.RabbitMqCallBack : 交换机已经收到id为1111的消息
2023-01-29 21:07:12.403 INFO 79848 --- [ntContainer#0-1] s.consumer.ConfirmConsumer : 当前时间:Sun Jan 29 21:07:12 CST 2023, 收到一条消息:12
2023-01-29 21:08:01.282 INFO 79848 --- [nio-8080-exec-2] s.controller.ConfirmController : 当前时间:Sun Jan 29 21:08:01 CST 2023, 发送一条消息:123 到队列
2023-01-29 21:08:01.289 ERROR 79848 --- [168.15.200:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange' in vhost '/', class-id=60, method-id=40)
2023-01-29 21:08:01.290 ERROR 79848 --- [nectionFactory2] s.config.RabbitMqCallBack : 交换机未收到id为1111的消息, 原因是:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange' in vhost '/', class-id=60, method-id=40)
消息回退
如果不开启消息回退,默认是消息即使无法发送到队列(如路由键错误等场景),也不会进行提醒,生产者不知道消息能否成功发送到队列。
解决方案
当消息无法到达队列的时候进行提醒
消息回退代码示例
配置,开启消息不可达目的地时的回调
spring.rabbitmq.publisher-returns=true
配置类,实现RabbitTemplate.ReturnCallback接口
package springbootrabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
// 1.先实现RabbitTemplate.ConfirmCallback接口,从写confirm回调函数
@Slf4j
@Component
public class RabbitMqCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
// 2.注入
@Autowired
private RabbitTemplate rabbitTemplate;
/**
*
* @param correlationData 消息
* @param b 发送成功是true,失败是false
* @param s 发送失败时的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("交换机已经收到id为{}的消息", id);
} else {
log.error("交换机未收到id为{}的消息, 原因是:{}", id, s);
// 消息缓存或入库,邮件提醒运维
}
}
// 3.然后在springBoot对象初始化之后再执行rabbitTemplate.setConfirmCallback(this);设置回调函数,避免使用默认的ConfirmCallback
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
// 当消息传递过程中不可达到目的地时将消息返回给生产者,只有不可达到目的地时才会调用这个方法
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息无法被写入队列:{}, 退回原因:{}, 路由Key: {}", message, replyText, routingKey);
// 邮件发送,缓存或存到数据库
}
}
生产者
@GetMapping("/sendMsg3/{message}")
public String sendMsg3(@PathVariable String message) {
log.info("当前时间:{}, 发送一条消息:{} 到队列", new Date().toString(), message);
CorrelationData data = new CorrelationData();
data.setId("1111");
rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY, message, data);
rabbitTemplate.convertAndSend(ConfirmConfig.EXCHANGE_NAME, ConfirmConfig.ROUTING_KEY+"222", message +"222", data);
return "success";
}
消费者与上一个消费者相同文章来源:https://www.toymoban.com/news/detail-757698.html
测试结果如下:调用:http://127.0.0.1:8080/confirm/sendMsg3/123生产者的接口可以看到当路由键错误导致交换机无法把消息投递到队列时会回调returnedMessage方法。文章来源地址https://www.toymoban.com/news/detail-757698.html
2023-01-29 21:27:48.910 INFO 74512 --- [nio-8080-exec-1] s.controller.ConfirmController : 当前时间:Sun Jan 29 21:27:48 CST 2023, 发送一条消息:123 到队列
2023-01-29 21:27:48.934 INFO 74512 --- [nectionFactory1] s.config.RabbitMqCallBack : 交换机已经收到id为1111的消息
2023-01-29 21:27:48.941 ERROR 74512 --- [nectionFactory1] s.config.RabbitMqCallBack : 消息无法被写入队列:(Body:'123222' MessageProperties [headers={spring_returned_message_correlation=1111}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), 退回原因:NO_ROUTE, 路由Key: key1222
2023-01-29 21:27:48.943 INFO 74512 --- [nectionFactory2] s.config.RabbitMqCallBack : 交换机已经收到id为1111的消息
2023-01-29 21:27:48.946 INFO 74512 --- [ntContainer#0-1] s.consumer.ConfirmConsumer : 当前时间:Sun Jan 29 21:27:48 CST 2023, 收到一条消息:123
到了这里,关于RabbitMq生产者发送消息确认的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!