Kafka消费端concurrency参数

这篇具有很好参考价值的文章主要介绍了Kafka消费端concurrency参数。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

首先说一下结论,这个参数用来增加消费者实例,或者可以理解为@KafkaListener注解实例的数量。当消费者服务数量小于topic的分区数的时候使用此参数可以提升消费能力,spring-kafka在初始化的时候会启动concurrency个Consumer线程来执行@KafkaListener里面的方法。

Consumer线程

用来直接调用kafka-client的poll()方法获取消息。如果是自动提交offset,poll()方法获取消息后会直接给到listener线程执行。

Listener线程

真正调用处理我们代码中标有@KafkaListener注解方法的线程。具体实现在KafkaMessageListenerContainer 类中。

KafkaMessageListenerContainer

protected void pollAndInvoke() {
            if (!this.autoCommit && !this.isRecordAck) {
                processCommits();
            }
            processSeeks();
            checkPaused();
            ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout);
            this.lastPoll = System.currentTimeMillis();
            checkResumed();
            debugRecords(records);
            if (records != null && records.count() > 0) {
                if (this.containerProperties.getIdleEventInterval() != null) {
                    this.lastReceive = System.currentTimeMillis();
                }
                // 这里可以看到如果是自动提交offset,会直接把consumer poll下来的消息给到listener执行,
// 即kafka consumer所在线程会直接调用我们的@KafkaListener方法
                invokeListener(records);
            }
            else {
                checkIdle();
            }
        }
如果是手动提交offset,即enable-auto-commit设置为false,则是将消息投放到阻塞队列中,另一边由Listener线程取出执行。

ConcurrentMessageListenerContainer

当使用了concurrency参数是,在consumer启动过程会通过这个类去初始化。其实就是根据concurrency的值for循环调用KafkaMessageListenerContainer的dostart方法创建实例文章来源地址https://www.toymoban.com/news/detail-721266.html

到了这里,关于Kafka消费端concurrency参数的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Kafka】Java实现数据的生产和消费

    Kafka 是由 LinkedIn 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的 基于发布订阅模式的消息引擎系统 。 Broker:消息中间件处理节点,一个Kafka节点就是一个Broker,一个或者多个Broker可以组成一个Kafka集群; T

    2023年04月19日
    浏览(46)
  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(42)
  • kafka 消费者相关参数

    是否自动提交,默认是true,通常为了保证消费的数据不异常,设置成false。设置false时,配合max.poll.interval.ms参数,根据自身消费者处理消息的能力,进行设值,消费消息后手动提交。 使用消费者组管理时调用poll()之间的最大延迟。这为消费者在获取更多记录之前可以空闲的

    2024年02月05日
    浏览(38)
  • Kafka:消费者参数配置

    maven配置 springboot配置类 配置文件 参数配置列表 属性 说明 bootstrap.servers 向Kafka集群建立初始连接用到的host/port列表。 客户端会使用这里列出的所有服务器进行集群其他服务器的发现,而不管 是否指定了哪个服务器用作引导。 这个列表仅影响用来发现集群所有服务器的初始

    2024年02月09日
    浏览(48)
  • 实战Flink Java api消费kafka实时数据落盘HDFS

    在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。 flink版本1.13 kafka版本0.8 hadoop版本3.1.4 为了完成 Flink 从 Kafka 消费数据并实时写入 HDFS 的需求,通常需要启动以下组件: 确保 Zookeeper 在运行,因为 Flink 的 Kafka Consumer 需要依赖 Zookeeper。 确保 Kafka Serve

    2024年01月24日
    浏览(48)
  • @KafkaListener指定kafka集群

    基于@KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:ConsumerConfig.MAX_POLL_RECORDS_CONFIG = “max.poll.records”),与自动装载

    2024年01月21日
    浏览(56)
  • Kafka3.0.0版本——消费者(消费者组详细消费流程图解及消费者重要参数)

    创建一个消费者网络连接客户端,主要用于与kafka集群进行交互,如下图所示: 调用sendFetches发送消费请求,如下图所示: (1)、Fetch.min.bytes每批次最小抓取大小,默认1字节 (2)、fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    浏览(45)
  • kafka @KafkaListener 动态接收topic

    @KafkaListener 里边的 topics 必须是常量,不可以是变量 但是某些业务场景 kafka定义的topic会不同这时候就需要传入变量才可以实现 具体实现方式如下: KafkaListener 监听方法 #{}  这里边是方法名称  这里是获取topic 其实可以在对应的@Bean里边写逻辑方法去处理 这里用到了获取配

    2024年02月13日
    浏览(39)
  • kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一

    2024年01月22日
    浏览(80)
  • java 集成kafka(支持单条消费和批量消费)

    1、下载安装zk,kafka...(大把教程,不在这里过多阐述) 2、引入pom 3、Kafka配置 4、生产者配置 5、生产者发消息的工具类 6、消费着配置 7、消费者配置类(配置批量消费) 8、测试类分别测试单条消费以及批量消费 9、消费者消费 完结。。。

    2024年02月12日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包