首先简单来说一下,flink2.0做存算分离,最最主要的一点是解决,大状态的问题,例如一个超过50T的物流数据,大状态恢复可能就要1天,所以才有存算分离这么一个设计初衷。
下面先来看一下 任务是怎么执行提交的,看一下state在整个流程里 处于一个什么位置
)
像为了解决算子checkpoint对齐 推出了unaligned CP + Buffer Debloating,阿里内一般叫online checkpoint ,保存两端channel数据 增速checkpoint的速度,测试后发现 channel里的数据导致的checkpoint增加大小,对备份和恢复几乎是没有影响的,所以很值得(这部分不了解的可以看我的flink其他帖子,有一个checkpoint的,专门讲了unaligned cp)
通用增量快照,是解耦了 flink本身做cp的流程和 state本身做快照的流程。这样的话可以做双写,状态做快照可以在backend后台慢慢做,平时把状态的增量快照持续上传,真正需要做cp的时候,只需要上传很少的数据。
下面右图,尖峰减少了很多,是因为状态后端做快照的时候,要做campation (忘了怎么拼),这个非常消耗cpu,并且状态快照如果都集中在cp时上传的话,网络消耗峰值也会飙升。
1 引言
我们在这个时间点重新聊状态存储这个话题是因为状态存储是流计算的核心。Flink 从 2017 年 VLDB 发表奠基之作介绍状态存储[1] 发展至今,Flink 在状态这部分的架构基本并没有太大的变化。但时代是在不断演进和变化的,无论部署模式、存储模式,还是作业负载,都已经发生了翻天覆地的变化。
从部署模式看,我们经历了 map-reduce 时代 Cluster 上没有资源隔离的部署,到云原生时代 K8S 容器为粒度隔离部署。
与此同时,硬件的提升也带来的存储模式的变化,最显著的是网络带宽从 10 年前几百 Mbps,到现在 25Gbps 起步,基本上可以和本地 SSD 的宽度比肩,且内网传输的话速度快、成本低,这使得存储模式从本地存储,到分布式存储,再向云存储的转变。云存储虽然延迟高,但具有容量大、成本低的优势,非常契合现在降本增效的大背景。
另外作业负载方面也发生了很大变化,Flink 刚诞生时,几百 MB 的状态就算大状态场景了,而现在物流等一些场景下,几百 TB 的状态也是很常见的。技术的进步、用户需求的变化带给我们对状态存储这个部分的一些新的思考,这就有了本文要探讨的话题。
本文内容主要分为三个方面探讨一下 Flink 存算分离架构的演进方向。
第一部分,先讲为什么状态对 Flink 的重要性,因为状态访问在单条数据处理路径上,处理延迟对整体性能影响很大;
第二部分,会讲一讲近期状态存储这个部分社区以及我们在阿里内部所做的一些工作和探索;
第三部分,聊一聊基于这些探索工作,Flink 存算分离架构应该是什么样的。
2 为什么状态对 Flink 如此重要
2.1 状态的角色
在 Flink 流处理中,如果后续计算需要依赖前面输入的时候,需要用状态表(State Table)来保存中间数据。在 Task Manager 中,通过一个称为 StateBackend 的服务组件为算子层提供状态读写服务。如果状态太大,内存无法承载时,StateBackend 会把状态数据以文件的形式组织并存储在本地磁盘上。但是通常本地盘不具备数据可靠性,所以 Flink 需要对状态进行定期的快照,把本地文件上传到分布式文件系统(OSS/HDFS/S3 等)。另外,一个 Flink 作业中往往有多个Task 包含有状态算子,因此 Flink 需要确保这些不同的 Task 上的状态快照是全局一致的,也就是说是对齐输入的,这个过程就是 Flink Checkpointing 过程。
2.2 Flink状态管理的需求以及现存的问题
这一部分我们主要探讨一下目前 Flink 状态存储的需求以及现存的问题,有了这个前提我们才能“对症下药”。
首先,状态处理是流计算的关键路径。目前 Flink State 接口都是单线程、单 KV 访问模式的,因此 State 读写的延迟会极大地影响 Flink 作业的处理性能(TPS)。而大多数 Flink 线上作业都会挂载本地盘,其访问延迟基本在 us 级别。如果将状态本地存储直接替换成 DFS 存储后,访问延迟部分会遇到很大的瓶颈,这也是之前很多人尝试 Flink 远端状态存储遇到性能问题的原因。另外,对状态做快照的过程也需要尽量轻量稳定,不能影响正常的数据处理。
在云原生的大背景下,对状态提出了更高的要求。
首先是本地盘大小的限制。容器化技术可以做到很好的资源隔离,这也就意味着资源需要预分配。在 Flink 场景中,作业在启动时 Task Manager 本地盘大小就已经固定了;在作业运行期间进行磁盘重新挂载(remount)又十分麻烦,一般各大云厂商的全托管服务也不支持这种高级功能。因此,如果在作业运行期间本地磁盘满了,通常只能通过扩并发解决。但对于公有云用户来说,往往是不太愿意因为状态写满磁盘去多付费的。
除了本地盘资源,对于其他资源(比如 CPU 和 Network IO 等)也是一样的,容器化架构希望这些资源使用可以尽量平滑,避免出现尖峰,才能更好地实现资源隔离。最后是高效的弹性伸缩(Elasticity),社区现在结合 K8S Operator 可以做到原地 rescale, 以及利用 1.19 将要做的 Local Rescale 可以部分解决弹性伸缩的问题,但是状态稍微大点之后还是会有状态下载慢的问题。
此外还有一些用户视角来讲的需求,比如说状态可查询和状态可复用。关于状态可查询,社区经常有用户询问 Queryable State, 但 Queryable State 其自身架构存在缺陷,开启时会影响 Task Manager 正常的数据处理,目前在社区已经处于 Deprecate 状态。我们后续在存算分离架构设计的时候也是需要全面考虑这类需求的。
3 状态存储提升 —— 社区和商业版状态存储
基于上述的这些需求和挑战,我们接下来看一下,在社区和我们的商业版本里面,围绕状态存储优化我们都做了哪些尝试以及还存在什么问题。
3.1 分布式快照架构升级
分布式快照这个部分,从 Flink 1.11 版本开始,历经社区 6 个版本,我们在 Checkpoint 的稳定性和完成速度方面做了一些工作,主要包含三个功能:动态调节缓存数据(Buffer Debloating)、非对齐快照(Unaligned Checkpoint)、通用增量 checkpoint。这三个功能在各大公司都有大规模实际落地,发挥其应有的价值。
Unaligned Checkpoint 和 Buffer Debloating 主要是为了防止或者减少反压时 Checkpoint Barrier 被算子间管道中数据阻塞的情况发生。从实验结果来看 Unaligned Checkpoint 在反压的时能把快照完成时间缩短 90%(这个前提是 state size 不能太大,barrier 对齐时间是主要因素)。另外,因为 Unaligned Checkpoint 存储了算子管道中的数据,所以我们也测试了其对全量 Checkpoint 大小和状态恢复时间的影响,测试结果显示全量 Checkpoint 大小的增加和状态恢复的额外时间基本上可以忽略不计。
当作业状态比较大的时候,状态上传时间就会成为 Checkpoint 完成耗时的主导因素,这时我们可以用通用增量快照来加速 Checkpoint。通用增量快照解耦了 Checkpointing 过程和状态快照过程,可以把增量的状态 changelog 持续上传到 DFS,使得在做 Checkpoint 时只需要上传很少的数据量,因此可以做到无论状态多大, Checkpoint 基本都可以在 10s 内完成(Nexmark 上所有状态 Query 的测试结果均符合)。
通用增量 Checkpoint 最开始是为加速 Checkpoint 设计的,后来发现还有一些额外的收益,如上图所示,CPU 和网络流量的尖峰使用减少了 70%,这对于整体 Cluster 的稳定性和资源节省都是非常关键的。该优化的原因是原来 Flink 周期性快照会让作业中所有 Rocksdb 状态后端周期性触发 Compaction 和上传文件,导致 Checkpoint 期间消耗大量 CPU 和 Network IO,容易形成 CPU 和 Network IO 尖峰。通用增量快照把 Checkpointing 和状态快照过程解耦后,状态快照触发的时间可以随机化,避免 CPU 和网络使用尖峰的形成。
另外,因为通用增量快照速度很快,在容错时数据回放量可以大大减少,这对于重复数据比较敏感的作业是非常关键的。事物都有两面性,通用增量快照的额外开销是需要双写数据,我们也利用 Nexmark Stateful Query 测试了其对 TPS 的影响,结果显示在网络带宽不是瓶颈的情况下,通用增量快照对压测 TPS 影响小于 5%。
3.2 面向云原生:高效弹性扩缩容
除了 Checkpoint 这个部分,社区从 1.17 开始也在尝试更高效的弹性扩缩容。
首先 Flink 1.18 中 Adaptive Scheduler 支持了 Rest API,K8S auto scaler 可以调用这个 API 进行原地扩缩容,也就是说在更新作业之后 Task Manager 和 Job Manager 不用被重启,可以最大限度的复用现有资源。
其次,对于文件下载这个部分,Flink-1.18 开始支持并行下载,平均耗时减少 30%;在 1.19 预计会支持 Local Rescale,扩缩容的时候可以免除重复下载,配合上述的 K8S 原地扩缩容,这个部分提升应该是会很大的。
最后在状态重建这个方面,Flink 1.18 中恢复状态时禁用 WAL,可以减少恢复时的额外开销;Flink 1.19 之后,状态重建可以支持文件粒度的合并裁剪,重建状态后端耗时减少 50% - 80%。
3.3 Gemini:面向流计算场景的分层状态存储
这一章节分享我们在阿里内部做的关于状态云原生部分的一些探索:Gemini,阿里自研的一款面向流计算场景的分层状态存储。
Gemini 在阿里内部全量上线,服务于不同状态大小的作业(从几百MB到上百TB),也成功度过了 2021 — 2023 双十一的洗礼,取得了非常好的降本增效效果。另外,我们基于 Flink1.15 利用 Nexmark Benchmark,对 Gemini 和 Rocksdb 做了纯状态后端层面的性能对比,从测试结果可以看出 Gemini 对作业性能(单核吞吐能力)提升效果十分显著,平均 50%+。
Gemini 可以看成是一个分层的状态存储,内存数据存放在 Mem Table(读写) 和 Cache(只读)中,热数据被写在 Local Disk,本地盘写满后将部分冷数据写到 DFS 上。抛开 DB 其他功能,我们重点分享一下 Gemini 存算分离方面的探索。首先,云上全托管服务都是有磁盘容量限制的,大部分用户是不愿意为状态把本地盘写满这件事情额外付费去增加并发的,所以 Gemini 实现了本地和远端统一的文件管理,把本地盘作为主存,DFS 作为再下一级存储,在 DFS 上数据量不大的情况下,性能是可以接受的,但是如果频繁访问 DFS,性能还是会有较大的影响。
在快速扩缩容方面,Gemini 主要有两方面优化:文件粒度合并剪裁以及状态文件懒加载。其中,文件粒度合并剪裁和社区现在的做法稍有不同,Gemini 的文件裁剪是异步的,可以让作业更快地恢复;状态文件懒加载是指远端文件异步下载,按需加载,Flink 作业在恢复元数据后, 就可以开始半速运行。从我们的测试结果中可以看到,热更新+状态懒加载可以让作业停止处理的时间从 200s 下降到 20s。
4 状态管理存算分离架构 —— 架构演进和挑战
在状态管理分层架构下,状态使用的难题目前都有一些解决方案,但是这些方法并不是很完美,我们在实现过程中也遇到了一些困难,在最后一节和大家分享一下我们的经验,以及存算分离架构演进的可能方案。
4.1 云原生架构演进
在 Gemini 的统一文件管理架构中,状态文件会在本地盘满了以后刷到 DFS,但仍然以本地存储作为 Primary Storage。这种形式的弊端是让文件管理比较复杂而且不太好扩展,比如说我们有更多层的存储时需要架构有较大改动。这种形式相对于把本地存储当成 Cache 的情况会更复杂,特别是对于文件写失败等异常情况的处理。其次,以本地 state 为主存的模型中,每个 Task 缺少算子完整状态的 Global View,这会让状态的共享化比较难以实现,对于一些状态特别大且有共享需求的作业来说,实现成本就会成倍增加。更关键的是现在这种模式下,很多 DB 设计都是基于本地存储为主存的前提做的,这让状态 DB Compaction 和冷热分离策略都变得相对复杂,计算节点和存储节点的捆绑实际还是比较紧密的。
为此我们可以考虑换个思路来做这个事情,把 DFS 作为 Primary Storage,而把本地磁盘作为 Cache 并且是不强制需要的(Optional),DFS 的数据是 Source of truth。在这种形式下,状态数据直接持续写入 DFS,内存和本地盘作为 Cache,服务于上层的算子状态访问。这种架构的好处是:
-
Checkpoint 被触发的时候,大部分状态文件已经在 DFS 上了, Checkpoint 可以速度很快;
-
状态文件在算子之间是可以共享的,一些场景下通过共享状态计算,存储资源可以大大减少;
-
状态查询可以基于远端 Checkpoint 文件实现,并提供文件级别 API;
-
Compaction 和状态清理不再和 Task Manager 绑定,我们可以通过 remote compaction、load balancing 等手段来均衡 Cluster 整体资源的使用。
然而仅仅将状态主存从本地磁盘切换到远端文件系统,就可以完美实现存算分离架构了吗?答案是 No。仅仅如此的话,作业性能会特别差,因为相比于本地状态架构,State 写到远端后访问性能会出现很大回退。
4.2 Flink 现有状态访问线程模型
性能回退的原因是受限于目前的 Flink 线程访问模型,数据处理必须以单线程单条 Record 处理的模式进行,对 State 访问的延迟极其敏感。其中,状态访问以单 KV 为粒度;算子数据处理会封装成 Mail,在 task 线程中顺序执行,因此单 record 的延迟对性能影响很明显。从读写链路上具体分析的话,对状态的写路径是 non-blocking 的,writes 追加写到 writeBuffer 中,writeBuffer 写满后异步刷到磁盘上,所以 write 流程对于整体的 TPS 影响不大。但是读路径是非常影响整体性能的,读数据时如果在 Block Cache 中 Cache miss,需要从磁盘上读取数据,读取过程会阻塞数据处理,影响作业总体的性能。
更近一步,我们可以看到 Task 线程执行一次计算时,时间主要消耗在以下几个部分:
-
Operator CPU:用于执行算子逻辑;
-
State CPU:读写 access 以及序列化反序列化等;
-
State IO:状态读盘的时间。正因为读盘操作是 Blocking IO,而 DFS 的访问延迟相比于本地盘很大很多,所以单线程模型直读 DFS 速度会比本地盘慢很多。
为了解决这个问题,我们可以考虑把 Task 线程的 Blocking IO 改造成 Non-Blocking IO,这样才能更高效地充分利用 CPU 和 IO 资源。在 state 文件多线程并发访问的初步实验中,当我们把 thread pool size 调到足够大的时候,性能有明显的提升(约有 6 倍)。实际上,Non-Blocking IO 架构要考虑的问题肯定要比现在的 state 多线程访问实验要复杂很多,比如说同一个 Key 的执行顺序如何保证等等,我们也做了一些 POC 并得到一些初步结论,敬请期待后续文章分享。
5 总结
最后是本次分享的一些总结 。
第一是状态访问在流计算中起到了非常关键的作用。在单条 record 的访问路径上,状态访问的处理速度会严重影响到 TPS ,这也是之前很多尝试 Flink 远端状态存储访问会非常慢的原因。
第二,云原生时代对状态存储提出了更高的要求,比如受限的本地盘、快速扩缩容以及平滑的资源使用等等。基于这个,我们做了大量的优化工作,但是还不太足够,因此我们提出 State 的存储访问架构需要进一步的演化。
第三,社区已经推出的 Unaligned checkpoint 和通用增量 checkpoint 这两个功能 ,逐步在各大公司和厂商里广泛的实践和使用,我们内部也进行了小范围的尝试,效果很不错。
第四,介绍了 Gemini,阿里内部的面向流计算的分层状态存储,我们在这个过程中进行了一些云原生以及存算分离模型的探索。
6 Flink 大状态管理痛点
从这里开始 我们就进入改造实践了。
6.1 Flink 状态管理
状态管理是有状态流计算的核心。目前在 Flink 生产环境中使用的最多的状态后端是基于 RocksDB 的实现,它是一个以本地磁盘为主的状态管理,将状态文件存储于本地,同时在进行检查点的时候将文件周期性写入 DFS 。这是一种存算一体的架构,它足够简单,在小状态作业下能够保证稳定高效,可以满足绝大部分场景的需求。随着 Flink 的发展,业务场景日益复杂,大状态作业屡见不鲜,在存算一体的架构下涌现了很多与状态管理有关的现实问题。
6.2 大状态作业痛点
大状态作业下,基于 RocksDB 本地磁盘存算一体的状态管理主要会遇到以下四方面的问题:
-
本地磁盘有可能会出现空间不足的情况,通常解决这类问题的方法就是扩容。在目前集群部署或是云原生部署的模式下,单独进行本地盘的扩容是不方便的,所以用户一般会增加并发度,也就是涉及到存储和计算绑在一起进行扩容,加剧了计算资源的浪费。
-
作业正常状态访问时,本地磁盘 I/O 也会遇到一些瓶颈。这导致作业整体性能不足,同样需要进行扩并发操作。
-
检查点的开销比较大。由于状态非常大,在检查点期间对于远端存储访问量呈现一个尖峰态势。
-
在作业恢复的时候,需要从远端读取全量文件到本地,这个过程也十分缓慢。
上述前两点是影响用户成本的问题,而检查点的开销与恢复速度是 Flink 中影响易用性的关键问题。
6.3 存算分离的架构
对于以上问题,我们提出了存算分离的架构来解决。存算分离可以摆脱本地磁盘的限制,将远端存储(DFS)作为主存储,同时将空闲的本地磁盘作为一个 Cache 来进行使用。同时用户仍可以选择本地磁盘作为主存储,还用原来的模式来运行。这样做的显著的好处是,一方面由于磁盘空间和 I/O 性能不足的问题不再影响计算资源,另一方面是状态检查点与恢复在远端就可以直接完成,变得更加轻量级。从架构上完美解决了大状态作业面临的问题。
7 阿里云自研状态存储后端 Gemini 的存算分离实践
在进入存算分离架构探讨的最开始,我希望先从阿里云自研的企业级状态存储 Gemini 入手,探寻它在存算分离上的一些实践,主要分为以下三项:
7.1 多种文件系统分层管理
Gemini 能够把远端作为状态主存储的一部分。它首先将状态文件存储于本地磁盘,如果本地磁盘不足,则将文件移动到远端存储。本地磁盘中存留的是访问概率高的文件,远端存储的是不容易访问的文件。两部分共同构成了主存储,并在此基础上进行了冷热划分,保证了在给定资源条件下的高效服务。Gemini 的这种文件分层管理模式摆脱了本地磁盘空间的限制。理论上本地空间可以配置为零,以达到纯远端存储的效果。
7.2 状态懒加载
Gemini 能够支持远端文件存储,在作业恢复的场景之下,无需将数据从远端文件加载回本地就可以开启服务,使用户作业进入运行状态。这一功能称为状态懒加载。在实际恢复过程中,Gemini 仅需将元数据以及少量内存中的数据从远端加载回,就可以重建整个存储并启动。
虽然作业已经从远端文件启动了,但读取远端文件涉及到更长的 I/O 延迟,性能仍旧不理想,此时需要使用内存和本地磁盘进行加速。Gemini 会使用后台线程进行异步下载,将未下载的数据文件逐渐转移至本地磁盘。下载过程分为多种策略,比如按照 LSM-tree 层次的顺序,或者按照实际访问的顺序来下载。这些策略可以在不同场景进一步缩短从懒加载恢复到全速运行性能的时间。
7.3 Gemini 延迟剪裁
在改并发的场景中,比如将两个并发的状态数据合并成一个并发时,目前 RocksDB 是把这两份数据都下载完成之后再做一个合并,涉及到将多余的数据剪裁掉,重建数据文件,其速度是比较慢的。社区针对这个过程进行了很多的针对性优化,但仍然避免不了数据文件的下载。Gemini 只需要把这两部分数据的元数据进行加载,并且把它们合成一个特殊的 LSM-tree 结构,就可以启动服务,这一过程称为延迟剪裁。
重建后 LSM-tree 的层数相比正常情况下会比较多。比如针对图中的例子,有两个 L0 层,两个 L1 层和两个 L2 层。由于 Flink 有 KeyGroup 数据划分的机制存在,层数变多并不会对读链路长度造成影响。由于并未对数据进行实际的裁剪,会存在一些多余的数据,这些数据会在之后的整理 (Compaction) 过程逐步清理掉。延迟剪裁的过程无需对数据本身进行下载和实际合并操作,它可以极大地缩短状态恢复的时间。
7.4 Gemini 恢复效果
有了异步剪裁状态+状态懒加载,对于 Gemini 来说,恢复时间即作业从 INITIALIZING 到 RUNNING 的状态可以变得非常之短,相比于本地状态存储的方案是一个极大的提升。
我们针对 Gemini 与 RocksDB 的改并发时间进行了评测。评测的指标为从作业启动一直到恢复原有性能的时间,这包含了 Gemini 异步下载文件的时间。从上述实验结果中可以看到 Gemini 相比于RocksDB 在缩容、扩容的场景下都有明显的提升。
8 存算分离的进一步探索
Gemini 做存算分离相关的优化部分解决了前述大作业场景的问题。本地空间不足的问题可以通过远端空间来解决。针对检查点开销大的问题,因为已经有一部分文件远端存储上了,无需再次上传,这部分的开销也得以减少。针对作业恢复慢的问题,状态懒加载+延迟剪裁功能,使得作业能够快速的恢复运行状态。
这里还有一个功能是对 Memtable 的快照。Gemini 在做检查点的时候,是将 Memtable 的原样上传到远端存储上,不会影响 Memtable flush 的过程,也不会影响内部的 Compaction。它的效果和通用增量快照的 changelog 的效果是类似的,都会缓解检查点时的 CPU 开销和 DFS I/O 量的尖峰。
8.1 Gemini 存算分离的问题
Gemini 在存算分离方面做了不错的实践,在阿里内部与云上客户的大状态作业场景下均取得了不错的效果。但它仍存在着一些问题:
第一个问题,Gemini 还是把本地磁盘作为主存的一部分,状态文件是优先写到本地磁盘的,这并非最彻底的一个存算分离。这样会导致检查点时上传文件数量还是会比较多,持续时间较长,做不到非常轻量级的检查点。
第二个问题,是所有存算分离方案都会遇到的一个问题,就是与本地方案的性能差距。目前的方案中 Gemini 已经利用了本地磁盘,但本地磁盘的利用效率并不是最高的。如果更多的请求可以落到内存或者本地磁盘,对应的远端 I/O 的请求数降低,作业整体性能会有提升。另外,异步 I/O 是很多存储系统都会采用的优化。它使用提高 I/O 并行度的方式来解决提高作业的吞吐,是值得尝试的下一步优化方向。
针对这几个问题我们进行了简单的探索,首先是做了一个非常彻底的存算分离,直接写入远端存储并且把本地磁盘直接作为 Cache 来使用,在此基础上实践了不同形式的 Cache。第二方面,我们实现了一个简单的异步 I/O PoC,验证其在存算分离场景上的性能提升。
8.2 直接写入远端与本地磁盘Cache的探索
8.2.1 原始方案:基于文件的本地磁盘 Cache
直接使用远端存储作为主存的改动我们不作详述,在这里主要探讨 Cache 的形态与优化问题。最简单的架构是基于文件的 Cache 。如果远端的文件被访问到,它会被加载到本地磁盘 Cache。与此同时内存 Cache 仍然存在,并且仍旧采用 BlockCache 的形式。这个形式是非常简单高效的架构。但是内存 BlockCache 和本地磁盘的文件 Cache 有很大的一个数据重复,这相当于浪费了很多空间。另一方面,由于文件的粒度相对较粗,对于同一个文件的不同 block ,其访问的概率并不一样,所以会有一些冷的 block 维持在磁盘中,降低了本地磁盘的命中率。针对这两个问题,我们设计了全新的本地磁盘 Cache 的形态,对上述问题进行优化。
8.2.2 优化方案:基于 Block 的本地磁盘 Cache
我们提出将本地磁盘与内存结合起来,组成一个以 block 为粒度的混合式 Cache。它使用一个整体的 LRU 进行统一的管理,不同 block 只有介质上的不同。内存上相对冷的 block 会异步地刷到本地磁盘上,磁盘的 block 是按照顺序以追加写的形式来写在底层文件中。如果由于 LRU 策略淘汰了某些磁盘的 block,必然会映射到某个文件上形成空洞。为了维持 Cache 空间有效性,我们采取了空间回收来进行优化。空间回收的过程是一个空间和 CPU 开销的权衡。
不同层的文件如 L0 file 、L1 file 以及 L2 file,它们的生命周期是不一样的。对于 L0 file 来讲,它的生命周期比较短一些,但是热度相对高。对于 L2 file 来讲,文件本身更容易存活,但是热度是相对低的。根据这些不同的特点,我们可以采取不同的策略来进行空间回收。来自不同层文件 block 会被 Cache 在不同的底层文件中。针对不同的底层文件可以执行不同的空间回收阈值与频率,这样可以保证最大的空间回收效率。
另外我们针对 block 淘汰策略也提出了优化方案。最原始的 LRU 是根据命中频率来进行管理的,某个 block 一段时间内不命中则会被淘汰。这种策略并没有考虑到在缓存某一个block 的空间开销。也就是说可能为了缓存某个 block,却有更多的 block 没有办法进行缓存。在这里引入了一个新的评判体系叫做缓存效率,用一段时间内命中次数除以 block 大小,来更好的评判每一个缓存的 block 是否应该被缓存。这种评判方式的缺点是开销会比较大。最基本的 LRU 针对于查询都是 O(1) 的,但缓存效率的评分需要实现一个优先队列,其运行效率会有较大下降。所以在这里的思路还是在保持 LRU 主体管理的情况下,针对 block 的缓存效率异常的情况进行特殊化处理。
目前发现有两部分异常,第一部分是内存中的 data block 。它的命中率是内存中相对低的,但是它的占比能达到 50%。目前对于它的策略就是进行压缩,其代价是每次访问涉及到解压,但这个开销要比进行一个 I/O 的开销要小得多。第二部分是磁盘中的 filter block 。虽然它有命中,但它的大小是比较大的,缓存效率并不高。在这里实现了一个倾向于把磁盘中的 filter block 优先踢出的策略,使得相对上层的数据可以缓存进来。在测试作业场景中,这两条特殊规则与 LRU 相结合,相比于没有这两条规则的时候,整体 TPS 会上升 22%,效果比较显著。
但直接写入远端使系统出现了远端文件冷读问题,即文件第一次生成后的读取仍然需要涉及到远端 I/O。为了解决这个问题,我们在这里也做了一个小的优化,在本地磁盘上提供一个上传远端的队列,并且让其中的文件多缓存一段时间。这个时间并不会很长,大概是二三十秒的一个级别,在此期间队列文件的远端 I/O 会变为本地 I/O。这样的做法能够让远端冷读的问题大大的缓解。
到目前为止我们有两种存算分离的 Cache 方案,第一种是基于文件的本地磁盘 Cache 方案,它的优点是非常简单和有效,在磁盘充足的场景下有与本地方案类似的性能,因为本地磁盘可以缓存所有文件。第二种是混合式 block cache 的优化,在本地磁盘不足的情况下是一个非常好的方案,因为它提升了 Cache 的命中率。但是它也带来了比较大的管理开销。如果我们想要有一个通用的方案来适配所有场景,应该怎么做呢?
8.2.3 混合方案:自适应变化
将上述两种方案结合,我们设计了一个自适应变化的的混合方案。在磁盘充足的情况下使用的是基于文件的 Cache 方案,在磁盘不足的情况下,会把本地磁盘自动的和内存结合在一起组成混合式 block cache 方案。两种方案的结合会让它们两个的优点结合在一起,在所有的场景下都能够最大化的满足性能效率的需求。
8.2.4 混合方案:评测
我们针对上述提出的混合方案使用测试作业进行评测。可以看到在 TPS 上,新方案相比于文件为粒度的原始缓存方案有 80% 的提升。同时它也伴随着一些 CPU 的开销,用 CPU 效率(TPS/CPU)作为评判标准,新方案也有 40% 的提升。Cache 命中率的提升是 TPS 提升的一个主要来源。
8.3 异步I/O的探索
8.3.1同步单条处理模式
第二项探索是对 Flink 进行的异步 I/O 改造与测试。如图展示了目前 Flink 的单线程处理模型,在 Task 线程上面,所有的数据是按顺序来进行处理的。对于每一条数据处理,会分为算子(operator)的 CPU 开销,状态(State)访问的 CPU 开销,以及状态访问所需的 I/O 时间,其中 I/O 是最大的一块开销。由于存算分离需要访问远端存储,其 I/O 延迟会比本地方案大,最终会导致整体 TPS 有明显下降。
8.3.2 批量处理+内部异步模式
我们对这一模式进行更改,使得 State 操作可以同时进行。在 Task 线程的角度来讲,State 被并行化之后整体的时间被缩小,所以 TPS 会有一个提升。同时,Task 线程需要预先攒批,这和 micro-batch 做的事情是非常类似的,同理也可以借用预聚合的功能,降低 state 访问的数目,TPS 得以进一步提升。
8.3.3 算子异步+批量处理模式
更进一步,在加上状态访问异步的基础之上,可以继续探索从算子的角度上进行异步化的过程。这意味着状态访问已经开始了异步执行后,让 Task 线程得以继续进行其他数据的 CPU 操作。但这样做有一个问题:状态访问 I/O 一般都是时间比较长的,虽然在 Task 线程闲的时候可以做一些其他的数据的处理工作,但是最终会一个速率不匹配的问题,瓶颈最终还会落到状态访问上,会退化到没有做此优化的情况。
经过权衡,我们认为仅采用攒批,再加上批内的状态访问使用异步 I/O 这种方式,是一个比较平衡的方案。
8.3.4 存算分离+批量化:评测
我们做了一个简单的支持批量异步的接口的状态后端,并在社区 Microbenchmark 上面做了一个简单的测试,目前仅涉及到 value get 的场景。从对比结果上可以看到,批量执行加上异步 I/O 是对存算分离场景有很大的提升。
9 批量化异步 I/O 存算分离适用场景
上述探索的批量化执行的存算分离状态访问有独特的应用场景。对于大状态作业来讲,存算分离在功能上解决了最开始所述的几个问题,在性能上,用批量接口的方式来弥补它的低的问题。
9.1 性能分析
此种方案的性能来源是 State 访问在批次内并行化,减少了状态访问的时间,提升了计算节点的 CPU 利用率。这种方案对于大状态作业性能提升是很有用的。
9.2 定性性能分析
在小状态作业的场景下,状态访问可以做到非常的快,将状态访问从 Task 线程抽离出来的提升量很小,且引入了线程之间交互的开销。所以在小状态的场景,这种批量异步状态访问的方案或许还不如原始本地状态管理方案。
随着状态大小逐渐增大,状态 I/O 开销逐渐增大并成为了瓶颈,异步 I/O 的执行当于摊薄了每个 I/O 所耗的时间。这导致了图中红色线的下降是较慢的,而本地状态管理(蓝色线)降低会比较快。在达到某个状态大小后,异步 I/O 的方案性能会显著的好。这种方案需要消耗 I/O 带宽,如果状态访问已经达到了 I/O 上限,异步 I/O 不能减少 I/O 的总时间,故此时它的斜率跟本地状态管理差不多。
如果状态很小的时候就达到 I/O 上限,并行化执行并不会产生效果,上图所示的便是这个场景。
总结一下,批量并异步执行状态访问在满足以下条件时会有优势:
-
大状态作业场景且状态访问是作业的瓶颈
-
I/O 并没有达到瓶颈(未打满)
-
业务对于攒批的延迟(亚秒到秒级别)可以接受文章来源:https://www.toymoban.com/news/detail-835721.html
绝大部分存算分离场景下,由于 I/O 性能是存储集群提供,可以支撑比较大的 I/O 量且可以灵活伸缩,一般不会过早达到 I/O 瓶颈状态,异步 I/O 可以很好的优化存算分离场景。文章来源地址https://www.toymoban.com/news/detail-835721.html
到了这里,关于Flink 2.0 状态管理存算分离架构演进与分离改造实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!