【Flink系列二】如何计算Job并行度及slots数量

这篇具有很好参考价值的文章主要介绍了【Flink系列二】如何计算Job并行度及slots数量。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

接上文的问题

  1. 并行的任务,需要占用多少slot ?
  2. 一个流处理程序,需要包含多少个任务

首先明确一下概念

slot:TM上分配资源的最小单元,它代表的是资源(比如1G内存,而非线程的概念,好多人把slot类比成线程,是不恰当的)

任务(task):线程调度的最小单元,和java中的类似。

---------------------------------------------------------------------------

为更好的去理解后面如何计算并行度及需要的slots数量,先介绍一下几个概念

并行度(Parallelism)

flink slot 数量设置,从零开始搞大数据,flink,大数据
图1
  •  一个特定算子的子任务(subtask)的个数被称之为并行度(parallelism)一般情况下,一个stream的并行度,可以认为就是其所有算子中最大的并行度。
  • 图中source算子的并行度=2,map算子的并行度=2,keyby算子的并行度=2,sink算子的并行度=1

ps:并行度的设置有3个地方,1=代码中指定,2=提交Job时指定-p参数,3=Flink配置文件conf中执行,其优先级1>2>3, 不详细展开,有问题可以评论区

由图1,我们可以算出stream的任务数=7(两个source + 两个map + 两个keyby + 一个sink)

TaskManager和Slots

flink slot 数量设置,从零开始搞大数据,flink,大数据
图2
  • Flink中每个TaskManager都是一个JVM进程,它可能会在独立的线程上执行一个或多个任务
  • 为了控制一个TM(TaskManager缩写)能接受多少哥task,TM通过task slot来进行控制(一个TM至少有1个slot)
  • 建议TM中slot数量设置为cpu核心数,因为一个TM中slot内存的独享的,但是cpu是共享的,为避免不同slot执行任务时争抢cpu资源,建议slot数量设置和cpu核心数一致
  • 图中slot数量决定了TM上的最大线程并行能力,一个slot可以执行一个线程,也可以串行执行多个线程。

图2中我们看到

  1. source和map算子合并到一块了,那为什么可以合并呢?
  2. 合并后每个任务都占用一个slot,一共是占用了5个slot,现实真的是这样的吗?

带着问题,再看一个例子

source和map算子及keyby算子的并行都调整为6,sink算子的并行度还是1,排列方式如图

flink slot 数量设置,从零开始搞大数据,flink,大数据
图3

按照我们上面的理解,我们应该需要的slot数量=6+6+1=13,但是这样会造成slot资源的浪费(流处理任务第一个算子处理完了之后需要等后面的算子都执行完,再开始下一批次的任务处理),为此,Flink允许任务共享slot

  • 默认情况下,Flink允许子任务共享slot(必须是前后执行的不同的任务),及时他们是不同任务的子任务。这样的结果是,一个slot可以保存作业的整个管道。
  • Task slot是静态的概念,是指TM具有的并发的并行执行能力

所以,Flink优化后一共占用6个slot。

slot共享组

  • 任务槽共享的好处:

1.Flink 集群所需的 task slot 和作业中使用的最大并行度恰好一样。无需计算程序总共包含多少个 task(具有不同并行度)。
2.资源 容易获得更好的资源利用。如果没有 slot 共享,非密集 subtask(source/map())将阻塞和密集型 subtask(window)
一样多的资源
 

默认情况下会设置一个默认的共享组, slotSharingGroup("default"),这样所有的算子都可以共享slot;如果想让两个算子任务不共享slot,通过调整共享组来实现。 不同的共享组一定在不同的slot上

// 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        //设置并行度,所有算子都默认这个并行度
        env.setParallelism(1);


        DataStreamSource<String> ds = env.socketTextStream("hadoop102", 8888);

        ds.flatmap(new WordCount.MyFlatMapper()).name("f1").setParallelism(2).slotSharingGroup("a")
          .keyBy(0)
          .sum(1).setParallelism(2).slotSharingGroup("c");
          .print().setParallelism(1)

        // 5. 启动执行
        env.execute();

show plan后我们可以看到slot没有共享,执行stream需要4个slot

flink slot 数量设置,从零开始搞大数据,flink,大数据
图4

如果不单独设置slot共享组,那么该任务的slot个数=2,

并行子任务的分配

flink slot 数量设置,从零开始搞大数据,flink,大数据
图5

图5中有两条不同的流,每个字母右下角的下标代表并行度,A并行度=4,B并行度=4,C并行度=2,D并行度=4,E并行度=2;

整个任务开启slot共享后,一个会有4+4+4+2+2=16个任务,一共需要申请4个slot;

C->D过程涉及数据的合并,需要将数据copy到D的每个子任务中。

总结

下图中在flink中配置文件flink-conf设置的并行度是3,flink集群中TM数量=3,每个TM中slot数量=3

Example1中代码中设置的paeallelism=1,并且允许slot共享,所以会占用1个slot,3个算子任务

Example1中代码中设置的paeallelism=2,并且允许slot共享,所以会占用2个slot,6个算子任务

flink slot 数量设置,从零开始搞大数据,flink,大数据文章来源地址https://www.toymoban.com/news/detail-800568.html

到了这里,关于【Flink系列二】如何计算Job并行度及slots数量的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入理解 Flink(七)Flink Slot 管理详解

    JobMaster 中封装了一个 DefaultScheduler,在 DefaultScheduler.startSchedulingInternal() 方法中生成 ExecutionGraph 以执行调度。 资源调度的大体流程如下: Register:当 TaskExecutor 启动之后,会向 ResourceManager 注册自己(TaskExecutor)和自己内部的 Slot(TaskManagerSlot)。 Status Report:TaskExecutor 启动之

    2024年01月21日
    浏览(42)
  • 实际并行workers数量不等于postgresql.conf中设置的max_parallel_workers_per_gather数量

    本文件的源码来自PostgreSQL 14.5,其它版本略有不同 并行workers并不能显箸提升性能。个人不建议使用并行worker进程,大多数情况下采用postgresql.conf默认配置即可。 PostgreSQL的并行workers是由compute_parallel_worker函数决定的,compute_parallel_worker是估算扫描所需的并行工作线程数,并不

    2024年02月10日
    浏览(35)
  • 如何解决大规模并行计算中的线性代数问题

    作者:禅与计算机程序设计艺术 对大型矩阵运算而言,由于矩阵的元素之间的关系非常复杂,因此当运算过程中涉及到矩阵乘法、行列转置等运算时,通常采用并行化的方法进行加速处理。目前,主要的并行化技术包括基于硬件的多核CPU并行化技术、分布式集群并行化技术、

    2024年02月14日
    浏览(43)
  • 【SQL开发实战技巧】系列(三十四):数仓报表场景☞如何对数据分级并行转为列

    【SQL开发实战技巧】系列(一):关于SQL不得不说的那些事 【SQL开发实战技巧】系列(二):简单单表查询 【SQL开发实战技巧】系列(三):SQL排序的那些事 【SQL开发实战技巧】系列(四):从执行计划讨论UNION ALL与空字符串UNION与OR的使用注意事项 【SQL开发实战技巧】系列

    2023年04月09日
    浏览(51)
  • 解决FLink:Missing required options are: slot.name

    解决 https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/postgres-cdc.html?highlight=slot 原因 其他坑 1、需要jdk8,不然本地./bin/start-cluster.sh虽然可以启动,但是log里面会有错

    2024年02月16日
    浏览(51)
  • Flink:并行度介绍和设置并行度

    一个Flink程序由多个Operator组成(source、transformation和 sink)。 一个Operator由多个并行的Task(线程)来执行, 一个Operator的并行Task(线程)数目就被称为该Operator(任务)的并行度(Parallel) 1.Operator Level(算子级别)(可以使用) 一个算子、数据源和sink的并行度可以通过调用 setParallelism()方法

    2024年02月16日
    浏览(37)
  • (增加细粒度资源管理)深入理解flink的task slot相关概念

    之前对flink的task slot的理解太浅了,重新捋一下相关知识点 我们知道,flink中每个TaskManager都是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask(线程)。 但是TaskManager 的计算资源是有限的,并不是所有任务都可以放在同一个 TaskManager 上并行执行。并行的任务越多

    2024年03月11日
    浏览(41)
  • Flink stop 和 cancel停止 job 的区别 Flink 停止 job 的方式(stop 和 cancel)

    后边跟的任务id 是flink的任务ID,不是yarn的 注:stop方式停止任务对 source 有要求,source必须实现了StopableFunction接口,才可以优雅的停止job 取消任务。如果在 conf/flink-conf.yaml 里面配置了 state.savepoints.dir ,会保存savepoint, 否则不会保存 savepoint。 使用 命令方式 也可以在停止的

    2024年02月12日
    浏览(46)
  • Flink Job 执行流程

    ​ 基于 Yarn 层面的架构类似 Spark on Yarn 模式 ,都是由 Client 提交 App 到 RM 上面去运行,然后 RM 分配第一个 container 去运行 AM ,然后由 AM 去负责资源的监督和管理 。需要说明的是, Flink 的 Yarn 模式更加类似 Spark on Yarn 的 cluster 模式,在 cluster 模式中, dirver 将作为 AM 中的一

    2024年02月04日
    浏览(45)
  • 深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年01月16日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包