@KafkaListener 详解及消息消费启停控制

这篇具有很好参考价值的文章主要介绍了@KafkaListener 详解及消息消费启停控制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

参考:Kafka参数

一、@KafkaListener注解

@KafkaListener(id = "11111", groupId = "demo-group",topics = Constants.TOPIC)
    public void listen(String msgData) {
    LOGGER.info("收到消息" + msgData);
}  
 
@KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix",
                                 topics = Constants.TOPIC)
    public void listen2(String msgData) {
    LOGGER.info("收到消息" + msgData);
}
 
@KafkaListener(id = "3333", groupId = "demo-group2", topics = Constants.TOPIC)
    public void listen3(String msgData) {
    LOGGER.info("收到消息" + msgData);
}
 
@KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC)
    public void listen4(String msgData) {
    LOGGER.info("收到消息" + msgData);
}

(1) id: 默认是每个Listener实例的重要标识。

对于整个日志的排查起着至关重要的作用。如果不指定groupId,那么id将直接作为groupId。可以使用另外一个属性 idIsGroup=false关闭,默认是true。

@kafkalistener,kafka&logstash&flume&rabbitMq,KafkaListener,kafka,Powered by 金山文档

(2) goupId: 每个消费者所属的组。

每个消费者都有自己所属的组。一个组中可以有多个消费者。

@kafkalistener,kafka&logstash&flume&rabbitMq,KafkaListener,kafka,Powered by 金山文档

一个Topic的分区只能被同一个组下的某个消费者消费。从日志上来看,侧面也反映的消费模式是 Subscribed 订阅模式,不是手动的assign模式。

[Consumer clientId=consumer-1, groupId=demo-group2] Subscribed to topic(s): COLA
[Consumer clientId=consumer-2, groupId=demo-group] Subscribed to topic(s): COLA
[Consumer clientId=consumer-3, groupId=demo-group2] Subscribed to topic(s): COLA
[Consumer clientId=prefix-0, groupId=demo-group] Subscribed to topic(s): COLA

(3) clientIdPrefix: 消费者clientId前缀

@KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix", topics = Constants.TOPIC)
public void listen2(String msgData) {
    LOGGER.info("收到消息" + msgData);
}

如下图,共有4个消费者。有个消费者配置了clientIdPrefix属性为"prefix",所以该消费者的clientId以指定的"prefix"开头。如果没有配置,该实例的clientId默认为"consumer"。同时,每个实例的clientId后面挂了一个数字,来标示它在整个kafka集群中的消费者编号,编号从0开始。这里配置了4个消费者,所以消费者实例编号有0、 1、 2、 3。

@kafkalistener,kafka&logstash&flume&rabbitMq,KafkaListener,kafka,Powered by 金山文档

(4) autoStartup

public @interface KafkaListener ...
    /**
     * Set to true or false, to override the default setting in the container factory. May
     * be a property placeholder or SpEL expression that evaluates to a {@link Boolean} or
     * a {@link String}, in which case the {@link Boolean#parseBoolean(String)} is used to
     * obtain the value.
     * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
     * @return true to auto start, false to not auto start.
     * @since 2.2
     */
    String autoStartup() default "";

是否自动启动,如果是 false,默认不生效,需要手动唤醒。

看源代码上作者给的的注释:该注解指定的值优先级比工厂里指定的高。

另外可以使用 ${} 占位符的形式,支持配置。

application.yaml:
listener:
  auto:
    startup: true  
 
java :
    @KafkaListener(... containerFactory = "batchContainerFactory",
      autoStartup = "${listener.auto.startup}")
    public void listen4(List<ConsumerRecord> list, Acknowledgment acknowledgment)...
注:每个消费者实例对象内部持有两个属性。
boolean running
boolean paused
有几个改变状态的方法:
调用start()方法后,running转为true
调用stop()方法后,running转为false
调用pause()方法后,paused转为true
调用resume()方法后,paused转为false

只有running=true 、 paused=false 的消费者实例才能正常消费数据。
注解上的autoStartup改变的是running属性。
    @KafkaListener(id = "11111", groupId = "demo-group", 
                    topics = Constants.TOPIC, autoStartup = "false")
    public void listen(String msgData) throws InterruptedException {
        LOGGER.info("收到消息" + msgData);
        Thread.sleep(1000);
    }

二、Kafka Listener任务暂停及恢复

2.1 唤醒消费者实例, 示例代码:

    @Autowired
    private KafkaListenerEndpointRegistry registry;
 
    // 获取到id="11111" 的消费实例对象
    MessageListenerContainer listenerContainer = 
                        this.registry.getListenerContainer("11111");
 
    listenerContainer.pause();  //paused ==> true
  // listenerContainer.stop(); //running==> false

2.2 暂停消费者实例, 示例代码:

    @Autowired
    private KafkaListenerEndpointRegistry registry;
 
    // 获取到id="11111" 的消费实例对象
    MessageListenerContainer listenerContainer = 
                        this.registry.getListenerContainer("11111");
 
    listenerContainer.pause();  //paused ==> true
    // listenerContainer.stop(); //running==> false

2.3 定时任务自动启动

    @Autowired
    private KafkaListenerEndpointRegistry registry;
 
    // 定时器,每天凌晨0点开启监听
    @Scheduled(cron = "0 0 0 * * ?")
    public void startListener() {
        log.info("开启监听");
        // 判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer("11111").isRunning()) {
            registry.getListenerContainer("11111").start();
        }
        registry.getListenerContainer("11111").resume();
    }
 
    // 定时器,每天早上10点关闭监听
    @Scheduled(cron = "0 0 10 * * ?")
    public void shutDownListener() {
        log.info("关闭监听");
        registry.getListenerContainer("11111").pause();
    }

三、@KafkaListener注解方法参数汇总

@KafkaListener注解能够使用到如下8种方法上面。至于监听单条数据的前4种方法,与批量监听多条数据的后4种方法,主要依据kafka的具体配置。

    @KafkaListener(....)
    public void listen1(String data) 
 
    @KafkaListener(....)
    public void listen2(ConsumerRecord<K,V> data) 
 
    @KafkaListener(....)
    public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment) 
 
    @KafkaListener(....)
    public void listen4(ConsumerRecord<K,V> data,
                        Acknowledgment acknowledgment, Consumer<K,V> consumer)
 
    @KafkaListener(....)
    public void listen5(List<String> data) 
 
    @KafkaListener(....)
    public void listen6(List<ConsumerRecord<K,V>> data) 
 
    @KafkaListener(....)
    public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment) 
 
    @KafkaListener(....)
    public void listen8(List<ConsumerRecord<K,V>> data, 
                        Acknowledgment acknowledgment, Consumer<K,V> consumer)

四、KafkaListenerContainerFactory配置

在application.yaml中配置的kafka参数,以spring.kafka开头的参数族,全部用于kafka默认对象的创建。

4.1 kafka参数默认封装对象

所有kafka参数默认封装到对象:KafkaProperties对象中,可使用@Autowired自动注入。

    @Autowired
    private KafkaProperties properties;

4.2 @KakfkaListener注解标记监听实例对象

如不特殊指定,默认使用在yaml中的所有spring.kafka.consumer与spring.kafka.listener下的参数。

监听器实例对象自动绑定到上述配置文件,是由于它默认使用的"containerFactory" 是名为"kafkaListenerContainerFactory"的bean。

源码注释如下,如果不特殊指定,则默认的容器工厂将会被使用。

package org.springframework.kafka.annotation;
 
public @interface KafkaListener ...
    /**
     * The bean name of the {@link 
            org.springframework.kafka.config.KafkaListenerContainerFactory}
     * to use to create the message listener container 
               responsible to serve this endpoint.
     * <p>If not specified, the default container factory is used, if any.
     * @return the container factory bean name.
     */
    String containerFactory() default "";
默认的容器工厂代码如下,均为Springboot与Kafka框架提供的类。
这两个bean将spring.kafka.listener与spring.kafka.consumer下的参数全部组装到名为"kafkaListenerContainerFactory"这个bean中。该bean供@KafkaListener标记的监听实例使用。
因此可以得出结论:
如果不想使用默认的"kafkaListenerContainerFactory"容器工厂,则必须手动创建一个"ConcurrentKafkaListenerContainerFactory"类的实例,并且其bean name 不能叫"kafkaListenerContainerFactory"(不然与默认的工厂实例重名了),然后把该对象加入spring容器中。当在使用@KafkaListener标注的监听实例对象时,手动指定该注解"containerFactory"属性为刚才自定义的容器工厂实例bean name。
package org.springframework.boot.autoconfigure.kafka;
 
class KafkaAnnotationDrivenConfiguration {
 
    @Bean
    @ConditionalOnMissingBean
    ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer = 
                            new ConcurrentKafkaListenerContainerFactoryConfigurer();
        configurer.setKafkaProperties(this.properties);
        MessageConverter messageConverterToUse = 
                     (this.properties.getListener().getType().equals(Type.BATCH))
                                ? this.batchMessageConverter : this.messageConverter;
        configurer.setMessageConverter(messageConverterToUse);
        configurer.setReplyTemplate(this.kafkaTemplate);
        configurer.setTransactionManager(this.transactionManager);
        configurer.setRebalanceListener(this.rebalanceListener);
        configurer.setErrorHandler(this.errorHandler);
        configurer.setBatchErrorHandler(this.batchErrorHandler);
        configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
        configurer.setRecordInterceptor(this.recordInterceptor);
        return configurer;
    }
 
    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = 
                                    new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory
                .getIfAvailable(() -> 
           new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
        return factory;
    }

4.3 自定义容器工厂实例代码示例:

    @Autowired
    private KafkaProperties properties;
    
    @Bean("batchContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainer() {
        ConcurrentKafkaListenerContainerFactory<?, ?> container =
                new ConcurrentKafkaListenerContainerFactory<>();
 
        Map<String, Object> stringObjectMap = this.properties.buildConsumerProperties();
        stringObjectMap.put("enable.auto.commit", false);
        container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(stringObjectMap));
        // 没有topic是否禁止系统启动
        container.setMissingTopicsFatal(true);
        // 并发
        container.setConcurrency(1);
        // 批量接收
        container.setBatchListener(true);
        // 如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
        container.getContainerProperties().setPollTimeout(5000);
        // 设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
        container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        // 设置kafka 异常重试次数 第一个参数等待重试时间,第二个参数数提交次数,这里设置不重试,默认重试10次 抛出异常后调用
        // factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 0L)));
        return container;
    }
  
    @KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC, containerFactory = "batchContainerFactory")
    public void listen4(List<ConsumerRecord> list, Acknowledgment acknowledgment) {
        LOGGER.info("4444收到消息" + list.size());
        acknowledgment.acknowledge();
    }

五、吞吐量

如下,这里我只列出了影响本例的几条参数。

spring:
  kafka:
    consumer:
      enable-auto-commit: true
      # max-poll-records: 20
 
    listener:
      ack-mode: batch
      type: batch
      concurrency: 5

如果设置spring.kafka.listener.concurrency为5,共两个消费者,Topic名为"COLA",共8个分区。代码如下。

    @KafkaListener(id = "4444", groupId = "demo-group2", topics = "COLA")
    public void listen4(List<String> msgData) {
        LOGGER.info("收到消息" + msgData);
    }
 
    @KafkaListener(id = "5555", groupId = "demo-group2", topics = "COLA")
    public void listen5(List<String> msgData) {
        LOGGER.info("收到消息" + msgData);
    }
 
    @Bean
    public NewTopic newTopic() {
        return new NewTopic(Constants.TOPIC, 8, (short) 1);
    }
系统每个消费者都创建了5个线程,共10个线程。换句话说,每个消费者实例(@KafkaListener标记的方法)同时都会有5个线程在跑。每个线程接收的分区都不一样。
另外,这两个消费者属于同一个组,Topic只有8个分区,2个消费者共10个线程,一个线程消费一个分区,所以必然有两个线程最后属于空闲状态。
从实际结果上来看(下面的日志),没想到系统为id="4444"的消费者实际只分配到了3个分区,有两个线程处于空闲状态。id="5555"的消费者达到了预期,共消费了5个分区,分配到了5个线程!
[4444-2-C-1]: demo-group2: partitions assigned: []
[4444-3-C-1]: demo-group2: partitions assigned: []
[4444-4-C-1]: demo-group2: partitions assigned: [COLA-1]
[4444-1-C-1]: demo-group2: partitions assigned: [COLA-7]
[5555-2-C-1]: demo-group2: partitions assigned: [COLA-3]
[5555-4-C-1]: demo-group2: partitions assigned: [COLA-5]
[5555-3-C-1]: demo-group2: partitions assigned: [COLA-4]
[4444-0-C-1]: demo-group2: partitions assigned: [COLA-6]
[5555-0-C-1]: demo-group2: partitions assigned: [COLA-0]
[5555-1-C-1]: demo-group2: partitions assigned: [COLA-2]

六、 结论:

  1. concurrency值对应@KafkaListener的消费者实例线程数目,如果concurrency数量大于partition数量,多出的部分分配不到partition,会被闲置。

  1. 设置的并发量不能大于partition的数量,如果需要提高吞吐量,可以通过增加partition的数量达到快速提升吞吐量的效果。文章来源地址https://www.toymoban.com/news/detail-821931.html

到了这里,关于@KafkaListener 详解及消息消费启停控制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • @KafkaListener的配置使用

    @KafkaListener注解来自spring-kafka包。使用@KafkaListener消费消息,需要按照spring-kafka指定的格式填写kafka配置信息,即可自动装配生成相关的KafkaConsumer实例,然后使用@KafkaListener消费消息。这里需要注意,使用自动装载方式生成KafkaConsumer实例时,spring-kafka的配置参数与原生kafka的配

    2024年04月15日
    浏览(16)
  • @KafkaListener指定kafka集群

    基于@KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:ConsumerConfig.MAX_POLL_RECORDS_CONFIG = “max.poll.records”),与自动装载

    2024年01月21日
    浏览(43)
  • @KafkaListener 注解配置多个 topic

    主要见 @KafkaListener 中 topics 属性的配置 其中 ${xxxx.topic1} 为从springBoot 配置文件中读取的属性值 由于该注解 topic 为字符串数组的类型,所以可以如上加大括号来完成指定

    2024年02月21日
    浏览(24)
  • kafka @KafkaListener 动态接收topic

    @KafkaListener 里边的 topics 必须是常量,不可以是变量 但是某些业务场景 kafka定义的topic会不同这时候就需要传入变量才可以实现 具体实现方式如下: KafkaListener 监听方法 #{}  这里边是方法名称  这里是获取topic 其实可以在对应的@Bean里边写逻辑方法去处理 这里用到了获取配

    2024年02月13日
    浏览(26)
  • Spring Boot中KafkaListener的介绍、原理和使用方法

    Kafka是一个高性能的分布式消息队列,它被广泛应用于对实时数据进行处理和分析。在Spring Boot中,我们可以通过 @KafkaListener 注解来监听并处理Kafka消息。本文将介绍Spring Boot中 @KafkaListener 注解的介绍、原理和使用方法。 @KafkaListener 注解是Spring Kafka提供的一种消费消息的方式

    2024年02月10日
    浏览(30)
  • 深入理解Spring Kafka中@KafkaListener注解的参数与使用方式

    Apache Kafka作为一个强大的消息代理系统,与Spring框架的集成使得在分布式应用中处理消息变得更加简单和灵活。Spring Kafka提供了 @KafkaListener 注解,为开发者提供了一种声明式的方式来定义消息监听器。在本文中,我们将深入探讨 @KafkaListener 注解的各种参数以及它们的使用方

    2024年01月16日
    浏览(38)
  • 动态启停kafka消费者

    使用背景:在开发业务中需要根据具体逻辑选择开启还是关闭消费者 实现逻辑: 1、创建consumer配置类,自定义工厂、自定义消费者配置(省略) 还需要注入consumerService bean(改类里主要是控制动态启停的具体实现方法) 2、接口实现 消费监听方法上,@KafkaListener(topicPattern=“

    2024年01月16日
    浏览(23)
  • Kafka消息消费流程详解

    在分布式系统中,Kafka是一种常用的消息队列系统,用于实现高可靠性的消息传递。本文将介绍Kafka消息消费的流程,并提供相应的示例代码。 Kafka消费者的流程可以概括为以下几个步骤: 创建Kafka消费者实例; 订阅一个或多个主题; 拉取消息记录; 处理消息; 提交消费位

    2024年02月09日
    浏览(61)
  • MQ消息队列详解以及MQ重复消费问题

    https://blog.csdn.net/qq_44240587/article/details/104630567 核心的就是:解耦、异步、削锋 现有ABCDE五个系统,最初的时候BCD三个系统都要调用A系统的接口获取数据,一切都很正常,但是突然,D系统说:我不要了,你不用给我传数据了,A系统无奈,只能修改代码,将调用D系统的代码删除

    2024年04月13日
    浏览(43)
  • 消息队列Pulsar入门(一) 生产者/消费者/Topic详解,附源码演示

    部署问题 连接Pulsar 创建方式 简单方法创建 loadConf自定义配置创建 Pulsar官网 发送模式 同步发送 异步发送 访问方式/发送方式 Share模式(默认情况) 请注意: Exclusive WaitForExclusive 创建方式 简单方法创建 监听器方法创建 loadConf自定义配置创建 多主题订阅 传入List数组的多主题订阅

    2024年02月08日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包