Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡)

这篇具有很好参考价值的文章主要介绍了Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、RoundRobin 分区分配策略原理

  • RoundRobin 针对集群中所有Topic而言。
  • RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。
    Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡),kafka,kafka

二、RoundRobin分区分配策略代码案例

2.1、创建带有7个分区的sixTopic主题

  • 在 Kafka 集群控制台,创建带有7个分区的sixTopic主题

    bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 7 --replication-factor 1 --topic sixTopic
    

    Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡),kafka,kafka

2.3、创建三个消费者 组成 消费者组

  • 复制 CustomConsumer1类,创建 CustomConsumer2和CustomConsumer3。这样可以由三个消费者组成消费者组,组名都为“test1”,设置分区分配策略为 RoundRobin。

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumer1 {
    
        public static void main(String[] args) {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 连接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消费者组id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
            // 设置分区分配策略
            properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
    
            // 1 创建一个消费者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 订阅主题 sixTopic
            ArrayList<String> topics = new ArrayList<>();
            topics.add("sixTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消费数据
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    

2.3、创建生产者

  • 创建CustomProducer生产者。

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    public class CustomProducerCallback {
    
        public static void main(String[] args) throws InterruptedException {
    
            //1、创建 kafka 生产者的配置对象
            Properties properties = new Properties();
    
            //2、给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            //3、指定对应的key和value的序列化类型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            //4、创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //5、调用 send 方法,发送消息
            for (int i = 0; i < 200; i++) {
                kafkaProducer.send(new ProducerRecord<>("sixTopic", "hello kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                        }
                    }
                });
                Thread.sleep(2);
            }
    
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
    
    

2.4、测试

  • 首先,在 IDEA中分别启动消费者1、消费者2和消费者3代码
    Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡),kafka,kafka

  • 然后,在 IDEA中分别启动生产者代码
    Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡),kafka,kafka

  • 在 IDEA 控制台观察消费者1、消费者2和消费者3控制台接收到的数据,如下图所示:

    Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡),kafka,kafka
    Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡),kafka,kafka
    Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡),kafka,kafka

2.5、RoundRobin分区分配策略代码案例说明

  • 由上述测试输出结果截图可知: 消费者1消费1、4分区的数据;消费者2消费2和5分区的数据;消费者3消费0、3、6分区的数据。
  • 说明:Kafka 采用修改后的RoundRobin分区分配策略。

三、RoundRobin 分区分配再平衡案例

3.1、停止某一个消费者后,(45s 以内)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 2、5号分区数据。
    Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡),kafka,kafka
  • 由下图控制台输出可知:3号消费者 消费到 0、3、6号分区数据。
    Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡),kafka,kafka

3.2、停止某一个消费者后,(45s 以后)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 1、3、5号分区数据。
    Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡),kafka,kafka

  • 由下图控制台输出可知:3号消费者 消费到 0、2、4、6号分区数据。
    Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡),kafka,kafka文章来源地址https://www.toymoban.com/news/detail-733573.html

3.3、RoundRobin 分区分配再平衡案例说明

  • 1号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
  • 消费者 1 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

到了这里,关于Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Kafka3.0.0版本——消费者(消费者总体工作流程图解)

    角色划分:生产者、zookeeper、kafka集群、消费者、消费者组。如下图所示: 生产者发送消息给leader,followerr主动从leader同步数据,一个消费者可以消费某一个分区数据或者一个消费者可以消费多个分区数据。如下图所示: 每个分区的数据只能由消费者组中一个消费者消费。如下

    2024年02月09日
    浏览(53)
  • Kafka3.0.0版本——消费者(消费者组初始化流程图解)

    每个consumer都发送JoinGroup请求,如下图所示: 选出一个consumer作为leader,如下图所示: 把要消费的topic情况发送给leader 消费者,如下图所示: leader会负责制定消费方案,并把消费方案发给coordinator,如下图所示: Coordinator就把消费方案下发给各个consumer,如下图所示: 每个消

    2024年02月09日
    浏览(36)
  • Kafka3.0.0版本——消费者(消费者组详细消费流程图解及消费者重要参数)

    创建一个消费者网络连接客户端,主要用于与kafka集群进行交互,如下图所示: 调用sendFetches发送消费请求,如下图所示: (1)、Fetch.min.bytes每批次最小抓取大小,默认1字节 (2)、fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms (3)、Fetch.max.bytes每批次最大抓取大小,默

    2024年02月09日
    浏览(47)
  • Kafka3.0.0版本——消费者(自动提交 offset)

    官网文档 参数解释 参数 描述 enable.auto.commi 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 图解分析 消费者自动提交 offset代码 消费者自动提交

    2024年02月09日
    浏览(38)
  • Kafka3.0.0版本——消费者(手动提交offset)

    1.1、手动提交offset的两种方式 commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。 commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。 1.2、手动提交offset两种方式的区别 相同点:都会将本次提交的一批数据最高的偏移量提交。 不

    2024年02月09日
    浏览(46)
  • Kafka3.0.0版本——消费者(独立消费者消费某一个主题数据案例__订阅主题)

    1.1、案例需求 创建一个独立消费者,消费firstTopic主题中数据,所下图所示: 注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。 1.2、案例代码 代码 1.3、测试 在 Kafka 集群控制台,创建firstTopic主题 在 IDEA中

    2024年02月09日
    浏览(39)
  • 【Kafka-Consumer分区分配策略】Kafka 消费者组三种分区分配策略 Range Assignor、RoundRobin Assignor、Sticky Assignor 详细解析

    1、一个 consumer group 中有多个 consumer 组成,一个 topic 有多个 partition 组成,现在的问题是,到底由哪个 consumer 来消费哪个 partition 的数据。 2、Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通过配置参数 partition.assignment.strategy ,修改分区的分配

    2024年02月22日
    浏览(48)
  • Kafka3.0.0版本——手动调整分区副本示例

    四台服务器 原始服务器名称 原始服务器ip 节点 centos7虚拟机1 192.168.136.27 broker0 centos7虚拟机2 192.168.136.28 broker1 centos7虚拟机3 192.168.136.29 broker2 centos7虚拟机4 192.168.136.30 broker3 2.1、先启动zookeeper集群 启动zookeeper集群 2.2、再启动kafka集群 启动kafka集群 3.1、手动调整分区副本的前提

    2024年02月11日
    浏览(56)
  • Kafka学习---4、消费者(分区消费、分区平衡策略、offset、漏消费和重复消费)

    1.1 Kafka消费方式 1、pull(拉)模式:consumer采用从broker中主动拉取数据。 2、push(推)模式:Kafka没有采用这种方式。因为broker决定消息发生速率,很难适应所有消费者的消费速率。例如推送的速度是50M/s,Consumer1、Consumer2就来不及处理消息。 pull模式不足之处是如果Kafka没有数

    2024年02月16日
    浏览(45)
  • 10、Kafka ------ 消费者组 和 消费者实例,分区 和 消费者实例 之间的分配策略

    形象来说:你可以把主题内的多个分区当成多个子任务、多个子任务组成项目,每个消费者实例就相当于一个员工,假如你们 team 包含2个员工。 同理: 同一主题下,每个分区最多只会分给同一个组内的一个消费者实例 消费者以组的名义来订阅主题,前面的 kafka-console-consu

    2024年01月19日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包