PySpark RDD的缓存和Checkpoint

这篇具有很好参考价值的文章主要介绍了PySpark RDD的缓存和Checkpoint。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 前言

RDD之间进行相互迭代计算(Transformation的转换),当执行开启后,新RDD的生成,代表老RDD的消息,RDD的数据只在处理的过程中存在,一旦处理完成,就不见了,所以RDD的数据是过程数据。

RDD数据是过程数据的这个特性可以最大化的利用资源,老旧的RDD没用了就会从内存中清理,给后续的计算腾出内存空间。

PySpark RDD的缓存和Checkpoint
如上图,rdd3被2次使用,第一次使用之后,其实rdd3就不存在了,在第二次使用的时候,只能基于RDD的血缘关系,从RDD1重新执行,构建出来RDD3,供RDD5使用。

2. RDD的缓存

Spark中提供了缓存API,可以让我们通过调用API,将指定的RDD数据保留在内存或者硬盘上。上述场景如果使用缓存API,RDD3就不会消失,第二次使用RDD3的时候就不会在通过血缘关系重新开始构建出RDD3

# RDD3 被2次使用,可以加入缓存进行优化
rdd3.cache() 									# 缓存到内存中.
rdd3.persist(StorageLevel.MEMORY_ONLY) 			# 仅内存缓存
rdd3.persist(StorageLevel.MEMORY_ONLY_2)		# 仅内存缓存,2个副本
rdd3.persist(StorageLevel.DISK_ONLY)			# 仅缓存硬盘上
rdd3.persist(StorageLevel.DISK_ONLY_2)			# 仅缓存硬盘上,2个副本
rdd3.persist(StorageLevel.DISK_ONLY_3)			# 仅缓存硬盘上,3个副本
rdd3.persist(StorageLevel.MEMORY_AND_DISK)		# 先放内存,不够放硬盘
rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)	# 先放内存,不够放硬盘,2个副本
rdd3.persist(StorageLevel.OFF_HEAP)				# 堆放内存(系统内存)
# 如上API,自行选择使用即可
# 一般建议使用rdd3.persist(StorageLevel.MEMORY_AND_DISK)
# 如果内存比较小的集群,建议使用rdd3.persist(StorageLevel.DISK_ONLY) 或者别用缓存了 用CheckPoint

# 主动清理缓存的API
rdd.unpersist() 

PySpark RDD的缓存和Checkpoint
如上图,RDD是将自己分区的数据,每个分区自行将其数据保存在其所在的Executor内存和硬盘上,这就是分散存储

缓存技术可以将过程RDD数据,持久化保存到内存或者硬盘上,但是这个保存在设定上是认为不安全的,存在丢失的风险,所以缓存有一个特点就是保存RDD之间的血缘关系
一旦缓存丢失,可以基于血缘关系的记录,重新计算这个RDD的数据。

缓存一般是如果丢失的?

  • 在内存中的存储是不安全的,比如断电\计算任务内存不足,把缓存清理给计算让路
  • 硬盘中因为硬盘的损坏也是可能丢失的

3. RDD的CheckPoint

Spark中Checkpoint技术,也是将RDD的数据保存起来,但是它只支持硬盘存储,并且它被设计认为是安全的,不保留血缘关系。

PySpark RDD的缓存和Checkpoint

如上图,Checkpoint存储的RDD数据是集中收集各个分区的数据进行存储,而缓存是分散存储

缓存和Checkpoint的对比:

  1. CheckPoint不管分区数量多少,风险都一样。 缓存:分区越多,风险越多
  2. CheckPoint支持写入HDFS,缓存不行。HDFS是高可靠存储,CheckPoint被认为是安全的
  3. CheckPoint不支持内存,缓存可以。缓存如果写内存 性能比 CheckPoint 要好一些
  4. CheckPoint因为设计是安全的,所以不保留血缘关系,而缓存则相反。

实现:

# 设置CheckPoint第一件事情,选择Checkpoint的保存路径
# 如果Local模式,可以支持本地文件系统,如果在集群运行,千万要用HDFS
sc.setCheckpointDir("hdfs://master:8020/output/11111")
# 用的时候,直接调用checkpoint算子即可,但是需要有action算子触发
rdd.checkpoint()
rdd.count()

# TODO: 再次执行count函数, 此时从checkpoint读取数据
rdd.count()

Checkpoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用Checkpoint比较合适,或者数据量很大的时候,采用Checkpoint比较合适。如果数据量小,或者RDD重新计算也是非常快的,直接使用缓存即可。

**注意:**Spark中缓存和Checkpoint两个API都不是action算子,所以需要后面跟action算子才能触发。文章来源地址https://www.toymoban.com/news/detail-406419.html

到了这里,关于PySpark RDD的缓存和Checkpoint的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Python】PySpark 数据计算 ⑤ ( RDD#sortBy方法 - 排序 RDD 中的元素 )

    RDD#sortBy 方法 用于 按照 指定的 键 对 RDD 中的元素进行排序 , 该方法 接受一个 函数 作为 参数 , 该函数从 RDD 中的每个元素提取 排序键 ; 根据 传入 sortBy 方法 的 函数参数 和 其它参数 , 将 RDD 中的元素按 升序 或 降序 进行排序 , 同时还可以指定 新的 RDD 对象的 分区数 ; RDD

    2024年02月14日
    浏览(40)
  • 【Python】PySpark 数据计算 ② ( RDD#flatMap 方法 | RDD#flatMap 语法 | 代码示例 )

    RDD#map 方法 可以 将 RDD 中的数据元素 逐个进行处理 , 处理的逻辑 需要用外部 通过 参数传入 map 函数 ; RDD#flatMap 方法 是 在 RDD#map 方法 的基础上 , 增加了 \\\" 解除嵌套 \\\" 的作用 ; RDD#flatMap 方法 也是 接收一个 函数 作为参数 , 该函数被应用于 RDD 中的每个元素及元素嵌套的子元素

    2024年02月14日
    浏览(36)
  • 10-用PySpark建立第一个Spark RDD

    PySpark实战笔记系列第一篇 Apache Spark的核心组件的基础是RDD。所谓的RDD,即 弹性分布式数据集(Resiliennt Distributed Datasets) ,基于RDD可以实现Apache Spark各个组件在多个计算机组成的集群中进行无缝集成,从而能够在一个应用程序中完成海量数据处理。 只读不能修改 :只能通过

    2024年04月08日
    浏览(42)
  • Python大数据之PySpark(五)RDD详解

    为什么需要RDD? 首先Spark的提出为了解决MR的计算问题,诸如说迭代式计算,比如:机器学习或图计算 希望能够提出一套基于内存的迭代式数据结构,引入RDD弹性分布式数据集,如下图 为什么RDD是可以容错? RDD依靠于依赖关系dependency relationship reduceByKeyRDD-----mapRDD-----flatMapRD

    2024年02月06日
    浏览(41)
  • Python大数据之PySpark(六)RDD的操作

    函数分类 *Transformation操作只是建立计算关系,而Action 操作才是实际的执行者* 。 Transformation算子 转换算子 操作之间不算的转换,如果想看到结果通过action算子触发 Action算子 行动算子 触发Job的执行,能够看到结果信息 Transformation函数 值类型valueType map flatMap filter mapValue 双值

    2024年02月04日
    浏览(42)
  • 【Python】PySpark 数据计算 ③ ( RDD#reduceByKey 函数概念 | RDD#reduceByKey 方法工作流程 | RDD#reduceByKey 语法 | 代码示例 )

    RDD#reduceByKey 方法 是 PySpark 中 提供的计算方法 , 首先 , 对 键值对 KV 类型 RDD 对象 数据 中 相同 键 key 对应的 值 value 进行分组 , 然后 , 按照 开发者 提供的 算子 ( 逻辑 / 函数 ) 进行 聚合操作 ; 上面提到的 键值对 KV 型 的数据 , 指的是 二元元组 , 也就是 RDD 对象中存储的数据是

    2024年02月14日
    浏览(50)
  • 【Python】PySpark 数据输入 ① ( RDD 简介 | RDD 中的数据存储与计算 | Python 容器数据转 RDD 对象 | 文件文件转 RDD 对象 )

    RDD 英文全称为 \\\" Resilient Distributed Datasets \\\" , 对应中文名称 是 \\\" 弹性分布式数据集 \\\" ; Spark 是用于 处理大规模数据 的 分布式计算引擎 ; RDD 是 Spark 的基本数据单元 , 该 数据结构 是 只读的 , 不可写入更改 ; RDD 对象 是 通过 SparkContext 执行环境入口对象 创建的 ; SparkContext 读取数

    2024年02月14日
    浏览(42)
  • 【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 )

    在 PySpark 中 RDD 对象 提供了一种 数据计算方法 RDD#map 方法 ; 该 RDD#map 函数 可以对 RDD 数据中的每个元素应用一个函数 , 该 被应用的函数 , 可以将每个元素转换为另一种类型 , 也可以针对 RDD 数据的 原始元素进行 指定操作 ; 计算完毕后 , 会返回一个新的 RDD 对象 ; map 方法 , 又

    2024年02月14日
    浏览(54)
  • Spark RDD 缓存机制

    Spark RDD 缓存是在内存存储RDD计算结果的一种优化技术。把中间结果缓存起来以便在需要的时候重复使用,这样才能有效减轻计算压力,提升运算性能。 当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接

    2024年03月25日
    浏览(53)
  • SPARK--cache(缓存)和checkpoint检查点机制

    rdd的特性 缓存和checkpoint 作用都是进行容错 rdd在计算是会有多个依赖,为了避免计算错误是从头开始计算,可以将中间* 依赖rdd进行缓存或checkpoint 缓存或checkpoint也叫作rdd的持久化 一般对某个计算特别复杂的rdd进行持久化 缓存使用 缓存是将数据存储在内存或者磁盘上,缓存

    2024年01月16日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包