PySpark之RDD的持久化

这篇具有很好参考价值的文章主要介绍了PySpark之RDD的持久化。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RDD的持久化

RDD的缓存

当RDD被重复使用,或者计算该RDD比较容易出错,而且需要消耗比较多的资源和时间的时候,我们就可以将该RDD缓存起来。

主要作用: 提升Spark程序的计算效率
注意事项: RDD的缓存可以存储在内存或者是磁盘上,甚至可以存储在Executor进程的堆外内存中。主要是放在内存中,因此缓存的数据是不太稳定可靠。

由于是临时存储,可能会存在丢失,所以缓存操作,并不会将RDD之间的依赖关系给截断掉(丢失掉),因为当缓存
失效后,可以全部重新计算
缓存的API都是Lazy惰性的,如果需要触发缓存操作,推荐调用count算子,因为运行效率高


设置缓存的相关API:
            rdd.cache():将RDD的数据缓存在内存中
            rdd.persist(缓存的级别/位置):将RDD的数据存储在指定位置
手动清理缓存:rdd.unpersits()
默认情况下,当整个Spark应用程序执行完成后,缓存数据会自动失效,会自动删除

缓存的级别/位置:

        DISK_ONLY: 只存储在磁盘
    DISK_ONLY_2: 只存储在磁盘,并且有2个副本
    DISK_ONLY_3: 只存储在磁盘,并且有3个副本
    MEMORY_ONLY: 只存储在内存中
    MEMORY_ONLY_2: 只存储在内存中,并且有2个副本
    MEMORY_AND_DISK: 存储在内存和磁盘中,先放在内存,再放在磁盘
    MEMORY_AND_DISK_2: 存储在内存和磁盘中,先放在内存,再放在磁盘,并且有2个副本
    OFF_HEAP: Executor进程的堆外内存
    
工作中最常用的是: MEMORY_AND_DISK和MEMORY_AND_DISK_2。优先推荐使用MEMORY_AND_DISK

import time

from pyspark import SparkConf, SparkContext, StorageLevel
import os
import jieba

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 需要过滤的关键词黑名单
keyword_black_list = ['+','.','的','com']

# ctrl+alt+M将代码封装成函数/方法
# 3.2- 需求一:统计每个关键词出现了多少次。先提取需要操作的字段并且分词,这一步类似WordCount中的对每行进行切分处理,再仿照WordCount实现。
def top10_keyword():
    keyword_rdd = etl_rdd.flatMap(lambda line_tup: list(jieba.cut(line_tup[2])))
    # print(keyword_rdd.take(10))

    # 数据结构转变。将单词变成元组
    # keyword_map_rdd = keyword_rdd.filter(lambda word:word!='+' or word!='.').map(lambda word:(word,1))
    keyword_map_rdd = keyword_rdd.filter(lambda word: word not in keyword_black_list).map(lambda word: (word, 1))

    # 分组聚合操作
    keyword_result_rdd = keyword_map_rdd.reduceByKey(lambda agg, curr: agg + curr)
    # print(keyword_result_rdd.take(100))

    # 对结果中关键词的次数降序排序,取TOP10
    keyword_result = keyword_result_rdd.top(10, key=lambda tup: tup[1])
    print(keyword_result)


# 3.3- 需求二:统计每个用户每个搜索内容点击的次数
def content():
    """
        hive sql:
            select
                用户,搜索内容,count(1) as cnt
            from table
            group by 用户,搜索内容
    """
    # 从原始的6个字段中,提取出2个字段,得到 (用户,搜索内容)
    new_tup_tmp_rdd = etl_rdd.map(lambda tup: (tup[1], tup[2]))

    # 数据格式转换
    """
            输入:(张三,鸡你太美) -> hello
            输出:((张三,鸡你太美),1) -> (hello,1)
        """
    new_tup_rdd = new_tup_tmp_rdd.map(lambda tup: (tup, 1))
    # new_tup_rdd = new_tup_tmp_rdd.map(lambda tup:(tup[0],tup[1],1))

    # 分组聚合
    content_result = new_tup_rdd.reduceByKey(lambda agg, curr: agg + curr)
    print(content_result.take(10))


if __name__ == '__main__':

    # 1- 创建SparkContext
    conf = SparkConf().setAppName('sogou_demo').setMaster('local[*]')
    sc = SparkContext(conf=conf)

    # 2- 数据输入
    init_rdd = sc.textFile('file:///export/data/gz16_pyspark/01_spark_core/data/SogouQ.sample')

    print("ETL处理前数据条数:",init_rdd.count())

    # 3- 数据处理
    # 3.1- ETL:数据的清洗、转换、加载
    """
        split():默认按照空白字符进行切分。例如:空格、制表符、回车换行符等
        
        map和flatMap的主要区别:flatMap对每一个元素处理以后,会将结果打平/压扁到一个更大的容器当中。
    """
    map_rdd = init_rdd.map(lambda line:line.split())
    # print("调用map算子后的内容:",map_rdd.take(10))

    # flatmap_rdd = init_rdd.flatMap(lambda line: line.split())
    # print("调用flatMap算子后的内容:",flatmap_rdd.take(10))

    # 过滤掉每行中没有6个字段的数据
    filter_rdd = map_rdd.filter(lambda line_list: len(line_list)==6)


    # 数据结构转换(为了演示而演示)
    etl_rdd = filter_rdd.map(lambda line_list:(
        line_list[0],
        line_list[1],
        line_list[2][1:-1], # 省略前后的中括号
        line_list[3],
        line_list[4],
        line_list[5]
    ))

    # 设置缓存。并且调用count算子触发操作
    # etl_rdd.cache().count()
    etl_rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count()

    print("ETL处理后数据条数:", etl_rdd.count())

    # 3.2- 需求一:统计每个关键词出现了多少次
    # top10_keyword()

    # 3.3- 需求二:统计每个用户每个搜索内容点击的次数
    content()


    time.sleep(20)

    # 手动清理缓存。你对哪个RDD设置了缓存,那么你就对那个RDD清理缓存。也需要调用count算子触发。
    etl_rdd.unpersist().count()

    time.sleep(100)

    # 5- 释放资源
    sc.stop()

 无缓存的DAG流程图显示:

PySpark之RDD的持久化,python,spark 有缓存的DAG流程图显示:

PySpark之RDD的持久化,python,spark

RDD的checkpoint检查点

RDD缓存主要是将数据存储在内存中,是临时存储,不太稳定,它主要是用来提升程序运行效率的。RDD的checkpoint(检查点)主要是将数据存储在HDFS上,是持久化存储。而HDFS存储数据有3副本的机制,让数据更加安全可靠。

checkpoint认为使用磁盘或者HDFS存储数据之后,数据非常的安全可靠,因此checkpoint会将RDD间的依赖关系给删除/丢弃掉。因此如果checkpoint的数据真的出现了问题,是无法在从头开始计算。

checkpoint主要作用: 提高程序的容错性
注意事项: checkpoint可以将数据存储在磁盘或者HDFS上,主要是将数据存储在HDFS上。

相关API:
    sc.setCheckpointDir(存储路径): 设置checkpoint数据存放路径
    rdd.checkpoint(): 对指定RDD启用checkpoint
    rdd.count(): 触发checkpoint

import time

from pyspark import SparkConf, SparkContext, StorageLevel
import os
import jieba

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 需要过滤的关键词黑名单
keyword_black_list = ['+','.','的','com']

# ctrl+alt+M将代码封装成函数/方法
# 3.2- 需求一:统计每个关键词出现了多少次。先提取需要操作的字段并且分词,这一步类似WordCount中的对每行进行切分处理,再仿照WordCount实现。
def top10_keyword():
    keyword_rdd = etl_rdd.flatMap(lambda line_tup: list(jieba.cut(line_tup[2])))
    # print(keyword_rdd.take(10))

    # 数据结构转变。将单词变成元组
    # keyword_map_rdd = keyword_rdd.filter(lambda word:word!='+' or word!='.').map(lambda word:(word,1))
    keyword_map_rdd = keyword_rdd.filter(lambda word: word not in keyword_black_list).map(lambda word: (word, 1))

    # 分组聚合操作
    keyword_result_rdd = keyword_map_rdd.reduceByKey(lambda agg, curr: agg + curr)
    # print(keyword_result_rdd.take(100))

    # 对结果中关键词的次数降序排序,取TOP10
    keyword_result = keyword_result_rdd.top(10, key=lambda tup: tup[1])
    print(keyword_result)


# 3.3- 需求二:统计每个用户每个搜索内容点击的次数
def content():
    """
        hive sql:
            select
                用户,搜索内容,count(1) as cnt
            from table
            group by 用户,搜索内容
    """
    # 从原始的6个字段中,提取出2个字段,得到 (用户,搜索内容)
    new_tup_tmp_rdd = etl_rdd.map(lambda tup: (tup[1], tup[2]))

    # 数据格式转换
    """
            输入:(张三,鸡你太美) -> hello
            输出:((张三,鸡你太美),1) -> (hello,1)
        """
    new_tup_rdd = new_tup_tmp_rdd.map(lambda tup: (tup, 1))
    # new_tup_rdd = new_tup_tmp_rdd.map(lambda tup:(tup[0],tup[1],1))

    # 分组聚合
    content_result = new_tup_rdd.reduceByKey(lambda agg, curr: agg + curr)
    print(content_result.take(10))


if __name__ == '__main__':

    # 1- 创建SparkContext
    conf = SparkConf().setAppName('sogou_demo').setMaster('local[*]')
    sc = SparkContext(conf=conf)

    # 设置checkpoint路径
    sc.setCheckpointDir("hdfs://node1:8020/day04/chk")

    # 2- 数据输入
    init_rdd = sc.textFile('file:///export/data/gz16_pyspark/01_spark_core/data/SogouQ.sample')

    print("ETL处理前数据条数:",init_rdd.count())

    # 3- 数据处理
    # 3.1- ETL:数据的清洗、转换、加载
    """
        split():默认按照空白字符进行切分。例如:空格、制表符、回车换行符等
        
        map和flatMap的主要区别:flatMap对每一个元素处理以后,会将结果打平/压扁到一个更大的容器当中。
    """
    map_rdd = init_rdd.map(lambda line:line.split())
    # print("调用map算子后的内容:",map_rdd.take(10))

    # flatmap_rdd = init_rdd.flatMap(lambda line: line.split())
    # print("调用flatMap算子后的内容:",flatmap_rdd.take(10))

    # 过滤掉每行中没有6个字段的数据
    filter_rdd = map_rdd.filter(lambda line_list: len(line_list)==6)


    # 数据结构转换(为了演示而演示)
    etl_rdd = filter_rdd.map(lambda line_list:(
        line_list[0],
        line_list[1],
        line_list[2][1:-1], # 省略前后的中括号
        line_list[3],
        line_list[4],
        line_list[5]
    ))

    # 对指定RDD启用checkpoint
    etl_rdd.checkpoint()
    # 调用count算子,触发checkpoint操作
    etl_rdd.count()


    print("ETL处理后数据条数:", etl_rdd.count())

    # 3.2- 需求一:统计每个关键词出现了多少次
    # top10_keyword()

    # 3.3- 需求二:统计每个用户每个搜索内容点击的次数
    content()

    time.sleep(1000)

    # 5- 释放资源
    sc.stop()

 PySpark之RDD的持久化,python,spark

 PySpark之RDD的持久化,python,spark

持久化方案对比

Spark的两种持久化方案缓存操作,checkpoint检查点的不同点

1.数据存储位置不同
缓存:rdd存储在内存,磁盘,或者是堆外内存中
checkpoint检查点:rdd存储在磁盘或者HDFS中,集群模式下仅能存储在HDFS中
2.生命周期不同
缓存:可以使用unpersist手动删除,或者程序运行结束后会自动销毁,自动删除
checkpoint检查点:程序运行结束后被保留,需要手动删除
3.血缘关系不同
缓存:RDD之间会保留血缘关系,缓存数据可能会失效,失效后可以重新回溯计算
checkpoint检查点:会丢掉依赖关系,因为checkpoint可以将数据保存到更加安全可靠的位置,当执行失败时也不需要重新回溯执行
4.目的不同
缓存:为了提高Spark程序的运行效率
checkpoint检查点:提高Spark程序的容错性

相同点:缓存的API都是Lazy惰性的,如果需要触发缓存操作,推荐调用count算子,因为运行效率高

实际应用

在同一个项目中,推荐缓存和checkpoint(检查点)同时配合使用。

使用顺序如下: 在代码中先设置缓存,再设置checkpoint检查点,然后再一同使用Action算子触发,推荐使用count算子。因为这个顺序,只会有一次IO写的过程。

实际过程如下: 程序会优先从缓存中读取数据,如果发现缓存中没有数据。再从checkpoint中读取数据,并且接着将读取到的数据重新在内存中放置一份,后续还是优先从缓存中读取文章来源地址https://www.toymoban.com/news/detail-817083.html

import time

from pyspark import SparkConf, SparkContext, StorageLevel
import os
import jieba

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 需要过滤的关键词黑名单
keyword_black_list = ['+','.','的','com']

# ctrl+alt+M将代码封装成函数/方法
# 3.2- 需求一:统计每个关键词出现了多少次。先提取需要操作的字段并且分词,这一步类似WordCount中的对每行进行切分处理,再仿照WordCount实现。
def top10_keyword():
    keyword_rdd = etl_rdd.flatMap(lambda line_tup: list(jieba.cut(line_tup[2])))
    # print(keyword_rdd.take(10))

    # 数据结构转变。将单词变成元组
    # keyword_map_rdd = keyword_rdd.filter(lambda word:word!='+' or word!='.').map(lambda word:(word,1))
    keyword_map_rdd = keyword_rdd.filter(lambda word: word not in keyword_black_list).map(lambda word: (word, 1))

    # 分组聚合操作
    keyword_result_rdd = keyword_map_rdd.reduceByKey(lambda agg, curr: agg + curr)
    # print(keyword_result_rdd.take(100))

    # 对结果中关键词的次数降序排序,取TOP10
    keyword_result = keyword_result_rdd.top(10, key=lambda tup: tup[1])
    print(keyword_result)


# 3.3- 需求二:统计每个用户每个搜索内容点击的次数
def content():
    """
        hive sql:
            select
                用户,搜索内容,count(1) as cnt
            from table
            group by 用户,搜索内容
    """
    # 从原始的6个字段中,提取出2个字段,得到 (用户,搜索内容)
    new_tup_tmp_rdd = etl_rdd.map(lambda tup: (tup[1], tup[2]))

    # 数据格式转换
    """
            输入:(张三,鸡你太美) -> hello
            输出:((张三,鸡你太美),1) -> (hello,1)
        """
    new_tup_rdd = new_tup_tmp_rdd.map(lambda tup: (tup, 1))
    # new_tup_rdd = new_tup_tmp_rdd.map(lambda tup:(tup[0],tup[1],1))

    # 分组聚合
    content_result = new_tup_rdd.reduceByKey(lambda agg, curr: agg + curr)
    print(content_result.take(10))


if __name__ == '__main__':

    # 1- 创建SparkContext
    conf = SparkConf().setAppName('sogou_demo').setMaster('local[*]')
    sc = SparkContext(conf=conf)

    # 设置checkpoint路径
    sc.setCheckpointDir("hdfs://node1:8020/day04/chk")

    # 2- 数据输入
    init_rdd = sc.textFile('file:///export/data/gz16_pyspark/01_spark_core/data/SogouQ.sample')

    print("ETL处理前数据条数:",init_rdd.count())

    # 3- 数据处理
    # 3.1- ETL:数据的清洗、转换、加载
    """
        split():默认按照空白字符进行切分。例如:空格、制表符、回车换行符等
        
        map和flatMap的主要区别:flatMap对每一个元素处理以后,会将结果打平/压扁到一个更大的容器当中。
    """
    map_rdd = init_rdd.map(lambda line:line.split())
    # print("调用map算子后的内容:",map_rdd.take(10))

    # flatmap_rdd = init_rdd.flatMap(lambda line: line.split())
    # print("调用flatMap算子后的内容:",flatmap_rdd.take(10))

    # 过滤掉每行中没有6个字段的数据
    filter_rdd = map_rdd.filter(lambda line_list: len(line_list)==6)


    # 数据结构转换(为了演示而演示)
    etl_rdd = filter_rdd.map(lambda line_list:(
        line_list[0],
        line_list[1],
        line_list[2][1:-1], # 省略前后的中括号
        line_list[3],
        line_list[4],
        line_list[5]
    ))

    # 先缓存
    etl_rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)

    # 再checkpoint
    etl_rdd.checkpoint()

    # 最后调用count算子,一同触发
    etl_rdd.count()


    print("ETL处理后数据条数:", etl_rdd.count())

    # 3.2- 需求一:统计每个关键词出现了多少次
    # top10_keyword()

    # 3.3- 需求二:统计每个用户每个搜索内容点击的次数
    content()

    time.sleep(1000)

    # 5- 释放资源
    sc.stop()

到了这里,关于PySpark之RDD的持久化的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark核心RDD详解(设计与运行原理,分区,创建,转换,行动与持久化)

    在实际应用中,存在许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘工具,这些应用场景的共同之处是,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。但是,目前的MapReduce框架都是把中间结果写入到HDFS中,带来了大量的

    2024年02月04日
    浏览(46)
  • 3.5 RDD持久化机制

    一、RDD持久化 (一)引入持久化的必要性 Spark中的RDD是懒加载的,只有当遇到行动算子时才会从头计算所有RDD,而且当同一个RDD被多次使用时,每次都需要重新计算一遍,这样会严重增加消耗。为了避免重复计算同一个RDD,可以将RDD进行持久化。 Spark中重要的功能之一是可以

    2024年02月09日
    浏览(34)
  • Spark重温笔记(三):Spark在企业中为什么能这么强?——持久化、Checkpoint机制、共享变量与内核调度原理全攻略“

    前言:今天是温习 Spark 的第 3 天啦!主要梳理了 Spark 核心数据结构:RDD(弹性分布式数据集),包括RDD持久化,checkpoint机制,spark两种共享变量以及spark内核调度原理,希望对大家有帮助! Tips:\\\"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量

    2024年04月09日
    浏览(46)
  • 数据持久化的利器,Python中的pickle模块详解

    📚 个人网站:涛哥聊Python Python数据序列化和反序列化时, pickle 模块是一个非常有用的工具。它允许将Python对象转换为字节流,以便存储在文件中或通过网络传输,然后将这些字节流重新转换回Python对象。 pickle 是Python标准库中的一个模块,用于将Python对象序列化(pickling)

    2024年02月19日
    浏览(38)
  • python3使用sqlite3构建本地持久化缓存

    环境:Windows 10_x64 python版本:3.9.2 sqlite3版本:3.34.0 日常python开发中会遇到数据持久化的问题,今天记录下如何使用sqlite3进行数据持久化,并提供示例代码及数据查看工具。 python应用程序在运行过程中被kill掉(比如版本升级等情况),内存中的运行数据将会丢失,如果能够

    2024年02月05日
    浏览(32)
  • 【Python】Locust持续优化:InfluxDB与Grafana实现数据持久化与可视化分析

    目录 前言 influxDB 安装运行InfluxDB 用Python 上报数据到influxdb ocust 数据写入到 influx Locust的生命周期 上报数据 优化升级 配置Grafana 总结  资料获取方法 在进行性能测试时,我们需要对测试结果进行监控和分析,以便于及时发现问题并进行优化。 Locust在内存中维护了一个时间序

    2024年02月14日
    浏览(41)
  • Redis两种持久化方案RDB持久化和AOF持久化

    Redis持久化 Redis有两种持久化方案: RDB持久化 AOF持久化 1.1.RDB持久化 RDB全称Redis Database Backup file(Redis数据备份文件),也被叫做Redis数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。快照文件称为RDB文件

    2024年02月14日
    浏览(49)
  • 【基础】【Python网络爬虫】【6.数据持久化】Excel、Json、Csv 数据保存(附大量案例代码)(建议收藏)

    创建数据表 批量数据写入 读取表格数据 案例 - 豆瓣保存 Excel 案例 - 网易新闻Excel保存 数据序列化和反序列化 中文指定 案例 - 豆瓣保存Json 案例 - Json保存 写入csv列表数据 案例 - 豆瓣列表保存Csv 写入csv字典数据 案例 - 豆瓣字典保存csv 读取csv数据 案例 - 网易新闻csv

    2024年02月03日
    浏览(55)
  • redis持久化【RDB+AOF】持久化双雄

    这是redis系列文章之《redis持久化【RDB+AOF】持久化双雄》,上一篇文章【redis基础】redis的十大数据类型_努力努力再努力mlx的博客-CSDN博客 感谢大家的支持~ 目录 RDB 什么是RDB RDB的作用 配置文件关于RDB部分  6vs7 操作步骤 修改配置文件(本案例设置5s修改2次) 修改dump文件的保

    2024年02月08日
    浏览(73)
  • RabbitMQ系列(8)--实现RabbitMQ队列持久化及消息持久化

    概念:在上一章文章中我们演示了消费者宕机的情况下消息没有被消费成功后会重新入队,然后再被消费,但如何保障RabbitMQ服务停掉的情况下,生产者发过来的消息不会丢失,这时候我们为了消息不会丢失就需要将队列和消息都标记为持久化。 1、实现RabbitMQ队列持久化 只需

    2024年02月09日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包