Flink|《Flink 官方文档 - 部署 - 内存配置 - 网络缓冲调优》学习笔记

这篇具有很好参考价值的文章主要介绍了Flink|《Flink 官方文档 - 部署 - 内存配置 - 网络缓冲调优》学习笔记。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

学习文档:《Flink 官方文档 - 部署 - 内存配置 - 网络缓冲调优》

学习笔记如下:


概述

Flink 中每条消息都会被放到网络缓冲(network buffer) 中,并以此为最小单位发送到下一个 subtask。 Flink 在传输过程的输入端和输出端使用了网络缓冲队列,即每个 subtask 都有一个输入队列来接收数据和一个输出队列来发送数据,拥有更多的中间缓存数据可以使 Flink 提供更高、更富有弹性的吞吐量,但是也会增加快照时间。

  • 对于对齐的 checkpoints,checkpoint barrier 会随着网络缓冲数据在 job graph 中流动;因此缓冲数据越多,checkpoint barrier的流动时间就越长
  • 对于不对齐的 checkpoint,缓冲数据越多,需要持久化到 checkpoint 中的数据越多,checkpoint 也就会越大

缓冲消胀机制(Buffer Debloating)

Flink 1.14 之前配置缓冲数据量的方法:

  • 指定缓冲数量
  • 指定缓冲大小

Flink 1.14 之后引入缓冲消胀功能来解决这个问题。缓冲消胀功能会计算 subtask 可能达到的最大吞吐(始终保持繁忙状态时),并根据计算结果调整缓冲数据量,从而使得数据的消费时间达到配置值。使用缓冲消胀机制的配置:

  • taskmanager.network.memory.buffer-debloat.enabledtrue:开启缓冲消胀机制
  • taskmanager.network.memory.buffer-debloat.targetduration 类型的值:指定消费缓冲数据的目标时间(默认值应该能满足大多数场景)

如果缓冲消胀功能的消费时间预测不准,会导致以下问题:

  • 缓冲数据不足,影响整体吞吐量
  • 缓冲数据太多,影响 checkpoint barrier 推进或非对齐的 checkpoint 大小

如果 Job 的负载经常波动,可以长治调整以下设置:

  • taskmanager.network.memory.buffer-debloat.period:这是缓冲区大小重算的最小时间周期。周期越小,缓冲消胀机制的反应时间就越快,但是必要的计算会消耗更多的CPU。
  • 调整样本数:样本数越少,缓冲消胀机制的反应时间就越快,但是当吞吐量突然飙升或者下降时,缓冲消胀机制计算的最佳缓冲数据量会更容易出错。
    • taskmanager.network.memory.buffer-debloat.samples:计算平均吞吐量的采样数。
    • taskmanager.network.memory.buffer-debloat.period:计算平均吞吐量的采集样本的频率。
  • taskmanager.network.memory.buffer-debloat.threshold-percentages:防止缓冲区大小频繁改变的优化

可以使用以下指标来监控当前的缓冲区大小:

  • estimatedTimeToConsumeBuffersMs:消费所有输入通道(input channel)中数据的总时间。
  • debloatedBufferSize:当前的缓冲区大小。
限制

当前,有一些场景还没有自动地被缓冲消胀机制处理。

  • 有多个输入流,或者有一个合并的输入的场景:因为吞吐计算和缓冲消胀发生在 subtask 层面,所以可能会导致低吞吐的输入有太多缓冲数据,而高吞吐输入的缓冲区数量太少而不能维持当前吞吐。当不同的输入吞吐差别比较大时,这种现象会更加明显。
  • 缓冲区的大小和数量未实际改变:当前,缓冲消胀仅在使用的缓冲区大小中设置上限,实际的缓冲区大小和个数保持不变。因此,当前的缓冲消胀机制不会减少作业的内存使用,需要手动减少缓冲区的大小或者个数。
  • 高并行度:目前,缓冲消胀机制在高并行度(超过 200)时可能无法正常执行。如果出现吞吐量降低或检查点时间高于预期,则建议将浮动缓冲区数量(taskmanager.network.memory.floating-buffers-per-gate)从默认值增加到至少等于并行度的数量。问题发生的并行度的实际值因作业而异,但通常应超过几百个。

网络缓冲生命周期

在 Flink 中,每个输入和输出流对应一个本地缓冲区池,每个缓冲区池的缓冲区数的计算公式如下:
channels × taskmanager.network.memory.buffers-per-channel + taskmanager.network.memory.floating-buffers-per-gate \texttt{channels} \times \texttt{taskmanager.network.memory.buffers-per-channel} + \texttt{taskmanager.network.memory.floating-buffers-per-gate} channels×taskmanager.network.memory.buffers-per-channel+taskmanager.network.memory.floating-buffers-per-gate
每个缓冲区(Buffer)的大小可以通过 taskmanager.memory.segment-size 来设置。

输入网络缓冲

缓冲区池不一定总能达到目标缓冲区数。Flink 有一个阈值 taskmanager.network.memory.read-buffer.required-per-gate.max 用于控制 Flink 在无法获取到缓冲区时是否会失败。在目标缓冲区数中,小于阈值的部分被称为必须(required)缓冲区,剩余的部分是可选(optional)缓冲区。如果无法获得必须缓冲区,会导致任务失败;如果无法获得可选缓冲区,任务不会失败,但可能会降低性能。

对于流作业,这个阈值的默认值是 Integer.MAX_VALUE;对于批作业,这个阈值的默认值是 1000。通常来说,这个阈值越小,出现 “网络缓冲区数量不足” 异常的可能性越小,但导致作业静默地性能下降的可能性越大。

输出网络缓冲

输出缓冲区池只有一种类型的缓冲区被所有的 subpartitions 共享。为了避免过多的数据倾斜,每个 subpartition 的缓冲区数量可以通过 taskmanager.network.memory.max-buffers-per-channel 来限制。

独占缓冲区和流动缓冲区:输出缓冲区池的独占缓冲区和流动缓冲区只被当作推荐值,如果没有足够的缓冲区,每个输出 subpartition 可以只使用一个独占缓冲区而没有流动缓冲区。

透支缓冲区(Overdraft buffers)

每个 subtask 输出数据时可以至多请求 taskmanager.network.memory.max-overdraft-buffers-per-gate (默认 5)个额外的透支缓冲区(overdraft buffers)。只有当前 subtask 被下游 subtasks 反压且当前 subtask 需要请求超过 1 个网络缓冲区(network buffer)才能完成当前的操作时,透支缓冲区才会被使用。

可能使用透支缓冲区的场景包括:

  • 序列化非常大的 records,不能放到单个网络缓冲区中
  • 类似 flatMap 的算子在处理单个 record 时产生了过多的 records
  • 周期性地或某些事件触发产生大量 records 的算子(例如 WindowOperator 的触发)。

在这些情况下,如果没有透支缓冲区,Flink 的 subtask 线程会被阻塞在反压,从而阻止例如 Unaligned Checkpoint 的完成。 为了缓解这种情况,增加了透支缓冲区的概念。这些透支缓冲区是可选的,Flink 可以仅仅使用常规的缓冲区逐渐取得进展,也就是 说 0taskmanager.network.memory.max-overdraft-buffers-per-gate 可以接受的配置值。

设置缓冲区的大小和数量

独占缓冲区和流动缓冲区的默认配置应该足以应对最大吞吐。

如果想要最小化缓冲数据量,那么可以将独占缓冲区设置为 0,同时减小内存段的大小。

选择缓冲区的大小

在往下游 subtask 发送数据部分时,缓冲区通过汇集 record 来优化网络开销。下游 subtask 应该在接收到完整的 record 后才开始处理它。

如果缓冲区太小,或缓冲区刷新得过于频繁(通过 execution.buffer-timeout 参数配置),则可能导致吞吐量降低,因每个缓冲区的开销明显高于每个记录的开销。

根据经验,通常不建议增加缓冲区的大小或缓冲区的超时时间,除非在实际工作中观察到网络瓶颈,即下游 Operator 空闲,上游 Operator 背压,上游的输出缓冲区队列已满,下游的输入队列为空。

如果缓冲区太大,会导致:

  • 内存使用高
  • 大量的 checkpoint 数据量(针对非对齐的 checkpoints)
  • 漫长的 checkpoint 时间(针对对齐的 checkpoints)
  • execution.buffer-timeout 较小时内存分配使用率会比较低,因为缓冲区还没被塞满数据就被发送下去了
选择缓冲区的数量

缓冲区的数量通过 taskmanager.network.memory.buffers-per-channeltaskmanager.network.memory.floating-buffers-per-gate 配置的。

为了最好的吞吐率,建议使用独占缓冲区和流动缓冲区的默认值。如果缓冲数据量存在问题,则建议打开缓冲消胀。

可以人工地调整网络缓冲区的个数,但是需要注意:

  1. 根据期待的吞吐量来调整缓冲区的数量。协调数据传输量(大约两个节点之间的两个往返消息)。延迟也取决于您的网络。
    使用 buffer 往返时间(大概 1ms 在正常的本地网络中),缓冲区大小和期待的吞吐,可以通过下面的公式计算维持吞吐所需要的缓冲区数量:

number_of_buffers = expected_throughput × buffer_roundtrip / buffer_size \texttt{number\_of\_buffers} = \texttt{expected\_throughput} \times \texttt{buffer\_roundtrip} / \texttt{buffer\_size} number_of_buffers=expected_throughput×buffer_roundtrip/buffer_size

比如,期待吞吐为 320MB/s,往返延迟为 1ms,内存段为默认大小,为了维持吞吐需要使用10个活跃的缓冲区:
number_of_buffers = 320 M B / s × 1 m s / 32 K B = 10 \texttt{number\_of\_buffers} = 320MB/s \times 1ms / 32KB = 10 number_of_buffers=320MB/s×1ms/32KB=10

  1. 流动缓冲区的目的是为了处理数据倾斜。理想情况下,流动缓冲区的数量(默认8个)和每个通道独占缓冲区的数量(默认2个)能够使网络吞吐量饱和。但这并不总是可行和必要的,所有 subtask 中只有一个通道被使用也是非常罕见的。
  2. 独占缓冲区的目的是提供一个流畅的吞吐量。当一个缓冲区在传输数据时,另一个缓冲区被填充。当吞吐量比较高时,独占缓冲区的数量是决定 Flink 中缓冲数据的主要因素。

当低吞吐量下出现反压时,应该考虑减少独占缓冲区。

总结

可以通过开启缓冲消胀机制来简化 Flink 网络的内存配置调整。

如果缓冲消胀机制不能起作用,则可以关闭缓冲消胀机制并且人工地配置内存段的大小和缓冲区个数。此时推荐:文章来源地址https://www.toymoban.com/news/detail-811542.html

  • 使用默认值以获得最大吞吐
  • 减少内存段大小、独占缓冲区的数量来加快 checkpoint 并减少网络栈消耗的内存量

到了这里,关于Flink|《Flink 官方文档 - 部署 - 内存配置 - 网络缓冲调优》学习笔记的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink任务内存调优,TaskManager、JobManager内存配置

            Flink是基于java的JVM运行,拥有高效的数据处理能力,但是考虑到用户在 Flink 上运行的应用的多样性,尽管flink框架已经为所有配置项提供合理的默认值,仍无法满足所有情况下的需求。 为了给用户生产提供最大化的价值, Flink 允许用户在整体上以及细粒度上对集

    2024年02月06日
    浏览(47)
  • Flink|《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》学习笔记

    学习文档:《Flink 官方文档 - 部署 - 命令行界面 - 提交 PyFlink 作业》 学习笔记如下: 当前,用户可以通过 CLI 提交 PyFlink 作业。对于通过 flink run 提交的 Python 作业,Flink 会执行 python 命令。因此,在启动 Python 作业前,需要先确定当前环境中的 python 命令指向 3.7+ 版本的 Pyt

    2024年02月22日
    浏览(65)
  • Flink|《Flink 官方文档 - 内幕 - 文件系统》学习笔记

    学习文档:内幕 - 文件系统 学习笔记如下: Flink 通过 org.apache.flink.core.fs.FileSystem 实现了文件系统的抽象。这种抽象提供了一组通用的操作,以支持使用各类文件系统。 为了支持众多的文件系统, FileSystem 的可用操作集非常有限。例如,不支持对现有文件进行追加或修改。

    2024年02月03日
    浏览(37)
  • Flink|《Flink 官方文档 - 概念透析 - Flink 架构》学习笔记

    学习文档:概念透析 - Flink 架构 学习笔记如下: 客户端(Client):准备数据流程序并发送给 JobManager(不是 Flink 执行程序的进程) JobManager:协调 Flink 应用程序的分布式执行 ResourceManager:负责 Flink 集群中的资源提供、回收、分配 Dispatcher:提供了用来提交 Flink 应用程序执行

    2024年01月19日
    浏览(48)
  • Flink|《Flink 官方文档 - DataStream API - 概览》学习笔记

    学习文档:Flink 官方文档 - DataStream API - 概览 学习笔记如下: Flink 的 DataStream API: 数据里的起始是各种 source,例如消息队列、socket 流、文件等; 对数据流进行转换,例如过滤、更新状态、定义窗口、聚合等; 结果通过 sink 返回,例如可以将数据写入文件或标准输出。 Da

    2024年01月23日
    浏览(55)
  • Flink|《Flink 官方文档 - 概念透析 - 及时流处理》学习笔记

    学习文档:概念透析 - 及时流处理 学习笔记如下: 及时流处理时有状态流处理的扩展,其中时间在计算中起着一定的作用。 及时流的应用场景: 时间序列分析 基于特定时间段进行聚合 对发生时间很重要的事件进行处理 处理时间(processing time) 处理时间的即数据到达各个

    2024年02月03日
    浏览(52)
  • Flink|《Flink 官方文档 - DataStream API - 算子 - 窗口》学习笔记

    学习文档:《Flink 官方文档 - DataStream API - 算子 - 窗口》 学习笔记如下: 窗口(Window):窗口是处理无界流的关键所在。窗口可以将数据流装入大小有限的 “桶” 中,再对每个 “桶” 加以处理。 Keyed Windows 在 Keyed Windows 上使用窗口时,要调用 keyBy(...) 而后再调用 window(..

    2024年01月18日
    浏览(47)
  • Flink|《Flink 官方文档 - DataStream API - 状态与容错 - 使用状态》学习笔记

    学习文档:Flink 官方文档 - DataStream API - 状态与容错 - 使用状态 相关文档: 有状态流处理背后的概念:Flink|《Flink 官方文档 - 概念透析 - 有状态流处理》学习笔记 Redis 过期 key 的删除机制:Redis|过期 key 的删除机制 学习笔记如下: 如果要使用键控状态,则必须要为 DataS

    2024年02月03日
    浏览(43)
  • Flink|《Flink 官方文档 - DataStream API - 用户自定义 Functions》学习笔记 + 源码分析

    学习文档:Flink 官方文档 - DataStream API - 用户自定义 Functions 学习笔记如下: 用户可以通过实现接口来完成自定义 Functions。 实现接口并使用的样例: 使用匿名类实现的样例: 使用 Lambda 表达式实现(Java 8)样例: 所有的 Flink 函数类都有其 Rich 版本,在 Rick function 中,可以获

    2024年01月18日
    浏览(48)
  • flink sql checkpoint 调优配置

    - `execution.checkpointing.interval`: 检查点之间的时间间隔(以毫秒为单位)。在此间隔内,系统将生成新的检查点 SET execution.checkpointing.interval = 6000; - `execution.checkpointing.tolerable-failed-checkpoints`: 允许的连续失败检查点的最大数量。如果连续失败的检查点数量超过此值,作业将失败

    2024年02月12日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包