Spark重温笔记(三):Spark在企业中为什么能这么强?——持久化、Checkpoint机制、共享变量与内核调度原理全攻略“

这篇具有很好参考价值的文章主要介绍了Spark重温笔记(三):Spark在企业中为什么能这么强?——持久化、Checkpoint机制、共享变量与内核调度原理全攻略“。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spark学习笔记

前言:今天是温习 Spark 的第 3 天啦!主要梳理了 Spark 核心数据结构:RDD(弹性分布式数据集),包括RDD持久化,checkpoint机制,spark两种共享变量以及spark内核调度原理,希望对大家有帮助!

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"


(本节的所有数据集放在我的资源下载区哦,感兴趣的小伙伴可以自行下载:最全面的SparkCore系列案例数据集

5. RDD持久化[掌握]

(1)为什么使用缓存
  • 缓存可以加速计算,比如在wordcount操作的时候对reduceByKey算子进行cache的缓存操作,这时候后续的操作直接基于缓存后续的计算
  • 缓存可以解决容错问题,因为RDD是基于依赖链的Dependency
  • 使用经验:一次缓存可以多次使用
(2)如何进行缓存
  • spark中提供cache方法
  • spark中提供persist方法
_15_acheOrpersist.py

# -*- coding: utf-8 -*-
# Program function:演示join操作
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
import time
if __name__ == '__main__':
    print('PySpark join Function Program')
    # TODO:1、创建应用程序入口SparkContext实例对象
    conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
    sc = SparkContext.getOrCreate(conf)
    # TODO: 2、从本地文件系统创建RDD数据集
    x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
    y = sc.parallelize([(1001, "sales"), (1002, "tech")])
    # TODO:3、使用join完成联合操作
    join_result_rdd = x.join(y)
    print(join_result_rdd.collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    print(x.leftOuterJoin(y).collect())
    print(x.rightOuterJoin(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    # 缓存--基于内存缓存-cache底层调用的是self.persist(StorageLevel.MEMORY_ONLY)
    join_result_rdd.cache()
    # join_result_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
    # 如果执行了缓存的操作,需要使用action算子触发,在4040页面上看到绿颜色标识
    join_result_rdd.collect()
    # 如果后续执行任何的操作会直接基于上述缓存的数据执行,比如count==============================> 4040端口出现绿点
    print(join_result_rdd.count())
    print(join_result_rdd.first())
    time.sleep(600)
    sc.stop()
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech')), (1003, ('wangwu', None)), (1004, ('zhangliu', None))]
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
2
(1001, ('zhangsan', 'sales'))

(3)何时缓存数据
  • rdd来之不易
  • 经过很长依赖链计算
  • 经过shuffle
  • rdd被使用多次
  • 缓存cache或persist

问题1:缓存将数据保存在内存或磁盘中,内存或磁盘都属于易失介质

  • 内存在重启之后没有数据了,磁盘也会数据丢失

  • 注意:缓存会将依赖链进行保存的

  • 问题2:如何解决基于cache或persist的存储在易失介质的问题?

  • 引入checkpoint检查点机制

  • 将元数据和数据统统存储在HDFS的非易失介质,HDFS有副本机制

  • checkpoint切断依赖链,直接基于保存在hdfs的中元数据和数据进行后续计算

  • 什么是元数据?

  • 管理数据的数据

  • 比如,数据大小,位置等都是元数据

6. Checkpoint机制[掌握]

(1) 为什么要检查点

为什么有检查点机制?

  • 1-因为cache或perisist将数据缓存在内存或磁盘中,会有丢失数据情况,引入检查点机制,可以将数据斩断依赖之后存储到HDFS的非易失介质中,解决Spark的容错问题
  • 2-Spark的容错问题?
    • 有一些rdd出错怎么办?可以借助于cache或Persist,或checkpoint
(2)如何进行检查点

如何使用检查点机制?

  • 1-指定数据保存在哪里?:sc.setCheckpointDir(“hdfs://node1:9820/chehckpoint/”)
  • 2-对谁缓存?:算子
  • 3-rdd1.checkpoint() :斩断依赖关系进行检查点
  • 4-检查点机制触发方式:action算子可以触发
  • 5-后续的计算过程:Spark机制直接从checkpoint中读取数据
(3)检查点机制有哪些作用

检查点机制那些作用?

  • 将数据和元数据保存在HDFS中
  • 后续执行rdd的计算直接基于checkpoint的rdd
  • 起到了容错的作用
(4) 如何实现spark的容错

面试题:如何实现Spark的容错?

  • 1-首先会查看Spark是否对数据缓存,cache或perisist,直接从缓存中提取数据
  • 2-否则查看checkpoint是否保存数据
  • 3-否则根据依赖关系重建RDD
(5)持久化和检查点的区别
  • 1-存储位置:缓存放在内存或本地磁盘,检查点机制在hdfs
  • 2-生命周期:缓存通过LRU或unpersist释放,检查点机制会根据文件一直存在
  • 3-依赖关系:缓存保存依赖关系,检查点斩断依赖关系链
_16_checkpoint.py
# -*- coding: utf-8 -*-
# Program function:checkpoint RDD

from pyspark import SparkContext, SparkConf
import os
import time

from pyspark.storagelevel import StorageLevel

os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python3"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

if __name__ == '__main__':
    print('PySpark checkpoint Program')
    # TODO:1、创建应用程序入口SparkContext实例对象
    conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
    sc = SparkContext.getOrCreate(conf)
    # TODO: 2、RDD的checkpoint
    sc.setCheckpointDir("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/checkpoint1")
    # TODO: 3、调用集合RDD中函数处理分析数据
    fileRDD = sc.textFile("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/words.txt")
    # TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发
    fileRDD.checkpoint()
    print(fileRDD.count())
    # TODO: 再次执行count函数, 此时从checkpoint读取数据
    print(fileRDD.count())

    time.sleep(100)
    print('停止 PySpark SparkSession 对象')
    # 关闭SparkContext
    sc.stop()

2
2
停止 PySpark SparkSession 对象  
(6)持久化和检查点并存

先cache 再 checkpoint测试

  • 1-读取数据文件
  • 2-设置检查点目录
  • 3-rdd.checkpoint() 和rdd.cache()
  • 4-执行action操作,根据spark容错选择首先从cache中读取数据,时间更少,速度更快
  • 5-如果对rdd实现unpersist
  • 6-从checkpoint中读取rdd的数据
  • 7-通过action可以查看时间
_17_acheCheckpoint.py
# -*- coding: utf-8 -*-
# Program function:cache&checkpoint RDD

from pyspark import SparkContext, SparkConf
import os
import time

from pyspark.storagelevel import StorageLevel

os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python3"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

if __name__ == '__main__':
    print('PySpark cache&checkpoint Program')
    # TODO:1、创建应用程序入口SparkContext实例对象
    conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
    sc = SparkContext.getOrCreate(conf)
    # TODO: 2、RDD的checkpoint
    sc.setCheckpointDir("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/checkpoint1")
    # TODO: 3、调用集合RDD中函数处理分析数据
    fileRDD = sc.textFile("/export/data/spark_practice/PySpark-SparkCore_3.1.2/data/words.txt")
    # TODO: 调用checkpoint和cache函数,将RDD进行容错,需要RDD中Action函数触发
    print("=======1-同时做cache和Perisist========")
    fileRDD.cache()
    fileRDD.checkpoint()
    print("=======2-启动Job1跑正常任务,启动Job2就会先从Cache读取数据,Web页面可以看到ProcessLocal========")
    fileRDD.count()
    # TODO: 再次执行count函数, 此时从checkpoint读取数据
    fileRDD.count()
    print("=======3-启动一个Job发现查询数据从checkpoint的hdfs中查找========")
    # TODO:释放cache之后如果在查询数据从哪里读取? 答案是checkpoint的hdfs的数据中。
    fileRDD.unpersist(True)
    fileRDD.count()

    time.sleep(100)
    print('停止 PySpark SparkSession 对象')
    # 关闭SparkContext
    sc.stop()

7.两种共享变量[掌握]

(1)累加器
  • 1-原理
    • 在Driver端和exeutor端可以共享Executor执行计算的结果
  • 2-不使用累加器
    • python本地集合可以直接得到结果
    • 但是在分布式集合中得不到累加的
  • 3-使用累加器
    • acc=sc.accumulate(10),10是初始值
    • acc.add(num)
    • print(acc.value)通过value获取累加器的值
(2)广播变量
  • 1-广播变量不是在每个Task拥有一份变量,而是每个节点的executor一份副本
  • 2-广播变量通过本地的executor从blockmanager中过去driver上面变量的副本(计算资源+计算程序)

8. Spark的内核调度

(1) RDD依赖
  • RDD依赖
  • 为什么设计依赖?
    • 1-为了实现Spark的容错,rdd1-rdd2-rdd3-rdd4
    • 2-并行计算,划分依赖、
  • 为什么划分宽窄依赖?
    • 为了加速并行计算
    • 窄依赖可以并行计算,如果是宽依赖无法并行计算
  • 依赖的划分
    • 窄依赖:*父 RDD 与子 RDD 间的分区是一对一的*
    • 宽依赖:划分Stage
      • *父 RDD 中的分区可能会被多个子 RDD 分区使用*
    • 如何区分宽窄依赖?
      • 比如map。filter,flatMap 窄依赖,无需进行shuffle
      • 比如reduceByKey(合并多个窄依赖),groupByKey,宽依赖(shuffle)
      • 不能说:一个子RDD依赖于多个父rdd,该种情况无法判断
(2) DAG

什么是DAG?

  • 有向无环图
  • DAG如何划分Stage?
    • 一个Dag就是一个Job,一个Dag是由Action算子进行划分
    • 一个Job下面有很多Stage,根据宽依赖Shuffle依赖划分Stage
  • 一个Spark应用程序包括Job、Stage及Task:
    • 第一:Job是以Action方法为界,遇到一个Action方法则触发一个Job;一个Job就是dag
    • 第二:Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分;
    • 第三:Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。
(3) Job的调度流程
  • 1-用户代码编写: 用户根据需求编写 Spark 应用程序,包括定义 RDD、转换操作和行动操作等。

  • 2-DAG 构建: Spark 将用户编写的代码进行解析,并构建出一个有向无环图(DAG),该图表示了任务之间的依赖关系。DAG 由一系列的阶段(Stage)组成,每个阶段包含一组可以并行执行的任务。

  • 3-Stage 划分: 根据任务之间的依赖关系,Spark 将 DAG 进一步划分为不同的阶段。一个阶段包含一组可以在无需 shuffle 的情况下并行执行的任务。

  • 4-Task 划分: 对于每个阶段,Spark 将其划分为一系列的任务(Task),每个任务对应于一个 RDD partition 的处理。任务的划分是根据数据的分区方式和计算的转换操作来确定的。

  • 5-资源分配: Spark 根据集群的资源情况,将任务分配给可用的 Executor,以便在集群中并行执行。

  • 6-DAG 调度: Spark 根据阶段之间的依赖关系,按照拓扑顺序调度阶段的执行。每个阶段的任务会在 Executor 上启动,并且会根据需要进行数据的 shuffle 操作。

  • 7-任务执行: Executor 在分配到的资源上并行执行任务。每个任务会根据用户编写的转换操作对 RDD 进行处理,并将结果传递给下一个阶段的任务。

  • 8-结果输出: 最后一个阶段完成后,Spark 将最终的结果返回给用户代码,或者将结果写入外部存储系统,如 HDFS、数据库等。文章来源地址https://www.toymoban.com/news/detail-845165.html

到了这里,关于Spark重温笔记(三):Spark在企业中为什么能这么强?——持久化、Checkpoint机制、共享变量与内核调度原理全攻略“的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 职场工作多年,为什么成长这么慢

    在职场工作多年,却没有成长,是许多人都会遇到的问题。这种情况可能让人感到沮丧和无助,但是它的根本原因是什么呢?在本文中,我们将探讨为什么会出现这种情况,以及如何克服这种困境。 成长需要我们对自己的能力和表现进行评估和反思。如果没有对自己的工作进

    2023年04月16日
    浏览(35)
  • 为什么这么设计—— Go的GC

    Go语言采用了3色标记清理法来对内存进行自动垃圾回收, 过程是这样的: (1)起初所有的对象都是白色的; (2)从根对象出发扫描所有可达对象,标记为灰色,放入待处理队列; (3)从待处理队列中取出灰色对象,将其引用的对象标记为灰色并放入待处理队列中,自身标

    2024年02月12日
    浏览(36)
  • 为什么现在原生家庭的问题这么严重?

    匿名用户 191 人赞同了该回答 换一个玄学的角度来看这个问题,之前看b站,有一个up主说,中国有历史记载的人口数一直都很稳定,7-8千万到1亿左右,明朝2亿,清朝到民国算是增长比较多的,有4亿,但是从开国到现在增长了10亿,从轮回的角度来讲,哪来那么多的人来转世

    2024年02月13日
    浏览(47)
  • 48 | DMA:为什么Kafka这么快?

    过去几年里,整个计算机产业界,都在尝试不停地提升 I/O 设备的速度。把 HDD 硬盘换成 SSD 硬盘,我们仍然觉得不够快;用 PCI Express 接口的 SSD 硬盘替代 SATA 接口的 SSD 硬盘,我们还是觉得不够快,所以,现在就有了傲腾(Optane)这样的技术。 但是,无论 I/O 速度如何提升,

    2024年02月21日
    浏览(33)
  • 玩转Discord:为什么它这么吸引加密社区?

        Twitter、Telegram、Discord,目前加密货币项目和社区必备的三件套,其重要程度堪比国内所说的“两微一抖(微博、微信和抖音)”。 Twitter和Telegram国内的用户还算了解,Discord相对来说就比较陌生了,但是近一年以来,随着国内社交平台的审查收紧,NFT、DAO的盛行,Discor

    2024年02月04日
    浏览(34)
  • 视频中为什么需要这么多的颜色空间?

    作者 | 17哥 导读 :在视频处理中,我们经常会用到不同的色彩空间: 非线性RGB,线性 RGB,YUV,XYZ ……为什么需要这么多的色彩空间呢?为什么在 FFMpeg 中会有 color_space,color_transfer,color_primaries 等一系列的颜色属性呢?这些术语之间究竟隐藏着什么秘密 ? 全文5840字,预计阅

    2023年04月13日
    浏览(31)
  • 电脑为什么这么卡?6个方法处理电脑卡顿

    你是否打开电脑就卡到不行?电脑的开机速度慢,就连打开网页也在转圈圈,一直加载不出来。世界上最痛苦的事莫过于此,想要好好工作,却一直加载不出网页。 你知道电脑为什么这么卡吗? 其实大多数的原因都在这篇文章列出来了,有兴趣的朋友一起来看看,下面还有

    2024年02月11日
    浏览(34)
  • 记录--强制缓存这么暴力,为什么不使用协商缓存

    前段时间在看面经的时候,发现很多份面经中都被问到了 强缓存 和 协商缓存 。因此我觉得有必要写一篇文章来好好聊聊这两者。 浏览器缓存是浏览器在本地磁盘对用户最近请求过的文档进行存储,当访问者再次访问同一页面时,浏览器就可以直接从本地磁盘加载文档,其中浏览

    2024年02月10日
    浏览(34)
  • 为什么C++这么复杂还不被淘汰?

    C++是一门广泛使用的编程语言,主要用于系统和应用程序的开发。尽管C++具有一些复杂的语法和概念,但它仍然是编程界的重量级选手,在编程语言排行榜中一直位居前列。 为什么C++这么复杂还不被淘汰呢? C++有以下优势 1、C++具有高性能 C++是一门编译型语言,可以直接编

    2024年02月05日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包