概要
kafka broker、consumer、和producer都有很多可配置的参数。本文主要总结日常开发中常用到的参数。其中producer端可以在org.apache.kafka.clients.producer.ProducerConfig
中找到配置项,consumer端可以在org.apache.kafka.clients.consumer.ConsumerConfig
中找到各配置项。
broker端参数
$KAFKA_HOME/config/server.properties文件中的配置。文章来源:https://www.toymoban.com/news/detail-696488.html
参数名称 | 描述 | 举例/默认值 |
---|---|---|
zookeeper.connect | 用于配置Kafka要连接的Zookeeper/集群的地址。 它的值是一个字符串,使用逗号分隔Zookeeper的多个地址 Zookeeper的单个地址是 host:port形式的,可以在最后添加Kafka在Zookeeper中的根节点路径 |
zookeeper.connect=node2:2181, node3:2181,node4:2181/myKafka |
listeners | 用于指定当前Broker向外发布服务的地址和端口。 | listeners=PLAINTEXT://0.0.0.0:9092 |
listener.security.protocol.map | 监听器名称和安全协议的映射配置 比如,可以将内外网隔离 |
listener.security.protocol.map=INTERNAL:SSL, EXTERNAL:SSL |
inter.broker.listener.name | 于配置broker之间通信使用的监听器名称,该名称必须在advertised.listeners列表中 | inter.broker.listener.name=EXTERNAL |
advertised.listeners | 将该地址发布到zookeeper供客户端使用 |
典型配置文章来源地址https://www.toymoban.com/news/detail-696488.html
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
listeners=INTERNAL://192.168.100.101:9092,EXTERNAL:node1:9093
inter.broker.listener.name=EXTERNAL
advertised.listeners=PLAINTEXT://node1:9092
参数名称 | 描述 | 举例/默认值 |
---|---|---|
broker.id | 用于唯一标记一个Kafka的Broker,它的值是一个任意integer值 | broker.id=0 |
log.dir | 定Kafka在磁盘上保存消息的日志片段的目录 |
producer端参数
参数名称 | 描述 | 举例/默认值 |
---|---|---|
bootstrap.servers | 生产者客户端与broker集群建立初始连接需要的broker地址列表 | node1:9092,node2:9092 |
acks | acks=0:生产者不等待broker的任何消息确认。只要将消息放到了socket的缓冲区,就认为消息已发送。不能保证服务器是否收到该消息,retries设置也不起作用,因为客户端不关心消息是否发送失败。客户端收到的消息偏移量永远是-1。 acks=1:leader将记录写到它本地日志,就响应客户端确认消息,而不等待follower副本的确认。如果leader确认了消息就宕机,则可能会丢失消息,因为follower副本可能还没来得及同步该消息。 acks=all:leader等待所有同步的副本确认该消息。保证了只要有一个同步副本存在,消息就不会丢失。这是最强的可用性保证。等价于acks=-1 |
默认值为1,字符串。可选值:[all, -1, 0, 1] |
compression.type | 生产者生成数据的压缩格式。默认是none(没有压缩)。允许的值:none,gzip,snappy和lz4。压缩是对整个消息批次来讲的。消息批的效率也影响压缩的比例。消息批越大,压缩效率越好。 | 默认是none |
retries | 设置该属性为一个大于1的值,将在消息发送失败的时候重新发送消息,允许重试,但是不设置max.in.flight.requests.per.connection为1,存在消息乱序的可能,因为如果两个批次发送到同一个分区,第一个失败了重试,第二个成功了,则第一个消息批在第二个消息批后 | 默认:0,可选值:[0,…,2147483647] |
retry.backoff.ms | 在向一个指定的主题分区重发消息的时候,重试之间的等待时间。避免了密集循环的重新发送请求 | long型值,默认100。可选值:[0,…] |
request.timeout.ms | 客户端等待请求响应的最大时长。如果服务端响应超时,则会重发请求,除非达到重试次数。该设置应该比 replica.lag.time.max.ms (a broker configuration)要大,以免在服务器延迟时间内重发消息 | int类型值,默认:30000ms |
batch.size | 当多个消息发送到同一个分区的时候,生产者尝试将多个记录作为一个批来处理。批处理提高了客户端和服务器的处理效率。 该配置项以字节为单位控制默认批的大小。 所有的批小于等于该值 发送给broker的请求将包含多个批次,每个分区一个 如果该值设置的比较小,会限制吞吐量(设置为0会完全禁用批处理)。如果设置的很大,又有一点浪费内存,因为Kafka会永远分配这么大的内存来参与到消息的批整合中 |
16384 |
client.id | 生产者发送请求的时候传递给broker的id字符串。 用于在broker的请求日志中追踪什么应用发送了什么消息。 一般该id是跟业务有关的字符串 |
|
buffer.memory | 生产者可以用来缓存等待发送到服务器的记录的总内存字节。如果记录的发送速度超过了将记录发送到服务器的速度,则生产者将阻塞 max.block.ms 的时间,此后它将引发异常。此设置应大致对应于生产者将使用的总内存 | long型数据。默认值:33554432 |
max.block.ms | 控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 阻塞的时长。当缓存满了或元数据不可用的时候,这些方法阻塞。在用户提供的序列化器和分区器的阻塞时间不计入 | long型值,默认:60000,可选值:[0,…] |
linger.ms | 一批消息发往broker的时机由两个参数控制,一个时batch.size,另一个时linger.ms,linger.ms表示这批消息的等待时间,消息不管有没达到batche.size的大小,达到了linger.ms的等待时间也会向broker端发送 | long型值,默认:0 |
max.request.size | 单个请求的最大字节数。该设置会限制单个请求中消息批的消息个数,以免单个请求发送太多的数据。服务器有自己的限制批大小的设置,与该配置可能不一样 | int类型值,默认1048576 |
max.in.flight.requests.per.connection | 单个连接上未确认请求的最大数量。达到这个数量,客户端阻塞。如果该值大于1,且存在失败的请求,在重试的时候消息顺序不能保证 | int类型值,默认5 |
consumer端参数
参数名称 | 描述 | 默认值 |
---|---|---|
bootstrap.servers | 向Kafka集群建立初始连接用到的host/port列表 | |
client.id | 当从服务器消费消息的时候向服务器发送的id字符串,在ip/port基础上,提供应用的逻辑名称,记录在服务端的请求日志中,用于追踪请求的源。 | |
group.id | 用于唯一标志当前消费者所属的消费组的字符串 | |
auto.offset.reset | earliest:自动重置偏移量到最早的偏移量 latest:自动重置偏移量为最新的偏移量 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常 anything:向消费者抛异常 |
默认值,latest |
enable.auto.commit | 是否自动提交位移 | true |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为true,则该值定义了消费者偏移量向Kafka提交的频率 | 5000ms |
fetch.min.bytes | 服务器对每个拉取消息的请求返回的数据量最小值。如果数据量达不到这个值,请求等待,以让更多的数据累积,达到这个值之后响应请求。默认设置是1个字节,表示只要有一个字节的数据,就立即响应请求,或者在没有数据的时候请求超时。将该值设置为大一点儿的数字,会让服务器等待稍微长一点儿的时间以累积数据。如此则可以提高服务器的吞吐量,代价是额外的延时间。 | 1字节 |
fetch.max.wait.ms | 如果服务器端的数据量达不到 fetch.min.bytes 的话,服务器端不能立即响应请求。该时间用于配置服务器端阻塞请求的最大时长 | |
session.timeout.ms | 当使用Kafka的消费组的时候,消费者周期性地向broker发送心跳数表明自己的存在。如果经过该超时时间还没有收到消费者的心跳,则broker将消费者从消费组移除,并启动再平衡。该值必须在broker配置 group.min.session.timeout.ms 和group.max.session.timeout.ms 之间。 | 45000ms |
heartbeat.interval.ms | 当使用消费组的时候,该条目指定消费者向消费者协调器发送心跳的时间间隔。心跳是为了确保消费者会话的活跃状态,同时在消费者加入或离开消费组的时候方便进行再平衡。该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的1/3。可以将其调整得更小,以控制正常重新平衡的预期时间。 | 3000ms |
max.poll.records | 一次调用poll()方法返回的记录最大数量 | 500 |
max.poll.interval.ms | 使用消费组的时候调用poll()方法的时间间隔。该条目指定了消费者调用poll()方法的最大时间间隔。如果在此时间内消费者没有调用poll()方法,则broker认为消费者失败,触发再平衡,将分区分配给消费组中其他消费者。 | 300000ms |
到了这里,关于Kafka常用参数的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!