springboot RabbitMQ客户端连接故障恢复

这篇具有很好参考价值的文章主要介绍了springboot RabbitMQ客户端连接故障恢复。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

最近做RabbitMQ故障演练发现RabbitMQ服务器停止后,基于springboot的消费端不可以自动的恢复,队列的消费者消失,消息一直积压到队列中,这种情况肯定是不可接收的;通过研究源代码找到了解决方案。

一、添加自动恢复配置automaticRecovery
 CachingConnectionFactory factory = new CachingConnectionFactory(connectionFactory);
cachingConnectionFactoryConfigurer.configure(factory);

//设置TCP连接超时时间,默认:60000ms
factory.getRabbitConnectionFactory().setConnectionTimeout(properties.getConnectionTimeout());
//启用或禁用连接自动恢复,默认:false
factory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(properties.isAutomaticRecovery());
//设置连接恢复时间间隔,默认:5000ms
factory.getRabbitConnectionFactory().setNetworkRecoveryInterval(properties.getNetworkRecoveryInterval());
//启用或禁用拓扑恢复,默认:true【拓扑恢复功能可以帮助消费者重新声明之前定义的队列、交换机和绑定等拓扑结构】
factory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(properties.isTopologyRecovery());
//替换默认异常处理DefaultExceptionHandler
factory.getRabbitConnectionFactory().setExceptionHandler(new DefaultMqExceptionHandler());
//添加连接监听器
factory.addConnectionListener(new DefaultMqConnectionListener(factory));

通过上述配置如果RabbitMQ服务器发生故障,则会自动重启恢复连接及队列的消费者,如果恢复失败则会间隔5000ms再次重试;在这里提一个问题,如果服务重试一直失败,重试的上限是多少?带着这个问题我们分析下源码。

二、RabbitMQ客户端实现连接的自动恢复功能

AutorecoveringConnection#beginAutomaticRecovery是在 RabbitMQ 客户端库层面实现的连接的自动恢复功能。当 RabbitMQ 连接出现故障时,它会尝试重新建立连接,以确保消息传递的可靠性。

  private synchronized void beginAutomaticRecovery() throws InterruptedException {
        //获取故障恢复连接的间隔时间,实际是设置的:networkRecoveryInterval
        final long delay = this.params.getRecoveryDelayHandler().getDelay(0);
        if (delay > 0)  {
           //等待指定的间隔时间
            this.wait(delay);
        }
				//调用恢复通知监听器
        this.notifyRecoveryListenersStarted();
				//获取恢复建立的连接对象
        final RecoveryAwareAMQConnection newConn = this.recoverConnection();
        //如果为null则直接返回
        if (newConn == null) {
            return;
        }
    //连接已经恢复建立,恢复监听器、channel等资源
        LOGGER.debug("Connection {} has recovered", newConn);
        this.addAutomaticRecoveryListener(newConn);
	    this.recoverShutdownListeners(newConn);
	    this.recoverBlockedListeners(newConn);
	    this.recoverChannels(newConn);
	    // don't assign new delegate connection until channel recovery is complete
	    this.delegate = newConn;
     //判断是否恢复拓扑结构,如果开启则开启拓扑结构恢复
	    if (this.params.isTopologyRecoveryEnabled()) {
	        notifyTopologyRecoveryListenersStarted();
	        recoverTopology(params.getTopologyRecoveryExecutor());
	    }
		this.notifyRecoveryListenersComplete();
    }

addAutomaticRecoveryListener自动恢复监听器

private void addAutomaticRecoveryListener(final RecoveryAwareAMQConnection newConn) {
    final AutorecoveringConnection c = this;
    // this listener will run after shutdown listeners,
    // see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
    RecoveryCanBeginListener starter = cause -> {
        try {
            if (shouldTriggerConnectionRecovery(cause)) {
                //开始自动回复
                c.beginAutomaticRecovery();
            }
        } catch (Exception e) {
            newConn.getExceptionHandler().handleConnectionRecoveryException(c, e);
        }
    };
    synchronized (this) {
        newConn.addRecoveryCanBeginListener(starter);
    }
}

init初始化

public void init() throws IOException, TimeoutException {
    //建立连接,否则抛出异常
    this.delegate = this.cf.newConnection();
    //自动回复监听器
    this.addAutomaticRecoveryListener(delegate);
}
三、消费端实现消息的消费和处理

SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run是应用程序层面实现消息的消费和处理,它负责从RabbitMQ中接收消息并进行相应的逻辑处理:

@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count
  ...

  try {
    //消费端初始化方法
    initialize();
    //当消费端是活跃状态,或者队列非空,或者消费端未被关闭则进入主循环
    while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
      mainLoop();
    }
  }
  catch (InterruptedException e) {
    ...
    }
}

消费端initialize初始化方法:

	private void initialize() throws Throwable { // NOSONAR
			try {
				redeclareElementsIfNecessary();
        //启动消费端初始化
				this.consumer.start();
				this.start.countDown();
			}
			catch (QueuesNotAvailableException e) {
				if (isMissingQueuesFatal()) {
					throw e;
				}
				else {
					this.start.countDown();
          //消费端启动异常等待处理
					handleStartupFailure(this.consumer.getBackOffExecution());
					throw e;
				}
			}
			catch (FatalListenerStartupException ex) {
				if (isPossibleAuthenticationFailureFatal()) {
					throw ex;
				}
				else {
					Throwable possibleAuthException = ex.getCause().getCause();
					if (!(possibleAuthException instanceof PossibleAuthenticationFailureException)) {
						throw ex;
					}
					else {
						this.start.countDown();
            //消费端启动异常等待处理
						handleStartupFailure(this.consumer.getBackOffExecution());
						throw possibleAuthException;
					}
				}
			}
			catch (Throwable t) { //NOSONAR
				this.start.countDown();
        //消费端启动异常等待处理
				handleStartupFailure(this.consumer.getBackOffExecution());
				throw t;
			}

			if (getTransactionManager() != null) {
				/*
				 * Register the consumer's channel so it will be used by the transaction manager
				 * if it's an instance of RabbitTransactionManager.
				 */
				ConsumerChannelRegistry.registerConsumerChannel(this.consumer.getChannel(), getConnectionFactory());
			}
		}

消费端异常等待处理处理:

protected void handleStartupFailure(BackOffExecution backOffExecution) {
   //获取等待时间间隔,参考FixedBackOff类实现
		long recoveryInterval = backOffExecution.nextBackOff();
		if (BackOffExecution.STOP == recoveryInterval) {
			synchronized (this) {
				if (isActive()) {
					logger.warn("stopping container - restart recovery attempts exhausted");
					stop();
				}
			}
			return;
		}
		try {
			if (logger.isDebugEnabled() && isActive()) {
				logger.debug("Recovering consumer in " + recoveryInterval + " ms.");
			}
      //当前时间加上等待时间
			long timeout = System.currentTimeMillis() + recoveryInterval;
      //如果当前时间小于等待时间,则休眠200毫秒,再次尝试
			while (isActive() && System.currentTimeMillis() < timeout) {
				Thread.sleep(RECOVERY_LOOP_WAIT_TIME);
			}
		}
		catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			throw new IllegalStateException("Irrecoverable interruption on consumer restart", e);
		}
	}

FixedBackOff回退等待时间类实现:

public class FixedBackOff implements BackOff {
   // 默认恢复重试间隔
    public static final long DEFAULT_INTERVAL = 5000L;
   //最大重试次数,可以认为无限大
    public static final long UNLIMITED_ATTEMPTS = Long.MAX_VALUE;
   // 默认恢复重试间隔
    private long interval = 5000L;
   //最大重试次数,可以认为无限大
    private long maxAttempts = Long.MAX_VALUE;

    public FixedBackOff() {
    }

    public FixedBackOff(long interval, long maxAttempts) {
        this.interval = interval;
        this.maxAttempts = maxAttempts;
    }

    public void setInterval(long interval) {
        this.interval = interval;
    }

    public long getInterval() {
        return this.interval;
    }

    public void setMaxAttempts(long maxAttempts) {
        this.maxAttempts = maxAttempts;
    }

    public long getMaxAttempts() {
        return this.maxAttempts;
    }

    public BackOffExecution start() {
        return new FixedBackOffExecution();
    }

    private class FixedBackOffExecution implements BackOffExecution {
        private long currentAttempts;

        private FixedBackOffExecution() {
            this.currentAttempts = 0L;
        }
				//获取下一次尝试的时间间隔,可以认为一直都是5000ms
        public long nextBackOff() {
            ++this.currentAttempts;
            return this.currentAttempts <= FixedBackOff.this.getMaxAttempts() ? FixedBackOff.this.getInterval() : -1L;
        }

        public String toString() {
            String attemptValue = FixedBackOff.this.maxAttempts == Long.MAX_VALUE ? "unlimited" : String.valueOf(FixedBackOff.this.maxAttempts);
            return "FixedBackOff{interval=" + FixedBackOff.this.interval + ", currentAttempts=" + this.currentAttempts + ", maxAttempts=" + attemptValue + '}';
        }
    }
}

总结:综上源码分析可知消费端故障恢复重试等待时间是5000ms,重试次数可以认为是无限制(Long最大值)

mainloop主循环逻辑:

		private void mainLoop() throws Exception { // NOSONAR Exception
			try {
				if (SimpleMessageListenerContainer.this.stopNow.get()) {
					this.consumer.forceCloseAndClearQueue();
					return;
				}
        //接收客户端发送过来的消息,至少获取一条
				boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
				if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
					checkAdjust(receivedOk);
				}
				long idleEventInterval = getIdleEventInterval();
				if (idleEventInterval > 0) {
					if (receivedOk) {
						updateLastReceive();
					}
					else {
						long now = System.currentTimeMillis();
						long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
						long lastReceive = getLastReceive();
						if (now > lastReceive + idleEventInterval
								&& now > lastAlertAt + idleEventInterval
								&& SimpleMessageListenerContainer.this.lastNoMessageAlert
								.compareAndSet(lastAlertAt, now)) {
							publishIdleContainerEvent(now - lastReceive);
						}
					}
				}
			}
			catch (ListenerExecutionFailedException ex) {
				// Continue to process, otherwise re-throw
				if (ex.getCause() instanceof NoSuchMethodException) {
					throw new FatalListenerExecutionException("Invalid listener", ex);
				}
			}
			catch (AmqpRejectAndDontRequeueException rejectEx) {
				/*
				 *  These will normally be wrapped by an LEFE if thrown by the
				 *  listener, but we will also honor it if thrown by an
				 *  error handler.
				 */
			}
		}

receiveAndExecute接收和处理消息:

private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONAR

		PlatformTransactionManager transactionManager = getTransactionManager();
		if (transactionManager != null) {
			try {
				if (this.transactionTemplate == null) {
					this.transactionTemplate =
							new TransactionTemplate(transactionManager, getTransactionAttribute());
				}
				return this.transactionTemplate
						.execute(status -> { // NOSONAR null never returned
							RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(
									new RabbitResourceHolder(consumer.getChannel(), false),
									getConnectionFactory(), true);
							// unbound in ResourceHolderSynchronization.beforeCompletion()
							try {
                //接收处理消息
								return doReceiveAndExecute(consumer);
							}
							catch (RuntimeException e1) {
								prepareHolderForRollback(resourceHolder, e1);
								throw e1;
							}
							catch (Exception e2) {
								throw new WrappedTransactionException(e2);
							}
						});
			}
			catch (WrappedTransactionException e) { // NOSONAR exception flow control
				throw (Exception) e.getCause();
			}
		}
		//接收处理消息
		return doReceiveAndExecute(consumer);

	}

调用具体的消息监听器消费消息:

	private void doExecuteListener(Channel channel, Object data) {
		if (data instanceof Message) {
			Message message = (Message) data;
			if (this.afterReceivePostProcessors != null) {
				for (MessagePostProcessor processor : this.afterReceivePostProcessors) {
					message = processor.postProcessMessage(message);
					if (message == null) {
						throw new ImmediateAcknowledgeAmqpException(
								"Message Post Processor returned 'null', discarding message");
					}
				}
			}
			if (this.deBatchingEnabled && this.batchingStrategy.canDebatch(message.getMessageProperties())) {
				this.batchingStrategy.deBatch(message, fragment -> invokeListener(channel, fragment));
			}
			else {
				invokeListener(channel, message);
			}
		}
		else {
			invokeListener(channel, data);
		}
	}

GitHub代码:https://github.com/mingyang66/spring-parent文章来源地址https://www.toymoban.com/news/detail-672700.html

到了这里,关于springboot RabbitMQ客户端连接故障恢复的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot集成Milo库实现OPC UA客户端:连接、遍历节点、读取、写入、订阅与批量订阅

    前面我们搭建了一个本地的 PLC 仿真环境,并通过 KEPServerEX6 读取 PLC 上的数据,最后还使用 UAExpert 作为OPC客户端完成从 KEPServerEX6 这个OPC服务器的数据读取与订阅功能。在这篇文章中,我们将通过 SpringBoot 集成 Milo 库实现一个 OPC UA 客户端,包括连接、遍历节点、读取、写入

    2024年02月09日
    浏览(44)
  • 13、MongoDB--通过 SpringBoot 整合 Spring Data MongoDB(【连接多个 MongoDB 服务器】、【高级定制 MongoDB 客户端】的简单介绍)

    放弃 Spring Boot 为 MongeDB 提供的自动配置,接下来同样要干如下事情: 手动配置多组 ReactiveMongoDatabaseFactory 和 ReactiveMongoTemplate,要连几个 MongoDB 服务器就配置几组。 同步 API 则使用 MongoDatabaseFactory 和 MongoTemplate。 针对不同 MongoDB 服务器,分别开发相应的 DAO 组件类,建议将它

    2024年03月19日
    浏览(48)
  • rabbitmq笔记-rabbitmq客户端开发使用

    1.创建ConnectionFactory,给定参数ip地址,端口号,用户名和密码等 2.创建ConnectionFactory,使用uri方式实现,创建channel。 注意: Connection可以用来创建多个channel实例,但channel实例不能在线程间共享,应用程序为每个线程开辟一个channel。多线程间共享channel实例是非线程安全的。

    2024年02月11日
    浏览(34)
  • RabbitMQ客户端清空所有消息

    打开RabbitMq管理页面,进入队列。 点击 Get messages Requeue 改成No Mesaages 设置一个值 点击Get messages 打开RabbitMq管理页面,进入队列。 点击Purge 点击按钮Purge Messages

    2024年02月08日
    浏览(40)
  • 【RabbitMQ】golang客户端教程1——HelloWorld

    本教程假设RabbitMQ已安装并运行在本机上的标准端口(5672)。如果你使用不同的主机、端口或凭据,则需要调整连接设置。如果你未安装RabbitMQ,可以浏览我上一篇文章Linux系统服务器安装RabbitMQ RabbitMQ是一个消息代理:它接受并转发消息。你可以把它想象成一个邮局:当你把

    2024年02月14日
    浏览(34)
  • RabbitMQ 教程 | 第3章 客户端开发向导

    👨🏻‍💻 热爱摄影的程序员 👨🏻‍🎨 喜欢编码的设计师 🧕🏻 擅长设计的剪辑师 🧑🏻‍🏫 一位高冷无情的编码爱好者 大家好,我是 DevOps 工程师 欢迎分享 / 收藏 / 赞 / 在看! 这篇 RabbitMQ 教程为学习者提供了全面的内容,从 RabbitMQ 的简介开始,涵盖了消息中间件的

    2024年02月15日
    浏览(49)
  • java socket Server TCP服务端向指定客户端发送消息;可查看、断开指定连接的客户端;以及设置客户端最大可连接数量。

    首先需要知道java里如何创建一个Socket服务器端。 提示:注意server.accept()方法调用会阻塞,只有新的客户端连接后才返回一个新的socket对象。如果一直未连接那么会一直处于阻塞状态 了解了如何创建一个socket服务器端后。那么如何实现给指定的连接客户端发送消息呢?首先我

    2024年02月11日
    浏览(46)
  • (四)「消息队列」之 RabbitMQ 路由(使用 .NET 客户端)

    先决条件 本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口( 5672 )上运行。如果您使用了不同的主机、端口或凭证,则要求调整连接设置。 获取帮助 如果您在阅读本教程时遇到问题,可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。 在上一篇教程中

    2024年02月17日
    浏览(29)
  • (五)「消息队列」之 RabbitMQ 主题(使用 .NET 客户端)

    先决条件 本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口( 5672 )上运行。如果您使用了不同的主机、端口或凭证,则要求调整连接设置。 获取帮助 如果您在阅读本教程时遇到问题,可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。 在上一篇教程中

    2024年02月16日
    浏览(46)
  • MongoDB之用户与权限管理、备份与恢复管理以及客户端工具的使用

    MongoDB默认不使用权限认证方式启动,但是需要设置权限以保证数据安全。 MongoDB是没有默认管理员账号,所以要先添加管理员账号,并且MongoDB服务器需要在运行的时候开启验证模式 语法格式: 创建root用户,角色为root 在使用的数据库上创建普通用户 若需要连接Mongodb进行认

    2024年02月07日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包