Spring初始化顺序- RabbitMq 无法自动创建队列
项目中使用了RabbitMq, 并配置了自动创建topic, exchange,binding 等,但是通过测试发现,有一个队列始终无法自动创建,在对spring 源码以及rabbitmq 源码debug 后发现问题。
rabbitmq 配置了两套环境 , 以下为代码示例
@Configuration
public class RabbitMqConfiguration {
/**
* one mq配置
*/
@Bean(name = "oneRabbitMQProperties")
@ConfigurationProperties(prefix = "spring.rabbitmq.one")
public RabbitMQProperties oneRabbitMQProperties() {
return new RabbitMQProperties();
}
@Primary
@Bean(name = "oneRabbitConnectionFactory")
public ConnectionFactory oneRabbitConnectionFactory(@Qualifier("oneRabbitMQProperties") RabbitMQProperties rabbitMQProperties) {
return connectionFactory(rabbitMQProperties, false);
}
@Primary
@Bean("oneRabbitMqAdmin")
public RabbitAdmin oneRabbitMqAdmin(@Qualifier("oneRabbitConnectionFactory") ConnectionFactory oneRabbitConnectionFactory){
return new RabbitAdmin(oneRabbitConnectionFactory);
}
@Primary
@Bean(name = "oneRabbitTemplate")
public RabbitTemplate oneRabbitTemplate(@Qualifier("oneRabbitConnectionFactory") ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
/**
* two mq配置
*/
@Bean(name = "twoRabbitMQProperties")
@ConfigurationProperties(prefix = "spring.rabbitmq.two")
public RabbitMQProperties twoRabbitMQProperties() {
return new RabbitMQProperties();
}
@Bean(name = "twoRabbitConnectionFactory")
public ConnectionFactory twoRabbitConnectionFactory(@Qualifier("twoRabbitMQProperties") RabbitMQProperties rabbitMQProperties) {
return connectionFactory(rabbitMQProperties, false);
}
@Bean("twoRabbitMqAdmin")
public RabbitAdmin twoRabbitMqAdmin(@Qualifier("twoRabbitConnectionFactory") ConnectionFactory twoRabbitConnectionFactory){
return new RabbitAdmin(twoRabbitConnectionFactory);
}
@Bean(name = "twoRabbitTemplate")
public RabbitTemplate twoRabbitTemplate(@Qualifier("twoRabbitConnectionFactory") ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
private ConnectionFactory connectionFactory(RabbitMQProperties rabbitMQProperties, boolean transaction) {
CachingConnectionFactory factory = new CachingConnectionFactory(rabbitMQProperties.getHost(), rabbitMQProperties.getPort());
factory.setUsername(rabbitMQProperties.getUsername());
factory.setPassword(rabbitMQProperties.getPassword());
factory.setPublisherConfirms(!transaction);
factory.setPublisherReturns(true);
factory.setVirtualHost(rabbitMQProperties.getVirtualHost());
return factory;
}
Queue, Exchange, Binding 自动生成配置:
@Configuration
public class RabbitMqQueueConfiguration {
@Bean
public Queue oneQueue(@Qualifier("oneRabbitMqAdmin") RabbitAdmin oneRabbitMqAdmin) {
Queue queue = new Queue("one_auto_test");
queue.setAdminsThatShouldDeclare(oneRabbitMqAdmin);
return queue;
}
@Bean
public DirectExchange oneExchange(@Qualifier("oneRabbitMqAdmin") RabbitAdmin oneRabbitMqAdmin) {
DirectExchange exchange = new DirectExchange("one_auto_test");
exchange.setAdminsThatShouldDeclare(oneRabbitMqAdmin);
return exchange;
}
@Bean
public Binding oneBinding(Queue oneQueue, DirectExchange oneExchange, @Qualifier("oneRabbitMqAdmin") RabbitAdmin oneRabbitMqAdmin) {
Binding binding = BindingBuilder.bind(oneQueue).to(oneExchange).with("one_auto_test");
binding.setAdminsThatShouldDeclare(oneRabbitMqAdmin);
return binding;
}
@Bean
public Queue twoQueue(@Qualifier("twoRabbitMqAdmin") RabbitAdmin oneRabbitMqAdmin) {
Queue queue = new Queue("two_auto_test");
queue.setAdminsThatShouldDeclare(oneRabbitMqAdmin);
return queue;
}
@Bean
public DirectExchange twoExchange(@Qualifier("twoRabbitMqAdmin") RabbitAdmin oneRabbitMqAdmin) {
DirectExchange exchange = new DirectExchange("two_auto_test");
exchange.setAdminsThatShouldDeclare(oneRabbitMqAdmin);
return exchange;
}
@Bean
public Binding twoBinding(Queue twoQueue, DirectExchange twoExchange, @Qualifier("twoRabbitMqAdmin") RabbitAdmin oneRabbitMqAdmin) {
Binding binding = BindingBuilder.bind(twoQueue).to(twoExchange).with("two_auto_test");
binding.setAdminsThatShouldDeclare(oneRabbitMqAdmin);
return binding;
}
}
通过运行项目,发现队列,交换机,绑定关系创建有问题
2023-03-21 17:54:06.860 TRACE 16056 --- \[ main] o.s.b.f.s.DefaultListableBeanFactory : Ignoring match to currently created bean 'twoExchange': Error creating bean with name 'twoExchange': Requested bean is currently in creation: Is there an unresolvable circular reference?
这个问题非常头疼,日志级别是TRACE 才能看到(Spring 5.0.6 日志级别为DEBUG), 而且在我们配置代码中的断点都可以进入,但是就是无法成功创建。下面我根据自己排查问题的步骤做一次分享记录。
首先当队列、交换机、绑定关系无法创建时,首先我怀疑是关于rabbitmq 的配置出现问题,导致无法创建。而配置类中@Bean 声明后按常理会创建队列,那么肯定在rabbitmq 中有一个步骤就是获取IOC容器中的 Queue,Exchange,Binding 来进行相关处理。
第一个关键角色登场:RabbitAdmin
// 在RabbitAdmin 实现了 InitializingBean ,afterPropertiesSet 会在 属性填充后执行
@Override
public void afterPropertiesSet() {
synchronized (this.lifecycleMonitor) {
if (this.running || !this.autoStartup) {
return;
}
if (this.connectionFactory instanceof CachingConnectionFactory &&
((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
return;
}
// Prevent stack overflow...
final AtomicBoolean initializing = new AtomicBoolean(false);
this.connectionFactory.addConnectionListener(connection -> {
if (!initializing.compareAndSet(false, true)) {
// If we are already initializing, we don't need to do it again...
return;
}
try {
/*
* ...but it is possible for this to happen twice in the same ConnectionFactory (if more than
* one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network
* chatter). In fact it might even be a good thing: exclusive queues only make sense if they are
* declared for every connection. If anyone has a problem with it: use auto-startup="false".
*/
initialize();
}
finally {
initializing.compareAndSet(true, false);
}
});
this.running = true;
}
}
public void initialize() {
if (this.applicationContext == null) {
this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
return;
}
this.logger.debug("Initializing declarations");
Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
this.applicationContext.getBeansOfType(Exchange.class).values());
Collection<Queue> contextQueues = new LinkedList<Queue>(
this.applicationContext.getBeansOfType(Queue.class).values());
Collection<Binding> contextBindings = new LinkedList<Binding>(
this.applicationContext.getBeansOfType(Binding.class).values());
@SuppressWarnings("rawtypes")
Collection<Collection> collections = this.declareCollections
? this.applicationContext.getBeansOfType(Collection.class, false, false).values()
: Collections.emptyList();
for (Collection<?> collection : collections) {
if (collection.size() > 0 && collection.iterator().next() instanceof Declarable) {
for (Object declarable : collection) {
if (declarable instanceof Exchange) {
contextExchanges.add((Exchange) declarable);
}
else if (declarable instanceof Queue) {
contextQueues.add((Queue) declarable);
}
else if (declarable instanceof Binding) {
contextBindings.add((Binding) declarable);
}
}
}
}
final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);
final Collection<Queue> queues = filterDeclarables(contextQueues);
final Collection<Binding> bindings = filterDeclarables(contextBindings);
for (Exchange exchange : exchanges) {
if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
+ exchange.getName()
+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
+ "reopening the connection.");
}
}
for (Queue queue : queues) {
if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
+ queue.getName()
+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
+ queue.isExclusive() + ". "
+ "It will be redeclared if the broker stops and is restarted while the connection factory is "
+ "alive, but all messages will be lost.");
}
}
if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
this.logger.debug("Nothing to declare");
return;
}
this.rabbitTemplate.execute(channel -> {
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
declareQueues(channel, queues.toArray(new Queue[queues.size()]));
declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
return null;
});
this.logger.debug("Declarations finished");
}
在RabbitAdmin 中可以看到暴露队列、交换机、绑定关系的相关处理逻辑。此处打断点后,可以发现无法全量获取到的交换机、绑定关系、队列。
那为什么没有全量获取呢?下面我们对getBeansOfType 方法进行剖析。重点关注下面代码:
此处是根据类型在IOC 容器中获取相关类型的Bean。
@Override
@SuppressWarnings("unchecked")
public <T> Map<String, T> getBeansOfType(
@Nullable Class<T> type, boolean includeNonSingletons, boolean allowEagerInit) throws BeansException {
String[] beanNames = getBeanNamesForType(type, includeNonSingletons, allowEagerInit);
Map<String, T> result = CollectionUtils.newLinkedHashMap(beanNames.length);
for (String beanName : beanNames) {
try {
Object beanInstance = getBean(beanName);
if (!(beanInstance instanceof NullBean)) {
result.put(beanName, (T) beanInstance);
}
}
catch (BeanCreationException ex) {
Throwable rootCause = ex.getMostSpecificCause();
if (rootCause instanceof BeanCurrentlyInCreationException) {
BeanCreationException bce = (BeanCreationException) rootCause;
String exBeanName = bce.getBeanName();
if (exBeanName != null && isCurrentlyInCreation(exBeanName)) {
if (logger.isTraceEnabled()) {
logger.trace("Ignoring match to currently created bean '" + exBeanName + "': " +
ex.getMessage());
}
onSuppressedException(ex);
// Ignore: indicates a circular reference when autowiring constructors.
// We want to find matches other than the currently created bean itself.
continue;
}
}
throw ex;
}
}
return result;
}
getBeansOfType 是先根据类型获取BeanName ,在通过BeanName去实例化,初始化Bean, 但是在这一步会出现错误。
2023-03-21 17:54:06.860 TRACE 16056 --- [ main] o.s.b.f.s.DefaultListableBeanFactory : Ignoring match to currently created bean 'twoExchange': Error creating bean with name 'twoExchange': Requested bean is currently in creation: Is there an unresolvable circular reference?
通过debug 不难发现上述错误是在 DefaultSingletonBeanRegistry.beforeSingletonCreation 方法中抛出的异常。
protected void beforeSingletonCreation(String beanName) {
if (!this.inCreationCheckExclusions.contains(beanName) && !this.singletonsCurrentlyInCreation.add(beanName)) {
throw new BeanCurrentlyInCreationException(beanName);
}
}
beforeSingletonCreation 就是在校验是否在重复创建bean 。表明现在有bean 正在重复创建。
此处大概已经明确了是bean的创建顺序导致有部分bean在重复创建,导致rabbitadmin 无法拿到全部的队列、交换机、绑定关系从而无法自动创建队列等。
接着,我们从Spring开始创建bean 的代码入手,
DefaultListableBeanFactory.preInstantiateSingletons
public void preInstantiateSingletons() throws BeansException {
if (logger.isTraceEnabled()) {
logger.trace("Pre-instantiating singletons in " + this);
}
// Iterate over a copy to allow for init methods which in turn register new bean definitions.
// While this may not be part of the regular factory bootstrap, it does otherwise work fine.
List<String> beanNames = new ArrayList<>(this.beanDefinitionNames);
// Trigger initialization of all non-lazy singleton beans...
for (String beanName : beanNames) {
RootBeanDefinition bd = getMergedLocalBeanDefinition(beanName);
if (!bd.isAbstract() && bd.isSingleton() && !bd.isLazyInit()) {
if (isFactoryBean(beanName)) {
Object bean = getBean(FACTORY_BEAN_PREFIX + beanName);
if (bean instanceof FactoryBean) {
FactoryBean<?> factory = (FactoryBean<?>) bean;
boolean isEagerInit;
if (System.getSecurityManager() != null && factory instanceof SmartFactoryBean) {
isEagerInit = AccessController.doPrivileged(
(PrivilegedAction<Boolean>) ((SmartFactoryBean<?>) factory)::isEagerInit,
getAccessControlContext());
}
else {
isEagerInit = (factory instanceof SmartFactoryBean &&
((SmartFactoryBean<?>) factory).isEagerInit());
}
if (isEagerInit) {
getBean(beanName);
}
}
}
else {
getBean(beanName);
}
}
}
// Trigger post-initialization callback for all applicable beans...
for (String beanName : beanNames) {
Object singletonInstance = getSingleton(beanName);
if (singletonInstance instanceof SmartInitializingSingleton) {
StartupStep smartInitialize = this.getApplicationStartup().start("spring.beans.smart-initialize")
.tag("beanName", beanName);
SmartInitializingSingleton smartSingleton = (SmartInitializingSingleton) singletonInstance;
if (System.getSecurityManager() != null) {
AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
smartSingleton.afterSingletonsInstantiated();
return null;
}, getAccessControlContext());
}
else {
smartSingleton.afterSingletonsInstantiated();
}
smartInitialize.end();
}
}
}
可以看到Spring 是获取到所有的beanNames 然后循环创建bean
哦嚯,原来Queue, Exchange,Binding的顺序在RabbitAdmin之前,而创建队列的方法是需要依赖RabbitAdmin ,这就导致,在创建Queue时,会触发RabbitAdmin 的创建,而RabbitAdmin 初始化过程中,又会去扫描所有的Queue, Exchange,Binding,这时部分bean在创建中,导致RabbitAdmin 无法拿到bean, 也就无法进行自动创建。文章来源:https://www.toymoban.com/news/detail-543790.html
改进:将RabbitAdmin 改变为类属性注入,这样就避免了重复创建问题。但是并不是所有的项目都会出现此问题,此问题依赖于项目中bean的加载顺序。文章来源地址https://www.toymoban.com/news/detail-543790.html
到了这里,关于Spring初始化顺序- RabbitMq 无法自动创建队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!