消息监听器和消息监听容器

这篇具有很好参考价值的文章主要介绍了消息监听器和消息监听容器。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。

一、消息监听器

1.1、消息监听器接口

消息监听器顾名思义用来接收消息,它是使用消息监听容器的必须条件。目前有8个消息监听器:

  • 使用自动提交或容器管理的提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。
    public interface MessageListener<K, V> {
      void onMessage(ConsumerRecord<K, V> data);
    }
    
  • 使用手动提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。
    public interface AcknowledgingMessageListener<K, V> {
      void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
    }
    
  • 使用自动提交或容器管理的提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。 提供对 Consumer 对象的访问。
    public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
      void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
    }
    
  • 使用手动提交方法之一,使用此接口处理从 Kafka 消费者 poll() 操作接收到的各个 ConsumerRecord 实例。 提供对 Consumer 对象的访问。
    public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
      void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
    }
    
  • 使用自动提交或容器管理的提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。
    (使用此接口时不支持 AckMode.RECORD,因为侦听器会获得完整的批次。)(注意:这个接收所有)
    public interface BatchMessageListener<K, V> {
       void onMessage(List<ConsumerRecord<K, V>> data);
    }
    
  • 使用手动提交方法之一,使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。(注意:这个接收所有)
    public interface BatchAcknowledgingMessageListener<K, V> {
       void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
    }
    
  • 使用自动提交或容器管理的提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。
    (使用此接口时不支持 AckMode.RECORD,因为侦听器会获得完整的批次。) 提供对 Consumer 对象的访问。(注意:这个接收所有)
    public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
      void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
    }
    
  • 使用手动提交方法之一,处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。
    提供对 Consumer 对象的访问。(注意:这个接收所有)
    public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
        void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
    }
    

1、Consumer对象不是线程安全的;2、不要在这个过程中操作消费者位置和/或监听器中已提交偏移量。

1.2、消息监听器容器

消息监听容器有两种,一种是单线程消费,一种是多线程消费。

1.2.1、单线程

单线程实现为 KafkaMessageListenerContainer,KafkaMessageListenerContainer在单个线程上接收来自所有主题或分区的所有消息。

  • 使用 KafkaMessageListenerContainer
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

通过 ContainerProperties 可以对主题和分区以及其他信息进行配置

//主题设置
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
//监听器设置
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
//消费者工厂配置
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());

权限配置:authorizationExceptionRetryInterval。这是一个容器属性,它会从KafkaConsumer中获取获取信息,当配置的用户被拒绝读取特定主题时,
就会发生触发 AuthorizationException。

1.2.2、多线程

多线程实现为 ConcurrentMessageListenerContainer ,ConcurrentMessageListenerContainer实际上是在给一个或多个KafkaMessageListenerContainer实例提供多线程消费,
本质上最后进行工作的还是KafkaMessageListenerContainer,故此它的实现和 KafkaMessageListenerContainer类似,

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它有一个concurrency属性,通过设置这个属性可以指定创建几个 KafkaMessageListenerContainer实例。

假设我们有3个主题,每个主题有5个分区。同时设置 container.setConcurrency(15),我们希望的是有15个线程活动着,实际上只有5个活着,
这是因为 Kafka 中默认的 PartitionAssignor是RangeAssignor。 在SpringBoot中可以设置: spring.kafka.consumer.properties.partition.assignment.strategy=
org.apache.kafka.clients.consumer.RoundRobinAssignor 来更改策略。

1.3、偏移量

  • 自动提交
    设置 enable.auto.commit 消费者属性为 true 即可。这个也是默认状态。

  • 手动提交
    设置 enable.auto.commit 消费者属性为 false 即可;同时注意设置 AckMode。以下是spring支持的集中类型说明(无事务)

    • RECORD:当侦听器处理记录后返回时提交偏移量。
    • BATCH:当返回的所有记录都poll()处理完毕后提交偏移量。
    • TIMEpoll():只要超出了ackTime自上次提交以来的记录,则在处理完返回的所有记录后提交偏移量。
    • COUNTpoll():只要ackCount自上次提交以来已收到记录,则在处理完返回的所有记录后提交偏移量。
    • COUNT_TIMETIME:与和类似COUNT,但如果任一条件为 ,则执行提交true。
    • MANUAL:消息监听者对acknowledge()负责Acknowledgment。之后,BATCH应用与 相同的语义。
    • MANUAL_IMMEDIATEAcknowledgment.acknowledge():当监听器调用该方法时立即提交偏移量。
  • 如何提交

public interface Acknowledgment {

    void acknowledge();

}

如果要提交部分批次,使用nack(),使用事务时,设置AckMode为MANUAL; 调用nack()会将成功处理的记录的偏移量发送到事务。
nack()只能在调用侦听器的消费者线程上调用。文章来源地址https://www.toymoban.com/news/detail-726156.html

到了这里,关于消息监听器和消息监听容器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spring中最简单的过滤器和监听器

            Filter也称之为过滤器,它是Servlet技术中最实用的技术,Web开发人员通过Filter技术,对web服务器管理的所有web资源:例如Jsp, Servlet, 静态图片文件或静态 html 文件等进行拦截,从而实现一些特殊的功能。例如实现URL级别的权限访问控制、过滤敏感词汇、压缩响应信息

    2024年02月14日
    浏览(42)
  • Spring高手之路15——掌握Spring事件监听器的内部逻辑与实现

    在阅读本文之前需要你已经对事件监听器有了简单的了解,或去阅读前面的文章《 Spring高手之路7——事件机制与监听器的全面探索 》   在 Spring 中, ApplicationContext 可以形成一个层次结构,通常由主容器和多个子容器组成。一个常见的疑问是:当一个事件在其中一个容器

    2024年02月06日
    浏览(47)
  • Spring高手之路7——事件机制与监听器的全面探索

      观察者模式是一种行为设计模式,它定义了对象之间的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都会得到通知并被自动更新。在这个模式中,改变状态的对象被称为主题,依赖的对象被称为观察者。 举个实际的例子: 事件源(Event Source) :可以视

    2024年02月11日
    浏览(43)
  • Spring监听器用法与原理详解(带ApplicationListener模型图)

    相信大家都或多或少知道Spring中的监听器,有些人还能说出它采用了 观察者模式 ,但其实它还用到了 适配器模式 , 工厂模式 等。当然,仍有不少人是完全不了解Spring的监听及其机制的,本次我们就来深入学习一下 Spring监听器 Spring监听器是一种 特殊的类,它们能帮助开发

    2024年02月06日
    浏览(59)
  • Spring项目配置文件中RabbitMQ监听器各个参数的作用

    spring.rabbitmq.listener.simple.concurrency :设置监听器容器的并发消费者数量,默认为1,即单线程消费。 spring.rabbitmq.listener.simple.max-concurrency :设置监听器容器的最大并发消费者数量。 spring.rabbitmq.listener.simple.prefetch :设置每个消费者从RabbitMQ服务器获取的消息数量,即每次从队列

    2024年02月16日
    浏览(40)
  • Spring Boot实战:拦截器和监听器的应用指南

    当使用Spring Boot时,我们可以通过拦截器(Interceptor)和监听器(Listener)来实现对请求和响应的处理。拦截器和监听器提供了一种可插拔的机制,用于在请求处理过程中进行自定义操作,例如记录日志、身份验证、权限检查等。下面通过提供一个示例,展示如何使用拦截器和

    2024年02月09日
    浏览(49)
  • Apche Kafka + Spring的消息监听容器

    消息的接收:可以通过配置MessageListenerContainer并提供消息侦听器或使用@KafkaListener注释来接收消息。本章我们主要说明通过配置MessageListenerContainer并提供消息侦听器的方式接收消息。 当使用消息监听容器时,就必须提供一个监听器来接收数据。目前有八个支持消息侦听器的接

    2024年02月12日
    浏览(37)
  • HttpSessionListener监听器和HttpSessionAttributeListener监听器

    1.作用:监听Session创建或销毁,即生命周期监听 2.相关方法: 3.使用场景: 和前面的ServletContextListener等一样,可以用于监控用户上线和离线 4.代码 HttpSessionListener监听器 实现类 HttpSessionAttributeListener监听器 1.作用:监听Session属性的变化,使用少 2.相关方法: 3.代码 监听器 实

    2024年02月04日
    浏览(50)
  • camunda执行监听器和任务监听器有什么区别

    Camunda的执行监听器和任务监听器是用于添加自定义逻辑的监听器,它们的区别在于作用对象和触发事件的不同。 执行监听器是与BPMN流程中的各种流程元素(例如开始事件、用户任务、服务任务、网关等)相关联的。执行监听器可以在流程元素执行前、执行后或抛出异常时添

    2024年02月04日
    浏览(55)
  • Listener监听器----HttpServletRequest对象的生命周期监听器

    一、HttpServletRequest对象的生命周期监听器         ServletRequestListener接口定义了ServletRequest(是HttpServletRequest接口的父接口类型)对象生命周期的监听行为。 void requestInitialized(ServletRequestEvent sre)         HttpServletRequest对象创建后会触发该监听器方法,并将已创建HttpServletR

    2024年01月23日
    浏览(61)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包