Kafka源码解析之索引

这篇具有很好参考价值的文章主要介绍了Kafka源码解析之索引。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka源码解析之索引

索引结构

kafka索引文件,kafka,java

Kafka有两种类型的索引:

  • TimeIndex: 根据时间戳索引,可以通过时间查找偏移量所在位置,目录下以.timeindex结尾
  • Index: 根据偏移量索引,.index结尾

构建索引时机

由log.index.interval.bytes 参数控制,默认4KB构建一条索引

为什么默认值是4kb呢?这里认为与基于磁盘的读写单位是 block(一般大小为 4KB)还有内存管理与分配的最小单位是4kb有关

def append(largestOffset: Long,
           largestTimestamp: Long,
           shallowOffsetOfMaxTimestamp: Long,
           records: MemoryRecords): Unit = {
  	.....
    // 判断是否写入索引文件
    if (bytesSinceLastIndexEntry > indexIntervalBytes) {
      offsetIndex.append(largestOffset, physicalPosition)
      timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
      bytesSinceLastIndexEntry = 0
    }
    bytesSinceLastIndexEntry += records.sizeInBytes
  }
}

在源码的LogSegment.append方法中,会对当前segement写入大小与上次构建索引时大小差值进行判断,如果超过log.index.interval.bytes,会构建timeIndex以及offsetIndex索引

AbstractIndex

kafka索引文件,kafka,java

AbstractIndexl类时TimeIndex以及OffsetIndex文件的父类,其中有一个很重要的成员变量 mmap:

protected var mmap: MappedByteBuffer = {
  val newlyCreated = file.createNewFile()
  val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
  try {
    //提前进行文件的创建
    if(newlyCreated) {
      if(maxIndexSize < entrySize)
        throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
      raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
    }

    /* memory-map the file */
    _length = raf.length()
    val idx = {
      if (writable)
        raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
      else
        raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
    }
    /* set the position in the index for the next entry */
    if(newlyCreated)
      idx.position(0)
    else
      // if this is a pre-existing index, assume it is valid and set position to last entry
      idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
    idx
  } finally {
    CoreUtils.swallow(raf.close(), AbstractIndex)
  }
}

这里用到了Memory Mapped Files即内存映射

mmap是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。

kafka索引文件,kafka,java

内存映射mmap参考文章:https://zhuanlan.zhihu.com/p/507907660

mmap同样是一种零拷贝的技术,常规的文件操作需要从磁盘到页缓存再到用户主存的两次数据拷贝。而mmap操控文件,只需要从磁盘到用户主存的一次数据拷贝过程,其实也是一种通过磁盘空间代替内存的操作,提供进程间共享内存及相互通信的方式。

kafka索引文件,kafka,java

二分查找与页缓存

Kafka根据索引文件查找offset

private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
  // 判断index文件是否为空
  if(_entries == 0)
    return (-1, -1)

  def binarySearch(begin: Int, end: Int) : (Int, Int) = {
    // 二分查找开始
    var lo = begin
    var hi = end
    while(lo < hi) {
      val mid = (lo + hi + 1) >>> 1
      //parseEntry方法在timeindex与index里有不同实现方式
      val found = parseEntry(idx, mid)
      val compareResult = compareIndexEntry(found, target, searchEntity)
      if(compareResult > 0)
        hi = mid - 1
      else if(compareResult < 0)
        lo = mid
      else
        return (mid, mid)
    }
     //如果lo等于最后一条,那么就返回-1
    (lo, if (lo == _entries - 1) -1 else lo + 1)
  }

  val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
  // 冷热区判断
  if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
    return binarySearch(firstHotEntry, _entries - 1)
  }

  // check if the target offset is smaller than the least offset
  if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
    return (-1, 0)

  binarySearch(0, firstHotEntry)
}

Kafka对二分搜索的优化

kafka索引文件,kafka,java

对正常的二分查找来说,假设索引的大小有13个页,我们需要查找的偏移量在页12上,那么我们会依次访问0->12->6->9->11->12这六个页

当生产者继续往分区中生产消息,超过4kb后,又写了一个新的索引项,这个时候索引访问的顺序是:0->13->7->10->11>12->13

kafka索引文件,kafka,java

通过对上面mmap的研究可以知道,磁盘到用户主存的映射实际上依赖于页表,只是用户进程可以通过指针操作直接读写page cache,不再需要系统调用和内存拷贝。常用的页表置换算法基本是基于LRU的,当读取页7/10的时候,这两个页可能已经很长时间没有被访问到了,已经从LRU中移除了,这个时候再访问这两个页的时候就可能导致操作系统陷入缺页中断。

Here, we use a more cache-friendly lookup algorithm:
if (target > indexEntry[end - N]) // if the target is in the last N entries of the index
   binarySearch(end - N, end)
else
   binarySearch(begin, end - N)

这里Kafka做了一个优化,保证index文件的最后N个项分为热区,而剩余项则是冷区。这是因为在热区中的索引项可能因为更为频繁的访问,更有可能存在于页表中,可以增加搜索的效率。这里N的值是8192,官方给的解释是:

We set N (_warmEntries) to 8192, because
1. This number is small enough to guarantee all the pages of the "warm" section is touched in every warm-section
   lookup. So that, the entire warm section is really "warm".
   When doing warm-section lookup, following 3 entries are always touched: indexEntry(end), indexEntry(end-N),
   and indexEntry((end*2 -N)/2). If page size >= 4096, all the warm-section pages (3 or fewer) are touched, when we
   touch those 3 entries. As of 2018, 4096 is the smallest page size for all the processors (x86-32, x86-64, MIPS,
   SPARC, Power, ARM etc.).
2. This number is large enough to guarantee most of the in-sync lookups are in the warm-section. With default Kafka
   settings, 8KB index corresponds to about 4MB (offset index) or 2.7MB (time index) log messages.

warmEntries的个数:

protected def _warmEntries: Int = 8192 / entrySize
class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
    extends AbstractIndex(_file, baseOffset, maxIndexSize, writable) {
  import OffsetIndex._

  override def entrySize = 8

这样的设计保证了几点:

1、8192的大小保证了三个索引项是在页表中的indexEntry(end), indexEntry(end-N), indexEntry((end*2 -N)/2)

//待续文章来源地址https://www.toymoban.com/news/detail-580707.html

到了这里,关于Kafka源码解析之索引的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据课程I3——Kafka的消息流与索引机制

    文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州 ⚪ 掌握Kafka的消息流处理; ⚪ 掌握Kafka的索引机制; ⚪ 掌握Kafka的消息系统语义;  流程说明: 1. producer 要向Kafka生产消息,需要先通过 zookeeper 的 \\\"/brokers/.../state\\\" 节点找到该 partition 的 副本leader的位置信息

    2024年02月13日
    浏览(31)
  • (Kafka源码三)Kafka的缓存机制

    在Kafka的架构中,若没有缓存机制,在RecordAccumulator中对于不再使用的批次对象,需要进行回收,释放这些对象所占用的内存,为了降低GC的压力,Kafka作者设计了一个缓存池的机制,从而实现了实现消息批次的内存复用。本文主要讲解缓冲池的设计。 首先来看下kafka生产者到

    2024年04月15日
    浏览(21)
  • kafka设计理念解析

    一.引言 kafka是广泛使用的流处理组件,我们知道怎么使用它,也知道它的实现原理。但是更重要的部分是它的设计理念,即kafka设计者当时是如何考量各种方案的,了解这些,对提升我们的设计能力非常有帮助。 二.动机 我们将 Kafka 设计为一个统一平台,来处理大型公司可能

    2023年04月23日
    浏览(40)
  • 深入学习Kafka之概念解析

    Kafka是一个开源的分布式流处理平台和消息队列系统。 Kafka是一个值得深入学习的开源中间件,其中涉及的概念有很多,今天就来梳理一下,这些概念到底都是什么。 Producers are those client applications that publish (write) events to Kafka 生产者就是那些发布/写事件到Kafka的客户端应用。

    2024年02月22日
    浏览(38)
  • Kafka简介:深入解析ApacheKafka

    作者:禅与计算机程序设计艺术 在当今高速发展的数据时代,分布式消息队列系统作为数据流通的中转站和分发中心,得到了越来越广泛的应用。Kafka是一款非常流行的开源分布式消息队列系统,以其高性能、可靠性、高可用性和可扩展性,成为了许多场景下的最佳选择。本

    2024年02月09日
    浏览(38)
  • kafka的log存储解析

    kafka 的 log 存储解析 ——topic 的分区 partition 分段 segment 以及索引等 引言 Kafka 中的 Message 是以 topic 为基本单位组织的,不同的 topic 之间是相互独立的。每个 topic 又可以分成几个不同的 partition( 每个 topic 有几个 partition 是在创建 topic 时指定 的 ) ,每个 partition 存储一部分

    2024年02月08日
    浏览(25)
  • 读kafka生产端源码,窥kafka设计之道(上)

    1. kafka 高吞吐之道-------异步提交批量发送 简约的发送接口----后面隐藏着并不简单的设计 kafka发送消息的接口非常简约,在简约的表面上,其背后却并不简单。先看下发送接口 正常情况下,调用kafkaProducer.send方法,由 业务线程执行 (比如tomcat的业务线程);业务线程在执行

    2024年02月16日
    浏览(27)
  • 一文详细解析kafka:消费者

    目前主流的MQ中间件都是基于 发布/订阅模式 实现,生产者生产消息到某个主题topic,消费者订阅了该topic后,当有消费写入该主题就可以进行消费。本篇主要介绍Kafka消费者,包括消费者群组以及遇到再均衡的情况及处理措施。 消费者通过检查消息的 偏移量 来区分已经读取

    2024年02月01日
    浏览(36)
  • flink写入到kafka 大坑解析。

    1.kafka能不能发送null消息?    能! 2 flink能不能发送null消息到kafka? 不能!     这里就报了java的最常见错误 空指针,原因就是flink要把kafka的消息getbytes。所以flink不能发送null到kafka。 这种问题会造成什么后果? flink直接挂掉。 如果我们采取了失败重试机制会怎样? 数据重

    2024年02月15日
    浏览(38)
  • kafka 3.5 kafka服务端接收生产者发送的数据源码

    kafka服务端接收生产者数据的API在 KafkaApis.scala 类中, handleProduceRequest 方法 通过调用 replicaManager.appendRecords 把数据存入副本中(这里的副本指的是Topic分区Leader副本) 上面写入本地日志的方法是 appendToLocalLog 方法 appendToLocalLog 方法中比遍历Topic分区集合,针对Topic分区得到分区对

    2024年02月09日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包