Kafka:消费者参数配置

这篇具有很好参考价值的文章主要介绍了Kafka:消费者参数配置。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

maven配置

// 消费者
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.129:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

springboot配置类

@Configuration
public class KafkaConfig {
 
    // 配置全局admin
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers","192.168.25.129:9092");
        KafkaAdmin admin = new KafkaAdmin(configs);
        return admin;
    }
}

配置文件

# 用于建立初始连接的broker地址
spring.kafka.bootstrap-servers=192.168.25.129:9092
# producer用到的key和value的序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 默认的批处理记录数 16k
spring.kafka.producer.batch-size=16384
# 32MB的总发送缓存
spring.kafka.producer.buffer-memory=33554432
# consumer用到的key和value的反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# consumer的消费组id
spring.kafka.consumer.group-id=spring-kafka-consumer-group
# 是否自动提交消费者偏移量
spring.kafka.consumer.enable-auto-commit=true
# 每隔100ms向broker提交一次偏移量
spring.kafka.consumer.auto-commit-interval=100
# 如果该消费者的偏移量不存在,则自动设置为最早的偏移量
spring.kafka.consumer.auto-offset-reset=earliest

参数配置列表文章来源地址https://www.toymoban.com/news/detail-492598.html

属性 说明
bootstrap.servers 向Kafka集群建立初始连接用到的host/port列表。 客户端会使用这里列出的所有服务器进行集群其他服务器的发现,而不管 是否指定了哪个服务器用作引导。 这个列表仅影响用来发现集群所有服务器的初始主机。 字符串形式:host1:port1,host2:port2,... 由于这组服务器仅用于建立初始链接,然后发现集群中的所有服务器,因 此没有必要将集群中的所有地址写在这里。 一般最好两台,以防其中一台宕掉。
key.deserializer key的反序列化类,该类需要实现 org.apache.kafka.common.serialization.Deserializer 接口。
value.deserializer 实现了 org.apache.kafka.common .serialization.Deserializer 接口的反序列化器, 用于对消息的value进行反序列化。
client.id 当从服务器消费消息的时候向服务器发送的id字符串。在ip/port基础上 提供应用的逻辑名称,记录在服务端的请求日志中,用于追踪请求的源。
group.id 用于唯一标志当前消费者所属的消费组的字符串。 如果消费者使用组管理功能如subscribe(topic)或使用基于Kafka的偏移量 管理策略,该项必须设置。
auto.offset.reset 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被 删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量 latest:自动重置偏移量为最新的偏移量 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异 常 anything:向消费者抛异常
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率。
enable.auto.commit 如果设置为true,消费者会自动周期性地向服务器提交偏移量。
fetch.min.bytes 服务器对每个拉取消息的请求返回的数据量最小值。 如果数据量达不到这个值,请求等待,以让更多的数据累积, 达到这个值之后响应请求。 默认设置是1个字节,表示只要有一个字节的数据, 就立即响应请求,或者在没有数据的时候请求超时。 将该值设置为大一点儿的数字,会让服务器等待稍微 长一点儿的时间以累积数据。 如此则可以提高服务器的吞吐量,代价是额外的延迟时间。
fetch.max.wait.ms 如果服务器端的数据量达不到 fetch.min.bytes 的话, 服务器端不能立即响应请求。 该时间用于配置服务器端阻塞请求的最大时长。
fetch.max.bytes 服务器给单个拉取请求返回的最大数据量。 消费者批量拉取消息,如果第一个非空消息批次的值比该值大, 消息批也会返回,以让消费者可以接着进行。 即该配置并不是绝对的最大值。 broker可以接收的消息批最大值通过 message.max.bytes (broker配置) 或 max.message.bytes (主题配置)来指定。 需要注意的是,消费者一般会并发拉取请求。
connections.max.idle.ms 在这个时间之后关闭空闲的连接。
check.crcs 自动计算被消费的消息的CRC32校验值。 可以确保在传输过程中或磁盘存储过程中消息没有被破坏。 它会增加额外的负载,在追求极致性能的场合禁用。
exclude.internal.topics 是否内部主题应该暴露给消费者。如果该条目设置为true, 则只能先订阅再拉取。
isolation.level 控制如何读取事务消息。 如果设置了 read_committed ,消费者的poll()方法只会 返回已经提交的事务消息。 如果设置了 read_uncommitted (默认值), 消费者的poll方法返回所有的消息,即使是已经取消的事务消息。 非事务消息以上两种情况都返回。 消息总是以偏移量的顺序返回。 read_committed 只能返回到达LSO的消息。 在LSO之后出现的消息只能等待相关的事务提交之后才能看到。 结果, read_committed 模式,如果有为提交的事务, 消费者不能读取到直到HW的消息。 read_committed 的seekToEnd方法返回LSO。
heartbeat.interval.ms 当使用消费组的时候,该条目指定消费者向消费者协调器 发送心跳的时间间隔。 心跳是为了确保消费者会话的活跃状态, 同时在消费者加入或离开消费组的时候方便进行再平衡。 该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。 可以将其调整得更小,以控制正常重新平衡的预期时间。
session.timeout.ms 当使用Kafka的消费组的时候,消费者周期性地向broker发送心跳数 表明自己的存在。 如果经过该超时时间还没有收到消费者的心跳, 则broker将消费者从消费组移除,并启动再平衡。 该值必须在broker配置 group.min.session.timeout.ms 和 group.max.session.timeout.ms 之间。
max.poll.records 一次调用poll()方法返回的记录最大数量。
max.poll.interval.ms 使用消费组的时候调用poll()方法的时间间隔。 该条目指定了消费者调用poll()方法的最大时间间隔。 如果在此时间内消费者没有调用poll()方法, 则broker认为消费者失败,触发再平衡, 将分区分配给消费组中其他消费者。
max.partition.fetch.bytes 对每个分区,服务器返回的最大数量。消费者按批次拉取数据。 如果非空分区的第一个记录大于这个值,批处理依然可以返回, 以保证消费者可以进行下去。 broker接收批的大小由 message.max.bytes (broker参数)或 max.message.bytes (主题参数)指定。 fetch.max.bytes 用于限制消费者单次请求的数据量。
send.buffer.bytes 用于TCP发送数据时使用的缓冲大小(SO_SNDBUF), -1表示使用OS默认的缓冲区大小。
retry.backoff.ms 在发生失败的时候如果需要重试,则该配置表示客户端 等待多长时间再发起重试。 该时间的存在避免了密集循环。
request.timeout.ms 客户端等待服务端响应的最大时间。如果该时间超时, 则客户端要么重新发起请求,要么如果重试耗尽,请求失败。
reconnect.backoff.ms 重新连接主机的等待时间。避免了重连的密集循环。 该等待时间应用于该客户端到broker的所有连接。
reconnect.backoff.max.ms 重新连接到反复连接失败的broker时要等待的最长时间 (以毫秒为单位)。 如果提供此选项,则对于每个连续的连接失败, 每台主机的退避将成倍增加,直至达到此最大值。 在计算退避增量之后,添加20%的随机抖动以避免连接风暴。
receive.buffer.bytes TCP连接接收数据的缓存(SO_RCVBUF)。 -1表示使用操作系统的默认值。
partition.assignment.strategy 当使用消费组的时候,分区分配策略的类名。
metrics.sample.window.ms 计算指标样本的时间窗口。
metrics.recording.level 指标的最高记录级别。
metrics.num.samples 用于计算指标而维护的样本数量
interceptor.classes 拦截器类的列表。默认没有拦截器 拦截器是消费者的拦截器,该拦截器需要实现 org.apache.kafka.clients.consumer .ConsumerInterceptor 接口。 拦截器可用于对消费者接收到的消息进行拦截处理。

到了这里,关于Kafka:消费者参数配置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

    kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一

    2024年01月22日
    浏览(66)
  • 【Kafka】【十七】消费者poll消息的细节与消费者心跳配置

    默认情况下,消费者⼀次会poll500条消息。 代码中设置了⻓轮询的时间是1000毫秒 意味着: 如果⼀次poll到500条,就直接执⾏for循环 如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500条,要么到1s 如果多次poll都没达到500条,且1秒时间到了,那么直接执

    2024年02月09日
    浏览(14)
  • 关于kafka消费者超时配置

    在Kafka中,消费者超时配置是指消费者在等待服务器响应时的超时时间。如果消费者在超时时间内未收到服务器的响应,它将重新发起请求或执行其他逻辑。 以下是关于Kafka消费者超时配置的一些常见选项: session.timeout.ms :该配置定义了消费者与Kafka集群之间的会话超时时间

    2024年02月16日
    浏览(9)
  • Kafka消费者常用超时时间配置

    https://blog.csdn.net/BHSZZY/article/details/126757295 //心跳超时时间(session超时时间)增加成25秒(之前项目设置了15秒) spring.kafka.properties.session.timeout.ms = 25000 //每次拉取的消息减少为20(之前是默认值500) spring.kafka.consumer.max-poll-records=20 //消息消费超时时间增加为10分钟 spring.kafka.p

    2024年02月03日
    浏览(12)
  • Kafka系列——详解创建Kafka消费者及相关配置

    参考自kafka系列文章——消费者创建与配置 在读取消息之前,需要先创建一个 KafkaConsumer 对象。 创建 KafkaConsumer 对象与创建 KafkaProducer 对象非常相似——把想要传给消费者的属性放在 Properties 对象里,后面深入讨论所有属性。这里我们只需要使用 3 个必要的属性: bootstrap.

    2024年02月09日
    浏览(8)
  • kafka生产者和消费者配置介绍

    每个kafka broker中配置文件 server.properties 默认必须配置的属性如下: **bootstrap.servers** - 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 **key

    2024年02月12日
    浏览(12)
  • 笔记:配置多个kafka生产者和消费者

    如果只有一个kafka,那么使用自带的KafkaAutoConfiguration配置类即可,对应已有属性类KafkaProperties,属性前缀为spring.kafka.xxx; 本文记录配置多个kafka的情况,即在KafkaAutoConfiguration的基础上,自定义额外的kafka生产者和消费者。 适用场景:需要消费来源于不同kafka的消息、需要在不

    2024年02月15日
    浏览(18)
  • kafka配置大全broker、topic、生产者和消费者等配置介绍

    每个kafka broker中配置文件 server.properties 默认必须配置的属性如下: **bootstrap.servers** - 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 **key

    2024年02月05日
    浏览(16)
  • Kafka-Java四:Spring配置Kafka消费者提交Offset的策略

    Kafka消费者提交Offset的策略有 自动提交Offset: 消费者将消息拉取下来以后未被消费者消费前,直接自动提交offset。 自动提交可能丢失数据,比如消息在被消费者消费前已经提交了offset,有可能消息拉取下来以后,消费者挂了 手动提交Offset 消费者在消费消息时/后,再提交o

    2024年02月08日
    浏览(8)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包