Kafka重要生产参数配置建议

这篇具有很好参考价值的文章主要介绍了Kafka重要生产参数配置建议。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、消费者参数配置

1. max.poll.records

单次poll()的调用可返回的最大消息总数,默认是500条

循环拉取并放入List集合就返回,可以看出这个取值的大小,将会影响一次poll()所需消耗的时间。

private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
    // Error when fetching the next record before deserialization.
    if (corruptLastRecord)
        throw new KafkaException("Received exception when fetching the next record from " + partition
                                     + ". If needed, please seek past the record to "
                                     + "continue consumption.", cachedRecordException);
    if (isConsumed)
        return Collections.emptyList();
    List<ConsumerRecord<K, V>> records = new ArrayList<>();
    try {
        for (int i = 0; i < maxRecords; i++) {
            // Only move to next record if there was no exception in the last fetch. Otherwise we should
            // use the last record to do deserialization again.
            if (cachedRecordException == null) {
                corruptLastRecord = true;
                lastRecord = nextFetchedRecord();
                corruptLastRecord = false;
            }
            if (lastRecord == null)
                break;
            records.add(parseRecord(partition, currentBatch, lastRecord));
            recordsRead++;
            bytesRead += lastRecord.sizeInBytes();
            nextFetchOffset = lastRecord.offset() + 1;
            // In some cases, the deserialization may have thrown an exception and the retry may succeed,
            // we allow user to move forward in this case.
            cachedRecordException = null;
        }
    } catch (SerializationException se) {
        cachedRecordException = se;
        if (records.isEmpty())
            throw se;
    } catch (KafkaException e) {
        cachedRecordException = e;
        if (records.isEmpty())
            throw new KafkaException("Received exception when fetching the next record from " + partition
                                         + ". If needed, please seek past the record to "
                                         + "continue consumption.", e);
    }
    return records;
}

2. max.poll.interval.ms

这个参数就规定了,当调用poll()之后,如果在max.poll.interval.ms指定的时间内未消费完消息,也就是未再调用poll()方法,则Consumer会主动发起离开组的请求,从而产生Rebalance。

因此max.poll.interval.ms参数是会对Rebalance产生影响的,默认是5分钟,可根据实际消费能力适当调整。

3. session.timeout.ms

这个参数是用于检测当多长时间内没有收到心跳,就认为其是有故障的,从而将其中Group中移除,产生Reblance,需要注意,该值必须在broker配置group.min.session.timeout.msgroup.max.session.timeout.ms的范围内。

显然session.timeout.ms参数也会对Rebalance产生影响,其默认值是10秒

4. heartbeat.interval.ms

Consumer的心跳频率,设置的越小频率就越高,对带宽的消耗也就越大,相反则不能快速发现实例是否已经“挂”了,通常情况下建议heartbeat.interval.ms设置为session.timeout.ms的三分之一,默认是3秒,如果session.timeout.ms有调整,记得heartbeat.interval.ms也需要一起调整。

5. enable.auto.commit

这个参数表示Consumer会自动定期的提交位移,默认为自动提交,自动提交虽然方便,但如果控制不好,很容易造成消息丢失、或者消息重复,所以一般建议改为手动提交。

6. auto.commit.interval.ms

结合enable.auto.commit开启自动提交,auto.commit.interval.ms就用来控制自动提交的频率,默认是5秒

if (autoCommitEnabled)
    this.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;

private void maybeAutoCommitOffsetsAsync(long now) {
    if (autoCommitEnabled) {
        if (coordinatorUnknown()) {
            this.nextAutoCommitDeadline = now + retryBackoffMs;
        } else if (now >= nextAutoCommitDeadline) {
            this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
            doAutoCommitOffsetsAsync();
        }
    }
}

7. fetch.min.bytes

这是一个调优相关的参数,默认为1字节,表示如果请求的数据量小于1字节,broker就是攒一攒,等足够1字节了再一起返回给Consumer,这个值建议可以适当调大一点,以提高服务的吞吐量。

8. fetch.max.wait.ms

如果在fetch.max.wait.ms指定的时间内,数据量依然没有达到fetch.min.bytes所设置的值,那broker也不会再等了,将直接返回数据给Consumer,此值默认为500ms

二、生产者参数配置

1. batch.size

当多条消息被发送到同一个分区时,生产者会尝试把多条消息变成批量发送。这有助于提高客户端和服务器的性能。此配置以字节为单位设置默认批处理大小。如果消息大于此配置的大小,将直接发送。发送到broker的请求将包含多个批处理,每个分区一个批处理,其中包含可发送的数据。
此参数默认值为:16KB
如果此参数值设置的太小,可能会降低吞吐量(批量大小为零将完全禁用批处理)。
如果此参数设置的太大,可能会更浪费内存,并增加消息发送的延迟时间。

2. linger.ms

这个参数一般会配合batch.size一起使用,可以通过设置linger.ms的值来表示,如果消息的大小一直达不到batch.size设置的值,那么等待多久后任然允许发送消息,默认是不等待,即消息到来就发送。
当我们发送的消息都比较小的时候,可以通过设置linger.ms来减少请求的次数,批次中累积更多的消息后再发送,提高了吞吐量,减少了IO请求。
如果设置的太大,则消息会被延迟更长的时间发送。

3. max.request.size

设置请求消息的最大大小,避免发送大量的请求,限制了单条消息的size与批次消息的size,如果改变此值,需要注意服务器也需要进行相应设置,因为服务器也有接收消息的大小限制。

此参数默认值为:1M

设置大小为10M,发送2M大小的消息

properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024*1024*10);
ProducerRecord<String, String> record = new ProducerRecord("test_topic", 1,"1", Arrays.toString(new byte[1024*1024*2]));

超过了服务器的限制

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
	at cn.enjoyedu.hellokafka.HelloKafkaProducer.lambda$main$0(HelloKafkaProducer.java:30)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

设置大小为1M,发送2M大小的消息,报错超过max.request.size限制。

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 6291545 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
	at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1269)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:933)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:743)
	at cn.enjoyedu.hellokafka.HelloKafkaProducer.lambda$main$0(HelloKafkaProducer.java:27)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 6291545 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

4. retries

生产者发送时如果遇到的是可重试的异常时,则可进行发送的重试,此参数规定了重试的次数,默认值为Integer.MAX_VALUE
需要注意如果max.in.flight.requests.per.connection设置大于1,将有可能造成同一个分区内的消息顺序颠倒,因为如果两个批被发送到一个分区,第一批失败并重试,但第二批成功,那么第二批中的消息就有可能先出现。

5. max.in.flight.requests.per.connection

一个消息发送后在得到服务端响应之前,生产者还可以发送的消息条数,配合retries使用,可以保证消息的顺序性,假设有两条消息A、B,A先发送但失败了在执行重试时,B发送且成功了,之后A也重试成功了,此时A、B消息顺序就反了,如果将此参数设置为1,则可以保证A在重试时,B消息无法进行发送,必须等A收到broker响应后B才能发送,设置较高可以提升吞吐量,但会占用更多的内存,此参数值默认是5条

6. acks

生产者在确认请求完成之前要求leader已收到的确认数。这控制了发送的消息的持久性。

这个参数一共有3个值,默认为1

0:生产者只要把消息发送出去即可,不用等待broker的处理结果,消息将立即添加到socket buffer并被视为已发送。在这种情况下,无法保证服务器已收到消息,并且retries配置将不会生效(因为客户端通常不会知道任何故障)。为每条消息返回的偏移量将始终设置为-1。
设置为0,吞吐量最高,同样消息的丢失率也最高。
1:生成者需要等分区leader将消息写入成功后才认为此消息发送成功,兼顾了吞吐量和消息丢失的问题,但是同样有消息丢失的风险,比如当leader写入成功后突然挂了,其他分区跟随者并为能够将此消息同步,则此消息丢失。
all(等同于-1):生产者会等待所有的副本都写入成功后才认为此消息发送成功,只要至少有一个同步副本保持活跃状态,消息就不会丢失,这是最安全的保障,是吞吐量最低的。

7. buffer.memory

生产者可用于缓冲等待发送到服务器的记录的内存总字节数,如果客户端send的速度大于发送到broker的速度,且积压的消息大于这个设置的值,就会造成send阻塞,阻塞时间为max.block.ms设置的值,如果超过时间就抛出异常。

8. max.block.ms

当执行KafkaProducer.send() 或KafkaProducer.partitionsFor()时阻塞等待的时间,之所以会阻塞时因为可能buffer满了或者获取元数据异常,那么超过这个时间就会抛出异常,改值默认是60秒

指定把消息发送到一个不存在的分区,模拟获取元数据异常,设置阻塞时间为1秒,报错信息如下。

properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic test_topic not present in metadata after 1000 ms.
	at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1269)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:933)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:743)
	at cn.enjoyedu.hellokafka.HelloKafkaProducer.lambda$main$0(HelloKafkaProducer.java:26)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic test_topic not present in metadata after 1000 ms.

三、生产者、消费者通用参数配置

1. send.buffer.bytes和receive.buffer.bytes

send.buffer.bytes和receive.buffer.bytes分别指定了发送数据和接收数据时要使用的TCP发送缓冲区(SO_SNDBUF)和接收缓冲区(SO_RCVBUF)的大小,send.buffer.bytes默认是128KB,receive.buffer.bytes默认是32KB,如果指定为-1,则表示使用操作系统默认大小。

2. request.timeout.ms

配置控制客户端等待请求响应的最长时间。如果在超时时间过去之前未收到响应,则客户端将在必要时重新发送请求,或者在重试次数用尽时使请求失败,默认值为:30秒

3. retry.backoff.ms

主要用来控制请求发送异常后retries的请求频率,默认是100ms

4. metadata.max.age.ms

Kafka客户端定期刷新元数据的频率,默认是5分钟

四、Broker参数配置

1. auto.create.topics.enable

是否允许自动创建topic,默认为自动创建,一般建议设置为不允许自动创建,否则可能会生成很多无效的Topic,造成资源浪费,也增加了运维成本。

2. unclean.leader.election.enable

这个参数表明了什么样的副本才有资格竞争Leader,如果设置成false,则表示对于落后太多的副本是没有资格竞选Leader的,这样做的后果是可能永远选不出Leader来了,如果是true,那么则有可能选出一个差很多数据的副本成为Leader,从而造成了数据丢失,此值默认是false,建议也设置成false。

3. replica.lag.time.max.ms

这个参数就定义了上面副本落后的标准,其表示Follower副本能够落后Leader副本的最长时间间隔,默认值为10秒,只要不超过10秒,就不算是落后的副本。

4. auto.leader.rebalance.enable

如果把auto.leader.rebalance.enable设置为true,则Kafka会定期的对Topic的分区进行Leader选举,他很有可能会莫名其妙的把原本好好的Leader换掉,此参数值默认为true,一般建议改为false。

5. message.max.bytes

这个参数表示Broker能够处理的最大消息大小,默认为1M,建议适当调大一些,一般这个参数还会结合max.message.bytesreplica.fetch.max.bytes以及Consumer端的fetch.message.max.bytes一起使用。

6. replication.factor

这个参数用来表示分区的副本数,建议大于等于3,确保Kafka的高可用性,默认是1

7. min.insync.replicas

这个参数定义了消息至少要被写入多少个副本才算是“已提交”,默认是1,建议设置成大于1。
注意:min.insync.replicas只有在ack设置成all(-1)时才会生效。

8. num.io.threads

表示每台Broker启动后自动创建的I/O线程数量,I/O线程是真正处理请求的线程,默认是8个

9. num.network.threads

表示每台Broker启动时专门用于从网络接收请求并向网络发送响应的线程数量,默认是3个

Kafka会通过使用网络线程来专门接收客户端的请求与和发送响应,但是当收到请求之后网络线程并不是真正的由自己来处理,而是丢到一个共享请求队列中,此时就会由另一个线程池中的线程来处理,也就是I/O线程,它会专门负责从共享队列中取出请求,执行真正的处理。文章来源地址https://www.toymoban.com/news/detail-407642.html

到了这里,关于Kafka重要生产参数配置建议的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka配置大全broker、topic、生产者和消费者等配置介绍

    每个kafka broker中配置文件 server.properties 默认必须配置的属性如下: **bootstrap.servers** - 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 **key

    2024年02月05日
    浏览(38)
  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(31)
  • Kafka重要生产参数配置建议

    单次poll()的调用可返回的最大消息总数, 默认是500条 循环拉取并放入List集合就返回,可以看出这个取值的大小,将会影响一次poll()所需消耗的时间。 这个参数就规定了,当调用poll()之后,如果在 max.poll.interval.ms 指定的时间内未消费完消息,也就是未再调用poll()方法,则C

    2023年04月09日
    浏览(63)
  • Kafka 之生产者与消费者基础知识:基本配置、拦截器、序列化、分区器

    kafaf集群地址列表:理论上写一个节点地址,就相当于绑定了整个kafka集群了,但是建议多写几个,如果只写一个,万一宕机就麻烦了 kafka消息的key和value要指定序列化方法 kafka对应的生产者id 使用java代码表示则为以下代码:  可使用 retries 参数 进行设置,同时要注意记住两

    2024年02月05日
    浏览(46)
  • kafka生产者消费者练习

    需求:写一个生产者,不断的去生产用户行为数据,写入到kafka的一个topic中 生产的数据格式: 造数据 {“guid”:1,“eventId”:“pageview”,“timestamp”:1637868346789} isNew = 1 {“guid”:1,“eventId”:“addcard”,“timestamp”:1637868347625} isNew = 0 {“guid”:2,“eventId”:“collect”,“timestamp”

    2024年02月08日
    浏览(34)
  • Kafka系列之:Kafka生产者和消费者

    batch.size:只有数据积累到batch.size之后,sender才会发送数据,默认16K。 linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。 0:生产者发送过来的数据,不需要等数据罗盘应答。 1:生产者发送过来的

    2023年04月09日
    浏览(33)
  • kafka消费者详解,根据实际生产解决问题

    1.首先kafka每创建一个消费者就是一个消费者组,必须指定groupip 2.两个消费者组之间不相互影响,消费同一个主题的同一个分区,两个消费者组不相互影响,各自记录自己的offset 3.在开发中如果没有指定每个消费者去消费特定的分区,那么kafka默认是按照roundRobin轮询的方式分

    2024年02月10日
    浏览(38)
  • Kafka生产者与消费者api示例

      一个正常的生产逻辑需要具备以下几个步骤 配置生产者参数及创建相应的生产者实例 构建待发送的消息 发送消息 关闭生产者实例 采用默认分区方式将消息散列的发送到各个分区当中    对于properties配置的第二种写法,相对来说不会出错,简单举例:   1.kafka的生产者可

    2024年02月07日
    浏览(41)
  • Java轻松使用Kafka生产者,消费者

    Java轻松使用Kafka生产者,消费者 一、环境说明 项目中需要下面的依赖: ( 版本自定义 ) 2. yml配置文件设置 1. 简单生产者的书写: 1. 简单消费者的书写:   注:多消费者时,需要对应kafka中配置的分区;多少的Partition就有多少个消费者,以免资源浪费

    2024年02月15日
    浏览(43)
  • kafka生产者和消费者(python版)

    生产者 消费者 消费者中的组名主要用户针对主题的偏移量进行更改,也涉及到主题中分区的问题, kafka工具类 此工具类基本上拿过去就可以用 疑问 当消费者链接kafka时发现topic没有未读的消息怎样退出呢,默认是在一直等待,但是我期望没有要读的消息的时候直接退出即可

    2024年02月16日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包