RabbitMQ中channel与try()结合导致的消息不消费或消息无法接收的问题分析与定位
项目场景:
使用 rabbitMQ 测试 topic 交换机的案例
关键信息
RabbitMQ、 try、消费者无法接收消息
一、 写在前面
想要直接参考解决方案,看最后一部分
想要看问题原因,看第三部分
想看如何分析,顺序浏览
- 个人认为,交换机相对于队列数量更少,且与生产者更加相关,因此交给生产者声明更佳。
- 一次声明之后,只要它还在,就不必重复声明。队列亦是如此,消费者声明后,只要还在,无需重复声明。
- 如有错误,欢迎留言批评指正。
二、问题描述
生产者声明topic交换机正常,消息发布正常
消费者运行无异常,但是无法消费消息
RabbitMQUtil源码
public class RabbitMQUtil {
private static final int MESSAGE_COUNT=20;
public static Channel getChannel() {
Channel channel = null;
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setPassword("guest");
connectionFactory.setUsername("guest");
Connection connection = connectionFactory.newConnection();
channel = connection.createChannel();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return channel;
}
}
消费者源码:
public class TopicConsumer1 {
private static String EXCHANGE_NAME = "topic_logs";
private static String QUEUE_NAME = "Q1";
public static void main(String[] args) {
try (Channel channel = RabbitMQUtil.getChannel();){
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "q1.#");
System.out.println("waiting for message ......");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody(), "UTF-8"));
System.out.println("接收队列:" + QUEUE_NAME + " 绑定键:" + message.getEnvelope().getRoutingKey());
};
//接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
生产者源码
public class TopicPublisher {
private static String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) {
Channel channel = RabbitMQUtil.getChannel();
try{
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("q1.q2.rabbit","q2.q1.rabbit");
bindingKeyMap.put("aa.q2.elephant","aa.q1.elephant");
bindingKeyMap.put("q1.orange.fox","q2.orange.fox");
bindingKeyMap.put("lazy.brown.q3","lazy.brown.q3");
bindingKeyMap.put("q1.q2.q3","q1.q2.q3");
for (Map.Entry<String, String> bindingKeyEntry:bindingKeyMap.entrySet()){
String bindingKey =bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey, null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
-
先启动消费者,声明交换机,但是这一次
消息会丢失
进入 页面管理界面 http://127.0.0.1:15672/
运行后没有队列
-
然后关闭生产者,这次运行只是声明交换机,关闭之后,通道消失。这也是重点
-
接下来启动消费者,等待消息消费
-
注释生产者中交换机声明,再次运行,发布消息
// 这次运行把它注释掉
//channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
那么问题来了,消费者处于运行状态,但是接收不到消息,why?
三、原因分析:
消息是否能够被消费,需要从以下几点考虑:
- 通道已建立
- 队列有消息(从上图看,没问题)
- 该队列消息从归属问题来看,可交给消费者。(队列就是它声明的,没问题)
- 服务没挂(后端能进去,没挂)
这么来看,只能去排查第一点,通道是否还在开启
看似有一个通道,可是问题是我们生产者也在运行,那么应该需要两个通道,从通道信息也可以看出,这是生产者的通道。关闭生产者即可确认,此时消费者的通道已经被关闭。
为什么会自己关掉呢,回看源码,我们将通道建立写在了 try() 中,这就是问题所在。
try (Channel channel = RabbitMQUtil.getChannel();){
在try() 圆括号中的内容一般是需要自动关闭的资源,那么可以猜测,这是被这种机制自动关闭了。
四、解决方案:
取消 try 自动关闭通道,把它写在外面。文章来源:https://www.toymoban.com/news/detail-639684.html
public static void main(String[] args) {
Channel channel = RabbitMQUtil.getChannel();
try {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "q1.#");
System.out.println("waiting for message ......");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody(), "UTF-8"));
System.out.println("接收队列:" + QUEUE_NAME + " 绑定键:" + message.getEnvelope().getRoutingKey());
};
//接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
} catch (IOException e) {
e.printStackTrace();
}
}
文章来源地址https://www.toymoban.com/news/detail-639684.html
到了这里,关于RabbitMQ中channel与try()结合导致的消息不消费或消息无法接收的问题分析与定位的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!