kafka 消费者相关参数

这篇具有很好参考价值的文章主要介绍了kafka 消费者相关参数。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.enable.auto.commit

ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";

是否自动提交,默认是true,通常为了保证消费的数据不异常,设置成false。设置false时,配合max.poll.interval.ms参数,根据自身消费者处理消息的能力,进行设值,消费消息后手动提交。

2.max.poll.interval.ms

ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";

使用消费者组管理时调用poll()之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间设置了一个上限。如果在超时之前没有调用poll(),则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个成员。根据自身消费消息的能力设值,默认值30*1000ms。

3.max.poll.records

ConsumerConfig.MAX_POLL_RECORDS_CONFIG = "max.poll.records";

在一次调用poll()中返回的最大记录数,默认值为500。根据自身业务消费消息能力设值,不要超过max.poll.interval.ms时间内最大处理消息个数。

4.auto.commit.interval.ms

ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";

如果enable.auto.commit设置为true,消费者偏移量自动提交到Kafka的频率(单位为毫秒),默认值5000ms。

5.session.timeout.ms

ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";

当使用Kafka的组管理功能时,用来检测消费者故障的超时。使用者定期向代理发送心跳以指示其活动状态。如果在此会话超时到期之前代理没有接收到心跳,则代理将从组中删除此消费者并启动重新平衡。该值必须在group.min.session.timeout.ms和group.max.session.timeout.ms在代理配置中配置的允许范围内。默认值10*1000ms(单位为毫秒)。

6.heartbeat.interval.ms

ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";

使用Kafka的组管理工具时,从心跳到消费者协调器之间的预期时间。心跳用于确保消费者会话保持活跃,并在新消费者加入或离开组时促进重新平衡。该值必须小于session.timeout。但通常不应设置为高于该值的1/3。它可以被调整得更低,以控制正常再平衡的预期时间。默认值3*1000ms(单位为毫秒)。

7.auto.offset.reset

ConsumerConfig.AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";

参数为"latest"表示从分区末尾开始消费消息,参数为"earliest"表示从起始处开始消费消息。默认值是latest。

8.max.partition.fetch.bytes

ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";

从每个分区里返回给Consumer的最大数据量,默认值为1048576(B),即1MB。此参数要配合broker的max.message.size设置,如果设置的比max.message.size小,那么消费者就可能无法正常读取到消息,比如broker能够读取到了2M的数据,但是消费者最大只能接受1M,就会有问题。

9.fetch.min.bytes

ConsumerConfig.FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";

服务器为获取请求应返回的最小数据量。如果可用数据不足,则请求将等待足够多的数据积累后再响应请求。默认设置为1字节意味着只要有一个字节的数据可用,或者读取请求就会超时等待数据到达。将该值设置为大于1的值将导致服务器等待更大量的数据积累,这可以以一些额外的延迟为代价提高服务器吞吐量。默认值为1(B)。

10.fetch.max.bytes

ConsumerConfig.FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";

服务器应为获取请求返回的最大数据量,这不是绝对的最大值,如果获取的第一个非空分区中的第一条消息大于此值,则仍将返回该消息以确保使用者能够取得进展。broker接受的最大消息大小是通过message.max.bytes(broker配置)或max.message.bytes(topic配置)定义的。请注意,消费者并行执行多个fetches。默认值为52428800(B),也就是50MB。

11.fetch.max.wait.ms

ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";

如果没有足够的数据来立即满足fetch.min.bytes给出的要求,服务器在响应获取请求之前阻塞的最大时间。默认值:500ms(毫秒)。

12.request.timeout.ms

ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";

配置控制客户端等待请求响应的最大时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者在重试次数用尽时失败请求。默认值为30*1000(毫秒)。文章来源地址https://www.toymoban.com/news/detail-449289.html

注意:以上参数是kafka1.0.2版本。

到了这里,关于kafka 消费者相关参数的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(31)
  • java:Kafka生产者推送数据与消费者接收数据(参数配置以及案例)

    bootstrap.servers :Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。 acks :Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认

    2024年02月16日
    浏览(32)
  • kafka消费者报错Offset commit ......it is likely that the consumer was kicked out of the group的解决

    2022年10月份接到一个小功能,对接kafka将数据写到数据库,开始的需求就是无脑批量insert,随着时间的推移,业务需求有变更,kafka的生产消息频次越来越高,到今年7月份为止就每秒会有几十条甚至上百条,然后消费消息的代码就报错: Caused by: org.apache.kafka.clients.consumer.Com

    2024年02月07日
    浏览(42)
  • Kafka中的enable-auto-commit和auto-commit-interval配置

    当前kafka的版本为2.8.11,Spring Boot的版本为2.7.6,在pom.xml中引入下述依赖: 提前说明:当前Kafka的使用是与Spring Boot做了整合,不是使用原生的Kafka,因此Kafka的某些功能Spring Boot是做了二次封装,使其更加符合于实际情况。  1、Kafka客户端自动提交offset Windosw环境下面使用下述

    2024年01月19日
    浏览(28)
  • 13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

    消费者API的核心类是 KafkaConsumer,它提供了如下常用方法: 下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。 根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步: 1、创建KafkaConsumer对象,创建

    2024年04月11日
    浏览(33)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(33)
  • kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一

    2024年01月22日
    浏览(53)
  • 10、Kafka ------ 消费者组 和 消费者实例,分区 和 消费者实例 之间的分配策略

    形象来说:你可以把主题内的多个分区当成多个子任务、多个子任务组成项目,每个消费者实例就相当于一个员工,假如你们 team 包含2个员工。 同理: 同一主题下,每个分区最多只会分给同一个组内的一个消费者实例 消费者以组的名义来订阅主题,前面的 kafka-console-consu

    2024年01月19日
    浏览(33)
  • Kafka消费者不消费数据

    背景: 工作往往是千篇一律,真正能学到点知识都是在上线后。使用Skywalking+Kafka+ES进行应用监控。 现象: 公司使用Skywalking在开发测试环境中Kafka顺利消费数据,到了UAT环境一开始还正常,后面接入了更多的应用后出现了问题:OAP服务正常但是ES里不再有数据。 排查: 通过

    2023年04月14日
    浏览(35)
  • Kafka-消费者组消费流程

    消费者向kafka集群发送消费请求,消费者客户端默认每次从kafka集群拉取50M数据,放到缓冲队列中,消费者从缓冲队列中每次拉取500条数据进行消费。   

    2024年02月12日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包