Spark源码解析(一):RDD之Transfrom算子

这篇具有很好参考价值的文章主要介绍了Spark源码解析(一):RDD之Transfrom算子。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、延迟计算

RDD 代表的是分布式数据形态,因此,RDD 到 RDD 之间的转换,本质上是数据形态上的转换(Transformations)

在 RDD 的编程模型中,一共有两种算子,Transformations 类算子和 Actions 类算子。开发者需要使用 Transformations 类算子,定义并描述数据形态的转换过程,然后调用 Actions 类算子,将计算结果收集起来、或是物化到磁盘。

在这样的编程模型下,Spark 在运行时的计算被划分为两个环节。

  1. 基于不同数据形态之间的转换,构建计算流图(DAG,Directed Acyclic Graph)
  2. 通过 Actions 类算子,以回溯的方式去触发执行这个计算流图

换句话说,开发者调用的各类 Transformations 算子,并不立即执行计算,当且仅当开发者调用 Actions 算子时,之前调用的转换算子才会付诸执行。在业内,这样的计算模式有个专门的术语,叫作“延迟计算”(Lazy Evaluation)。

二、Spark算子分类

在 RDD 的开发框架下,哪些算子属于 Transformations 算子,哪些算子是 Actions 算子呢?

这里给出一张自己在极客看的课程中的图

Spark源码解析(一):RDD之Transfrom算子

三、Transform算子执行流程(源码)

Map转换算是 RDD 的经典转换操作之一了.就以它开头.Map的源码如下:

Spark源码解析(一):RDD之Transfrom算子

1. sc.clean(f)

首先掉了一个sc.clean(f) , 我们进到clean函数里看下:

Spark源码解析(一):RDD之Transfrom算子

注释中明确提到了这个函数的功能:clean 整理一个闭包,使其可以序列化并发送到任务.

这里的代码有些多,大概知道这个函数的功能是这样就ok了,闭包的问题会在另一篇文章里仔细介绍

2. MapPartitionsRDD

进入到函数后源码如下:

Spark源码解析(一):RDD之Transfrom算子

这是一个MapPartitionsRDD。我们仔细看它的构成,从而来理解它是如何描述MapPartitionsRDD的.

2.1 var prev:RDD[T]

这里的 prev 就是父RDD,f 则是Map中传入的处理函数,除了这两个就没有了,也就是说明 RDD中没有存储具体的数据本身

这再次印证了转换不会产生任何数据.它只是单纯了记录父RDD以及如何转换的过程就完了,不会在转换阶段产生任何数据集

2.2 preservesPartitioning

preservesPartitioning 表示是否保持父RDD的分区信息.
如果为false(默认为false),则会对结果重新分区.也就是Map系默认都会分区
如果为true,保留分区. 则按照 firstParent 保留分区   

Spark源码解析(一):RDD之Transfrom算子

可以看到根据 dependencies 找到其第一个父 RDD

Spark源码解析(一):RDD之Transfrom算子

2.3 compute 计算逻辑
2.3.1 compute方法

RDD 抽象类要求其所有子类都必须实现 compute 方法,该方法接受的参数之一是一个Partition 对象,目的是计算该分区中的数据。

override def compute(split: Partition, context: TaskContext): Iterator[U] =
  f(context, split.index, firstParent[T].iterator(split, context))

可以看到,compute 方法调用当前 RDD 内的第一个父 RDD 的 iterator 方法,该方的目的是拉取父 RDD 对应分区内的数据。

iterator 方法会返回一个迭代器对象,迭代器内部存储的每个元素即父 RDD 对应分区内已经计算完毕的数据记录。得到的迭代器作为 f 方法的一个参数。fRDD 类的 map 方法中指定,即实际的转换函数。

compute 方法会将迭代器中的记录一一输入 f 方法,得到的新迭代器即为所求分区中的数据。

其他 RDD 子类的 compute 方法与之类似,在需要用到父 RDD 的分区数据时候,就会调用 iterator 方法,然后根据需求在得到的数据之上执行粗粒度的操作。换句话说,compute 函数负责的是父 RDD 分区数据到子 RDD 分区数据的变换逻辑。

2.3.2 iterator方法

此方法的实现在 RDD 这个抽象类中

/**
 * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
 * This should ''not'' be called by users directly, but is available for implementers of custom
 * subclasses of RDD.
 */
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

interator首先检查 存储级别 storageLevel:此处可参考RDD持久化

如果存储级别不是NONE, 说明分区的数据说明分区的数据要么已经存储在文件系统当中,要么当前 RDD 曾经执行过 cachepersise 等持久化操作,此时需要从存储空间读取分区数据,调用 getOrCompute 方法

Spark源码解析(一):RDD之Transfrom算子

getOrCompute 方法会根据 RDD 编号:id分区编号:partition.index 计算得到当前分区在存储层对应的块编号:blockId,通过存储层提供的数据读取接口提取出块的数据。

代码中的这几句注释给的非常到位,大致的判断顺序如下:

  • 块命中的情况:也就是数据之前已经成功存储到介质中,这其中可能是数据本身就在存储介质中(比如通过读取HDFS创建的RDD),也可能是 RDD 在经过持久化操作并且经历了一次计算过程,这个时候我们就能成功读取数据并将其返回
  • 块未命中的情况:可能是数据已经丢失,或者 RDD 经过持久化操作,但是是当前分区数据是第一次被计算,因此会出现拉取得到数据为 None 的情况。这就意味着我们需要计算分区数据,继续调用 RDDcomputeOrReadCheckpoint 方法来计算数据,并将计算得到的数据缓存到存储介质中,下次就无需再重复计算。

如果当前RDD的存储级别为 None,说明为未经持久化的 RDD,需要重新计算 RDD 内的数据,这时候调用 RDD 类的 computeOrReadCheckpoint 方法,该方法也在持久化 RDD 的分区获取数据失败时被调用。

Spark源码解析(一):RDD之Transfrom算子

computeOrReadCheckpoint 方法会检查当前 RDD 是否已经被标记成检查点,如果未被标记成检查点,则执行自身的 compute 方法来计算分区数据,否则就直接拉取父 RDD 分区内的数据。

需要注意的是,对于标记成检查点的情况,当前 RDD 的父 RDD 不再是原先转换操作中提供数据的父 RDD,而是被 Apache Spark 替换成一个 CheckpointRDD 对象,该对象中的数据存放在文件系统中,因此最终该对象会从文件系统中读取数据并返回给 computeOrReadCheckpoint 方法

参考文章:

Cache 和 Checkpoint文章来源地址https://www.toymoban.com/news/detail-807398.html

到了这里,关于Spark源码解析(一):RDD之Transfrom算子的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 2023_Spark_实验十:RDD基础算子操作

    Ø练习 1: Ø 练习 2: Ø 练习 3: Ø 练习 4: Ø 练习 5: groupByKey groupByKey会将RDD[key,value]按照相同的key进行分组,形成RDD[key,iterable[value]]的形式,有点类似于sql中的groupby,例如类似于mysql中的group_contact cogroup groupByKey是对单个RDD的数据进行分组,还可以使用一个叫作cogroup()的函

    2024年02月08日
    浏览(36)
  • 2023_Spark_实验十一:RDD高级算子操作

    coalesce : 总所周知,spark的rdd编程中有两个算子repartition和coalesce。公开的资料上定义为,两者都是对spark分区数进行调整的算子。         repartition会经过shuffle,其实际上就是调用的coalesce(shuffle=true)。         coalesce,默认shuffle=false,不会经过shuffle。         当

    2024年02月08日
    浏览(29)
  • Spark大数据处理学习笔记(3.2.2)掌握RDD算子

    衔接上文:http://t.csdn.cn/Z0Cfj 功能: reduce()算子按照传入的函数进行归约计算 案例: 计算1 + 2 + 3 + …+100的值 计算1 × 2 × 3 × 4 × 5 × 6 的值(阶乘 - 累乘) 计算1 2 + 2 2 + 3 2 + 4 2 + 5**2的值(先映射,后归约) 功能: collect()算子向Driver以数组形式返回数据集的所有元素。通常对

    2024年02月08日
    浏览(38)
  • RDD算子操作(基本算子和常见算子)

    目录 一、基本算子         1.map算子         2.flatMap算子         3.filter算子          4.foreach算子         5.saveAsTextFile算子         6.redueceByKey算子 二、常用Transformation算子                 1.mapValues算子         2.groupBy算子         3.distinct算子        

    2024年02月08日
    浏览(29)
  • 大数据之RDD的算子分类

    上一篇文章主要讲述了两种RDD的创建方式,本篇文章接着讲RDD的算子及其分类。 RDD的算子主要有两种类型,一种是Transformation转换算子,另一种是Action动作算子,Transformation转换算子执行完成后会返回一个新的RDD,所有的Transformation转换算子都是Lazy,不会立即执行,需要Acti

    2024年02月05日
    浏览(17)
  • Spark重温笔记(二):快如闪电的大数据计算框架——你真的了解SparkCore的 RDD 吗?(包含企业级搜狗案例和网站点击案例)

    前言:今天是温习 Spark 的第 2 天啦!主要梳理了 Spark 核心数据结构:RDD(弹性分布式数据集),其中包括基于内存计算的 SparkCore 各类技术知识点希望对大家有帮助! Tips:\\\"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起

    2024年03月25日
    浏览(27)
  • openCV实战-系列教程4:图像梯度计算(Sobel算子/开运算/梯度计算方法/scharr算子/lapkacian算子)、源码解读 ?????OpenCV实战系列总目录

    打印一个图片单独做出一个函数: 先读进来一个原型白色图 打印结果: 如图有两个3*3的卷积核,其中A是原始图片中的一个3*3的区域,这个A和3*3的卷积核所谓对应位置相乘的结果就分别是左右梯度和上下梯度 假如A是这个矩阵:  那么Gx的计算结果就为:-x1+x3-2x4+2x6-x7+x9 代码实

    2024年02月10日
    浏览(32)
  • Spark 【RDD编程(一)RDD编程基础】

            在Spark中,RDD是弹性分布式数据集(Resilient Distributed Dataset)的缩写。通俗来讲,RDD是一种抽象的数据结构,用于表示分布式计算中的数据集合。它是Spark中最基本的数据模型,可以看作是一个不可变的、可分区、可并行处理的数据集合。这个数据集的全部或部分可

    2024年02月09日
    浏览(45)
  • Spark【RDD编程(三)键值对RDD】

            键值对 RDD 就是每个RDD的元素都是 (key,value)类型的键值对,是一种常见的 RDD,可以应用于很多场景。                 因为毕竟通过我们之前Hadoop的学习中,我们就可以看到对数据的处理,基本都是以键值对的形式进行统一批处理的,因为MapReduce模型中

    2024年02月09日
    浏览(38)
  • spark源码的scala解析

    一、scala抽象类和java的有何不同? 在org/apache/spark/util/collection/SortDataFormat.scala中有以下抽象类 private[spark] abstract class SortDataFormat[K, Buffer] {...}    然后在org/apache/spark/graphx/Edge.scala中,直接调用了xxx = new SortDataFormat[Edge[ED], Array[Edge[ED]]] {...} 为啥可以直接new一个抽象类呢??sc

    2024年02月12日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包