KafKa 3.x(二、Broker,消费者)

这篇具有很好参考价值的文章主要介绍了KafKa 3.x(二、Broker,消费者)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

4. Kafka Broker

4.1 kafka Broker工作流程

4.1.1 Zoopkeeper存储的Kafka信息

  1. 启动Zookeeper客户端
  2. 通过ls命令查看kafka相关信息
    KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式
    在Zookeeper的服务端存储的Kafka相关信息
  1. /kafka/brokers/ids [0,1,2] 记录那些服务器
  2. /kafka/brokers/topics/first/partitions/0/state {“leader”:1,“isr”:[1,0,2]} 记录谁是leader,有哪些服务器可用
  3. /kafka/controller {“brokerid”:0} 辅助选择Leader
    可借助prettyZoo可视化工具查看具体信息:
    KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式

4.1.2 Kafka Broker总体工作流程

KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式

4.2 生产经验-节点服役与退役

4.2.1 服务新节点

  1. 新节点准备
  • 关闭一台旧服务器,执行克隆操作
  • 开启新服务器的kafka,并i需改IP地址
  • 在新服务器上修改主机名。
  • 重启新旧服务器上的ksfka
  • 修改新服务器kafka的broker.id
  • 删除新服务器上的logs和datas文件
  • 启动旧服务器上kafka集群
  • 单独启动新服务器的kafka
  1. 执行负载均衡操作
  • 创建一个要均衡的主题
  • 生成一个负载均衡的计划
  • 创建副本存储计划(所有副本存储在broker0,broker1,broker2,broker3中)。
  • 执行副本计划
  • 验证

4.2.2 退役旧节点

  1. 执行负载均衡操作:先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。
  • 创建一个要均衡的主题
  • 创建执行计划
  • 创建副本存储计划(所有副本存储在broker0,broker1,broker2中)。
  • 关闭新的kafka服务

4.3 Kafka副本

4.3.1 副本基本信息

  1. Kafka副本作用:提高数据可靠性
  2. Kafka默认副本1个,生成环境一般配置2个,保证数据可靠性;多副本会增加磁盘空间,网络速率,降低效率
  3. Kafka副本分为:Leader和Follower。生产者只会把数据发往Leader,然后同步给Follower
  4. Kafka分区中所有副本统称AR(Assigned Repllicas)
    AR = ISR + OSR
    ISR:表示和leader保持同步的follower集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR,由replicas.lag.time.max.ms设置,默认30s。Leader发送故障,就会从ISR中选举新Leader。
    OSR:表示Follower与Leader副本同步时,延迟过多的副本。

4.3.2 Leader选举流程

Kafka集群中有一个broker的Controller会被选举未Controller Leader,负责管理集群broker的上下限,所有topic的分区副本分配Leader选举等工作。
Controller的信息同步工作是依赖于Zookeeper的。

4.3.3 Leader和Follower故障处理细节

LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset+1
HW(High Watermark):所有副本中最小的LEO

  1. Follower故障
    KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式
  • Follower故障发生后会被临时踢出ISR
  • 这个期间LeaderheFollower继续接收数据
  • 待该Follower恢复,Follower会读取本地磁盘记录上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。
  • 等该Follower的LEO>=该Partition的HW,即Follower追上Leader之后,就可重新加入ISR了。
  1. Leader故障
  • Leader发生故障后,会从ISR中选出新的Leader
  • 为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。
    注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或不重复

4.3.4 分区副本配置

如果kafka服务器只有4个节点,那么设置kafka的分区数>服务器数,在kafka中如何分配?

  1. 创建16个分区3个副本:/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic second --partitions 16 -replication -factor 3
  2. 查看分区和副本:bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic second

4.3.5 生成经验-手动调整分区副本

在生产环境中,每台服务器配置和性能不一样,但是kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器压力大。所以需要手动调整分区副本存储。
需求:创建一个新的topic,4个分区,两个副本,名称为three。将该topic的所有副本都存储到broker0和broker1两台服务器上。

  1. 创建新的topic
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --parititions4 --replication-factor 2 --topic three
  2. 查看分区副本存储情况
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic three
  3. 创建副本存储计划(所有副本都指定存储在broker0,broker1中)
    vim increase-replication-factor.json
    {
    “version”:1
    “partitions”:[{“topic”:“three”,“partition”:0,“replicas”:[0,1]},
    {“topic”:“three”,“partition”:1,“replicas”:[0,1]},
    {“topic”:“three”,“partition”:2,“replicas”:[1,0]},
    {“topic”:“three”,“partition”:3,“replicas”:[1,0]},]
    }
  4. 执行副本存储计划
    bin/kafka-reassign-partitions,sh --bootstrap-server localhost:9092 --reassignment-json-fileincrease-replication-factor.json --execute
  5. 验证副本存储计划
    bin/kafka-reassign-partitions,sh --bootstrap-server localhost:9092 --reassignment-json-fileincrease-replication-factor.json --verify
  6. 查看分区副本存储情况
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic three

4.3.6 生成经验-Leader Partition 负载均衡

如果有broker宕机,会导致LP集中在几台broker上,其他宕机broker重启后都是follower partition,读写请求低,造成集群负载不平衡。
KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式

  • auto.Leader.rebalance.enable,默认true。 自动LeaderPartition平衡。
  • leader.imbalance.per.broker.percentage,默认10%。每个broker允许的不平衡的leader的比率。如果每个broker超值,控制器会触发leader的平衡。
  • leader.imbalance.check.interval.seconds,默认300s,检查leader负载是否平衡的间隔时间。
    如下,假设集群只有一个主题:
    Topic:xuyu partition:0 Leader:0 Replicas:3,0,2,1 Isr:3,0,2,1
    Topic:xuyu partition:1 Leader:1 Replicas:1,2,3,0 Isr:1,2,3,0
    Topic:xuyu partition:2 Leader:2 Replicas:0,3,1,2 Isr:0,3,1,2
    Topic:xuyu partition:3 Leader:3 Replicas:2,1,0,3 Isr:2,1,0,3
    针对broker0节点,分区2的AR优先副本是0节点,但是0节点却不是leader,所以不平衡数+1,AR副本总数是4,所以broker0节点不平衡率为1/4>10%,需要再平衡。
    broker1的不平衡为0,不需要再平衡。

4.3.7 生成经验-增加副本因子

生产环境中,由于某个主题重要等级需要提升,需要增加副本,副本数需要先指定计划,然后根据计划执行。

  1. 创建topic
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 3 --replication-factor 1 --topic four
  2. 手动增加副本存储
    创建副本存储计划
    vim increase-replication-factor.json
    {“version”:1,“partitions”:[{“topic”:“four”,“partition”:0,“replicas”:[0,1,2]},{“topic”:“four”,“partition”:1,“replicas”:[0,1,2]},{“topic”:“four”,“partition”:2,“replicas”:[0,1,2]}]}
    执行:
    bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file-increase-replication-factor.json --execute

4.4 Kafka文件存储

4.4.1 文件存储机制

  1. Topic数据的存储机制
    topic是逻辑上的概念,partition是物理上的概念,每个partition对应一个log文件,该文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低,kafka采用分片和索引机制,将每个partition分为多个segment。每个segment包括:“index文件、log文件、timeindex文件等”。这些文件在一个文件下,该文件命名规则:topic名+分区序号。
    KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式
  2. 思考:topic数据到底存储在什么位置?
  • 启动生产者发送消息
    bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first
  • 查看localhost的/opt/module/kafka/datas/first路径上的文件,windows看log文件
    KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式
  • 直接查看log日志,是乱码
  • 通过工具查看index和log信息
    kafka-run-classs.sh kafka.tools.DumpLogSegments --files ./000000.index
  1. index文件和log文件详解
    注意:
  • index为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引。参数log.index.interval.bytes默认4kb。
  • index文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大,因此能将offset的值控制在固定大小。
    KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式

4.4.2 文件清楚策略

kafka中日志默认保存7天,可通过一下参数调整:log.retenion.hours(保存时间),log.retention.check.interval.ms(检测时间)。如果超时,kafka会提供delete和compact两种删除方式。

  1. delete:
  • log.cleanup.policy=delete所有数据启用
    基于时间:默认打开,以segment中所有记录中最大时间戳作为该文件时间戳。
    基于大小:默认关闭,超过设置大小,删除最早的segment,log.retention.bytes。默认-1,表示无穷大。
  1. compact日志压缩:对于相同key的不同value值,只保留最后一个版本。
  • log.cleanup.policy=compact所有数据启用压缩策略。
    KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式
    压缩后offset不连续,需要拿到大于一个当前offset的对应消息进行消费。
    注意:此种只适合特殊场景,比如key为id,val为值,通过压缩,整个消息集群种保存了所有用户最新的资料。

4.5 Kafka高效读写数据

  1. kafka本身是分布式集群,可采用分区技术,并行度高
  2. 读数据采用稀疏索引,可快读定位要消费的数据
  3. 顺序写磁盘:kafka的producer生产数据,要写入到log文件中,写的过程是追加到文件末端,为顺序写,
  4. 页缓存+零拷贝
    零拷贝:kafka的数据加工处理操作交由kafka生产者和kafka消费者处理。kafkaBroker应用层不关心存储的数据,所以不走应用层,效率高。
    PageCache页缓存:kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache。当读时,从PageCacha中查找,找不到再去磁盘。
    KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式

5. Kafka消费者

5.1 Kafka消费方式

pull拉模式(Kafka采用):consumer采用从broker中主动拉取数据,因为每个消费者处理能力不同。
push推模式:由于由broker决定消息频率,消费者难适应,处理不足。
pull缺点:如果没有数,消费者进入循环空数据。

5.2 Kafka消费者工作流程

5.2.1 消费者总体工作流程

KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式

5.2.2 消费者组原理

消费者组Consumer Group(GC):消费者组由多个consumer组成。组中消费者groupid相同。

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
  • 消费者组之间互不影响。所有消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
    消费者组初始化流程:
  1. coordinator:辅助实现消费者组的初始化和分区分配
    coordinator节点选择=groupid的hashcode值%50(__consumer_offsets的分区数量)。eg:groupid的哈希为1,1%50=1,那么其主题1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有消费者提交offset的时候就往这个分区去提交offset。
    KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式
    消费者组详细消费流程:
    KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式

5.3.1 独立消费者案例(订阅主题)

  1. 需求:创建独立消费者,消费first主题数据。
    注意:在消费者API中必须配置消费者组id,命令行启动消费者不填写消费者组id会被自动填写随机消费者组id
  2. 实现步骤
public class CustomConsumer {
    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //1 创建一个消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        //2 定义消费主题first
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);
        //3 消费数据
        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

5.3.1 独立消费者案例(订阅分区)

需求:创建一个独立消费者,消费first主题0号分区的数据。
实现步骤:

//2 订阅主题对应的分区
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first", 0)); //0号分区
kafkaConsumer.assign(topicPartitions);

5.3.1 消费者组案例

需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。
不同消费者配置相同消费者组id即可,消费者底层会自动分区。
properties.put(ConsumerConfig.GROUP_ID_CONFIG,“test”);

5.4 生产经验-分区的分配以及再平衡

  1. 一个消费者组有消费者组成,一个topic有多个partition组成,到底由哪个消费者消费哪个partition的数据呢?
  2. kafka有4种主流的分区分配策略:Range、RoundRobin、Sticky、CooperativeSticky。可配置partition.assignment.strategy修改。默认Range+CooperativeSticky。可多选。KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式

5.4.1 Range以及再平衡

  1. 分区分配策略之range
    首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。通过partition数/consumer数来决定每个消费者应该消费几个分区。如果除不尽,那么全面几个消费者将会多消费1个分区
    注意:如果只是针对一个topic而言,c0消费者多消费1个分区影响不大。但是如果有N个topic,那么每个topic,消费者C0都将多消费1个分区,topic越多,C0消费的分区会比其他消费者明显多消费N个分区。 容易产生数据倾斜
  2. Range分区分配策略案例
  • 修改主题first为7个分区
  • bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic first --partitions 7
    注意: 分区数只能增加,不能减少。
  • 复制CustomConsumer类,创建CustomConsumer2,3。这样可由三个消费者CustomConsumer1,2,3组成消费者组,组名都为"test",同时启动三个消费者。
  • 启动CustomProducer,发送500个消息。
    consumer0消费到0,1,2分区的随数据,consumer1消费到3,4分区的数据,consumer2消费到5,6分区数据
  • 关闭CustomConsumer,在心跳(45s)内再次发送数据
    consumer1先消费3,4分区数据,consumer2消费到5,6分区数据,在心跳断开后consumer1继续消费0,1,2分区数据
  • 关闭CustomConsumer心跳(45s)后,再次发送数据
    consumer1先消费0,1,2,3,4分区数据,consumer2消费到5,6分区数据
    说明:消费者0已经被踢出消费者组,所以重新按照range方式分配

5.4.2 RoundRobin以及再平衡

  1. RoundRobin分区策略原理
    RoundRobin针对集群种所有topic而言的。
    RoundRobin轮询分区策略是把所有的consumer都列出来,然后按照hashcode进行排序,最后通过轮询算法来分配partition给到各个消费者。
    2.RoundRobin分区分配策略案例
  • 同上,将CustomConsumer,1,2中消费者代码中修改策略为RoundRobin。
    //properties.put(ConsumerConifg.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, “org.apache.kafka.clients.consumer.RoundRobinAssignor”);
  • 重启3个消费者,发消息
    consumer0消费到0,3,6分区的随数据,consumer1消费到1,4分区的数据,consumer2消费到2,5分区数据
  • 关闭CustomConsumer,在心跳(45s)内再次发送数据
    consumer1先消费1,4分区数据,consumer2消费到2,5分区数据,在断开后consumer1继续消费0,6分区数据,consumer2消费到3分区数据
  • 关闭CustomConsumer心跳(45s)后,再次发送数据
    consumer1先消费1,3,5分区数据,consumer2消费到0,2,4,6分区数据

5.4.3 Sticky以及再平衡

粘性分区定义:即在执行一次新分配之前,考虑上一次分配,尽量少的调整分配变动。
粘性分区从0.11.x引入,首先会尽量均衡的放置分区到消费者上面,在出现统一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变
需求:设置first主题,7个分区,3个消费者,采用粘性分区消费,再停掉一个观察。

  1. 修改分区策略
ArrayList<String> startegys = new ArrayList<>();
startegys。add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,startegys);
  1. 同上
    consumer0消费到0,1分区的随数据,consumer1消费到4,5,6分区的数据,consumer2消费到2,3分区数据,但是重启后会改变。
    关闭CustomConsumer,在心跳(45s)内再次发送数据
    consumer1先消费1,4分区数据,consumer2消费到2,5分区数据,在断开后consumer1继续消费0,6分区数据,consumer2消费到3分区数据
    关闭CustomConsumer心跳(45s)后,再次发送数据
    consumer1先消费1,0,4,6分区数据,consumer2消费到2,3,5分区数据

5.5 offset位移

5.5.1 offset的默认维护位置 **

0.9版本之后,consumer默认将offset保存在卡夫卡一个内置的topic中,该topic为__consumer_offsets
0.9之前,consumer默认将offset保存在Zookeeper

KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式
consumer_offsets主题采用k-v存储数据,key为group.id+topic+分区号,value为当前offset的值。每隔一段时间,kafka内部会对这个topic进行compact,即每个group.id+topic+分区号保留最新数据。

  1. 消费offset案例
  • consumer_offsets为kafka的topic,那就可以通过消费者消费
  • 在配置文件中config/consumer.properties中添加exclude.internal.topics=false,默认true,表示不能消费系统主题。为了查看,设为false。
  • 创建新的topic:bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic offsetTest --partitions 2 --replcation-factor 2
  • 启动生产者往offsetTest发数据。bin/kafka-console-producer.sh --topic offsetTest --bootstrap-server node1:9092
  • 启动消费者消费offsetTest数据:bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic offsetTest --group test(指定消费者组名称,更好观察数据存储位置)
  • 查看消费者消费主题consumer_offsets:bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --consumer.config config/consumer.properties --formatter “kafka.coordinatior.group.GroupMetadataManager$OffsetsMessageFromatter” --from-beginning

5.5.2 自动提交offset

  • enable.auto.commit:是否开启自动提交offset功能,默认true
  • auto.commit.interval.mx:自动提交offset的时间间隔。默认5s
//自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

5.5.3 手动提交offset

  1. commitSync(同步提交):必须等待offset提交完成,再去消费下一批数据
  2. commitAsync(异步提交):发送offset请求就开启下一批数据消费
    相同点:都会将本次提交的一批数据最高的偏移量提交
    不同点:同步阻塞,直到提交成功,有重试机制,异步没有失败重试,会提交失败。
//手动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//手动提交offset
kafkaConsumer.commitSync();
kafkaConsumer.commitASync();

5.5.4 指定offset进行消费

auto.offset.reset=earliest | latest | none 默认latest
当kafka中没有初始偏移量(第一次)或不存在(被删除)时怎么办?

  1. earliest:自动将偏移量重置为最早的偏移量 --from-beginning
  2. latest:自动重置为最新值
  3. none:没找到先前偏移量,向消费者抛出异常
  4. 任意指定offset位置开始消费(注:每次执行完,要修改消费者组名)
        //指定位置offset消费
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        //保证分区分配方案已经指定完成
        while(assignment.size() == 0) {
            kafkaConsumer.poll(Duration.ofSeconds(1));
            assignment = kafkaConsumer.assignment();
        }
        for (TopicPartition topicPartition : assignment) {
            kafkaConsumer.seek(topicPartition, 100);
        }

5.5.5 指定时间进行消费

需求:在生产环境中,会遇到消费数据异常,想重新按照时间消费。

        //指定位置offset消费
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        //保证分区分配方案已经指定完成
        while (assignment.size() == 0) {
            kafkaConsumer.poll(Duration.ofSeconds(1));
            assignment = kafkaConsumer.assignment();
        }
        //时间转offset
        HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<TopicPartition, Long>();
        for (TopicPartition topicPartition : assignment) {
            topicPartitionLongHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);  //一天前
        }
        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);
        for (TopicPartition topicPartition : assignment) {
            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
            kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
        }

5.5.6 漏消费和重复消费

重复消费:已经消费了数据,但是offset没提交(consumer挂了,再次重启会从上次offset消费)
漏消费:先提交offset后消费(手动消费后,当offset被提交,数据还没落库,消费者线程被kill,导致内存数据丢失)

5.6 生产经验-消费者事务

消费者事务需要将kafka消费端将消费过程和提交offset过程做原子绑定。此时需要将kafka的offset保存到支持事务的自定义介质(mysql)
KafKa 3.x(二、Broker,消费者),消息中间件,kafka,linq,分布式

5.7 生产经验-数据积压(消费者提高吞吐量)

  1. 如果消费能力不足,则增加topic分区数,并且提升消费者组的消费者数量,消费者数=分区数。(两者缺一不可)
  2. 如果下游数据处理不及时:提高每批次拉取数量(500+)。批次拉取数据少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据(50m),也会积压。

回顾总结

1.zk存储的信息:broker.ids、leaders、controller
2.工作理财
3.服役:
4.退役
5.副本:
副本好处:提高可靠性,生产环境一般2个默认1个,有ledaer和follower
isr,ar,controller选举(第一次随机),leader挂了(leo,hw多删少补),follow挂了
副本分配:负载均衡,保证数据分配
手动副本分配:指定计划、执行计划,验证计划
leader partition的负载均衡 10%
手动增加副本因子
6.存储机制
broker topic partitions log segment 稀疏索引(4KB) 时间戳
7.删除数据
默认7天 删除策略(删除,压缩)
8.高效读写
集群 分区 (提高生产和消费同步,海量数据打散) 稀疏索引 顺序读写 零拷贝和页缓存(Linux内核的缓存)
9.消费者
消费流程 消费者组 分区分配策略(range、轮询、粘性) 再平衡(45s) 订阅主题(可多个)
10.offset
存储在系统主题 自动提交5s 手动提交(同异步) 指定offset消费 按照时间消费 漏,重复消费
11.事务
生产-集群-消费-下游(mysql)
12.数据积压
增加分区,消费者 生产-集群(4个参数) 消费者(2个参数)文章来源地址https://www.toymoban.com/news/detail-528069.html

到了这里,关于KafKa 3.x(二、Broker,消费者)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • kafka配置大全broker、topic、生产者和消费者等配置介绍

    每个kafka broker中配置文件 server.properties 默认必须配置的属性如下: **bootstrap.servers** - 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 **key

    2024年02月05日
    浏览(48)
  • 13、Kafka ------ kafka 消费者API用法(消费者消费消息代码演示)

    消费者API的核心类是 KafkaConsumer,它提供了如下常用方法: 下面这些方法都体现了Kafka是一个数据流平台,消费者通过这些方法可以从分区的任意位置、重新开始读取数据。 根据KafkaConsumer不难看出,使用消费者API拉取消息很简单,基本只要几步: 1、创建KafkaConsumer对象,创建

    2024年04月11日
    浏览(49)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(45)
  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(45)
  • 【Kafka】【十七】消费者poll消息的细节与消费者心跳配置

    默认情况下,消费者⼀次会poll500条消息。 代码中设置了⻓轮询的时间是1000毫秒 意味着: 如果⼀次poll到500条,就直接执⾏for循环 如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500条,要么到1s 如果多次poll都没达到500条,且1秒时间到了,那么直接执

    2024年02月09日
    浏览(48)
  • 分布式 - 消息队列Kafka:Kafka 消费者的消费位移

    01. Kafka 分区位移 对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。偏移量从0开始,每个新消息的偏移量比前一个消息的偏移量大1。 每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一

    2024年02月12日
    浏览(50)
  • kafka-python 消费者消费不到消息

    使用 group_id=”consumer_group_id_001“ 和  auto_offset_reset=\\\"earliest\\\"    生产者发完消息后,在close中  先执行 producer.flush() ,再执行 producer.close() 使用offset 观看消息是否写到kafka中。    

    2024年02月10日
    浏览(39)
  • 分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式

    最简单的提交方式是让消费者自动提交偏移量,自动提交 offset 的相关参数: enable.auto.commit:是否开启自动提交 offset 功能,默认为 true; auto.commit.interval.ms:自动提交 offset 的时间间隔,默认为5秒; 如果 enable.auto.commit 被设置为true,那么每过5秒,消费者就会自动提交 poll() 返

    2024年02月12日
    浏览(48)
  • 分布式消息队列Kafka(四)- 消费者

    1.Kafka消费方式 2.Kafka消费者工作流程 (1)总体工作流程 (2)消费者组工作流程 3.消费者API (1)单个消费者消费 实现代码 (2)单个消费者指定分区消费 代码实现: (3)消费者组消费 复制上面CustomConsumer三个,同时去订阅统一个主题,消费数据,发现一个分区只能被一个

    2023年04月26日
    浏览(49)
  • Kafka系列:查看Topic列表、消息消费情况、模拟生产者消费者

    执行topic删除命令时,出现提示 这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。 解决办法: a)在server.properties中设置delete.topic.enable参数为ture b)如下操作: 1.登

    2023年04月26日
    浏览(59)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包