springboot-rabbitmq 实现动态配置监听容器

这篇具有很好参考价值的文章主要介绍了springboot-rabbitmq 实现动态配置监听容器。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.从源码了解mq启动配置流程

1.1配置启动入口

1.1.1从factories我们可以看到mq的启动配置类
springboot-rabbitmq 实现动态配置监听容器

1.1.2然后我们找到 RabbitAutoConfiguration,发现它引入了RabbitAnnotationDrivenConfiguration这个配置类

@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
}

1.1.3进入RabbitAnnotationDrivenConfiguration滑到最低部看到这里引入了@EnableRabbit这个注解,找个注解里面又引出RabbitBootstrapConfiguration这个配置类

@EnableRabbit
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableRabbitConfiguration {

}

//---------------------------------------------


@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}

1.1.4这里定义了两个bean,其中RabbitListenerEndpointRegistry就是监听容器的注册操作实现类

@Configuration
public class RabbitBootstrapConfiguration {

	@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
		return new RabbitListenerAnnotationBeanPostProcessor();
	}

	@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
	public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
		return new RabbitListenerEndpointRegistry();
	}

}

1.1.5RabbitListenerEndpointRegistry里面有获取所有容器的方法getListenerContainerIds和注册监听容器的方法registerListenerContainer


	/**
	 * Create a message listener container for the given {@link RabbitListenerEndpoint}.
	 * <p>This create the necessary infrastructure to honor that endpoint
	 * with regards to its configuration.
	 * @param endpoint the endpoint to add
	 * @param factory the listener factory to use
	 * @see #registerListenerContainer(RabbitListenerEndpoint, RabbitListenerContainerFactory, boolean)
	 */
	public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
		registerListenerContainer(endpoint, factory, false);
	}

1.1.6触发监听容器的位置是在RabbitListenerEndpointRegistrar类里面的bean初始化完成调用的钩子方法里面,注册所有listenercontain。ps:而RabbitListenerEndpointRegistrar类是RabbitListenerAnnotationBeanPostProcessor的属性对象,RabbitListenerAnnotationBeanPostProcessor是在1.1.4哪里初始化的rabbitmq监听注解拓展对象

	@Override
	public void afterPropertiesSet() {
		registerAllEndpoints();
	}


//---------------------------------------
public class RabbitListenerAnnotationBeanPostProcessor
		implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware,
		SmartInitializingSingleton {

	/**
	 * The bean name of the default {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}.
	 */
	public static final String DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "rabbitListenerContainerFactory";


	private final RabbitListenerEndpointRegistrar registrar = new RabbitListenerEndpointRegistrar();
}

2.创建自定义消息处理对象和监听容器

2.1在自定义bean引入RabbitListenerEndpointRegistry

@Component
public class RabbitmqCustomerConfiguration {
    
    @Resource
    RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
    
 
}

2.2创建MyMessageListener类,实现消息监听接口


@Component
public class MyMessageListener implements  MessageListener {


    @Override
    public void onMessage(Message message) {
        System.out.println(new String(message.getBody()));

    }
}

2.3新建SimpleRabbitListenerEndpoint,设置需要监听的队列名和自定义消息接收处理器


@Component
public class RabbitmqCustomerConfiguration {

    @Resource
    RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

    @Autowired
    MyMessageListener myMessageListener;

    public void registryCustomerContain() {
        SimpleRabbitListenerEndpoint simpleRabbitListenerEndpoint = new SimpleRabbitListenerEndpoint();
        simpleRabbitListenerEndpoint.setMessageListener(myMessageListener);
        simpleRabbitListenerEndpoint.setQueueNames("testQueue");
        
		//第三个参数是否马上启动监听容器        
		rabbitListenerEndpointRegistry.registerListenerContainer(simpleRabbitListenerEndpoint,null,true);


    }

}

ps:第二个参数的null为空的时候会调用default的factory。
从RabbitListenerEndpointRegistrar判断工厂是否为空,最后会根据containerFactoryBeanName获取

	private RabbitListenerContainerFactory<?> resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) {
		if (descriptor.containerFactory != null) 
		//*return descriptor.containerFactory;
		}
		else if (this.containerFactory != null) {
			//*	return this.containerFactory;
		}
		else if (this.containerFactoryBeanName != null) {
			Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
			this.containerFactory = this.beanFactory.getBean(
					this.containerFactoryBeanName, RabbitListenerContainerFactory.class);
			return this.containerFactory;  // Consider changing this if live change of the factory is required
		}
		else {
			throw new IllegalStateException("Could not resolve the " +
					RabbitListenerContainerFactory.class.getSimpleName() + " to use for [" +
					descriptor.endpoint + "] no factory was given and no default is set.");
		}
	}

而containerFactoryBeanName是来自RabbitListenerAnnotationBeanPostProcessor的

public static final String DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "rabbitListenerContainerFactory";

也就是1.1.2里面提到的RabbitAnnotationDrivenConfiguration里面定义的bean文章来源地址https://www.toymoban.com/news/detail-407311.html

	@Bean
	@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
	public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
			SimpleRabbitListenerContainerFactoryConfigurer configurer,
			ConnectionFactory connectionFactory) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		configurer.configure(factory, connectionFactory);
		return factory;
	}

2.4思路:运行rabbitListenerEndpointRegistry的stop和start重启刷新所有监听器,或者刷新容器会导致刷新

	@Override
	public void start() {
		for (MessageListenerContainer listenerContainer : getListenerContainers()) {
			startIfNecessary(listenerContainer);
		}
	}



//----------------

	private void startIfNecessary(MessageListenerContainer listenerContainer) {
		if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
			listenerContainer.start();
		}
	}

//----------------


	@Override
	public void onApplicationEvent(ContextRefreshedEvent event) {
		if (event.getApplicationContext().equals(this.applicationContext)) {
			this.contextRefreshed = true;
		}
	}

到了这里,关于springboot-rabbitmq 实现动态配置监听容器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器)

    1、ContentUtil 先定义常量 2、RabbitMQConfig 创建队列的两种方式之一: 配置式: 在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。 就是在配置类中创建一个生成消息队列的@Bean。 问题: 用 @Configuration 注解声明为配置类,但是项目启动

    2024年02月06日
    浏览(57)
  • rabbitMq动态创建和监听队列

    在与第三方业务员系统对接时,需要根据第三方的信息的进行队列的创建,且个数不定,这就造成使用@RabbitListener来添加监听不方便。本文采用了当业务需要时,动态的创建队列和监听队列的方式,适合某个任务为一组的队列方式,需要考虑队列使用完成后的处理方式。 封装

    2024年02月13日
    浏览(50)
  • 【Java项目】使用Nacos实现动态线程池技术以及Nacos配置文件更新监听事件

    真诚的希望能给我项目一个stars!!! 项目源码 项目视频演示 线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如Tomcat。 线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性

    2024年02月09日
    浏览(39)
  • SpringBoot整合Canal+RabbitMQ监听数据变更

    需求 步骤 环境搭建 整合SpringBoot Canal实现客户端 Canal整合RabbitMQ SpringBoot整合RabbitMQ   我想要在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。 经过调研发现,使用Canal来监听MySQL的binlog变化可

    2024年02月11日
    浏览(46)
  • RabbitMQ之动态创建队列与绑定交换机和监听器

    为什么需要动态创建队列与绑定交换机?我在写项目的时候遇到这么个问题,我数据库中存在一个字段messageType指定为消息类型,消息类型存在三种,一种是通知类,一种是验证码类,一种是活动类。并且对应的,要将消息进行不同渠道的分发,还存在一个channelType,而他又存

    2024年02月03日
    浏览(37)
  • springboot - 实现动态刷新配置

    这里不加@Component,是因为: FilePropertiesSource filePropertiesSource = new FilePropertiesSource(); // 属性源是按照添加的顺序进行合并的,后添加的属性源中的属性会覆盖前面添加的属性源中的同名属性。 // 因此,为了确保我们自定义的属性源中的属性优先级最高,我们需要将它添加到属

    2024年02月01日
    浏览(30)
  • Spring项目配置文件中RabbitMQ监听器各个参数的作用

    spring.rabbitmq.listener.simple.concurrency :设置监听器容器的并发消费者数量,默认为1,即单线程消费。 spring.rabbitmq.listener.simple.max-concurrency :设置监听器容器的最大并发消费者数量。 spring.rabbitmq.listener.simple.prefetch :设置每个消费者从RabbitMQ服务器获取的消息数量,即每次从队列

    2024年02月16日
    浏览(40)
  • SpringCloud 整合 Canal+RabbitMQ+Redis 实现数据监听

    Canal 指的是阿里巴巴开源的数据同步工具,用于数据库的实时增量数据订阅和消费。它可以针对 MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的异构数据同步等情况进行实时增量数据同步。 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x Canal是如何同步数据库

    2024年02月03日
    浏览(52)
  • Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理

    关于Canal的介绍及原理不在此赘述,可自行查阅。笔者在使用Canal同步Mysql实时操作记录至RabbitMQ的过程中,也翻阅了一些大牛们的文章,可能是我使用的Canal版本与文中版本不一致,出现了一些问题,在此总结记录一下可行的方案。 注:本文使用的Canal为 v1.1.7 先查看目标数据

    2024年04月10日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包