(18)不重启服务动态停止、启动RabbitMQ消费者

这篇具有很好参考价值的文章主要介绍了(18)不重启服务动态停止、启动RabbitMQ消费者。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

        我们在消费RabbitMQ消息的过程中,有时候可能会想先暂停消费一段时间,然后过段时间再启动消费者,这个需求怎么实现呢?我们可以借助RabbitListenerEndpointRegistry这个类来实现,它的全类名是org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry,通过这个类可以实现全部队列消息的启动、停止消费,也可以实现指定队列消息的启动、停止消费。具体的原因感兴趣的话可以参考一下我前面的这篇博客(17)不重启服务动态调整RabbitMQ消费者数量,里面有相应的源码分析。

停止、启动全部队列消费

        RabbitListenerEndpointRegistry类提供了start()方法和stop()方法,可以看到底层都是通过调用getListenerContainers()获取到所有队列的消费监听容器列表,然后遍历挨个调用对应的start()方法和stop()方法。

	@Override
	public void start() {
		for (MessageListenerContainer listenerContainer : getListenerContainers()) {
			startIfNecessary(listenerContainer);
		}
	}

	@Override
	public void stop() {
		for (MessageListenerContainer listenerContainer : getListenerContainers()) {
			listenerContainer.stop();
		}
	}

        我们只需要获取到RabbitListenerEndpointRegistry对象,然后调用其start()方法和stop()方法即可实现启动/停止所有队列消费。

        实现代码如下所示:

@Resource
RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

@RequestMapping(value = "/startStopAllConsumer")
@ApiOperation(value = "启动/暂停全部队列消息消费")
public Response startStopAllConsumer(@RequestParam(value = "consumeSwitch", required = true) boolean consumeSwitch) {
	log.info("启动/暂停全部队列消息消费,consumeSwitch:{}",consumeSwitch);
	if(consumeSwitch){
		rabbitListenerEndpointRegistry.start();
	}else {
		rabbitListenerEndpointRegistry.stop();
	}
	return Response.success();
}

        传入开关参数为false,会停止所有队列消费者消费,调用后控制台看到如下日志

2023-09-04 19:43:11.480 +0800 [TID: N/A] [http-nio-8080-exec-4] INFO  c.b.t.m.p.w.PayCashierMockController:67 - 启动/暂停全部队列消息消费,consumeSwitch:false
2023-09-04 19:43:11.556 +0800 [TID: N/A] [http-nio-8080-exec-4] INFO  o.s.a.r.l.SimpleMessageListenerContainer:586 - Waiting for workers to finish.
2023-09-04 19:43:12.352 +0800 [TID: N/A] [http-nio-8080-exec-4] INFO  o.s.a.r.l.SimpleMessageListenerContainer:589 - Successfully waited for workers to finish.
可以看到消息监听容器关闭的日志,然后再传入开关参数为true,调用后会启动所有队列消息消费。

停止、启动指定队列消费

        上面提到了RabbitListenerEndpointRegistry.getListenerContainers()可以获取到所有队列的消费监听容器列表,我们可以使用MessageListenerContainer中获取消费的队列名进行判断,以实现指定队列的停止、启动消费。

        实现代码如下所示:

@Resource
RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

@RequestMapping(value = "/startStopConsumer")
@ApiOperation(value = "启动/暂停指定队列消息消费")
public Response startStopConsumer(@RequestParam(value = "queueName", required = false) String queueName,
									@RequestParam(value = "consumeSwitch", required = true) boolean consumeSwitch) {
	log.info("启动/暂停指定队列消息消费,consumeSwitch:{},queueName:{}",consumeSwitch,queueName);
	//获取所有消息监听容器
	Collection<MessageListenerContainer> listenerContainers = rabbitListenerEndpointRegistry.getListenerContainers();
	for (MessageListenerContainer container : listenerContainers) {
		SimpleMessageListenerContainer con = (SimpleMessageListenerContainer) container;
		//消息监听容器要消费的队列名称集合
		List<String> queueNamesList = Arrays.asList(con.getQueueNames());
		//判断容器中的队列名称是否包含需要调整的队列名参数
		if (queueNamesList.contains(queueName)) {
			if(consumeSwitch){
				con.start();
			}else{
				con.stop();
			}
		}
	}
	return Response.success();
}

传入开关参数为false,停止pay_work_notify队列消费者消费,调用后控制台看到如下日志

2023-09-04 19:51:37.130 +0800 [TID: N/A] [http-nio-8080-exec-1] INFO  c.b.t.m.p.w.PayCashierMockController:80 - 启动/暂停指定队列消息消费,consumeSwitch:false,queueName:pay_work_notify
2023-09-04 19:51:37.200 +0800 [TID: N/A] [http-nio-8080-exec-1] INFO  o.s.a.r.l.SimpleMessageListenerContainer:586 - Waiting for workers to finish.
2023-09-04 19:51:37.903 +0800 [TID: N/A] [http-nio-8080-exec-1] INFO  o.s.a.r.l.SimpleMessageListenerContainer:589 - Successfully waited for workers to finish.
可以看到消息监听容器关闭的日志,然后再传入开关参数为true,调用后会启动pay_work_notify队列消息消费。文章来源地址https://www.toymoban.com/news/detail-707658.html

到了这里,关于(18)不重启服务动态停止、启动RabbitMQ消费者的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • WindowsPowerShell 停止、启动、暂停和重启服务、卸载服务

    PowerShell 停止、启动、暂停和重启服务 官文 powershell卸载服务 官文 所有 Service cmdlet 都具有相同的一般形式。 可以按公用名或显示名称指定服务,并使用列表和通配符作为值。 停止服务 若要停止打印后台处理程序,请使用: 启动服务 若要在打印后台处理程序停止后启动它,

    2024年02月05日
    浏览(34)
  • 【ELK解决方案】ELK集群+RabbitMQ部署方案以及快速开发RabbitMQ生产者与消费者基础服务...

    前言: 大概一年多前写过一个部署ELK系列的博客文章,前不久刚好在部署一个ELK的解决方案,我顺便就把一些基础的部分拎出来,再整合成一期文章。大概内容包括:搭建ELK集群,以及写一个简单的MQ服务。 如果需要看一年多之前写的文章,可以详见下列文章链接(例如部署

    2023年04月08日
    浏览(38)
  • Windows系统编写bat脚本启动,停止,重启Java服务jar包

    创建一个以bat后缀结束的文件,写入一下代码: 在Windows系统上面创建 start.bat 启动jar包脚本编辑以下内容: 给窗口命名:“jeeplus-gateway”,如果是微服务的话方便查看 在Windows系统上面创建 stop.bat 停止jar包脚本编辑以下内容: 重启jar包就是将两个合并到一起: 双击 bat文件 确

    2024年02月03日
    浏览(55)
  • 在linux上启动、重启、查询、停止java服务脚本,并且设置定时任务自动执行

    以下代码来源于另一位博主,在实践过程中需要设置定时任务,所以遇到一些问题,把没有写清楚的地方优化了一下。 我们把上面的脚本复制到以下目录的脚本文件/data/www/shtools/startmanage.sh,执行命令可以得到不同的结果。 设置的三个定时任务运行脚本, 第一个是0 3 * * *

    2024年02月07日
    浏览(47)
  • 如何一键启动、停止或重启运行在服务器内的幻兽帕鲁游戏服务进程?

    如果你是用腾讯云轻量应用服务器一键部署的幻兽帕鲁服务器,那么可以在面板一键启动、停止或重启运行在服务器内的幻兽帕鲁游戏服务进程(注意并非对服务器整机进行操作),无需手动在服务器内部运行命令。 详细教程地址:https://docs.qq.com/doc/DQnBvck1Jb2Vud2NE

    2024年04月09日
    浏览(47)
  • RabbitMQ多消费者实例时,保证只有一个消费者进行消费(单活消费者模式)

    有一种业务场景,当人员组织结构变更时,会有大量数据进行推送。这些数据类型有的是add,有的是update,并且必须先add,才能进行update。 这时,为了保证消费顺序,需要 只有一个实例进行按顺序消费,其他实例仅提供日常对外服务 ,不进行消息消费。当唯一消费实例无法

    2024年02月11日
    浏览(35)
  • RabbitMQ 消费者

      RabbitMQ的消费模式分两种:推模式和拉模式,推模式采用Basic.Consume进行消费,拉模式则是调用Basic.Get进行消费。   消费者通过订阅队列从RabbitMQ中获取消息进行消费,为避免消息丢失可采用消费确认机制   顾名思义,拉模式就是消费者主动的从RabbitMQ中获取数据,通

    2024年02月11日
    浏览(28)
  • RabbitMQ-消费者确认机制

    none:不做任何处理,消息投递到消费者了之后,立即返回ACK,并且从MQ将消息删除,非常不安全,不建议使用。 manual:手动模式,需要在业务中调用api,ack或者reject。 auto:自动模式,SpringAMQP利用AOP对我们的消息处理做了环绕增强,当业务正常执行时返回ACK,执行异常时,根

    2024年01月21日
    浏览(46)
  • docker—启动、停止、重启容器实例

    先查看已经暂停的容器实例信息 docker ps -a 通过 docker start xxx 启动容器 通过 docker ps 查看当前启动的容器 1、docker stop 此方式常常被翻译为优雅的停止容器 docker stop 容器ID或容器名 参数 -t:关闭容器的限时,如果超时未能关闭则用kill强制关闭,默认值10s,这个时间用于容器的

    2024年02月09日
    浏览(38)
  • rabbitmq消费者与生产者

    在第一次学习rabbitmq的时候,遇到了许多不懂得 第一步导包 第二步新增生产者 在这里中: connectionFactory.setVirtualHost(\\\"my_vhost\\\");//填写自己的队列名称,如果你的为”/“则填写\\\'\\\'/\\\'\\\' 第三步新增消费者 消息获取成功 注意如果你用的云服务器需要打开这两个端口 5672 15672 如果你使

    2024年02月11日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包