Kafka中的max-poll-records和listener.concurrency配置

这篇具有很好参考价值的文章主要介绍了Kafka中的max-poll-records和listener.concurrency配置。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、max-poll-records是什么

max-poll-records是Kafka consumer的一个配置参数,表示consumer一次从Kafka broker中拉取的最大消息数目,默认值为500条。在Kafka中,一个消费者组可以有多个consumer实例,每个consumer实例负责消费一个或多个partition的消息,每个consumer实例一次从broker中可以拉取一个或多个消息。

max-poll-records参数的作用就是控制每次拉取消息的最大数目,以实现消费弱化和控制内存资源的需求。

2、max-poll-records解决的问题

避免一次性加载大量数据:

一次性拉取数量过大,会导致拉取消息时间过长,对broker和网络资源造成过度压力,同时consumer实例应用内存消耗过大,从而影响应用性能。如果要通过增加consumer实例数量或增加机器内存来解决该问题,则会增加成本;而通过控制每次拉取的消息数目,可以实现内存资源控制和应用性能优化。

更好地控制消息轮询的间隔时间:

当consumer实例消费消息的速度比broker生产消息的速度慢时,consumer会产生轮询时间间隔。如果轮询时间跨度过长,则会严重地延迟消息消费。而通过设置max-poll-records,可以控制consumer拉取消息的频率,进而控制消息消费的时间。

3、max-poll-records的最佳实践

max-poll-records的最佳实践共有下述三个核心思想:

3.1 根据机器内存和consumer实例数量调整参数

在设置max-poll-records参数时,应根据机器内存和实例数量来调整参数值,从而实现更好的性能和内存控制。如果消费数据量不大,可以设置较小的值,反之,如果消费数据量很大,则可以设置更大的值。

3.2 注意正确理解和使用max-poll-records

max-poll-records参数不是为了减少消息延迟而设置的,而是为了控制内存和消费弱化而设置的。在设置参数时应该明确这一点,从而更好地利用这个参数。

3.3 尽可能使用手动提交offset的方式

使用自动提交offset的方式,可能存在一些问题。如果一个消息批次在服务端已经被消费掉,但是由于客户端宕机或重启而没有及时提交offset,则可能导致消息重复消费的情况。因此, 建议在设置max-poll-records的同时,使用手动提交offset的方式。

4、案例实践

当前kafka的版本为2.8.11,Spring Boot的版本为2.7.6,在pom.xml中引入下述依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.11</version>
</dependency>

在yml配置文件进行如下配置:

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: 0
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false
      max-poll-records: 20
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    listener:
      ack-mode: manual_immediate
      type: batch
      concurrency: 2

以下为相关配置的说明: 

  • spring.kafka.listener.type的值为batch表示开启批量消费,默认值为single(单条)。
  • spring.kafka.consumer.enable-auto-commit的值为false表示关闭Kafka客户端的自动提交offSet。
  • spring.kafka.consumer.max-poll-records的值为20表示在开启了批量消费以后,每次从Kafka服务端拉取的数据最大条数为20。
  • spring.kafka.listener.ack-mode的值为manual_immediate表示关闭Spring的自动提交offSet,我们需要在代码中进行手动提交。spring.kafka.listener.ack-mode的取值有两个比较常见的选项值 MANUAL  MANUAL_IMMEDIATEMANUAL表示处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交。MANUAL_IMMEDIATE表示每次处理完业务,手动调用Acknowledgment.acknowledge()后立即提交。

在项目中创建一个生产者用于往主题 topic0 中投递消息,如下所示:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaProducer {

    // 自定义的主题名称
    public static final String TOPIC_NAME="topic0";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public String send(@RequestParam("msg")String msg) {
        log.info("准备发送消息为:{}",msg);
        // 1.发送消息
        ListenableFuture<SendResult<String,String>> future=kafkaTemplate.send(TOPIC_NAME,msg);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                // 2.发送失败的处理
                log.error("生产者 发送消息失败:"+throwable.getMessage());
            }
            @Override
            public void onSuccess(SendResult<String, String> stringObjectSendResult) {
                // 3.发送成功的处理
                log.info("生产者 发送消息成功:"+stringObjectSendResult.toString());
            }
        });
        return "接口调用成功";
    }
}

接着再在项目中创建一个消费者用于批量消费主题 topic0 中的消息,如下所示: 

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Optional;

@Slf4j
@Component
public class KafkaConsumer {

    // 自定义主题名称,这里要注意的是主题名称中不能包含特殊符号:“.”、“_”
    public static final String TOPIC_NAME = "topic0";

    @KafkaListener(topics = TOPIC_NAME, groupId = "ONE")
    public void topic_one(List<ConsumerRecord<?, ?>> records, Acknowledgment acknowledgment) {
        log.info("消费者组One批量消费的数据量 = {}", records == null ? 0 : records.size());
        for(ConsumerRecord<?, ?> record : records){
            Optional message = Optional.ofNullable(record.value());
            if (message.isPresent()) {
                //Object msg = message.get();
                //log.info("消费者组One消费了消息:Topic:" + TOPIC_NAME + ",Record:" + record + ",Message:" + msg);
            }
        }
        acknowledgment.acknowledge();
    }
}

启动整个项目,这时控制台中会打印下述信息:

ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [127.0.0.1:9092]
client.id = consumer-ONE-1
enable.auto.commit = false
group.id = ONE
max.poll.records = 20
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

紧接着使用Apipost的压测工具调用 /kafka/send?msg=1 接口往主题 topic0 中生产100条消息,稍微等了一会后可以看到在控制台中该消息已经被批量消费了,如下所示:

消费者组One批量消费的数据量 = 20
消费者组One批量消费的数据量 = 20
消费者组One批量消费的数据量 = 20
消费者组One批量消费的数据量 = 20
消费者组One批量消费的数据量 = 20

再次使用Apipost的压测工具调用 /kafka/send?msg=1 接口往主题topic0中生产100条消息,可以看到后面的消息都是即时批量拉取、即时批量消费,每次批量的拉取的数据量都没有超过最大限制数:

消费者组One批量消费的数据量 = 2
消费者组One批量消费的数据量 = 20
消费者组One批量消费的数据量 = 10
消费者组One批量消费的数据量 = 20
消费者组One批量消费的数据量 = 8
消费者组One批量消费的数据量 = 10
消费者组One批量消费的数据量 = 6
消费者组One批量消费的数据量 = 10
消费者组One批量消费的数据量 = 11
消费者组One批量消费的数据量 = 3

5、listener.concurrency

上述yml配置文件中 spring.kafka.listener.concurrency 的值为2,这个表示在代码中标记了@KafkaListener注解的方法处会启动两个消费者线程任务并发处理。 但是如果一个主题只有一个分区的话,消息只能被一个消费者组里面的一个消息者所消费,所以即使开了多个并发线程也没有用的。

一个消费者可以消费同一个topic的多个分区,但是一个分区不能被同一个组下的多个消费者消费。同一个组下有多个消费者并发消费同一个topic时,要注意设置的消费者并发个数一定要小于等于topic的分区数,不然会有空置的线程没有分区可以消费。

设置并发的时候根据分区数和消费者的个数来分配每个消费者消费几个分区,消费者可以消费一个或多个分区。例如两个分区的话,如果想增强消息的消费速度,在没有进行消费者服务的横向扩展时,可以考虑采用增加消费者的并发数量,将并发数量修改为2。

项目中总的消费者线程数量为: concurrency * 标记了@KafkaListener注解方法的数量(默认监听全部的partition)文章来源地址https://www.toymoban.com/news/detail-797974.html

  • 当concurrency < partition 的数量,会出现消费不均的情况,一个消费者的线程可能消费多个partition 的数据
  • 当concurrency = partition 的数量,最佳状态,一个消费者的线程消费一个 partition 的数据
  • 当concurrency > partition 的数量,会出现有的消费者的线程没有可消费的partition, 造成资源的浪费

到了这里,关于Kafka中的max-poll-records和listener.concurrency配置的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入理解KAFKA_ADVERTISED_LISTENERS

    (1)第一句话========= KAFKA_LISTENERS:负责绑定网卡 KAFKA_ADVERTISED_LISTENERS:负责发布外网地址,这个地址会发布到zookeeper中。 (2)第二句话======== 内网部署的程序,读的地址是KAFKA_LISTENERS写的。 外网部署的程序,读的地址是KAFKA_ADVERTISED_LISTENERS zookeeper中存的地址是KAFKA_ADVERTISED_LIST

    2024年02月02日
    浏览(29)
  • 【Kafka】【十七】消费者poll消息的细节与消费者心跳配置

    默认情况下,消费者⼀次会poll500条消息。 代码中设置了⻓轮询的时间是1000毫秒 意味着: 如果⼀次poll到500条,就直接执⾏for循环 如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500条,要么到1s 如果多次poll都没达到500条,且1秒时间到了,那么直接执

    2024年02月09日
    浏览(44)
  • JavaScript中的Concurrency并发:异步操作下的汉堡制作示例

    这篇文章想讲一下JavaScript中同步与异步操作在一个简单的示例中的应用。我们将以制作汉堡为例,展示如何使用同步方法、回调函数(callbacks)和Promise与async/await来实现该过程。 1. Get ingredients 获取原料(比如beef) 2. Cook the beef  烹饪牛肉 3. Get burger buns 获得面包片 4. Put th

    2024年02月02日
    浏览(39)
  • 使用Java 17中的record替代Lombok的部分功能

    在DD长期更新的Java新特性专栏中,已经介绍过Java 16中开始支持的新特性:record的使用。 之前只是做了介绍,但没有结合之前的编码习惯或规范来聊聊未来的应用变化。最近正好因为互相review一些合作伙伴的代码,产生了一些讨论话题,主要正针对于有了 record 之后,其实之前

    2024年02月02日
    浏览(41)
  • Kafka - TimeoutException: Expiring 1 record(s) for art-0:120001 ms has passed since batch creation

    报错如下: 这种情况,肯定要先看网络问题嘛 首先查看本机防火墙的配置 结果都是关闭的 (建议开放特定端口) 不关闭防火墙,但是建议本机防火墙开放特定端口,可以使用如下命令 (使用root账户) 比如 接着看看kafka中间件的配置, 问题就在这里 我并没有大改配置,具

    2024年02月04日
    浏览(43)
  • Kafka篇——Kafka消费者端常见配置,涵盖自动手动提交offset、poll消息细节、健康状态检查、新消费组消费offset规则以及指定分区等技术点配置,全面无死角,一篇文章拿下!

    一、自动提交offset 1、概念 Kafka中默认是自动提交offset。消费者在poll到消息后默认情况下,会自动向Broker的_consumer_offsets主题提交当前 主题-分区消费的偏移量 2、自动提交offset和手动提交offset流程图 3、在Java中实现配置 4、自动提交offset问题 自动提交会丢消息。因为如果消费

    2024年01月22日
    浏览(52)
  • JDK8-JDK17中的新特性(var类型推断、模式匹配、Record、密封类)

    新的语法结构,勾勒出了 Java 语法进化的一个趋势,将开发者从 复杂、繁琐 的低层次抽象中逐渐解放出来,以更高层次、更优雅的抽象,既 降低代码量 ,又避免意外编程错误的出现,进而提高代码质量和开发效率。 1.1 Java的REPL工具: jShell命令 JDK9的新特性 Java 终于拥有了

    2024年02月06日
    浏览(54)
  • 理解3ds max中的容器的概念

    在场景中创建一个容器 把这个容器保存为一个文件,在文件夹中可看到此容器文件,其大小为892KB,同时可看到生成一个同名的lock类型文件。 将场景中的某一个物体(面加多一点的)添加到容器中,容器文件的大小没有变化。 文件-重置,提示: 按不保存,提示: 按确定。

    2024年02月09日
    浏览(32)
  • 3d max软件中的缓存垃圾该如何清理?

    使用3d max建模到渲染操作,来回对效果图调整的次数过多时,就会出现一下看不到的垃圾缓存,影响保存的速度,影响效率! 对于这类的3d垃圾清理的有什么高效方法呢? 3dmax垃圾清理的常规操作如下: 1、打开文件,在max菜单中选择max锝擄絻锝掞綁锝愶綌 下拉菜单中选择侦

    2024年02月04日
    浏览(67)
  • 3ds max软件中的一些常用功能分享!

    3ds max软件有很多小伙伴反馈说,明明有很多3ds max教程资料。却不知道如何入门3dmax。 掌握3dmax基本功能是开始使用3dmax的基础之一,所以,小编带大家盘点一下3dmax常用操作。 3dmax常用功能介绍如下,快快跟着小编一起看起来。 1、物体的创建 一般来说,对象的创建是通过从

    2024年02月02日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包