3.5 RDD持久化机制

这篇具有很好参考价值的文章主要介绍了3.5 RDD持久化机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、RDD持久化
(一)引入持久化的必要性
Spark中的RDD是懒加载的,只有当遇到行动算子时才会从头计算所有RDD,而且当同一个RDD被多次使用时,每次都需要重新计算一遍,这样会严重增加消耗。为了避免重复计算同一个RDD,可以将RDD进行持久化。
Spark中重要的功能之一是可以将某个RDD中的数据保存到内存或者磁盘中,每次需要对这个RDD进行算子操作时,可以直接从内存或磁盘中取出该RDD的持久化数据,而不需要从头计算才能得到这个RDD。
(二)案例演示持久化操作
1、RDD的依赖关系图
读取文件,进行一系列操作,有多个RDD,如下图所示。

2、不采用持久化操作
在上图中,对RDD3进行了两次算子操作,分别生成了RDD4和RDD5。若RDD3没有持久化保存,则每次对RDD3进行操作时都需要从textFile()开始计算,将文件数据转化为RDD1,再转化为RDD2,最终才得到RDD3。

查看要操作的HDFS文件

以集群模式启动Spark Shell

按照图示进行操作,得RDD4和RDD5

查看RDD4内容,会从RDD1到RDD2到RDD3到RDD4跑一趟

显示RDD5内容,也会从RDD1到RDD2到RDD3到RDD5跑一趟

3、采用持久化操作
可以在RDD上使用persist()或cache()方法来标记要持久化的RDD(cache()方法实际上底层调用的是persist()方法)。在第一次行动操作时将对数据进行计算,并缓存在节点的内存中。Spark的缓存是容错的:如果缓存的RDD的任何分区丢失,Spark就会按照该RDD原来的转换过程自动重新计算并缓存。

计算到RDD3时,标记持久化

计算RDD4,就是基于RDD3缓存的数据开始计算,不用从头到尾跑一趟

计算RDD5,就是基于RDD3缓存的数据开始计算,不用从头到尾跑一趟

二、存储级别
(一)持久化方法的参数
利用RDD的persist()方法实现持久化,向persist()方法中传入一个StorageLevel对象指定存储级别。每个持久化的RDD都可以使用不同的存储级别存储,默认的存储级别是StorageLevel.MEMORY_ONLY。
(二)Spark RDD存储级别表
Spark RDD有七种存储级别
存储级别 说明
MEMORY_ONLY 将RDD存储为JVM中的反序列化Java对象。如果内存不够,部分分区就不会被缓存,并且在每次需要这些分区的时候都会被动态地重新计算。此为默认级别。
MEMORY_AND_DISK 将RDD存储为JVM中的反序列化Java对象。如果内存不够,就将未缓存的分区存储在磁盘上,并在需要这些分区时从磁盘读取。
MEMORY_ONLY_SER 将RDD存储为序列化的Java对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,特别是在使用快速序列化时,但读取时会增加 CPU负担。
MEMORY_AND_DISK_SER 类似于MEMORY_ONLY_SER,但是溢出的分区将写到磁盘,而不是每次需要对其动态地重新计算。
DISK_ONLY 只在磁盘上存储RDD分区。
MEMORY_ONLY_2 与MEMORY_ONLY 相同,只是每个持久化的分区都会复制一份副本,存储在其他节点上。这种机制主要用于容错,一旦持久化数据丢失,可以使用副本数据,而不需要重新计算。
MEMORY_AND_DISK_2 与MEMORY_AND_DISK相同,只是每个持久化的分区都会复制一份副本,存储在其他节点上。这种机制主要用于容错,一旦持久化数据丢失,可以使用副本数据,而不需要重新计算。
在Spark的Shuffle操作(例如reduceByKey()中,即使用户没有使用persist()方法,也会自动保存一些中间数据。这样做是为了避免在节点洗牌的过程中失败时重新计算整个输入。如果想多次使用某个RDD,那么强烈建议在该RDD上调用persist()方法。
(三)如何选择存储级别
选择原则:权衡内存使用率和CPU效率
如果RDD存储在内存中不会发生溢出,那么优先使用默认存储级别(MEMORY_ONLY),该级别会最大程度发挥CPU的性能,使在RDD上的操作以最快的速度运行。
如果RDD存储在内存中会发生溢出,那么使用MEMORY_ONLY_SER并选择一个快速序列化库将对象序列化,以节省空间,访问速度仍然相当快。
除非计算RDD的代价非常大,或者该RDD过滤了大量数据,否则不要将溢出的数据写入磁盘,因为重新计算分区的速度可能与从磁盘读取分区一样快。
如果希望在服务器出故障时能够快速恢复,那么可以使用多副本存储级别MEMORY_ONLY_2或MEMORY_AND_DISK_2。该存储级别在数据丢失后允许在RDD上继续运行任务,而不必等待重新计算丢失的分区。其他存储级别在发生数据丢失后,需要重新计算丢失的分区。
(四)persist()与cache()的关系
查看两个方法的源码
/**

在第一次行动操作时持久化RDD,并设置存储级别,当RDD从来没有设置过存储级别时才能使用该方法
/
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
// 如果之前已将该RDD设置为localCheckpoint,就覆盖之前的存储级别
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
/*
持久化RDD,使用默认存储级别(MEMORY_ONLY)
/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/
*

持久化RDD,使用默认存储级别(MEMORY_ONLY)
*/
def cache(): this.type = persist()

从上述代码可以看出,cache()方法调用了无参的persist()方法,两者的默认存储级别都为MEMORY_ONLY,但cache()方法不可更改存储级别,而persist()方法可以通过参数自定义存储级别。
(五)案例演示设置存储级别
在net.huawei.rdd根包里创建day05子包,然后在子包里创建SetStorageLevel对象

package net.huawei.rdd.day05

import org.apache.log4j.{Level, Logger}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**

功能:设置存储级别

作者:华卫

日期:2023年05月06日
/
object SetStorageLevel {
def main(args: Array[String]): Unit = {
// 创建Spark配置对象
val conf = new SparkConf()
.setAppName(“SetStorageLevel”) // 设置应用名称
.setMaster(“local[]”) // 设置主节点位置(本地调试)
// 基于Spark配置对象创建Spark容器
val sc = new SparkContext(conf)

// 去除Spark运行信息
Logger.getLogger(“org”).setLevel(Level.OFF)
Logger.getLogger(“com”).setLevel(Level.OFF)
System.setProperty(“spark.ui.showConsoleProgress”, “false”)
Logger.getRootLogger().setLevel(Level.OFF)

// 读取HDFS文件,得到rdd
val rdd = sc.textFile(“hdfs://master:9000/park/words.txt”)
// 将rdd标记为持久化,采用默认存储级别 - StorageLevel.MEMORY_ONLY
rdd.persist() // 无参持久化方法

// 对rdd做扁平化映射,得到rdd1
val rdd1 = rdd.flatMap(_.split(" "))
// 将rdd1持久化到磁盘
rdd1.persist(StorageLevel.DISK_ONLY)

// 将rdd1映射成二元组,得到rdd2
val rdd2 = rdd1.map((_, 1))
// 将rdd2持久化到内存,溢出的数据持久化到磁盘
rdd2.persist(StorageLevel.MEMORY_AND_DISK)

// 第一次行动算子,对标记为持久化的RDD进行不同级别的持久化操作
println(“元素个数:” + rdd2.count)

// 第二次行动算子,直接利用rdd2的持久化数据进行操作,无须从头进行计算
rdd2.collect.foreach(println)
}
}

运行程序,查看结果

三、利用Spark WebUI查看缓存
最好重启Spark Shell

(一)创建RDD并标记为持久化
执行命令:val rdd = sc.parallelize(List(56, 67, 32, 89, 90, 66, 100))

(二)Spark WebUI查看RDD存储信息
浏览器中访问Spark Shell的WebUI http://master:4040/storage/查看RDD存储信息,可以看到存储信息为空

执行命令:rdd.collect,收集RDD数据

刷新WebUI,发现出现了一个ParallelCollectionRDD的存储信息,该RDD的存储级别为MEMORY,持久化的分区为8,完全存储于内存中。

单击ParallelCollectionRDD超链接,可以查看该RDD的详细存储信息

上述操作说明,调用RDD的persist()方法只是将该RDD标记为持久化,当执行行动操作时才会对标记为持久化的RDD进行持久化操作。

执行以下命令,创建rdd2,并将rdd2持久化到磁盘

刷新上述WebUI,发现多了一个MapPartitionsRDD的存储信息,该RDD的存储级别为DISK,持久化的分区为8,完全存储于磁盘中。

(三)将RDD从缓存中删除
Spark会自动监视每个节点上的缓存使用情况,并以最近最少使用的方式从缓存中删除旧的分区数据。如果希望手动删除RDD,而不是等待该RDD被Spark自动从缓存中删除,那么可以使用RDD的unpersist()方法。

执行命令:rdd.unpersist(),将rdd(ParallelCollectionRDD)从缓存中删除文章来源地址https://www.toymoban.com/news/detail-487187.html

到了这里,关于3.5 RDD持久化机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • PySpark大数据教程:深入学习SparkCore的RDD持久化和Checkpoint

    本教程详细介绍了PySpark中SparkCore的RDD持久化和Checkpoint功能,重点讲解了缓存和检查点的作用、如何进行缓存、如何设置检查点目录以及它们之间的区别。还提供了join操作的示例和Spark算子补充知识。

    2024年02月08日
    浏览(42)
  • Spark避坑系列(三)(Spark Core-RDD 依赖关系&持久化&共享变量)

    大家想了解更多大数据相关内容请移驾我的课堂: 大数据相关课程 剖析及实践企业级大数据 数据架构规划设计 大厂架构师知识梳理:剖析及实践数据建模 PySpark入坑系列第三篇,该篇章主要介绍spark的编程核心RDD的其他概念,依赖关系,持久化,广播变量,累加器等 在spa

    2024年02月01日
    浏览(45)
  • redis 持久化机制

    client redis[内存] ----- 内存数据- 数据持久化--磁盘 Redis官方提供了两种不同的持久化方法来将数据存储到硬盘里面分别是: RDB 快照(Snapshot) AOF (Append Only File) 只追加日志文件 1 快照(Snapshot) 1. 特点 这种方式可以将某一时刻的所有数据都写入硬盘中,当然这也是 redis的默认开启持久

    2024年01月22日
    浏览(41)
  • 【RabbitMQ】之持久化机制

    一、RabbitMQ 持久化机制 1、RabbitMQ 持久化概述 2、队列持久化 3、消息持久化 4、交换器持久化 二、RabbitMQ 知识扩展 1、内存告警与内存换页 2、磁盘告警与配置 3、数据写入磁盘时机 4、磁盘消息格式 5、磁盘文件删除机制 持久化 ,即将原本存在于内存中的数据写入到磁盘上永

    2024年02月14日
    浏览(42)
  • redis的持久化机制

    简单来说就是将内存数据保存到硬盘,防止机器重启后数据丢失。 1)拍快照(RDB,Redis DataBase):将某一个时刻Redis的内存数据,以二进制的方案写入磁盘,速度快,会数据丢失。 2)文件追加方案(AOF,Append Only File):记录所有的操作命令,并以日志的形式追加到文件中,速度慢

    2024年02月16日
    浏览(43)
  • 搞明白Redis持久化机制

    Redis是一种内存数据库,其内存中的数据存储在计算机的内存中,如果服务器发生崩溃或者重启,内存中的数据将会丢失。为了避免这种情况发生,Redis提供了两种持久化机制:RDB和AOF。 Redis支持将当前数据状态快照持久化到硬盘上,这种快照是一个二进制文件,包含了Redis在

    2023年04月25日
    浏览(43)
  • 深入解析 Redis 持久化机制

    我们都知道,Redis 的数据存储在内存中, 一旦服务器宕机,内存中的数据将全部丢失。因此,对 Redis 来说,实现数据的持久化,避免从后端数据库中进行恢复,是至关重要的。本篇我们详细讲解下 Redis 的三种持久化机制,分别是  AOF(Append Only File)  日志和  RDB 快照  以及

    2024年02月12日
    浏览(48)
  • redis持久化机制 & 事务详解

    目录 前言: 持久化机制 RDB(Redis DataBase) 手动触发  save bgsave 自动触发 RDB特点 AOF(append only file) 缓冲区刷新策略 重写机制 aof重写流程 混合持久化 事务 事务操作命令 WATCH WATCH实现原理     redis为了保证高可用引入了持久化机制,目的就是为了redis服务器重启时可以恢复

    2024年02月11日
    浏览(43)
  • 【Redis】Redis持久化机制

    Redis是基于内存存储的数据库,如果遇到服务重启或者崩溃,内存中的数据将会被清空。所以为了确保数据安全性和可靠性,我们需要将内存中的数据持久化到磁盘上。 持久化不仅可以防止由于系统故障、重启或者其他原因导致的数据丢失。还可以用于备份、数据恢复和迁移

    2023年04月20日
    浏览(65)
  • 深入了解Kafka的数据持久化机制

    欢迎来到我的博客,代码的世界里,每一行都是一个故事 在消息传递的舞台上,数据就像是时间的旅行者,承载着信息的流动。然而,时间不停歇。本文将带你进入数据的永恒之路,探寻在Kafka中,数据如何通过持久化机制守护信息的不朽之旅。 持久化的基本概念: 在 Kaf

    2024年04月28日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包