RabbitMQ中channel与try()结合导致的消息不消费或消息无法接收的问题分析与定位

这篇具有很好参考价值的文章主要介绍了RabbitMQ中channel与try()结合导致的消息不消费或消息无法接收的问题分析与定位。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RabbitMQ中channel与try()结合导致的消息不消费或消息无法接收的问题分析与定位

项目场景:

使用 rabbitMQ 测试 topic 交换机的案例
关键信息
RabbitMQ、 try、消费者无法接收消息


一、 写在前面

想要直接参考解决方案,看最后一部分
想要看问题原因,看第三部分
想看如何分析,顺序浏览

  • 个人认为,交换机相对于队列数量更少,且与生产者更加相关,因此交给生产者声明更佳。
  • 一次声明之后,只要它还在,就不必重复声明。队列亦是如此,消费者声明后,只要还在,无需重复声明。
  • 如有错误,欢迎留言批评指正

二、问题描述

生产者声明topic交换机正常,消息发布正常
消费者运行无异常,但是无法消费消息
RabbitMQUtil源码

public class RabbitMQUtil {
    private static final int MESSAGE_COUNT=20;

    public static Channel getChannel()  {
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setPassword("guest");
            connectionFactory.setUsername("guest");
            Connection connection = connectionFactory.newConnection();
            channel = connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

        return channel;
    }
}

消费者源码

public class TopicConsumer1 {
    private static String EXCHANGE_NAME = "topic_logs";
    private static String QUEUE_NAME = "Q1";

    public static void main(String[] args) {
        try (Channel channel = RabbitMQUtil.getChannel();){
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "q1.#");
            System.out.println("waiting for message ......");
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(new String(message.getBody(), "UTF-8"));
                System.out.println("接收队列:" + QUEUE_NAME + "  绑定键:" + message.getEnvelope().getRoutingKey());
            };
            //接收消息
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

生产者源码

public class TopicPublisher {
    private static String EXCHANGE_NAME="topic_logs";
    public static void main(String[] args) {
        Channel channel = RabbitMQUtil.getChannel();
            try{
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("q1.q2.rabbit","q2.q1.rabbit");
            bindingKeyMap.put("aa.q2.elephant","aa.q1.elephant");
            bindingKeyMap.put("q1.orange.fox","q2.orange.fox");
            bindingKeyMap.put("lazy.brown.q3","lazy.brown.q3");
            bindingKeyMap.put("q1.q2.q3","q1.q2.q3");

            for (Map.Entry<String, String> bindingKeyEntry:bindingKeyMap.entrySet()){
                String bindingKey =bindingKeyEntry.getKey();
                String message = bindingKeyEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME,bindingKey, null,message.getBytes("UTF-8"));
                System.out.println("生产者发出消息" + message);
            }
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}
  1. 先启动消费者,声明交换机,但是这一次消息会丢失
    进入 页面管理界面 http://127.0.0.1:15672/

rabbitmq消费者收不到消息,技术问题,java-rabbitmq,rabbitmq,java
运行后没有队列

  1. 然后关闭生产者,这次运行只是声明交换机,关闭之后,通道消失。这也是重点
    rabbitmq消费者收不到消息,技术问题,java-rabbitmq,rabbitmq,java

  2. 接下来启动消费者,等待消息消费
    rabbitmq消费者收不到消息,技术问题,java-rabbitmq,rabbitmq,javarabbitmq消费者收不到消息,技术问题,java-rabbitmq,rabbitmq,java

  3. 注释生产者中交换机声明,再次运行,发布消息

// 这次运行把它注释掉
 //channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); 

rabbitmq消费者收不到消息,技术问题,java-rabbitmq,rabbitmq,java
那么问题来了,消费者处于运行状态,但是接收不到消息,why?
rabbitmq消费者收不到消息,技术问题,java-rabbitmq,rabbitmq,java


三、原因分析:

消息是否能够被消费,需要从以下几点考虑:

  1. 通道已建立
  2. 队列有消息(从上图看,没问题)
  3. 该队列消息从归属问题来看,可交给消费者。(队列就是它声明的,没问题)
  4. 服务没挂(后端能进去,没挂)

这么来看,只能去排查第一点,通道是否还在开启
rabbitmq消费者收不到消息,技术问题,java-rabbitmq,rabbitmq,java
看似有一个通道,可是问题是我们生产者也在运行,那么应该需要两个通道,从通道信息也可以看出,这是生产者的通道。关闭生产者即可确认,此时消费者的通道已经被关闭。

为什么会自己关掉呢,回看源码,我们将通道建立写在了 try() 中,这就是问题所在。

 try (Channel channel = RabbitMQUtil.getChannel();){

在try() 圆括号中的内容一般是需要自动关闭的资源,那么可以猜测,这是被这种机制自动关闭了。


四、解决方案:

取消 try 自动关闭通道,把它写在外面。

public static void main(String[] args) {
        Channel channel = RabbitMQUtil.getChannel();
        try {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "q1.#");
            System.out.println("waiting for message ......");
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(new String(message.getBody(), "UTF-8"));
                System.out.println("接收队列:" + QUEUE_NAME + "  绑定键:" + message.getEnvelope().getRoutingKey());
            };
            //接收消息
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

rabbitmq消费者收不到消息,技术问题,java-rabbitmq,rabbitmq,java文章来源地址https://www.toymoban.com/news/detail-639684.html

到了这里,关于RabbitMQ中channel与try()结合导致的消息不消费或消息无法接收的问题分析与定位的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Rabbitmq 消费者 : Exception (504) Reason: “channel/connection is not open“

    启动微服务 Rabbitmq 时,定位 ch.consume() 出现异常 一、问题分析 分析日志:no exchange ‘log_topic’ 二、检查代码 consumer.go 中,ch.QueueBind 绑定在 “log_topic” event.go 中,ch.ExchangeDeclare 配置的是 “logs_topic” 三、优化代码 定义一个常量 channelName,代码中使用这个常量

    2024年02月13日
    浏览(25)
  • 90、RabbitMQ如何确保消息发送?消息接收?

    信道需要设置为 confirm 模式,则所有在信道上发布的消息都会分配一个唯一 ID。 一旦消息被投递到queue(可持久化的消息需要写入磁盘),信道会发送一个确认给生产者(包含消息唯一 ID) 如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack (未确认) 消息给生产者。

    2024年02月16日
    浏览(34)
  • 【RabbitMQ】RabbitMQ如何确认消息被消费、以及保证消息的幂等

    目录 一、如何保证消息被消费 二、如何保证消息幂等性 RabbitMQ提供了消息补偿机制来保证消息被消费,当一条消费被发送后,到达队列后发给消费者。消费者消费成功后会给MQ服务器的队列发送一个确认消息,此时会有一个回调检测服务监听该接收确认消息的队列,然将消费

    2024年02月16日
    浏览(30)
  • RabbitMQ消费端消费能力不足,消息消费慢问题解决思路

    思路:(易-难) 1.新增服务器,增加消费端数量。 2.服务器端开启多线程消费消息(默认单线程监听队列),需要注意多线程可能带来的并发问题。 3.优化消费端处理逻辑,提升消息消费速率即系统优化。 背景: 本系统为小型支付系统,商户618活动期间,用户购买商品付款

    2024年02月13日
    浏览(74)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制_rabbitmq 生产者消息确认

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新大数据全套学习资料》,

    2024年04月26日
    浏览(74)
  • Centos安装RabbitMQ,JavaSpring发送RabbitMQ延迟延时消息,JavaSpring消费RabbitMQ消息

    erlang 和 rabbitmq 版本说明 https://www.rabbitmq.com/which-erlang.html 确认需要安装的mq版本以及对应的erlang版本。 RabbitMQ下载地址: https://packagecloud.io/rabbitmq/rabbitmq-server Erlang下载地址: https://packagecloud.io/rabbitmq/erlang RabbitMQ延迟消息插件下载 https://github.com/rabbitmq/rabbitmq-delayed-message-exc

    2024年02月08日
    浏览(41)
  • SpringBoot: RabbitMQ消息队列之同时消费多条消息

    1. basicQos预取方法参数解析 basicQos(int prefetchCount) basicQos(int prefetchCount, boolean global) basicQos(int prefetchSize, int prefetchCount, boolean global) 参数: prefetchSize:可接收消息的大小 prefetchCount:处理消息最大的数量。 global:是不是针对整个Connection的,因为一个Connection可以有多个Channel,如

    2024年02月01日
    浏览(56)
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制

    在现代分布式应用程序中,消息队列扮演了至关重要的角色,允许系统中的各个组件之间进行异步通信。这种通信模式提供了高度的灵活性和可伸缩性,但也引入了一系列的挑战,其中最重要的之一是消息的可靠性。 首先让我们来了解一下,在消息队列中,消息从生产者发送

    2024年02月05日
    浏览(40)
  • 如何使用RabbitMQ发送和接收消息

    本文介绍了如何使用RabbitMQ的Python客户端库pika来发送和接收消息,并提供了示例代码。读者可以根据自己的需求修改代码,例如修改队列名称、发送不同的消息等。 RabbitMQ 是一个开源的消息队列软件,可以用于在应用程序之间传递消息。下面是一个使用 RabbitMQ 的流程和代码

    2024年02月15日
    浏览(36)
  • 消息队列-RabbitMQ:MQ作用分类、RabbitMQ核心概念及消息生产消费调试

    1)什么是 MQ MQ (message queue),从字面意思上看, 本质是个队列,FIFO 先入先出 ,只不过队列中存放的内容是 message 而已,还是一种 跨进程的通信机制 , 用于上下游传递消息 。在互联网架构中,MQ 是一种非常常见的上下游 “ 逻辑解耦 + 物理解耦” 的消息通信服务 。 使用了

    2024年02月20日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包