Spark大数据处理讲课笔记--- RDD持久化机制

这篇具有很好参考价值的文章主要介绍了Spark大数据处理讲课笔记--- RDD持久化机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

零、本讲学习目标

  1. 理解RDD持久化的必要性
  2. 了解RDD的存储级别
  3. 学会如何查看RDD缓存

一、RDD持久化

(一)引入持久化的必要性

  • Spark中的RDD是懒加载的,只有当遇到行动算子时才会从头计算所有RDD,而且当同一个RDD被多次使用时,每次都需要重新计算一遍,这样会严重增加消耗。为了避免重复计算同一个RDD,可以将RDD进行持久化。
  • Spark中重要的功能之一是可以将某个RDD中的数据保存到内存或者磁盘中,每次需要对这个RDD进行算子操作时,可以直接从内存或磁盘中取出该RDD的持久化数据,而不需要从头计算才能得到这个RDD。

(二)案例演示持久化操作

1、RDD的依赖关系图

  • 读取文件,进行一系列操作,有多个RDD,如下图所示。

Spark大数据处理讲课笔记--- RDD持久化机制

 文章来源地址https://www.toymoban.com/news/detail-457485.html

2、不采用持久化操作

  • 在上图中,对RDD3进行了两次算子操作,分别生成了RDD4和RDD5。若RDD3没有持久化保存,则每次对RDD3进行操作时都需要从textFile()开始计算,将文件数据转化为RDD1,再转化为RDD2,最终才得到RDD3。

  • 查看要操作的HDFS文件

Spark大数据处理讲课笔记--- RDD持久化机制

 

  • 以集群模式启动Spark Shell

Spark大数据处理讲课笔记--- RDD持久化机制

 

  • 按照图示进行操作,得RDD4和RDD5

Spark大数据处理讲课笔记--- RDD持久化机制

 

  • 查看RDD4内容,会从RDD1到RDD2到RDD3到RDD4跑一趟

Spark大数据处理讲课笔记--- RDD持久化机制

 

  • 显示RDD5内容,也会从RDD1到RDD2到RDD3到RDD5跑一趟

Spark大数据处理讲课笔记--- RDD持久化机制

 

3、采用持久化操作

  • 可以在RDD上使用persist()或cache()方法来标记要持久化的RDD(cache()方法实际上底层调用的是persist()方法)。在第一次行动操作时将对数据进行计算,并缓存在节点的内存中。Spark的缓存是容错的:如果缓存的RDD的任何分区丢失,Spark就会按照该RDD原来的转换过程自动重新计算并缓存。
  • 计算到RDD3时,标记持久化

Spark大数据处理讲课笔记--- RDD持久化机制

 

  • 计算RDD4,就是基于RDD3缓存的数据开始计算,不用从头到尾跑一趟

Spark大数据处理讲课笔记--- RDD持久化机制

 

  • 计算RDD5,就是基于RDD3缓存的数据开始计算,不用从头到尾跑一趟

Spark大数据处理讲课笔记--- RDD持久化机制

 

二、存储级别

(一)持久化方法的参数

  • 利用RDD的persist()方法实现持久化,向persist()方法中传入一个StorageLevel对象指定存储级别。每个持久化的RDD都可以使用不同的存储级别存储,默认的存储级别是StorageLevel.MEMORY_ONLY

(二)Spark RDD存储级别表

  • Spark RDD有七种存储级别

Spark大数据处理讲课笔记--- RDD持久化机制

 

  • 在Spark的Shuffle操作(例如reduceByKey()中,即使用户没有使用persist()方法,也会自动保存一些中间数据。这样做是为了避免在节点洗牌的过程中失败时重新计算整个输入。如果想多次使用某个RDD,那么强烈建议在该RDD上调用persist()方法。

(三)如何选择存储级别

  • 选择原则:权衡内存使用率和CPU效率
  • 如果RDD存储在内存中不会发生溢出,那么优先使用默认存储级别(MEMORY_ONLY),该级别会最大程度发挥CPU的性能,使在RDD上的操作以最快的速度运行。
  • 如果RDD存储在内存中会发生溢出,那么使用MEMORY_ONLY_SER并选择一个快速序列化库将对象序列化,以节省空间,访问速度仍然相当快。
  • 除非计算RDD的代价非常大,或者该RDD过滤了大量数据,否则不要将溢出的数据写入磁盘,因为重新计算分区的速度可能与从磁盘读取分区一样快。
  • 如果希望在服务器出故障时能够快速恢复,那么可以使用多副本存储级别MEMORY_ONLY_2或MEMORY_AND_DISK_2。该存储级别在数据丢失后允许在RDD上继续运行任务,而不必等待重新计算丢失的分区。其他存储级别在发生数据丢失后,需要重新计算丢失的分区。

(四)persist()与cache()的查看

  • 查看两个方法的源码
/**                                                                                           
 * 在第一次行动操作时持久化RDD,并设置存储级别,当RDD从来没有设置过存储级别时才能使用该方法                                           
 */                                                                                          
def persist(newLevel: StorageLevel): this.type = {                                            
  if (isLocallyCheckpointed) {                                                                
    // 如果之前已将该RDD设置为localCheckpoint,就覆盖之前的存储级别                                                
    persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)     
  } else {                                                                                    
    persist(newLevel, allowOverride = false)                                                  
  }                                                                                           
}                                                                                             
/**                                                                                           
  * 持久化RDD,使用默认存储级别(MEMORY_ONLY)                                                              
  */                                                                                          
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)                                  
                                                                                              
/**                                                                                           
  * 持久化RDD,使用默认存储级别(MEMORY_ONLY)                                                              
  */                                                                                          
def cache(): this.type = persist()                                                            
  • 从上述代码可以看出,cache()方法调用了无参的persist()方法,两者的默认存储级别都为MEMORY_ONLY,但cache()方法不可更改存储级别,而persist()方法可以通过参数自定义存储级别

(五)案例演示设置存储级别

  • net.cl.rdd根包里创建day05子包,然后在子包里创建SetStorageLevel对象
package net.cl.rdd.day05

import org.apache.log4j.{Level, Logger}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

object SetStorageLevel {
  def main(args: Array[String]): Unit = {
    // 创建Spark配置对象
    val conf = new SparkConf()
      .setAppName("SetStorageLevel") // 设置应用名称
      .setMaster("local[*]") // 设置主节点位置(本地调试)
    // 基于Spark配置对象创建Spark容器
    val sc = new SparkContext(conf)

    // 去除Spark运行信息
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("com").setLevel(Level.OFF)
    System.setProperty("spark.ui.showConsoleProgress", "false")
    Logger.getRootLogger().setLevel(Level.OFF)

    // 读取HDFS文件,得到rdd
    val rdd = sc.textFile("hdfs://master:9000/park/words.txt")
    // 将rdd标记为持久化,采用默认存储级别 - StorageLevel.MEMORY_ONLY
    rdd.persist() // 无参持久化方法

    // 对rdd做扁平化映射,得到rdd1
    val rdd1 = rdd.flatMap(_.split(" "))
    // 将rdd1持久化到磁盘
    rdd1.persist(StorageLevel.DISK_ONLY)

    // 将rdd1映射成二元组,得到rdd2
    val rdd2 = rdd1.map((_, 1))
    // 将rdd2持久化到内存,溢出的数据持久化到磁盘
    rdd2.persist(StorageLevel.MEMORY_AND_DISK)

    // 第一次行动算子,对标记为持久化的RDD进行不同级别的持久化操作
    println("元素个数:" + rdd2.count)

    // 第二次行动算子,直接利用rdd2的持久化数据进行操作,无须从头进行计算
    rdd2.collect.foreach(println)
  }
}
  • 运行程序,查看结果

Spark大数据处理讲课笔记--- RDD持久化机制

 

三、利用Spark WebUI查看缓存

  • 最好重启Spark Shell

Spark大数据处理讲课笔记--- RDD持久化机制

 

(一)创建RDD并标记为持久化

  • 执行命令:val rdd = sc.parallelize(List(56, 67, 32, 89, 90, 66, 100))

Spark大数据处理讲课笔记--- RDD持久化机制

 

(二)Spark WebUI查看RDD存储信息

  • 浏览器中访问Spark Shell的WebUI http://master:4040/storage/查看RDD存储信息,可以看到存储信息为空

Spark大数据处理讲课笔记--- RDD持久化机制

 

  • 执行命令:rdd.collect,收集RDD数据

Spark大数据处理讲课笔记--- RDD持久化机制

 

  • 刷新WebUI,发现出现了一个ParallelCollectionRDD的存储信息,该RDD的存储级别为MEMORY,持久化的分区为8,完全存储于内存中。

Spark大数据处理讲课笔记--- RDD持久化机制

 

  • 单击ParallelCollectionRDD超链接,可以查看该RDD的详细存储信息

Spark大数据处理讲课笔记--- RDD持久化机制

 

  • 上述操作说明,调用RDD的persist()方法只是将该RDD标记为持久化,当执行行动操作时才会对标记为持久化的RDD进行持久化操作。

  • 执行以下命令,创建rdd2,并将rdd2持久化到磁盘

Spark大数据处理讲课笔记--- RDD持久化机制

 

  • 刷新上述WebUI,发现多了一个MapPartitionsRDD的存储信息,该RDD的存储级别为DISK,持久化的分区为8,完全存储于磁盘中。

Spark大数据处理讲课笔记--- RDD持久化机制

 

(三)将RDD从缓存中删除

  • Spark会自动监视每个节点上的缓存使用情况,并以最近最少使用的方式从缓存中删除旧的分区数据。如果希望手动删除RDD,而不是等待该RDD被Spark自动从缓存中删除,那么可以使用RDD的unpersist()方法。

  • 执行命令:rdd.unpersist(),将rdd(ParallelCollectionRDD)从缓存中删除

Spark大数据处理讲课笔记--- RDD持久化机制

 

  • 刷新上述WebUI,发现只剩下了MapPartitionsRDDParallelCollectionRDD已被移除。

Spark大数据处理讲课笔记--- RDD持久化机制

 

到了这里,关于Spark大数据处理讲课笔记--- RDD持久化机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark大数据处理讲课笔记4.1 Spark SQL概述、数据帧与数据集

      目录 零、本讲学习目标 一、Spark SQL (一)Spark SQL概述 (二)Spark SQL功能 (三)Spark SQL结构 1、Spark SQL架构图 2、Spark SQL三大过程 3、Spark SQL内部五大组件 (四)Spark SQL工作流程 (五)Spark SQL主要特点 1、将SQL查询与Spark应用程序无缝组合 2、Spark SQL以相同方式连接多种数据

    2024年02月09日
    浏览(64)
  • Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

      目录 零、本讲学习目标 一、基本操作 二、默认数据源 (一)默认数据源Parquet (二)案例演示读取Parquet文件 1、在Spark Shell中演示 2、通过Scala程序演示 三、手动指定数据源 (一)format()与option()方法概述 (二)案例演示读取不同数据源 1、读取房源csv文件 2、读取json,保

    2024年02月09日
    浏览(44)
  • Spark大数据处理学习笔记(3.1)掌握RDD的创建

    文章目录 一、准备工作 1.1 准备文件 1.1.1 准备本地系统文件 在/home目录里创建test.txt 单词用空格分隔 1.1.2 启动HDFS服务 执行命令:start-dfs.sh 1.1.3 上传文件到HDFS 将test.txt上传到HDFS的/park目录里 查看文件内容 1.2 启动Spark Shell 1.2.1 启动Spark服务 执行命令:start-all.sh 1.2.2 启动Sp

    2024年02月09日
    浏览(43)
  • Spark大数据处理学习笔记(3.2.2)掌握RDD算子

    衔接上文:http://t.csdn.cn/Z0Cfj 功能: reduce()算子按照传入的函数进行归约计算 案例: 计算1 + 2 + 3 + …+100的值 计算1 × 2 × 3 × 4 × 5 × 6 的值(阶乘 - 累乘) 计算1 2 + 2 2 + 3 2 + 4 2 + 5**2的值(先映射,后归约) 功能: collect()算子向Driver以数组形式返回数据集的所有元素。通常对

    2024年02月08日
    浏览(48)
  • Spark 大数据实战:基于 RDD 的大数据处理分析

    之前笔者参加了公司内部举办的一个 Big Data Workshop,接触了一些 Spark 的皮毛,后来在工作中陆陆续续又学习了一些 Spark 的实战知识。 本文笔者从小白的视角出发,给大家普及 Spark 的应用知识。 Spark 集群是基于 Apache Spark 的分布式计算环境,用于处理 大规模数据集 的计算任

    2024年01月25日
    浏览(48)
  • Spark大数据处理学习笔记(2.2)搭建Spark Standalone集群

    一、在master虚拟机上安装配置Spark 1.1 将spark安装包上传到master虚拟机 下载Spark:pyw2 进入/opt目录,查看上传的spark安装包 1.2 将spark安装包解压到指定目录 执行命令: tar -zxvf spark-3.3.2-bin-hadoop3.tgz 修改文件名:mv spark-3.3.2-bin-hadoop3 spark-3.3.2 1.3 配置spark环境变量 执行命令:vim

    2024年02月09日
    浏览(52)
  • Spark大数据处理学习笔记(2.4)IDEA开发词频统计项目

    该文章主要为完成实训任务,详细实现过程及结果见【http://t.csdn.cn/0qE1L】 从Scala官网下载Scala2.12.15 - https://www.scala-lang.org/download/2.12.15.html 安装在默认位置 安装完毕 在命令行窗口查看Scala版本(必须要配置环境变量) 启动HDFS服务 启动Spark集群 在master虚拟机上创建单词文件

    2024年02月08日
    浏览(57)
  • Spark避坑系列(三)(Spark Core-RDD 依赖关系&持久化&共享变量)

    大家想了解更多大数据相关内容请移驾我的课堂: 大数据相关课程 剖析及实践企业级大数据 数据架构规划设计 大厂架构师知识梳理:剖析及实践数据建模 PySpark入坑系列第三篇,该篇章主要介绍spark的编程核心RDD的其他概念,依赖关系,持久化,广播变量,累加器等 在spa

    2024年02月01日
    浏览(45)
  • Spark重温笔记(四):秒级处理庞大数据量的 SparkSQL 操作大全,能否成为你的工作备忘指南?

    前言:今天是温习 Spark 的第 4 天啦!主要梳理了 SparkSQL 工作中常用的操作大全,以及演示了几个企业级案例,希望对大家有帮助! Tips:\\\"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊! 喜欢我的博

    2024年04月11日
    浏览(45)
  • Spark核心RDD详解(设计与运行原理,分区,创建,转换,行动与持久化)

    在实际应用中,存在许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘工具,这些应用场景的共同之处是,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。但是,目前的MapReduce框架都是把中间结果写入到HDFS中,带来了大量的

    2024年02月04日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包