详解RDD基本概念、RDD五大属性

这篇具有很好参考价值的文章主要介绍了详解RDD基本概念、RDD五大属性。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、RDD是什么

        RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD是spark core的底层核心

Dataset:

  • RDD 可以不保存具体数据, 只保留创建自己的必备信息, 例如依赖和计算函数;

  • RDD 也可以缓存起来, 相当于存储具体数据。

Distributed

        RDD 支持分区, 可以运行在集群中。

Resilient

  • RDD 支持高效的容错;

  • RDD 中的数据即可以缓存在内存中, 也可以缓存在磁盘中, 也可以缓存在外部存储中。

1.RDD的特点:

  • 弹性
    • 容错的弹性:数据丢失可以自动恢复;
    • 存储的弹性:内存与磁盘的自动切换;
    • 计算的弹性:计算出错重试机制;
    • 分片的弹性:可根据需要重新分片。
  • 分布式:数据存储在集群不同节点上/计算分布式。
  • 数据集: RDD封装了计算逻辑,并不保存数据。
  • 数据抽象: RDD是一个抽象类,需要子类具体实现。
  • 不可变: RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑。
  • 可分区、并行计算。

二、RDD 为什么会出现

在 RDD 出现之前, 当时 MapReduce 是比较主流的, 而 MapReduce 如何执行迭代计算的任务呢?

详解RDD基本概念、RDD五大属性

多个 MapReduce 任务之间没有基于内存的数据共享方式, 只能通过磁盘来进行共享,这种方式明显比较低效。

RDD 如何解决迭代计算非常低效的问题呢?

详解RDD基本概念、RDD五大属性

在 Spark 中, 其实最终 Job3 从逻辑上的计算过程是: Job3 = (Job1.map).filter, 整个过程是共享内存的, 而不需要将中间结果存放在可靠的分布式文件系统中。 

这种方式可以在保证容错的前提下, 提供更多的灵活, 更快的执行速度, RDD 在执行迭代型任务时候的表现可以通过下面代码体现:

// 线性回归
val points = sc.textFile(...)
	.map(...)
	.persist(...)
val w = randomValue
for (i <- 1 to 10000) {
    val gradient = points.map(p => p.x * (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y)
    	.reduce(_ + _)
    w -= gradient
}

在这个例子中, 进行了大致 10000 次迭代, 如果在 MapReduce 中实现, 可能需要运行很多 Job, 每个 Job 之间都要通过 HDFS 共享结果, 谁快谁慢一窥便知。

三、结合案例深入了解RDD

需求:

  • 给定一个网站的访问记录, 俗称 Access log

  • 计算其中出现的独立 IP, 以及其访问的次数

val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
val sc = new SparkContext(config)

val result = sc.textFile("dataset/access_log_sample.txt")
  .map(item => (item.split(" ")(0), 1))
  .filter(item => StringUtils.isNotBlank(item._1))
  .reduceByKey((curr, agg) => curr + agg)
  .sortBy(item => item._2, false)
  .take(10)

result.foreach(item => println(item))

针对这个小案例, 我们问出互相关联但是又方向不同的六个问题:

1.假设要针对整个网站的历史数据进行处理, 量有 1T, 如何处理?

放在集群中, 利用集群多台计算机来并行处理。

2.如何放在集群中运行?

详解RDD基本概念、RDD五大属性

简单来讲, 并行计算就是同时使用多个计算资源解决一个问题, 有如下四个要点

  • 要解决的问题必须可以分解为多个可以并发计算的部分;

  • 每个部分要可以在不同处理器上被同时执行;

  • 需要一个共享内存的机制;

  • 需要一个总体上的协作机制来进行调度。

 3.如果放在集群中的话, 可能要对整个计算任务进行分解, 如何分解?

详解RDD基本概念、RDD五大属性

概述:

  • 对于 HDFS 中的文件, 是分为不同的 Block 的;

  • 在进行计算的时候, 就可以按照 Block 来划分, 每一个 Block 对应一个不同的计算单元。

扩展:

  • RDD 并没有真实的存放数据, 数据是从 HDFS 中读取的, 在计算的过程中读取即可;

  • RDD 至少是需要可以 分片 的, 因为HDFS中的文件就是分片的, RDD 分片的意义在于表示对源数据集每个分片的计算, RDD 可以分片也意味着 可以并行计算。

4.移动数据不如移动计算是一个基础的优化, 如何做到?

详解RDD基本概念、RDD五大属性

每一个计算单元需要记录其存储单元的位置, 尽量调度过去。

5.在集群中运行, 需要很多节点之间配合, 出错的概率也更高, 出错了怎么办?

详解RDD基本概念、RDD五大属性

 

RDD1 → RDD2 → RDD3 这个过程中, RDD2 出错了, 有两种办法可以解决:

  1. 缓存 RDD2 的数据, 直接恢复 RDD2, 类似 HDFS 的备份机制;

  2. 记录 RDD2 的依赖关系, 通过其父级的 RDD 来恢复 RDD2, 这种方式会少很多数据的交互和保存。

如何通过父级 RDD 来恢复?

  1. 记录 RDD2 的父亲是 RDD1;

  2. 记录 RDD2 的计算函数, 例如记录 RDD2 = RDD1.map(…​)map(…​) 就是计算函数;

  3. 当 RDD2 计算出错的时候, 可以通过父级 RDD 和计算函数来恢复 RDD2。

6.假如任务特别复杂, 流程特别长, 有很多 RDD 之间有依赖关系, 如何优化?

详解RDD基本概念、RDD五大属性

上面提到了可以使用依赖关系来进行容错, 但是如果依赖关系特别长的时候, 这种方式其实也比较低效, 这个时候就应该使用另外一种方式, 也就是记录数据集的状态。

在 Spark 中有两个手段可以做到:

  1. 缓存

  2. Checkpoint

 

四、RDD的五大属性

/**
 * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
 * partitioned collection of elements that can be operated on in parallel. This class contains the
 * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
 * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
 * pairs, such as `groupByKey` and `join`;
 * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
 * Doubles; and
 * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
 * can be saved as SequenceFiles.
 * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)])
 * through implicit.
 *
 * Internally, each RDD is characterized by five main properties:
 *
 *  - A list of partitions
 *  - A function for computing each split
 *  - A list of dependencies on other RDDs
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
 *    an HDFS file)
 *
 * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
 * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
 * reading data from a new storage system) by overriding these functions. Please refer to the
 * <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
 * for more details on RDD internals.
 */

从上面源码中,可以得到RDD的五大属性:

1.分区列表( a list of partitions)

        Spark RDD是被分区的,每一个分区都会被一个计算任务(Task)处理,分区数决定了并行计算的数量,RDD的并行度默认从父RDD传给子RDD。默认情况下,一个HDFS上的数据分片就是一个 partiton,RDD分片数决定了并行计算的力度,可以在创建RDD时指定RDD分片个数(分区)。

        如果不指定分区数量,当RDD从集合创建时,则默认分区数量为该程序所分配到的资源的CPU核数(每个Core可以承载2~4个 partition),如果是从HDFS文件创建,默认为文件的 Block数。

/**
  * Implemented by subclasses to return the set of partitions in this RDD. This method will only
  * be called once, so it is safe to implement a time-consuming computation in it.
  *
  * The partitions in this array must satisfy the following property:
  *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
  */
 protected def getPartitions: Array[Partition]

2.每一个分区都有一个计算函数( a function for computing each split)

        每个分区都会有计算函数, Spark的RDD的计算函数是以分片为基本单位的,每个RDD都会实现 compute函数,对具体的分片进行计算,不需要保存每次计算的结果。RDD中的分片是并行的,所以是分布式并行计算。

        有一点非常重要,就是由于RDD有前后依赖关系,遇到宽依赖关系,如reduceByKey等这些操作时划分成 Stage, Stage内部的操作都是通过 Pipeline进行的,在具体处理数据时它会通过 Blockmanager来获取相关的数据,因为具体的 split要从外界读数据,也要把具体的计算结果写入外界,所以用了一个管理器,具体的 split都会映射成 BlockManager的Block,而具体的split会被函数处理,函数处理的具体形式是以任务的形式进行的。

 /**
  * :: DeveloperApi ::
  * Implemented by subclasses to compute a given partition.
  */
 @DeveloperApi
 def compute(split: Partition, context: TaskContext): Iterator[T]

3.依赖关系( a list of dependencies on other RDDS)

由于RDD每次转换都会生成新的RDD,所以RDD会形成类似流水线一样的前后依赖关系,当然宽依赖就不类似于流水线了,宽依赖后面的RDD具体的数据分片会依赖前面所有的RDD的所有数据分片,这个时候数据分片就不进行内存中的 Pipeline,一般都是跨机器的,因为有前后的依赖关系,所以当有分区的数据丢失时, Spark会通过依赖关系进行重新计算,从而计算出丢失的数据,而不是对RDD所有的分区进行重新计算。

RDD之间的依赖有两种:窄依赖( Narrow Dependency)和宽依赖( Wide Dependency)。

详解RDD基本概念、RDD五大属性

窄依赖(Narrow Dependency)

窄依赖是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、filter、union等操作都会产生窄依赖;

对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,这种转换不会引起shuffle操作,速度快!

宽依赖(Wide Dependency)

宽依赖是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖;

这种转换会引起shuffle操作,速度慢!

Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每一个Reducer上去,这个过程就是shuffle。由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。

 /**
  * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
  * be called once, so it is safe to implement a time-consuming computation in it.
  */
 protected def getDependencies: Seq[Dependency[_]] = deps

4.key- value数据类型的RDD分区器( a Partitioner for key- alue RDDS)

一个Partitioner,即RDD的分区函数(可选项),Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

当前Spark中实现了两种类型的分区函数,

  1. 基于哈希的HashPartitioner,(key.hashcode % 分区数= 分区号)。它是默认值
  2. 基于范围的RangePartitioner。

什么会有Partitioner?

  1. 只有对于key-value的RDD(RDD[(String, Int)]),并且产生shuffle,才会有Partitioner;
  2. 非key-value的RDD(RDD[String])的Parititioner的值是None。

Option类型:可以表示有值或者没有值,它有2个子类:

  1. Some:表示封装了值
  2. None:表示没有值
/** Optionally overridden by subclasses to specify how they are partitioned. */
 @transient val partitioner: Option[Partitioner] = None

5每个分区都有一个优先位置列表,即首选位置( a list of preferred locations to compute each split on)

存储每个切片优先(preferred location)位置的列表。 比如对于一个 HDFS 文件来说, 这个列表保存的就是每个 Partition 所在文件块的位置.。按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候, 会尽可能地将计算任务分配到其所要处理数据块的存储位置。文章来源地址https://www.toymoban.com/news/detail-401701.html

/**
  * Optionally overridden by subclasses to specify placement preferences.
  */
 protected def getPreferredLocations(split: Partition): Seq[String] = Nil

到了这里,关于详解RDD基本概念、RDD五大属性的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • HTTP基本概念-HTTP 是什么?

    资料来源 : 小林coding 小林官方网站  : 小林coding (xiaolincoding.com) HTTP 是超文本传输协议,也就是HyperText Transfer Protocol。 能否详细解释「超文本传输协议」? HTTP 的名字「超文本协议传输」,它可以拆成三个部分: 超文本 传输 协议 1.「协议」 在生活中,我们也能随处可见「协议

    2024年02月20日
    浏览(38)
  • (Animator详解二)Unity Animator的基本属性

    在Inspector下 Animator的第一项为状态机的名称(注意:这里的名称不是动画名称) Tag 当前动画的Tag标签,可以通过Tag值来处理一些逻辑 Motion :动画片段的名称 Speed :动画的播放速度:1表示正常播放,                                            speed 1 表示加

    2023年04月09日
    浏览(48)
  • 什么是域名转移和域名转移密码基本概念理解

    “域名转移”也叫“变更域名注册服务机构”、“域名转移注册商”或“域名转入”,是指将域名从现在的注册商转移到另外一家域名注册商,由其他的域名注册商为您的域名提供相关的服务。国际域名注册商是指由ICANN授权认证的国际域名顶级注册商;.CN域名注册商是指由

    2024年02月06日
    浏览(78)
  • TCP/IP详解——网络基本概念

    网络最开始是为了数据通信。 以前通过ARPA网络,卫星来实现几个计算机的互相通信。 IBM推出自己的网络协议,这时网络没有标准。 1977年:TCP/IP标准。 1980年:ARPAnet全面向TCP/IP迁移。 1984年:ISO-网络标准,国籍标准化组织机构-定制各行各业的标准。 OSI开放式系统互联,同时

    2024年02月05日
    浏览(41)
  • 【Python】PySpark 数据计算 ③ ( RDD#reduceByKey 函数概念 | RDD#reduceByKey 方法工作流程 | RDD#reduceByKey 语法 | 代码示例 )

    RDD#reduceByKey 方法 是 PySpark 中 提供的计算方法 , 首先 , 对 键值对 KV 类型 RDD 对象 数据 中 相同 键 key 对应的 值 value 进行分组 , 然后 , 按照 开发者 提供的 算子 ( 逻辑 / 函数 ) 进行 聚合操作 ; 上面提到的 键值对 KV 型 的数据 , 指的是 二元元组 , 也就是 RDD 对象中存储的数据是

    2024年02月14日
    浏览(50)
  • HTTPS协议详解:基本概念与工作原理

    个人主页: insist--个人主页​​​​​​ 本文专栏 :网络基础——带你走进网络世界 本专栏会持续更新网络基础知识,希望大家多多支持,让我们一起探索这个神奇而广阔的网络世界。 目录 一、HTTPS协议的基本概念

    2024年02月10日
    浏览(41)
  • 【Unity Shader】从入门到领悟(1)基本概念:什么是网格?什么是材质和Shader?

    如上图,模型的三角形面就叫做网格(Mesh),网格的本质是一堆顶点数据的规则排序,在Unity和UE中由三角形表示,Maya等DCC软件(Digital Content Creation)中则通常由四边形表示(俩个三角形刚好组成一个四边形)。 在Unity中我们新建一个Cube,

    2024年02月06日
    浏览(44)
  • 【Unity Shader】从入门到感慨(1)基本概念:什么是网格?什么是材质和Shader?

    如上图,模型的三角形面就叫做网格(Mesh),网格的本质是一堆顶点数据的规则排序,在Unity和UE中由三角形表示,Maya等DCC软件(Digital Content Creation)中则通常由四边形表示(俩个三角形刚好组成一个四边形)。 在Unity中我们新建一个Cube,

    2024年02月11日
    浏览(34)
  • 数据结构之数据结构要学什么,基本概念,三要素

          我从大二上学期的时候学了数据结构,但是当时对数据结构的重要性并不太重视,直到在升大三的暑假,才意识到数据结构对以后学语言和找工作方面的重要性,所以亡羊补牢,为时未晚,尝试着结合b站上王道考研数据结构课,来记录自己对知识和代码的理解。    

    2024年02月15日
    浏览(41)
  • Spark的核心概念:RDD、DataFrame和Dataset

    Apache Spark,其核心概念包括RDD(Resilient Distributed Dataset)、DataFrame和Dataset。这些概念构成了Spark的基础,可以以不同的方式操作和处理数据,根据需求选择适当的抽象。 RDD是Spark的基本数据抽象,它代表一个不可变、分布式的数据集合。下面我们将更详细地探讨RDD: RDD的特性

    2024年02月04日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包