详解Kafka分区机制原理|Kafka 系列 二

这篇具有很好参考价值的文章主要介绍了详解Kafka分区机制原理|Kafka 系列 二。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka 系列第二篇,详解分区机制原理。为了不错过更新,请大家将本号“设为星标”。

点击上方“后端开发技术”,选择“设为星标” ,优质资源及时送达

上一篇文章介绍了 Kafka 的基本概念和术语,里面有个概念是 分区(Partition)。

kafka 将 一个Topic 中的消息分成多份,分别存储在不同的 Broker 里,这每一段消息被 kafka 称为分区,其中每条消息只会保存在一个分区中。

如果不太理解请回顾上一篇:

详解Kafka分区机制原理|Kafka 系列 二,kafka,分布式

开始学习 Kafka,一文掌握基本概念|Kafka 系列 一

 

为什么有分区?

为什么要有分区呢?

Kafka 的分区机制的本质就是将一个大的 Topic 进行拆分,将一组很大的队列拆分成了多组队列。这样做有以下几个好处:

  1. 因为一个 Topic 中的消息可能非常多,多到一台Broker存不下,因此需要拆分成多段存储在不同的机器里,实现负载均衡。

  2. 拆分成多个队列,可以在多个生产者和消费者的情况下发挥多机性能,可以分流和并行处理消息,从而提高读写性能,提升系统的吞吐力。

  3. 有利于系统扩缩容,提高系统的可扩展性。不同分区在不同的broker上,可以通过增加新机器提高吞吐,并且增加新机器的时候可以通过调整分区的分布来调配负载。

详解Kafka分区机制原理|Kafka 系列 二,kafka,分布式

但是分区数不是越多越好,需要根据系统具体情况来设置。比如3个Broker就应该至少有3个分区,如果broker性能之间有差异,可以调大分区数进行调配。也可以通过broker的倍数来设置分区数,并且进行性能压测,测试集群的吞吐量。

分区数过多会带来资源管理上的消耗,清除日志时间变长,集群broker故障后分区leader重选时间变长,客户端消费端线程数需求增加,甚至导致连接所需的socket消耗增加。

分区策略

分区策略就是决定生产者将会把消息发送到具体哪个分区的算法,分区策略由 Partitioner 接口实现。

自定义分区策略

用于分区的 partition 方法定义如下:

/**
     * Compute the partition for the given record.
     *
     * @param topic topic名 The topic name
     * @param key 用于分区的key The key to partition on (or null if no key)
     * @param keyBytes 用于分区的序列号key The serialized key to partition on( or null if no key)
     * @param value 用于分区的值 The value to partition on or null
     * @param valueBytes 用于分区的序列号值 The serialized value to partition on or null
     * @param cluster 当前集群元数据 The current cluster metadata
     */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

可以看出,这里提供了 Topic 和一些跟消息有关的key参数,cluster 是集群信息,包含Kafka 当前的Node 数据以及Topic、partition数据等。有了这些数据,具体拿到一条消息该发往哪个分区,我们就可以根据已有信息制定自己的分区策略。

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

我们实现了自定义的 Partition 类之后,就可以设置 partitioner.class 为目标策略类,Producer 就会按照我们的自定义策略来对消息进行分区。

默认分区策略

Kafka 提供了默认分区策略 DefaultPartitioner,策略内容如下:

  1. 如果在消息中指定了分区,优先使用指定的分区。

  2. 如果没有指定分区,但存在分区键,则根据序列化key使用murmur2哈希算法对分区数取模。

  3. 如果没有指定分区或分区键,则会使用粘性分区策略。(关于粘性分区策略后面讲解)

在实际生产中,我们一般都默认使用此策略,无需修改。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                         int numPartitions) {
    if (keyBytes == null) {
        return stickyPartitionCache.partition(topic, cluster);
    }
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

注意,这里指的分区键是序列化后的key,也就是变量 keyBytes,其他key、value、valueBytes 并没用到。

byte[] keyBytes = keySerializer.serialize(topic, record.headers(), record.key());
default byte[] serialize(String topic, Headers headers, T data) {
  // data 变量
    return serialize(topic, data);
}

看到 key 等序列化方法我们可以明白,key 的序列号值只受到 record.key() 的影响,所以同样的key会被固定分配到同样的partition中。(注意这里的key是指用于分区的key,而不是topic)

详解Kafka分区机制原理|Kafka 系列 二,kafka,分布式

粘性分区策略

实现类为 UniformStickyPartitioner ,他与默认分区策略的区别是:

  • DefaultPartitionerd 默认分区策略:如果有分区键的话,会按照分区键来决定分区,这个时候并不会使用粘性分区策略。

  • UniformStickyPartitioner粘性分区策略:无论有没有分区键,都用粘性分区来分配。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    return stickyPartitionCache.partition(topic, cluster);
}

什么是粘性分区策略?

我们需要知道,在Producer在发送消息的时候,会将消息放到一个ProducerBatch中, 然后多条消息批量发送。这样可以减少网络请求次数,提高消息的发送效率。

所以批量发送消息有两个条件:

  1. 一个batch满了,与 batch.size有关,一般大小是16k。

  2. linger.ms时间到了。

满足任意一个条件,都会触发sender线程的发送。如果生产的消息较少,batch没有满,就必须等到等待时间到了,这就导致了较长的延迟。

因为ProducerBatch是多个,为了让消息尽可能快的发送,就需要让其中一个ProducerBatch先变满。

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

注意:一个分区对应一个双端队列Deque<ProducerBatch>>

粘性分区策略就是在相同的分区中,优先填满一个ProducerBatch,发送,再去填充另一个ProducerBatch。参见下图,第一个分区会被优先塞满并发送。

详解Kafka分区机制原理|Kafka 系列 二,kafka,分布式

在一个 ProducerBatch 发送结束,选择新分区的时候,是随机选择的,之后便会继续优先填满新的分区。

  • 可用分区<1 ,所有分区中随机选择。

  • 可用分区=1,选择这个分区。

  • 可用分区>1,所有可用分区中随机选择。

public int nextPartition(String topic, Cluster cluster, int prevPartition) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        Integer oldPart = indexCache.get(topic);
        Integer newPart = oldPart;
        // Check that the current sticky partition for the topic is either not set or that the partition that 
        // triggered the new batch matches the sticky partition that needs to be changed.
        if (oldPart == null || oldPart == prevPartition) {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() < 1) {
                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = random % partitions.size();
            } else if (availablePartitions.size() == 1) {
                newPart = availablePartitions.get(0).partition();
            } else {
                while (newPart == null || newPart.equals(oldPart)) {
                    int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    newPart = availablePartitions.get(random % availablePartitions.size()).partition();
                }
            }
            // Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
            if (oldPart == null) {
                indexCache.putIfAbsent(topic, newPart);
            } else {
                indexCache.replace(topic, prevPartition, newPart);
            }
            return indexCache.get(topic);
        }
        return indexCache.get(topic);
    }

轮询分区策略

Kafka 中提供了轮训策略的实现 RoundRobinPartitioner。当用户希望将写操作均匀地分发到所有分区时,可以使用此分区策略。

举例,有三个分区,针对于同一个producer,第一条消息发送到partition1,第二条消息发送到partition2,第三条发送到partition3,以此类推。

详解Kafka分区机制原理|Kafka 系列 二,kafka,分布式
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    // 分区数
    int numPartitions = partitions.size();
    // 下一个自增值
    int nextValue = nextValue(topic);
    // 获取此主题的可用分区列表
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (!availablePartitions.isEmpty()) {
        // topic可用分区不为空,取余
        int part = Utils.toPositive(nextValue) % availablePartitions.size();
        return availablePartitions.get(part).partition();
    } else {
        // 没有可用的分区,给出一个不可用的分区
        // no partitions are available, give a non-available partition
        return Utils.toPositive(nextValue) % numPartitions;
    }
}

hash 键的值并不会影响到数据的分布,这应该是数据均匀度最好的策略,可以保证消息最大程度的平均分配到所有分区。

除了官方提供的策略,我们还可以实现自己的分区策略,比如随机策略,实现起来也很简单;比如按照业务键去分区的策略;比如按照ip分区的策略等。

最后,欢迎大家提问和交流。

加入讨论群是升职加薪第一步!

回复:加群

点赞是一种美德,如对您有帮助,欢迎评论和分享,感谢阅读!

实战总结|记一次消息队列堆积的问题排查

2023-07-18

详解Kafka分区机制原理|Kafka 系列 二,kafka,分布式

从二叉查找树到B*树,一文搞懂搜索树的演进!|原创

2023-05-23

详解Kafka分区机制原理|Kafka 系列 二,kafka,分布式

CAP、BASE理论真的很重要!|分布式事务系列(一)

2023-05-06文章来源地址https://www.toymoban.com/news/detail-633193.html

详解Kafka分区机制原理|Kafka 系列 二,kafka,分布式

到了这里,关于详解Kafka分区机制原理|Kafka 系列 二的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Redis集群(分布式缓存):详解持久化、主从同步原理、哨兵机制、Cluster分片集群,实现高并发高可用

            单机式Redis存在以下问题,因此需要Redis集群化来解决这些问题        Redis数据快照,简单来说就是 把内存中的所有数据都记录到磁盘中 。当Redis实例故障重启后,从 磁盘读取快照文件,恢复数据 。快照文件称为RDB文件,默认是保存在当前运行目录。     (1)

    2024年02月08日
    浏览(38)
  • 【大数据工具】Kafka伪分布式、分布式安装和Kafka-manager工具安装与使用

    Kafka 安装包下载地址:https://archive.apache.org/dist/kafka/ 1. Kafka 伪分布式安装 1. 上传并解压 Kafka 安装包 使用 FileZilla 或其他文件传输工具上传 Kafka 安装包: kafka_2.11-0.10.0.0.tgz 解压安装包 2. 编辑配置文件 3. 拷贝并修改配置文件 分别修改 server2.properties、server3.properties 4. 创建日志

    2024年02月14日
    浏览(32)
  • 【分布式应用】kafka集群、Filebeat+Kafka+ELK搭建

    主要原因是由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发 too many connection 错误,引发雪崩效应。 我们使用消息队列,通过异步处理请求,从而缓解系统的压力。消息队

    2024年02月16日
    浏览(26)
  • 分布式 - 消息队列Kafka:Kafka 消费者的消费位移

    01. Kafka 分区位移 对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。偏移量从0开始,每个新消息的偏移量比前一个消息的偏移量大1。 每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一

    2024年02月12日
    浏览(34)
  • Kafka 原理以及分区分配策略剖析

    一、简介 Apache Kafka 是一个分布式的流处理平台(分布式的基于发布/订阅模式的消息队列【Message Queue】)。 流处理平台有以下3个特性: 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。 可以储存流式的记录,并且有较好的容错性。 可以在流式

    2023年04月08日
    浏览(23)
  • 分布式消息服务kafka

    什么是消息中间件? 消息中间件是分布式系统中重要的组件,本质就是一个具有接收消息、存储消息、分发消息的队列,应用程序通过读写队列消息来通信。 例如:在淘宝购物时,订单系统处理完订单后,把订单消息发送到消息中间件中,由消息中间件将订单消息分发到下

    2024年02月01日
    浏览(32)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(31)
  • 分布式 - 消息队列Kafka:Kafka生产者架构和配置参数

    生产者发送消息流程参考图1: 先从创建一个ProducerRecord对象开始,其中需要包含目标主题和要发送的内容。另外,还可以指定键、分区、时间戳或标头。在发送ProducerRecord对象时,生产者需要先把键和值对象序列化成字节数组,这样才能在网络上传输。 接下来,如果没有显式

    2024年02月13日
    浏览(31)
  • 分布式 - 消息队列Kafka:Kafka消费者和消费者组

    1. Kafka 消费者是什么? 消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者

    2024年02月13日
    浏览(31)
  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包