Rabbitmq - rabbitmq Listener监听

这篇具有很好参考价值的文章主要介绍了Rabbitmq - rabbitmq Listener监听。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Listener监听

Listener的yml配置参数形式如下:

   listener:
     simple:
       prefetch: 1 # 一次拉取的数量
       concurrency: 5 # 消费端的监听个数(即@RabbitListener开启几个线程去处理数据。)
       max-concurrency: 10 # 消费端的监听最大个数
       acknowledge-mode: manual
       retry:
         multiplier: 1
         max-attempts: 3
         enabled: true
     direct:
       retry:
         enabled: true
         max-attempts: 3
       acknowledge-mode: manual
       auto-startup: true
     type: simple

在消费端,配置prefetch和concurrency参数便可以实现消费端MQ并发处理消息,下面详细叙述下listener下的几个参数的意思

listener.type 表示监听的类型 主要有两种 simple和direct 对于的监听容器是 
SimpleMessageListenerContainer和DirectMessageListenerContainer

listener.simple.prefetch: 
每个customer会在MQ预取一些消息放入内存的LinkedBlockingQueue中进行消费,这个值越高,消息传递的越快,
但非顺序处理消息的风险更高。如果ack模式为none,则忽略。如有必要,将增加此值以匹配txSize或messagePerAck。
不过在有些情况下,尤其是处理速度比较慢的大消息,消息可能在内存中大量堆积,消耗大量内存;以及对于一些严格要求顺序的消息,prefetch的值应当设置为1。
acknowledge-mode:表示消费端收到消息后的确认方式。
有三种确认方式:
自动确认:acknowledge="none"
手动确认:acknowledge="manual"
根据异常情况确认:acknowledge="auto"

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
concurrency:
concurrency =1,即每个Listener容器将开启一个线程去处理消息。
可以在 @RabbitListener(concurrency = "3")直接配置当前监听器启动线程,如果在Listener配置了exclusive参数,即确定此容器中的单个customer是否具有对队列的独占访问权限。如果为true,则容器的并发性必须为1。
如果配置了exclusive=true,但是concurrency>1则会抛错
prefetch和concurrency
若一个消费者配置prefetch=10,concurrency=2,即会开启2个线程去消费消息,每个线程都会抓取10个线程到内存中(注意不是两个线程去共享内存中抓取的消息)

 max-concurrency:: 表示最大能启动的线程数
retry:
表示消息被消费者拒收后重试发送或者因为异常原因消息重试发送
multiplier:重试间隔乘法策略
max-attempts: 是最大重试次数,默认是三
enabled: 是否开启重试
initial-interval: 初始化重试次数间隔
max-interval 最大重试间隔
stateless:重试是无状态的还是有状态的。
autoStartup:
当autoStartup为false的时候,监听容器就不会自动启动,然后我们可以通过使用单个容器的ID,调用RabbitListenerEndpointRegistry类的getListenerContainer(String id)方法来获得对单个容器的引用,并执行strat方法,启动容器。

举一个例子:


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "${rabbitmq.queue.routing.beijing}", durable = "true", autoDelete = "false"),
                    exchange = @Exchange(
                            value = "${rabbitmq.exchange.routing}",
                            durable = "true",
                            type = ExchangeTypes.TOPIC),
                    key = "china.#")}, concurrency = "3", exclusive = false,id = "autoStart",autoStartup = "false")
    public void receive(@Payload String msg, Message message,Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        System.out.println(String.format("mesage:%s", message.getBody()));
        System.out.println("路由监听接受到发送者发送的信息:" + msg);
        // 确认消息
//        channel.basicAck(deliveryTag, false);
    }

手动开启/关闭容器

public void stopContainer(String containerId){
        //得到容器的对象
        MessageListenerContainer container = registry.getListenerContainer(containerId);
        //判断容器状态
        if(container.isRunning()){
            //开启容器
            container.stop();
            System.out.println("关闭容器");
        }

    }

    public void startContainer(String containerId){
        //得到容器的对象
        MessageListenerContainer container = registry.getListenerContainer(containerId);
        //判断容器状态
        if(!container.isRunning()){
            //开启容器
            container.start();
            System.out.println("开启容器");
        }

    }

Rabbitmq listener监听Message消息,其中Message主要包含两部分

public class Message implements Serializable {
	private final MessageProperties messageProperties;

	private final byte[] body;
	
	public Message(byte[] body, MessageProperties messageProperties) { //NOSONAR
		this.body = body; //NOSONAR
		this.messageProperties = messageProperties;
	}
}
MessageProperties // 消息属性

byte[] body // 消息内容

当监听者监听到队列中有消息时则会进行接收并处理,MessageConvert 会直接转换成消息类型,并绑定在对应被注解的方法中。默认实现类为SimpleMessageConverter文章来源地址https://www.toymoban.com/news/detail-543506.html

public class SimpleMessageConverter extends WhiteListDeserializingMessageConverter implements BeanClassLoaderAware {
	@Override
	public Object fromMessage(Message message) throws MessageConversionException {
		Object content = null;
		MessageProperties properties = message.getMessageProperties();
		if (properties != null) {
			String contentType = properties.getContentType();
			if (contentType != null && contentType.startsWith("text")) {
				String encoding = properties.getContentEncoding();
				if (encoding == null) {
					encoding = this.defaultCharset;
				}
				try {
					content = new String(message.getBody(), encoding);
				}
				catch (UnsupportedEncodingException e) {
					throw new MessageConversionException(
							"failed to convert text-based Message content", e);
				}
			}
			else if (contentType != null &&
					contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) {
				try {
					content = SerializationUtils.deserialize(
							createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
				}
				catch (IOException | IllegalArgumentException | IllegalStateException e) {
					throw new MessageConversionException(
							"failed to convert serialized Message content", e);
				}
			}
		}
		if (content == null) {
			content = message.getBody();
		}
		return content;
	}

	/**
	 * Creates an AMQP Message from the provided Object.
	 */
	@Override
	protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
		byte[] bytes = null;
		if (object instanceof byte[]) {
			bytes = (byte[]) object;
			messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
		}
		else if (object instanceof String) {
			try {
				bytes = ((String) object).getBytes(this.defaultCharset);
			}
			catch (UnsupportedEncodingException e) {
				throw new MessageConversionException(
						"failed to convert to Message content", e);
			}
			messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
			messageProperties.setContentEncoding(this.defaultCharset);
		}
		else if (object instanceof Serializable) {
			try {
				bytes = SerializationUtils.serialize(object);
			}
			catch (IllegalArgumentException e) {
				throw new MessageConversionException(
						"failed to convert to serialized Message content", e);
			}
			messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
		}
		if (bytes != null) {
			messageProperties.setContentLength(bytes.length);
			return new Message(bytes, messageProperties);
		}
		throw new IllegalArgumentException(getClass().getSimpleName()
				+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
	}

}

到了这里,关于Rabbitmq - rabbitmq Listener监听的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • RabbitMQ默认监听的ip地址

    RabbitMQ 默认监听所有可用 ip 地址,当Rabbitmq 所在的服务端节点上存在多 ip 时,只要客户端能与服务端任一 ip 通信,即可向 RabbitMQ 发送消息

    2024年02月11日
    浏览(40)
  • SpringBoot整合Canal+RabbitMQ监听数据变更

    需求 步骤 环境搭建 整合SpringBoot Canal实现客户端 Canal整合RabbitMQ SpringBoot整合RabbitMQ   我想要在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。 经过调研发现,使用Canal来监听MySQL的binlog变化可

    2024年02月11日
    浏览(36)
  • linux部署rabbitmq开启mqtt插件由于监听1883端口导致重启rabbitmq失败的解决方法

    第一步:部署rabbitmq 部署rabbitmq请移步(在这里可以找到erlang和rabbitmq适配的版本并下载安装包): 通过移步的地址中执行以下步骤 1. 安装erlang环境 2. 下载完rabbitmq的安装包并执行命令 yum localinstall 安装包的名称 3. 开启rabbitmq插件 rabbitmq-plugins enable rabbitmq_management rabbitmq_man

    2024年02月09日
    浏览(39)
  • springboot-rabbitmq 实现动态配置监听容器

    1.1.1从factories我们可以看到mq的启动配置类 1.1.2然后我们找到 RabbitAutoConfiguration ,发现它引入了 RabbitAnnotationDrivenConfiguration 这个配置类 1.1.3进入 RabbitAnnotationDrivenConfiguration 滑到最低部看到这里引入了 @EnableRabbit 这个注解,找个注解里面又引出 RabbitBootstrapConfiguration 这个配置类

    2023年04月09日
    浏览(63)
  • SpringCloud 整合 Canal+RabbitMQ+Redis 实现数据监听

    Canal 指的是阿里巴巴开源的数据同步工具,用于数据库的实时增量数据订阅和消费。它可以针对 MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的异构数据同步等情况进行实时增量数据同步。 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x Canal是如何同步数据库

    2024年02月03日
    浏览(40)
  • SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理

    RabbitMQ是一种常用的消息队列,Spring Boot对其进行了深度的整合,可以快速地实现消息的发送和接收。在RabbitMQ中,消息的发送和接收都是异步的,因此需要使用监听器来监听消息的到来。Spring Boot中提供了默认的监听器容器,但是有时候我们需要自定义监听器容器,来满足一

    2024年02月16日
    浏览(40)
  • Spring项目配置文件中RabbitMQ监听器各个参数的作用

    spring.rabbitmq.listener.simple.concurrency :设置监听器容器的并发消费者数量,默认为1,即单线程消费。 spring.rabbitmq.listener.simple.max-concurrency :设置监听器容器的最大并发消费者数量。 spring.rabbitmq.listener.simple.prefetch :设置每个消费者从RabbitMQ服务器获取的消息数量,即每次从队列

    2024年02月16日
    浏览(26)
  • RabbitMQ之动态创建队列与绑定交换机和监听器

    为什么需要动态创建队列与绑定交换机?我在写项目的时候遇到这么个问题,我数据库中存在一个字段messageType指定为消息类型,消息类型存在三种,一种是通知类,一种是验证码类,一种是活动类。并且对应的,要将消息进行不同渠道的分发,还存在一个channelType,而他又存

    2024年02月03日
    浏览(27)
  • 207、SpringBoot 整合 RabbitMQ 实现消息的发送 与 接收(监听器)

    1、ContentUtil 先定义常量 2、RabbitMQConfig 创建队列的两种方式之一: 配置式: 在容器中配置 org.springframework.amqp.core.Queue 类型的Bean,RabbitMQ将会自动为该Bean创建对应的队列。 就是在配置类中创建一个生成消息队列的@Bean。 问题: 用 @Configuration 注解声明为配置类,但是项目启动

    2024年02月06日
    浏览(41)
  • Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理

    关于Canal的介绍及原理不在此赘述,可自行查阅。笔者在使用Canal同步Mysql实时操作记录至RabbitMQ的过程中,也翻阅了一些大牛们的文章,可能是我使用的Canal版本与文中版本不一致,出现了一些问题,在此总结记录一下可行的方案。 注:本文使用的Canal为 v1.1.7 先查看目标数据

    2024年04月10日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包