八、Kafka时间轮与常见问题

这篇具有很好参考价值的文章主要介绍了八、Kafka时间轮与常见问题。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Kafka与时间轮

Kafka中存在大量的延时操作。
1、发送消息-超时+重试机制

2、ACKS 用于指定分区中必须要有多少副本收到这条消息,生产者才认为写入成功(延时 等)

Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)

JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(log(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。

时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、Zookeeper等组件中都存在时间轮的踪影。

时间轮

Java中任务调度

要回答这个问题,我们先从Java中最原始的任务调度的方法说起。

给你一批任务(假设有1000个任务),都是不同的时间执行的,时间精确到秒,你怎么实现对所有的任务的调度?

第一种思路是启动一个线程,每秒钟对所有的任务进行遍历,找出执行时间跟当前时间匹配的,执行它。如果任务数量太大,遍历和比较所有任务会比较浪费时间。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Cu92Jn2L-1690353345189)(file:///C:/Users/root/AppData/Local/Temp/ksohtml10964/wps14.jpg)]八、Kafka时间轮与常见问题,kafka,kafka,分布式

第二个思路,把这些任务进行排序,执行时间近(先触发)的放在前面。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GfOAIem8-1690353345192)(file:///C:/Users/root/AppData/Local/Temp/ksohtml10964/wps15.jpg)]八、Kafka时间轮与常见问题,kafka,kafka,分布式

如果是数组的时间的话,这里会涉及到大量的元素移动(新加入任务,任务执行–删除任务之类,都需要重新排序)

那么在Java代码怎么实现呢?

JDK包里面自带了一个Timer工具类(java.util包下),可以实现延时任务(例如30分钟以后触发),也可以实现周期性任务(例如每1小时触发一次)。

它的本质是一个优先队列(TaskQueue),和一个执行任务的线程(TimerThread)。

(普通的队列是一种先进先出的数据结构,元素在队列尾追加,而从队列头删除。在优先队列中,元素被赋予优先级。当访问元素时,具有最高优先级的元素最先删除。优先队列具有最高级先出 (first in, largest out)的行为特征。通常采用堆数据结构来实现。)

八、Kafka时间轮与常见问题,kafka,kafka,分布式

八、Kafka时间轮与常见问题,kafka,kafka,分布式

在这个优先队列中,最先需要执行的任务排在优先队列的第一个。然后 TimerThread 不断地拿第一个任务的执行时间和当前时间做对比。如果时间到了先看看这个任务是不是周期性执行的任务,如果是则修改当前任务时间为下次执行的时间,如果不是周期性任务则将任务从优先队列中移除。最后执行任务。

但是Timer是单线程的,在很多场景下不能满足业务需求。

在JDK1.5之后,引入了一个支持多线程的任务调度工具ScheduledThreadPoolExecutor用来替代TImer,它是几种常用的线程池之一。看看构造函数,里面是一个延迟队列DelayedWorkQueue,也是一个优先队列。

八、Kafka时间轮与常见问题,kafka,kafka,分布式

DelayedWorkQueue的最小堆实现

优先队列的使用的是最小堆实现。

最小堆的含义: 一种完全二叉树, 父结点的值小于或等于它的左子节点和右子节点

比如插入以下的数据 [1,2,3,7,17,19,25,36,100]

最小堆就长成这个样子。

八、Kafka时间轮与常见问题,kafka,kafka,分布式

优先队列的插入和删除的时间复杂度是O(logn),当数据量大的时候,频繁的入堆出堆性能不是很好。

比如要插入0,过程如下:

1、插入末尾元素

八、Kafka时间轮与常见问题,kafka,kafka,分布式

2、0比19小,所以要向上移动且互换。

八、Kafka时间轮与常见问题,kafka,kafka,分布式

3、0比2小,所以要向上移动且互换。

八、Kafka时间轮与常见问题,kafka,kafka,分布式

4、0比2小,所以要向上移动且互换。

八、Kafka时间轮与常见问题,kafka,kafka,分布式

算法复杂度

N个数据的最小堆, 共有logN层, 最坏的情况下, 需要移动logN次

时间轮

这里我们先考虑对所有的任务进行分组,把相同执行时刻的任务放在一起。比如这里,数组里面的一个下标就代表1秒钟。它就会变成一个数组加链表的数据结构。分组以后遍历和比较的时间会减少一些。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dpk4Jicj-1690353345195)(file:///C:/Users/root/AppData/Local/Temp/ksohtml10964/wps16.jpg)]八、Kafka时间轮与常见问题,kafka,kafka,分布式

但是还是有问题,如果任务数量非常大,而且时间都不一样,或者有执行时间非常遥远的任务,那这个数组长度是不是要非常地长?比如有个任务2个月之后执行,从现在开始计算,它的下标是5253120。

所以长度肯定不能是无限的,只能是固定长度的。比如固定长度是8,一个格子代表1秒(现在叫做一个bucket槽),一圈可以表示8秒。遍历的线程只要一个格子一个格子的获取任务,并且执行就OK了。

固定长度的数组怎么用来表示超出最大长度的时间呢?可以用循环数组。

比如一个循环数组长度8,可以表示8秒。8秒以后执行的任务怎么放进去?只要除以8,用得到的余数,放到对应的格子就OK了。比如10%8=2,它放在第2个格子。这里就有了轮次的概念,第10秒的任务是第二轮的时候才执行。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qCMzQcjf-1690353345196)(file:///C:/Users/root/AppData/Local/Temp/ksohtml10964/wps17.jpg)]八、Kafka时间轮与常见问题,kafka,kafka,分布式

这时候,时间轮的概念已经出来了。

如果任务数量太多,相同时刻执行的任务很多,会导致链表变得非常长。这里我们可以进一步对这个时间轮做一个改造,做一个多层的时间轮。

比如:最内层8个格子,每个格子1秒;外层8个格子,每个格子8*8=64秒;最内层走一圈,外层走一格。这时候时间轮就跟时钟更像了。随着时间流动,任务会降级,外层的任务会慢慢地向内层移动。

八、Kafka时间轮与常见问题,kafka,kafka,分布式

时间轮任务插入和删除时间复杂度都为O(1),应用范围非常广泛,更适合任务数很大的延时场景。Dubbo、Netty、Kafka中都有实现。

Kafka中时间轮实现

Kafka里面TimingWheel的数据结构

八、Kafka时间轮与常见问题,kafka,kafka,分布式

kafka会启动一个线程,去推动时间轮的指针转动。其实现原理其实就是通过queue.poll()取出放在最前面的槽的TimerTaskList

八、Kafka时间轮与常见问题,kafka,kafka,分布式

八、Kafka时间轮与常见问题,kafka,kafka,分布式

添加新的延迟任务

八、Kafka时间轮与常见问题,kafka,kafka,分布式

往时间轮添加新的任务

八、Kafka时间轮与常见问题,kafka,kafka,分布式

时间轮指针的推进

八、Kafka时间轮与常见问题,kafka,kafka,分布式

第二层时间轮的创建代码如下

八、Kafka时间轮与常见问题,kafka,kafka,分布式

Kafka性能问题

1、kafka如何确保消息的可靠性传输

这个问题需要从以下3个方面分析和解决

(1)消费端弄丢了数据

唯一可能导致消费者弄丢数据的情况,就是说,你那个消费到了这个消息,然后消费者那边自动提交了offset,让kafka以为你已经消费好了这个消息,其实你刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。

大家都知道kafka会自动提交offset,那么只要关闭自动提交offset,在处理完之后自己手动提交offset,就可以保证数据不会丢。但是此时确实还是会重复消费,比如你刚处理完,还没提交offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。

生产环境碰到的一个问题,就是说我们的kafka消费者消费到了数据之后是写到一个内存的queue里先缓冲一下,结果有的时候,你刚把消息写入内存queue,然后消费者会自动提交offset。

然后此时我们重启了系统,就会导致内存queue里还没来得及处理的数据就丢失了

(2)kafka弄丢了数据

这块比较常见的一个场景,就是kafka某个broker宕机,然后重新选举partiton的leader时。大家想想,要是此时其他的follower刚好还有些数据没有同步,结果此时leader挂了,然后选举某个follower成leader之后,他不就少了一些数据?这就丢了一些数据啊。

所以此时一般是要求起码设置如下4个参数:

给这个topic设置replication.factor参数:这个值必须大于1,要求每个partition必须有至少2个副本。

在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower吧。

在producer端设置acks=all:这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了。

在producer端设置retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。

(3)生产者会不会弄丢数据

如果按照上述的思路设置了ack=all,一定不会丢,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

2、如何实现Kafka的高性能?

1、宏观架构层面利用Partition实现并行处理

Kafka中每个Topic都包含一个或多个Partition,不同Partition可位于不同节点。同时Partition在物理上对应一个本地文件夹,每个Partition包含一个或多个Segment,每个Segment包含一个数据文件和一个与之对应的索引文件。在逻辑上,可以把一个Partition当作一个非常长的数组,可通过这个“数组”的索引(offset)去访问其数据。

一方面,由于不同Partition可位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。另一方面,由于Partition在物理上对应一个文件夹,即使多个Partition位于同一个节点,也可通过配置让同一节点上的不同Partition置于不同的disk drive上,从而实现磁盘间的并行处理,充分发挥多磁盘的优势。

利用多磁盘的具体方法是,将不同磁盘mount到不同目录,然后在server.properties中,将log.dirs设置为多目录(用逗号分隔)。Kafka会自动将所有Partition尽可能均匀分配到不同目录也即不同目录(也即不同disk)上。

Partition是最小并发粒度,Partition个数决定了可能的最大并行度。

2、充分利用PageCache

Page Cache,又称pcache,其中文名称为页高速缓冲存储器,简称页高缓。page cache的大小为一页,通常为4K。在linux读写文件时,它用于缓存文件的逻辑内容,从而加快对磁盘上映像和数据的访问。 是Linux操作系统的一个特色。

八、Kafka时间轮与常见问题,kafka,kafka,分布式

1、读Cache

当内核发起一个读请求时(例如进程发起read()请求),首先会检查请求的数据是否缓存到了Page Cache中。

如果有,那么直接从内存中读取,不需要访问磁盘,这被称为cache命中(cache hit);

如果cache中没有请求的数据,即cache未命中(cache miss),就必须从磁盘中读取数据。然后内核将读取的数据缓存到cache中,这样后续的读请求就可以命中cache了。

page可以只缓存一个文件部分的内容,不需要把整个文件都缓存进来。

2、写Cache

当内核发起一个写请求时(例如进程发起write()请求),同样是直接往cache中写入,后备存储中的内容不会直接更新(当服务器出现断电关机时,存在数据丢失风险)。

内核会将被写入的page标记为dirty,并将其加入dirty list中。内核会周期性地将dirty list中的page写回到磁盘上,从而使磁盘上的数据和内存中缓存的数据一致。

当满足以下两个条件之一将触发脏数据刷新到磁盘操作:

数据存在的时间超过了dirty_expire_centisecs(默认300厘秒,即30秒)时间;

脏数据所占内存 > dirty_background_ratio,也就是说当脏数据所占用的内存占总内存的比例超过dirty_background_ratio(默认10,即系统内存的10%)的时候会触发pdflush刷新脏数据。

如何查看Page Cache参数

执行命令 sysctl -a|grep dirty

如何调整内核参数来优化IO性能?

(1)vm.dirty_background_ratio参数优化

这个参数指定了当文件系统缓存脏页数量达到系统内存百分之多少时(如5%)就会触发后台回写进程运行,将一定缓存的脏页异步地刷入磁盘;

当cached中缓存当数据占总内存的比例达到这个参数设定的值时将触发刷磁盘操作。

把这个参数适当调小,这样可以把原来一个大的IO刷盘操作变为多个小的IO刷盘操作,从而把IO写峰值削平。

对于内存很大和磁盘性能比较差的服务器,应该把这个值设置的小一点。

(2)vm.dirty_ratio参数优化

这个参数则指定了当文件系统缓存脏页数量达到系统内存百分之多少时(如10%),系统不得不开始处理缓存脏页(因为此时脏页数量已经比较多,为了避免数据丢失需要将一定脏页刷入外存);在此过程中很多应用进程可能会因为系统转而处理文件IO而阻塞。

对于写压力特别大的,建议把这个参数适当调大;对于写压力小的可以适当调小;如果cached的数据所占比例(这里是占总内存的比例)超过这个设置,

系统会停止所有的应用层的IO写操作,等待刷完数据后恢复IO。所以万一触发了系统的这个操作,对于用户来说影响非常大的。

(3)vm.dirty_expire_centisecs参数优化

这个参数会和参数vm.dirty_background_ratio一起来作用,一个表示大小比例,一个表示时间;即满足其中任何一个的条件都达到刷盘的条件。

为什么要这么设计呢?我们来试想一下以下场景:

如果只有参数 vm.dirty_background_ratio ,也就是说cache中的数据需要超过这个阀值才会满足刷磁盘的条件;

如果数据一直没有达到这个阀值,那相当于cache中的数据就永远无法持久化到磁盘,这种情况下,一旦服务器重启,那么cache中的数据必然丢失。

结合以上情况,所以添加了一个数据过期时间参数。当数据量没有达到阀值,但是达到了我们设定的过期时间,同样可以实现数据刷盘。

这样可以有效的解决上述存在的问题,其实这种设计在绝大部分框架中都有。

(4)vm.dirty_writeback_centisecs参数优化

理论上调小这个参数,可以提高刷磁盘的频率,从而尽快把脏数据刷新到磁盘上。但一定要保证间隔时间内一定可以让数据刷盘完成。

(5)vm.swappiness参数优化

禁用swap空间,设置vm.swappiness=0

3、减少网络开销批处理

批处理是一种常用的用于提高I/O性能的方式。对Kafka而言,批处理既减少了网络传输的Overhead,又提高了写磁盘的效率。

Kafka 的send方法并非立即将消息发送出去,而是通过batch.size和linger.ms控制实际发送频率,从而实现批量发送。

由于每次网络传输,除了传输消息本身以外,还要传输非常多的网络协议本身的一些内容(称为Overhead),所以将多条消息合并到一起传输,可有效减少网络传输的Overhead,进而提高了传输效率。

4、数据压缩降低网络负载

Kafka支持将数据压缩后再传输给Broker。除了可以将每条消息单独压缩然后传输外,Kafka还支持在批量发送时,将整个Batch的消息一起压缩后传输。数据压缩的一个基本原理是,重复数据越多压缩效果越好。因此将整个Batch的数据一起压缩能更大幅度减小数据量,从而更大程度提高网络传输效率。

Broker接收消息后,并不直接解压缩,而是直接将消息以压缩后的形式持久化到磁盘。Consumer Fetch到数据后再解压缩。因此Kafka的压缩不仅减少了Producer到Broker的网络传输负载,同时也降低了Broker磁盘操作的负载,也降低了Consumer与Broker间的网络传输量,从而极大得提高了传输效率,提高了吞吐量。

5、高效的序列化方式

Kafka消息的Key和Value的类型可自定义,只需同时提供相应的序列化器和反序列化器即可。

因此用户可以通过使用快速且紧凑的序列化-反序列化方式(如Avro,Protocal Buffer)来减少实际网络传输和磁盘存储的数据规模,从而提高吞吐率。这里要注意,如果使用的序列化方法太慢,即使压缩比非常高,最终的效率也不一定高。文章来源地址https://www.toymoban.com/news/detail-610180.html

到了这里,关于八、Kafka时间轮与常见问题的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink开发常见问题 —— flink-kafka 依赖版本冲突问题

    由于 flink / kafka 的版本不断更新,创建项目的时候就应当考虑清楚这几个依赖库的版本问题,尽可能地与实际场景保持一致,比如服务器上部署的 kafka 是哪个版本,flink 是哪个版本,从而确定我们需要开发的是哪个版本,并且在真正的开发工作开始之前,应当先测试一下保证

    2024年02月07日
    浏览(59)
  • Kafka如何保证消息的消费顺序【全局有序、局部有序】、Kafka如何保证消息不被重复消费、Kafka为什么这么快?【重点】、Kafka常见问题汇总【史上最全】

    目录 Kafka消息生产 一个Topic对应一个Partition 一个Topic对应多个Partition Kafka消息的顺序性保证(Producer、Consumer) 全局有序 局部有序  max.in.flight.requests.per.connection参数详解 Kafka的多副本机制 Kafka的follower从leader同步数据的流程 Kafka的follower为什么不能用于消息消费 Kafka的多分区

    2024年04月11日
    浏览(54)
  • 分布式系统常见问题

    分布式系统存在网络,时钟,以及许多不可预测的故障。分布式事务,一致性与共识问题,迄今为止仍没有得到很好的解决方案。要想完美地解决分布式系统中的问题不太可能,但是实践中应对特定问题仍有许多可靠的解决方案。本文不会谈及诸如BASE, CAP, ACID 等空泛的理论,

    2024年02月04日
    浏览(86)
  • 【技术驿站】分布式基础与常见面试问题

    💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学习,不断总结,共同进步,活到老学到老 导航 檀越剑指大厂系列:全面总

    2024年02月04日
    浏览(48)
  • Jmeter分布式测试的注意事项和常见问题

    Jmeter是一款开源的性能测试工具,使用Jmeter进行分布式测试时,也需要注意一些细节和问题,否则可能会影响测试结果的准确性和可靠性。 如果使用csv文件进行参数化,即通过读取csv文件中的数据来为测试脚本提供不同的输入值,那么需要注意以下两点: 需要把参数文件在

    2024年02月06日
    浏览(47)
  • 性能测试 —— Jmeter分布式测试的注意事项和常见问题

    Jmeter是一款开源的性能测试工具,使用Jmeter进行分布式测试时,也需要注意一些细节和问题,否则可能会影响测试结果的准确性和可靠性。 Jmeter分布式测试时需要特别注意的几个方面 1. 参数化文件的位置和内容 如果使用csv文件进行参数化,即通过读取csv文件中的数据来为测

    2024年02月05日
    浏览(44)
  • 【Redis 开发】分布式锁中的常见问题和Lua脚本

    分布式锁中我们设置的过期时间: 如果有一个线程获取锁之后在进行操作时,到达了锁的过期时间,之后就会有别的线程获得锁,如果这时,第一个线程执行完成后释放锁,就会将第二个锁的线程删除 针对这个情况如何改进: 在获取锁时存入线程标示(可以用UUID) 在释放

    2024年04月28日
    浏览(46)
  • Hive常见时间日期函数的使用与问题整理

    这里整理一下Hive常见的时间函数和日期函数和用法,作为平时数据处理过程的一个检索和记录。 平时在数据处理过程中,如果不经常使用时间函数,一时间遇到一些时间上的处理,难免会想不起来。 hive本身提供的时间函数已经很丰富了,基本上能满足我们所有的需求,一些

    2024年02月08日
    浏览(47)
  • kafka自动退出问题:开启kafka一段时间后,jps发现kafka自动退出。

    解决方法: 开启zk,来到zk路径下开启一个zk bin/zkServer.sh start . 查看zk的状态 bin/zkServer.sh status 发现zk并没有运行,这是正常现象,因为一台机器开启zk,没有达到集群的半数,不工作。 接着在第二台机器上开启zk 并查看zk状态 bin/zkServer.sh start bin/zkServer.sh status 如果zk状态出现

    2024年02月13日
    浏览(37)
  • 【kafka】JDBC connector进行表数据增量同步过程中的源表与目标表时间不一致问题解决...

    〇、参考资料 时间不一致,差了8个小时 (1)source (2)sink 即sink和source都加  \\\"db.timezone\\\": \\\"Asia/Shanghai\\\", 并需要保持一直

    2024年02月11日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包