【大数据】Kafka 数据存储

这篇具有很好参考价值的文章主要介绍了【大数据】Kafka 数据存储。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.文件目录

Kafka 中的消息是存储在磁盘上的,一个分区副本对应一个 日志(Log)。为了防止 Log 过大,Kafka 又引入了 日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment ,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以 .txnindex 为后缀的事务索引文件),下图为 Topic、Partition、副本、Log 和 LogSegment 之间的关系。

kafka数据文件目录,# Kafka,大数据,kafka,消息队列,数据同步

2.日志分段

虽然一个 Log 被拆为多个分段,但只有最后一个 LogSegment(当前活跃的日志分段)才能执行写入操作,在此之前所有的 LogSegment 都不能写入数据。当满足以下其中任一条件会创建新的 LogSegment。

  • 当前日志分段文件的大小超过了 Broker 端参数 log.segment.bytes 配置的值,默认值为 1073741824 1073741824 1073741824,即 1 G B 1GB 1GB
  • 当前日志中第一条消息的时间戳与当前系统的时间戳的差值大于 log.roll.mslog.roll.hours 参数配置的值。如果同时配置了 log.roll.mslog.roll.hours,那么以 log.roll.ms 为准。默认只配置了 log.roll.hours 参数,其值为 168 168 168,即 7 7 7 天。
  • 偏移量索引文件或时间戳索引文件的大小达到 Broker 端参数 log.index.size.max.bytes 配置的值,默认值为 10485760 10485760 10485760,即 10 M 10M 10M
  • 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE,即(offset - baseOffset)> Integer.MAX_VALUE

在索引文件切分的时候,Kafka 会关闭当前正在写入的索引文件并置为只读模式,同时以可读写的模式创建新的索引文件,默认大小为 1 G B 1GB 1GB。当下次索引切分时才会设置为实际大小。也就是说,之前的 Segment 都是实际大小,活跃的 Segment 大小为 1 G 1G 1G

3.日志索引

索引的主要目的是提高查找的效率。

Kafka 采用 稀疏索引sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。而是每当写入一定量(由 Broker 端参数 log.index.interval.bytes 指定,默认 4 K B 4KB 4KB)的消息时,索引文件会增加一个索引项。

3.1 偏移量索引

一条偏移量索引包含两部分数据,如图:

kafka数据文件目录,# Kafka,大数据,kafka,消息队列,数据同步

kafka数据文件目录,# Kafka,大数据,kafka,消息队列,数据同步

  • relativeOffset:相对偏移量,表示消息相对于 baseOffset 的偏移量,当前索引文件的文件名即为 baseOffset
  • position:物理地址,也就是消息在日志分段文件中对应的物理位置。
  • baseOffset:Segment 第一个 Message 的 Offset。

消息查找过程
kafka数据文件目录,# Kafka,大数据,kafka,消息队列,数据同步

如果我们要查找偏移 23 23 23 的消息,那么应该怎么做呢? 首先通过二分法在偏移量索引文件中找到不大于 23 23 23 最大索引项,即 [22,656],然后从日志分段文件中的物理位置 656 656 656 开始顺序查找偏移 23 23 23 的消息。

以上是比较简单的情况,如下图所示,如果要查找要查找偏移 268 268 268 的消息,那么应该怎么办呢?

kafka数据文件目录,# Kafka,大数据,kafka,消息队列,数据同步

首先肯定是定位到 baseOffset = 251 的日志分段,然后计算相对偏移量 relativeOffset 268 − 251 = 17 268 - 251=17 268251=17,之后再在对应的索引文件中找到不大于 17 17 17 的索引项,最后根据索引项中的 position 定位到具体的日志分段文件位置开始查找目标消息。

那么如何查找 baseOffset 25 的日志分段的呢?Kafka 使用了跳跃表的结构。Kafka 的每个日志对象中使用了 ConcurrentSkipListMap 来保存各个日志分段,每个日志分段的 baseOffset 作为 Key ,这样可以根据指定偏移量来快速定位到消息所在的日志分段。

3.2 时间戳索引

时间戳索引也是包含两部分数据,如图:

kafka数据文件目录,# Kafka,大数据,kafka,消息队列,数据同步

kafka数据文件目录,# Kafka,大数据,kafka,消息队列,数据同步

  • timestamp:当前日志分段最大的时间戳。
  • relativeOffset:时间戳所对应的消息的相对偏移量,也就是偏移量索引中偏移量。

时间戳索引文件中包含若干时间戳索引项,每个追加的时间戳索引项中的 timestamp 必须大于之前追加的索引项的 timestamp,否则不予追加。

消息查找过程

kafka数据文件目录,# Kafka,大数据,kafka,消息队列,数据同步

如果要查找指定时间戳 targetTimeStamp = 1526384718288 开始的消息,首先是找到不小于指定时间戳的日志分段。这里就无法使用跳跃表来快速定位到相应的日志分段 了, 需要分以下几个步骤来完成。

  • targetTimeStamp 和每个日志分段中的最大时间戳对比,直到找到不小于 targetTimeStam 所对应的日志分段。(注:日志分段中的最大时间戳的计算是先查询该日志分段所对应的时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于 0 0 0,则取其值,否则取该日志分段的最近修改时间。)
  • 找到相应的日志分段之后,在时间戳索引文件中使用二分查找算法查找到不大于 targetTimeStamp 最大索引项,即 [1526384718283, 28],如此便找到了相对偏移量 28 28 28
  • 在偏移量索引文件中使用二分算法查找到不大于 28 28 28 的最大索引工页,即 [26,838]
  • 从第一步中找到的日志分段文件中的 838 838 838 的物理位置开始查找不小于 targetTimeStamp 的消息。

4.日志清理

Kafka 将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。Kafka 提供了两种日志清理策略。

  • 日志删除:按照一定的保留策略直接删除不符合条件的日志分段,默认该策略。
  • 日志压缩:针对每个消息的 Key 进行整合,对于有相同 Key 的不同 Value 值,只保留最后一个版本。

4.1 日志删除

kafka 有专门的任务来周期性删除不符合条件的日志分段文件,删除策略主要以下有 3 3 3 种。

4.1.1 基于时间

Broker 端可通过参数设置日志的最大保留时间,默认 7 7 7 天。定时任务会查看每个分段的最大时间戳(计算逻辑同上),若最大时间戳距当前时间超过 7 7 7 天,则需要删除。

删除日志分段时, 首先会先从跳跃表中移除待删除的日志分段,保证没有线程对这些日志分段进行读取操作。然后将日志分段所对应的所有文件添加上 .delete 的后缀。最后由专门的定时任务来删除以 .delete 为后缀的文件。

4.1.2 基于日志大小

日志删除任务会检查当前日志的大小是否超过设定的阈值(retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments)。

注意这里的日志的大小是指所有的 Segment 的总和,不是单个 Segment。

kafka数据文件目录,# Kafka,大数据,kafka,消息队列,数据同步

首先计算日志文件的总大小和设定阈值的差值,即计算需要删除的日志总大小,然后从日志文件中的第一个日志分段开始进行查找可删除的日志分段,放入集合 deletableSegments 中 。之后进行删除,删除过程同 4.1.1 小节所述。

4.1.3 基于日志起始偏移量

一般情况下,日志文件的起始偏移 logStartOffset 等于第 1 1 1 个日志分段的 baseOffset,但 logStartOffset 是可以被修改的。

该策略会判断某日志分段的下一个日志分段的起始偏移量 baseOffset 是否小于等于 logStartOffset,若是,则将其放入 deletableSegments 中。如下图所示。

kafka数据文件目录,# Kafka,大数据,kafka,消息队列,数据同步
之后进行删除,删除过程同 4.1.1 小节所述。

4.2 日志压缩

对于有相同 Key 的不同 Value 值,只保留最后一个版本。如果应用只关心 Key 对应的最新 Value 值,则可以开启 Kafka 的日志压缩功能,Kafka 会定期将相同 Key 的消息进行合井,只保留最新的 Value 值。

kafka数据文件目录,# Kafka,大数据,kafka,消息队列,数据同步文章来源地址https://www.toymoban.com/news/detail-797147.html

到了这里,关于【大数据】Kafka 数据存储的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(40)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的分区策略

    01. Kafka 分区的作用 分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的

    2024年02月13日
    浏览(51)
  • 分布式 - 消息队列Kafka:Kafka 消费者消息消费与参数配置

    01. 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似——把想要传给消费者的属性放在Properties对象里。 为简单起见,这里只提供4个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    浏览(42)
  • 消息队列 Kafka

    Kafka 是一个分布式的基于发布/订阅模式的消息队列(MQ,Message Queue),主要应用于大数据实时处理领域 在高并发环境下,同步请求来不及处理会发生堵塞,从而触发too many connection错误,引发雪崩效应。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过

    2024年02月07日
    浏览(38)
  • 消息队列——kafka基础

    首先自然是要列出Kafka官网地址啦:https://kafka.apache.org/ 概述 定义 发布/订阅模式 ​ 原文链接:https://blog.csdn.net/tjvictor/article/details/5223309 ​ 定义了一种 一对多 的依赖关系,让 多个订阅者对象同时监听某一个主题对象 。这个主题对象在自身状态变化时,会通知所有订阅者对

    2024年02月04日
    浏览(40)
  • 消息队列之王——Kafka

        在学习kafka之前,我们需要先学习 Zookeeper ,那Zookeeper是什么呢? Zookeeper 是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。         Zookeeper从 设计模式 角度来理解:是一个基于观察者模式设计的 分布式服务管理框架 ,它 负责存储和管理 大家都关心

    2024年01月23日
    浏览(40)
  • 分布式 - 消息队列Kafka:Kafka生产者发送消息的3种方式

    不管是把Kafka作为消息队列、消息总线还是数据存储平台,总是需要一个可以往Kafka写入数据的生产者、一个可以从Kafka读取数据的消费者,或者一个兼具两种角色的应用程序。 Kafka 生产者是指使用 Apache Kafka 消息系统的应用程序,它们负责将消息发送到 Kafka 集群中的一个或多

    2024年02月13日
    浏览(45)
  • Kafka源码解析之SocketServer,kafka消息队列面试题

    ======================================================================= Kafka处理请求不区分优先级,但这种绝对公平的策略有时会发生问题。 比如:创建一个单分区双副本的主题,当时集群中的Broker A机器保存了分区的Leader副本,Broker B保存了Follower副本。突然业务激增,Broker A瞬间积压大量

    2024年04月08日
    浏览(39)
  • Kafka消息队列实现消息的发送和接收

    消息在Kafka消息队列中发送和接收过程如下图所示: 消息生产者Producer产生消息数据,发送到Kafka消息队列中,一台Kafka节点只有一个Broker,消息会存储在Kafka的Topic(主题中),不同类型的消息数据会存储在不同的Topic中,可以利用Topic实现消息的分类,消息消费者Consumer会订阅

    2024年02月11日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包