SpringBoot如何与RabbitMQ建立连接

这篇具有很好参考价值的文章主要介绍了SpringBoot如何与RabbitMQ建立连接。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一. 引言

本文主要描述SpringBoot如何与RabbitMQ建立连接,建立多少个连接,以及如何接收消息

二. 相关概念

  • ConnectionFactory接口,是客户端与RabbitMQ服务器的tcp socket连接工厂,负责根据服务器地址创建Connection
  • Connection是客户端与RabbitMQ服务器的socket连接,它封装了socket协议相关的逻辑(比如接收和发送消息)
  • Channel是RabbitMQ客户端与服务器交互的最重要的一个接口,大部分的业务操作是在Channel接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息、接收消息等。Channel之间是完全隔离的

Channel与Connection的区别?

  • 职责不同。Channel负责封装业务,Connection负责封装Socket连接
  • 多线程和性能因素。建立和销毁一个Connection的开销比较大,而Channel是一个建立在Connection内部的逻辑连接,建立和销毁的成本比较低。

注意:在Connection底层,当有多个Channel发送消息时,也是排队发送的。

三. 连接代码实现

  • 在https://blog.csdn.net/qq_43216019/article/details/128824328?spm=1001.2014.3001.5501中介绍了如何实现消息接收功能,其中RabbitListenerAnnotationBeanPostProcessor中类中,没扫描到一个消息接收方法,就定义为如下实例:

    RabbitListenerEndpoint(接口)
    SimpleRabbitListenerEndpoint(实现:表示RabbitListenerConfigurer接口所实现类的endPoint
    MethodRabbitListenerEndpoint(实现:表示@RabbitListener+@RabbitHandler注解方法对应的endpoint))
    
  • 同时在每个RabbitListenerAnnotationBeanPostProcessor类中,对于每一个endpoint,最终都会回调RabbitListenerEndpointRegistery的doStart方法,伪代码:

    public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory, boolean startImmediately) {
            synchronized(this.listenerContainers) {
          		//创建SimpleMessageListenerContainer实例
                MessageListenerContainer container = this.createListenerContainer(endpoint, factory);
                this.listenerContainers.put(id, container);
                if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
                    Object containerGroup;
                    if (this.applicationContext.containsBean(endpoint.getGroup())) {
                        containerGroup = (List)this.applicationContext.getBean(endpoint.getGroup(), List.class);
                    } else {
                        containerGroup = new ArrayList();
                        this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
                    }
    
                    ((List)containerGroup).add(container);
                }
    
                if (this.contextRefreshed) {
                    container.lazyLoad();
                }
    						//启动调用SimpleMessageListenerContainer的start方法
                if (startImmediately) {
                    this.startIfNecessary(container);
                }
    
            }
        }
    
  • 在SimpleMessagelistenerContainer类中,start方法最终会调用的dostart方法,伪代码:

    protected void doStart() {
            Assert.state(!this.consumerBatchEnabled || this.getMessageListener() instanceof BatchMessageListener || this.getMessageListener() instanceof ChannelAwareBatchMessageListener, "When setting 'consumerBatchEnabled' to true, the listener must support batching");
            this.checkListenerContainerAware();
            super.doStart();
            synchronized(this.consumersMonitor) {
                if (this.consumers != null) {
                    throw new IllegalStateException("A stopped container should not have consumers");
                } else {
                //根据@RabbitListener注解中定义的concurrency数量初始化consumer的个数
                对于每个consumer生成一个BlockingQueueConsumer实例
                    int newConsumers = this.initializeConsumers();
                    if (this.consumers == null) {
                        this.logger.info("Consumers were initialized and then cleared (presumably the container was stopped concurrently)");
                    } else if (newConsumers <= 0) {
                        if (this.logger.isInfoEnabled()) {
                            this.logger.info("Consumers are already running");
                        }
    
                    } else {
                        Set<SimpleMessageListenerContainer.AsyncMessageProcessingConsumer> processors = new HashSet();
                        Iterator var4 = this.consumers.iterator();
    
                        while(var4.hasNext()) {
                        //对于每个BlockingQueueConsumer,生成AsyncMessageProcessingConsumer,并启动一个线程启动这个consumer,实际上调用了BlockingQueueConsumer实例的start方法
                            BlockingQueueConsumer consumer = (BlockingQueueConsumer)var4.next();
                            SimpleMessageListenerContainer.AsyncMessageProcessingConsumer processor = new SimpleMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
                            processors.add(processor);
                            this.getTaskExecutor().execute(processor);
                            if (this.getApplicationEventPublisher() != null) {
                                this.getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
                            }
                        }
    
                        this.waitForConsumersToStart(processors);
                    }
                }
            }
        }
    
  • BlockingQueueConsumer的start方法启动consumer

    public void start() throws AmqpException {
            
            try {
                this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory, this.transactional);
                this.channel = this.resourceHolder.getChannel();
                ClosingRecoveryListener.addRecoveryListenerIfNecessary(this.channel);
            } catch (AmqpAuthenticationException var2) {
                throw new FatalListenerStartupException("Authentication failure", var2);
            }
    
        }
    

​ 注意:每个监听线程都会调用以上方法创建connection和channel,那么connection和channel的数量岂不是一致?

​ CachingConnectionFactory bean的createConnection负责创建connection:

public final Connection createConnection() throws AmqpException {
    ......
        Object var1 = this.connectionMonitor;
        synchronized(this.connectionMonitor) {
            if (this.cacheMode == CachingConnectionFactory.CacheMode.CHANNEL) {
                //当第一个线程创建connection时,this.connection.target为空,所以调用父类方法创建真实的connection。
                //当第二个以上的线程创建connection时,直接返回之前创建好的connection。
                if (this.connection.target == null) {
                    this.connection.target = super.createBareConnection();
                    ......
                }
                return this.connection;
            } 
            ......
        }
    }
}

所以系统即使有多个线程,也仅仅创建了一个connection,而每个线程创建了自己独立的channel。

后续继续学习记录AMQP协议相关知识文章来源地址https://www.toymoban.com/news/detail-650899.html

到了这里,关于SpringBoot如何与RabbitMQ建立连接的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ保证消息的可靠投递,Java实现RabbitMQ消息的可靠投递,Springboot实现RabbitMQ消息的可靠投递

    我们先看一串代码,并思考一下为什么要先入库然后发MQ: 如果先发MQ的话,如果入库失败,就会导致MQ消息无法回滚了。今天我们就好好聊一聊RabbitMQ消息可靠投递的问题。 ① 消息从生产者发送到Broker 生产者把消息发送到Broker之后,如何知道自己的消息有没有被Broker成功接

    2024年02月11日
    浏览(39)
  • 微服务 02-rabbitmq在springboot中如何使用(上篇)

    目录 前言: 上文传送 - 安装rabbitmq传送门: - rabbitmq使用出现问题解决传送门: 1. rabbitmq的六大模式:  1.1 简单模式: (一对一) - 业务场景:  1.2 工作模式: (一对多) - 业务场景:  1.3 发布与订阅模式: (广播) 1.4 路由模式:   - 业务场景 1.5 主题模式: (路由升级版) - 业务场景 1.6 RPC异步

    2024年02月13日
    浏览(24)
  • 微服务: 03-rabbitmq在springboot中如何使用(下篇)

    目录 前言: 上文传送 4.六大模式实际操作(续) 4.4  路由模式:  --- 4.4.1 消费者配置类 --- 4.4.2 消费者代码 ---4.4.3 生产者代码 4.5  主题模式: (路由升级版) --- 4.5.1 消费者配置类 --- 4.5.2 消费者代码 --- 4.5.3 生产者代码 --- 4.5.4 测试效果 4.6  RPC异步调用模式(用的少)  --- 4.6.0 找了

    2024年02月12日
    浏览(26)
  • Springboot实战16 消息驱动:如何使用 RabbitTemplate 集成 RabbitMQ?

    15 讲我们介绍了基于 ActiveMQ 和 JmsTemplate 实现消息发送和消费,并重构了 SpringCSS 案例系统中的 account-service 和 customer-service 服务。 今天,我们将介绍另一款主流的消息中间件 RabbitMQ,并基于 RabbitTemplate 模板工具类为 SpringCSS 案例添加对应的消息通信机制。 AMQP 规范与 RabbitM

    2024年02月02日
    浏览(37)
  • springboot连接rabbitmq报错:Failed to check/redeclare auto-delete queue(s).

    springboot项目使用 spring-boot-starter-amqp 连接rabbitmq时出现报错: 这类问题是因为没有连接上rabbitmq导致的,一般可以的原因有如下几种: 1、springboot中的配置文件配置的不对,这其中又分为: (1)配置项格式出错,比如yml格式常见的配置项错位 (2)rabbitmq地址错误 (3)端口错

    2023年04月08日
    浏览(27)
  • 15年大牛用140多个实战案例深入讲解Java微服务架构实战:SpringBoot +SpringCloud +Docker +RabbitMQ

    第一部分,springboot篇; 第1章SpringBoot编程起步; 1.SpringBoot提倡的是一种简洁的开发模式,可保证用户不被大量的配置文件和依赖关系所困扰。 2.SpringBoot开发需要Maven或 Gradle构建工具支持。 3.SpringBoot使用一系列的注解来简化开发过程。 第2章SpringBoot程序开发; 1. SpringBoot的依赖

    2024年04月09日
    浏览(40)
  • RabbitMQ: SpringBoot 整合 RabbitMQ

    重点是这个依赖 通过              和上一个一样  

    2024年02月09日
    浏览(36)
  • 【RabbitMQ】RabbitMQ整合SpringBoot案例

    【RabbitMQ】消息队列-RabbitMQ篇章 RabbitMQ实现流程 2.1 实现架构总览 实现步骤: 1:创建生产者工程:sspringboot-rabbitmq-fanout-producer 2:创建消费者工程:springboot-rabbitmq-fanout-consumer 3:引入spring-boot-rabbitmq的依赖 4:进行消息的分发和测试 5:查看和观察web控制台的状况 2.2 具体实现

    2024年02月12日
    浏览(35)
  • RabbitMQ--03--SpringAMQP(SpringBoot集成RabbitMQ)

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 SpringBoot 已经提供了对 AMQP 协议完全支持的 spring-boot-starter-amqp 依赖,引入此依赖即可快速方便的在 SpringBoot 中使用 RabbitMQ。 https://spring.io/projects/spring-amqp RabbitTemplate 是 SpringBoot AMQP 提供的快速发 Rabbit

    2024年03月21日
    浏览(32)
  • RabbitMQ详细教程以及SpringBoot集成RabbitMQ

    目录 一、RabbitMQ简介 1.1、RabbitMQ主流的原因 1.2、RabbitMQ特点 1.3、常见MQ对比 1.3.1、ActiveMQ 1.3.2、RabbitMQ 1.3.3、RocketMQ 1.3.4、Kafka 1.4、AMQP协议 1.4.1、AMQP协议介绍 1.4.2、AMQP协议图解​编辑  1.4.3、AMQP协议的核心概念- Exchange 1.5、Exchange详细解析 1.5.1、Exchange的作用 1.5.2、Exchange四种

    2024年02月05日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包