1.从源码了解mq启动配置流程
1.1配置启动入口
1.1.1从factories我们可以看到mq的启动配置类
1.1.2然后我们找到 RabbitAutoConfiguration
,发现它引入了RabbitAnnotationDrivenConfiguration
这个配置类
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
}
1.1.3进入RabbitAnnotationDrivenConfiguration
滑到最低部看到这里引入了@EnableRabbit
这个注解,找个注解里面又引出RabbitBootstrapConfiguration
这个配置类
@EnableRabbit
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableRabbitConfiguration {
}
//---------------------------------------------
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}
1.1.4这里定义了两个bean,其中RabbitListenerEndpointRegistry
就是监听容器的注册操作实现类
@Configuration
public class RabbitBootstrapConfiguration {
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
return new RabbitListenerAnnotationBeanPostProcessor();
}
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
}
1.1.5RabbitListenerEndpointRegistry
里面有获取所有容器的方法getListenerContainerIds和注册监听容器的方法registerListenerContainer
/**
* Create a message listener container for the given {@link RabbitListenerEndpoint}.
* <p>This create the necessary infrastructure to honor that endpoint
* with regards to its configuration.
* @param endpoint the endpoint to add
* @param factory the listener factory to use
* @see #registerListenerContainer(RabbitListenerEndpoint, RabbitListenerContainerFactory, boolean)
*/
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
registerListenerContainer(endpoint, factory, false);
}
1.1.6触发监听容器的位置是在RabbitListenerEndpointRegistrar
类里面的bean初始化完成调用的钩子方法里面,注册所有listenercontain。ps:而RabbitListenerEndpointRegistrar类是RabbitListenerAnnotationBeanPostProcessor
的属性对象,RabbitListenerAnnotationBeanPostProcessor是在1.1.4哪里初始化的rabbitmq监听注解拓展对象
@Override
public void afterPropertiesSet() {
registerAllEndpoints();
}
//---------------------------------------
public class RabbitListenerAnnotationBeanPostProcessor
implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware,
SmartInitializingSingleton {
/**
* The bean name of the default {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}.
*/
public static final String DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "rabbitListenerContainerFactory";
private final RabbitListenerEndpointRegistrar registrar = new RabbitListenerEndpointRegistrar();
}
2.创建自定义消息处理对象和监听容器
2.1在自定义bean引入RabbitListenerEndpointRegistry
@Component
public class RabbitmqCustomerConfiguration {
@Resource
RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
}
2.2创建MyMessageListener类,实现消息监听接口
@Component
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println(new String(message.getBody()));
}
}
2.3新建SimpleRabbitListenerEndpoint,设置需要监听的队列名和自定义消息接收处理器
@Component
public class RabbitmqCustomerConfiguration {
@Resource
RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
@Autowired
MyMessageListener myMessageListener;
public void registryCustomerContain() {
SimpleRabbitListenerEndpoint simpleRabbitListenerEndpoint = new SimpleRabbitListenerEndpoint();
simpleRabbitListenerEndpoint.setMessageListener(myMessageListener);
simpleRabbitListenerEndpoint.setQueueNames("testQueue");
//第三个参数是否马上启动监听容器
rabbitListenerEndpointRegistry.registerListenerContainer(simpleRabbitListenerEndpoint,null,true);
}
}
ps:第二个参数的null为空的时候会调用default的factory。
从RabbitListenerEndpointRegistrar判断工厂是否为空,最后会根据containerFactoryBeanName获取
private RabbitListenerContainerFactory<?> resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) {
if (descriptor.containerFactory != null)
//*return descriptor.containerFactory;
}
else if (this.containerFactory != null) {
//* return this.containerFactory;
}
else if (this.containerFactoryBeanName != null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
this.containerFactory = this.beanFactory.getBean(
this.containerFactoryBeanName, RabbitListenerContainerFactory.class);
return this.containerFactory; // Consider changing this if live change of the factory is required
}
else {
throw new IllegalStateException("Could not resolve the " +
RabbitListenerContainerFactory.class.getSimpleName() + " to use for [" +
descriptor.endpoint + "] no factory was given and no default is set.");
}
}
而containerFactoryBeanName是来自RabbitListenerAnnotationBeanPostProcessor的文章来源:https://www.toymoban.com/news/detail-407311.html
public static final String DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "rabbitListenerContainerFactory";
也就是1.1.2里面提到的RabbitAnnotationDrivenConfiguration里面定义的bean文章来源地址https://www.toymoban.com/news/detail-407311.html
@Bean
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
2.4思路:运行rabbitListenerEndpointRegistry的stop和start重启刷新所有监听器,或者刷新容器会导致刷新
@Override
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
startIfNecessary(listenerContainer);
}
}
//----------------
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}
//----------------
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().equals(this.applicationContext)) {
this.contextRefreshed = true;
}
}
到了这里,关于springboot-rabbitmq 实现动态配置监听容器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!