PySpark大数据教程:深入学习SparkCore的RDD持久化和Checkpoint

这篇具有很好参考价值的文章主要介绍了PySpark大数据教程:深入学习SparkCore的RDD持久化和Checkpoint。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SparkCore加强

  • 重点:RDD的持久化和Checkpoint

  • 提高拓展知识:Spark内核调度全流程,Spark的Shuffle

  • 练习:热力图统计及电商基础指标统计

  • combineByKey作为面试部分重点,可以作为扩展知识点

Spark算子补充

  • 关联函数补充

  • join为主基础算子

  • # -*- coding: utf-8 -*-# Program function:演示join操作from pyspark import SparkConf, SparkContextif __name__ == '__main__':
        print('PySpark join Function Program')
        # TODO:1、创建应用程序入口SparkContext实例对象
        conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
        sc = SparkContext.getOrCreate(conf)
        # TODO: 2、从本地文件系统创建RDD数据集
        x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
        y = sc.parallelize([(1001, "sales"), (1002, "tech")])
        # TODO:3、使用join完成联合操作
        print(x.join(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
        print(x.leftOuterJoin(y).collect())
        print(x.rightOuterJoin(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
    
        sc.stop()


[掌握]RDD 持久化

为什么使用缓存

  • 缓存可以加速计算,比如在wordcount操作的时候对reduceByKey算子进行cache的缓存操作,这时候后续的操作直接基于缓存后续的计算

  • 缓存可以解决容错问题,因为RDD是基于依赖链的Dependency

  • 使用经验:一次缓存可以多次使用

如何进行缓存?

  • spark中提供cache方法

  • spark中提供persist方法

  • # -*- coding: utf-8 -*-# Program function:演示join操作from pyspark import SparkConf, SparkContextfrom pyspark.storagelevel import StorageLevelimport timeif __name__ == '__main__':
        print('PySpark join Function Program')
        # TODO:1、创建应用程序入口SparkContext实例对象
        conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
        sc = SparkContext.getOrCreate(conf)
        # TODO: 2、从本地文件系统创建RDD数据集
        x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
        y = sc.parallelize([(1001, "sales"), (1002, "tech")])
        # TODO:3、使用join完成联合操作
        join_result_rdd = x.join(y)
        print(join_result_rdd.collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
        print(x.leftOuterJoin(y).collect())
        print(x.rightOuterJoin(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
        # 缓存--基于内存缓存-cache底层调用的是self.persist(StorageLevel.MEMORY_ONLY)
        join_result_rdd.cache()
        # join_result_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
        # 如果执行了缓存的操作,需要使用action算子触发,在4040页面上看到绿颜色标识
        join_result_rdd.collect()
        # 如果后续执行任何的操作会直接基于上述缓存的数据执行,比如count
        print(join_result_rdd.count())
        time.sleep(600)
        sc.stop()


缓存级别

  • PySpark,SparkCore,RDD持久化,Checkpoint,缓存,join操作,Spark算子

  • PySpark,SparkCore,RDD持久化,Checkpoint,缓存,join操作,Spark算子

  • PySpark,SparkCore,RDD持久化,Checkpoint,缓存,join操作,Spark算子

  • 如何选:

  • 1-首选内存

  • 2-内存放不下,尝试序列化

  • 3-如果算子比较昂贵可以缓存在磁盘中,否则不要直接放入磁盘

  • 4-使用副本机制完成容错性质

释放缓存

  • 后续讲到Spark内存模型中,缓存放在Execution内存模块

  • 如果不在需要缓存的数据,可以释放

  • PySpark,SparkCore,RDD持久化,Checkpoint,缓存,join操作,Spark算子

  • 最近最少使用(LRU)



print(“释放缓存之后,直接从rdd的依赖链重新读取”)
print(join_result_rdd.count())

* <img src="https://maynor.oss-cn-shenzhen.aliyuncs.com/img/20231009192818.png" alt="image-20210913104616717" style="zoom:150%;" />

何时缓存数据

  • rdd来之不易

  • 经过很长依赖链计算

  • 经过shuffle

  • rdd被使用多次

  • 缓存cache或persist

  • 问题:缓存将数据保存在内存或磁盘中,内存或磁盘都属于易失介质

  • 内存在重启之后没有数据了,磁盘也会数据丢失

  • 注意:缓存会将依赖链进行保存的

  • 如何解决基于cache或persist的存储在易失介质的问题?

  • 引入checkpoint检查点机制

  • 将元数据和数据统统存储在HDFS的非易失介质,HDFS有副本机制

  • checkpoint切断依赖链,直接基于保存在hdfs的中元数据和数据进行后续计算

  • 什么是元数据?

    • 管理数据的数据

    • 比如,数据大小,位置等都是元数据

[掌握]RDD Checkpoint

  • 为什么有检查点机制?

    • 有一些rdd出错怎么办?可以借助于cache或Persist,或checkpoint

    • 因为cache或perisist将数据缓存在内存或磁盘中,会有丢失数据情况,引入检查点机制,可以将数据斩断依赖之后存储到HDFS的非易失介质中,解决Spark的容错问题

    • Spark的容错问题?

  • 如何使用检查点机制?

    • 指定数据保存在哪里?

    • sc.setCheckpointDir(“hdfs://node1:9820/chehckpoint/”)

    • 对谁缓存?答案算子

    • rdd1.checkpoint() 斩断依赖关系进行检查点

    • 检查点机制触发方式

    • action算子可以触发

    • 后续的计算过程

    • Spark机制直接从checkpoint中读取数据

    • PySpark,SparkCore,RDD持久化,Checkpoint,缓存,join操作,Spark算子

    • 实验过程还原:

    • PySpark,SparkCore,RDD持久化,Checkpoint,缓存,join操作,Spark算子PySpark,SparkCore,RDD持久化,Checkpoint,缓存,join操作,Spark算子

    • PySpark,SparkCore,RDD持久化,Checkpoint,缓存,join操作,Spark算子

    • PySpark,SparkCore,RDD持久化,Checkpoint,缓存,join操作,Spark算子

  • 检查点机制那些作用?

    • 将数据和元数据保存在HDFS中

    • 后续执行rdd的计算直接基于checkpoint的rdd

    • 起到了容错的作用

  • 面试题:如何实现Spark的容错?

    • 1-首先会查看Spark是否对数据缓存,cache或perisist,直接从缓存中提取数据

    • 2-否则查看checkpoint是否保存数据

    • 3-否则根据依赖关系重建RDD

  • 检查点机制案例

持久化和Checkpoint的区别

  • 存储位置:缓存放在内存或本地磁盘,检查点机制在hdfs

  • 生命周期:缓存通过LRU或unpersist释放,检查点机制会根据文件一直存在

  • 依赖关系:缓存保存依赖关系,检查点斩断依赖关系链

案例测试:

先cache在checkpoint测试

  • 1-读取数据文件

  • 2-设置检查点目录

  • 3-rdd.checkpoint() 和rdd.cache()

  • 4-执行action操作,根据spark容错选择首先从cache中读取数据,时间更少,速度更快

  • PySpark,SparkCore,RDD持久化,Checkpoint,缓存,join操作,Spark算子

  • 5-如果对rdd实现unpersist

  • 6-从checkpoint中读取rdd的数据

  • PySpark,SparkCore,RDD持久化,Checkpoint,缓存,join操作,Spark算子

  • 7-通过action可以查看时间

  • PySpark,SparkCore,RDD持久化,Checkpoint,缓存,join操作,Spark算子

AI副业实战手册:http://www.yibencezi.com/notes/253200?affiliate_id=1317(目前40+工具及实战案例,持续更新,实战类小册排名第一,做三个月挣不到钱找我退款,交个朋友的产品)

后记

📢博客主页:https://manor.blog.csdn.net

📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 Maynor 原创,首发于 CSDN博客🙉
📢感觉这辈子,最深情绵长的注视,都给了手机⭐
📢专栏持续更新,欢迎订阅:https://blog.csdn.net/xianyu120/category_12453356.html文章来源地址https://www.toymoban.com/news/detail-720069.html

到了这里,关于PySpark大数据教程:深入学习SparkCore的RDD持久化和Checkpoint的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包