Python大数据之PySpark(五)RDD详解

这篇具有很好参考价值的文章主要介绍了Python大数据之PySpark(五)RDD详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RDD详解

为什么需要RDD?

  • 首先Spark的提出为了解决MR的计算问题,诸如说迭代式计算,比如:机器学习或图计算
  • 希望能够提出一套基于内存的迭代式数据结构,引入RDD弹性分布式数据集,如下图
    Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划
  • 为什么RDD是可以容错?
  • RDD依靠于依赖关系dependency relationship
  • reduceByKeyRDD-----mapRDD-----flatMapRDD
  • 另外缓存,广播变量,检查点机制等很多机制解决容错问题
  • 为什么RDD可以执行内存中计算?
  • RDD本身设计就是基于内存中迭代式计算
  • RDD是抽象的数据结构
    Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划

Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划

什么是RDD?

  • RDD弹性分布式数据集
  • 弹性:可以基于内存存储也可以在磁盘中存储
  • 分布式:分布式存储(分区)和分布式计算
  • 数据集:数据的集合

RDD 定义

  • RDD是不可变,可分区,可并行计算的集合
  • 在pycharm中按两次shift可以查看源码,rdd.py
  • Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划
  • RDD提供了五大属性
  • Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划

RDD的5大特性

  • RDD五大特性:
  • 1-RDD是有一些列分区构成的,a list of partitions
  • 2-计算函数
  • 3-依赖关系,reduceByKey依赖于map依赖于flatMap
  • 4-(可选项)key-value的分区,对于key-value类型的数据默认分区是Hash分区,可以变更range分区等
  • 5-(可选项)位置优先性,移动计算不要移动存储
  • Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划
  • 1-Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划
  • 2-
  • Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划
  • 3-
  • Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划
  • 4-
  • Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划
  • 5-最终图解
  • Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划
  • RDD五大属性总结
  • 1-分区列表
  • 2-计算函数
  • 3-依赖关系
  • 4-key-value的分区器
  • 5-位置优先性

RDD特点—不需要记忆

  • 分区
  • 只读
  • 依赖
  • 缓存
  • checkpoint

WordCount中RDD

Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划

RDD的创建

PySpark中RDD的创建两种方式

并行化方式创建RDD

rdd1=sc.paralleise([1,2,3,4,5])

通过文件创建RDD

rdd2=sc.textFile(“hdfs://node1:9820/pydata”)

代码:

# -*- coding: utf-8 -*-
# Program function:创建RDD的两种方式
'''
第一种方式:使用并行化集合,本质上就是将本地集合作为参数传递到sc.pa
第二种方式:使用sc.textFile方式读取外部文件系统,包括hdfs和本地文件系统
1-准备SparkContext的入口,申请资源
2-使用rdd创建的第一种方法
3-使用rdd创建的第二种方法
4-关闭SparkContext
'''
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
print("=========createRDD==============")
# 1 - 准备SparkContext的入口,申请资源
conf = SparkConf().setAppName("createRDD").setMaster("local[5]")
sc = SparkContext(conf=conf)
# 2 - 使用rdd创建的第一种方法
collection_rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
print(collection_rdd.collect())  # [1, 2, 3, 4, 5, 6]
# 2-1 如何使用api获取rdd的分区个数
print("rdd numpartitions:{}".format(collection_rdd.getNumPartitions()))  # 5
# 3 - 使用rdd创建的第二种方法
file_rdd = sc.textFile("/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/words.txt")
print(file_rdd.collect())
print("rdd numpartitions:{}".format(file_rdd.getNumPartitions()))  # 2
# 4 - 关闭SparkContext
sc.stop()

小文件读取

通过外部数据创建RDD

  • Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划

  • http://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis

  • Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划

# -*- coding: utf-8 -*-

# Program function:创建RDD的两种方式

'''
1-准备SparkContext的入口,申请资源
2-读取外部的文件使用sc.textFile和sc.wholeTextFile方式
3-关闭SparkContext
'''
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
 print("=========createRDD==============")

 # 1 - 准备SparkContext的入口,申请资源

 conf = SparkConf().setAppName("createRDD").setMaster("local[5]")
 sc = SparkContext(conf=conf)

 # 2 - 读取外部的文件使用sc.textFile和sc.wholeTextFile方式\

 file_rdd = sc.textFile("/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/ratings100")
 wholefile_rdd = sc.wholeTextFiles("/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/ratings100")
 print("file_rdd numpartitions:{}".format(file_rdd.getNumPartitions()))#file_rdd numpartitions:100
 print("wholefile_rdd numpartitions:{}".format(wholefile_rdd.getNumPartitions()))#wholefile_rdd numpartitions:2
 print(wholefile_rdd.take(1))# 路径,具体的值

 # 如何获取wholefile_rdd得到具体的值

 print(type(wholefile_rdd))#<class 'pyspark.rdd.RDD'>
 print(wholefile_rdd.map(lambda x: x[1]).take(1))

 # 3 - 关闭SparkContext

 sc.stop()
* 如何查看rdd的分区?getNumPartitions()

扩展阅读:RDD分区数如何确定

  • Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划

  • Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划

# -*- coding: utf-8 -*-

# Program function:创建RDD的两种方式

'''
第一种方式:使用并行化集合,本质上就是将本地集合作为参数传递到sc.pa
第二种方式:使用sc.textFile方式读取外部文件系统,包括hdfs和本地文件系统
1-准备SparkContext的入口,申请资源
2-使用rdd创建的第一种方法
3-使用rdd创建的第二种方法
4-关闭SparkContext
'''
from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
print("=========createRDD==============")

# 1 - 准备SparkContext的入口,申请资源

conf = SparkConf().setAppName("createRDD").setMaster("local[*]")

# conf.set("spark.default.parallelism",10)#重写默认的并行度,10

sc = SparkContext(conf=conf)

# 2 - 使用rdd创建的第一种方法,

collection_rdd = sc.parallelize([1, 2, 3, 4, 5, 6],5)

# 2-1 如何使用api获取rdd的分区个数

print("rdd numpartitions:{}".format(collection_rdd.getNumPartitions()))  #2

# 总结:sparkconf设置的local[5](默认的并行度),sc.parallesise直接使用分区个数是5

# 如果设置spark.default.parallelism,默认并行度,sc.parallesise直接使用分区个数是10

# 优先级最高的是函数内部的第二个参数 3

# 2-2 如何打印每个分区的内容

print("per partition content:",collection_rdd.glom().collect())

# 3 - 使用rdd创建的第二种方法

# minPartitions最小的分区个数,最终有多少的分区个数,以实际打印为主

file_rdd = sc.textFile("/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/words.txt",10)
print("rdd numpartitions:{}".format(file_rdd.getNumPartitions()))
print(" file_rdd per partition content:",file_rdd.glom().collect())

# 如果sc.textFile读取的是文件夹中多个文件,这里的分区个数是以文件个数为主的,自己写的分区不起作用

# file_rdd = sc.textFile("/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/ratings100", 3)

# 4 - 关闭SparkContext

sc.stop()
* 首先明确,分区的个数,这里一切以看到的为主,特别在sc.textFile
  • Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划
  • Python大数据之PySpark(五)RDD详解,# PySpark,python,大数据,wpf,原力计划
  • 重要两个API

  • 分区个数getNumberPartitions

  • 分区内元素glom().collect()

后记

📢博客主页:https://manor.blog.csdn.net

📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 Maynor 原创,首发于 CSDN博客🙉
📢感觉这辈子,最深情绵长的注视,都给了手机⭐
📢专栏持续更新,欢迎订阅:https://blog.csdn.net/xianyu120/category_12453356.html文章来源地址https://www.toymoban.com/news/detail-735076.html

到了这里,关于Python大数据之PySpark(五)RDD详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Python】PySpark 数据计算 ③ ( RDD#reduceByKey 函数概念 | RDD#reduceByKey 方法工作流程 | RDD#reduceByKey 语法 | 代码示例 )

    RDD#reduceByKey 方法 是 PySpark 中 提供的计算方法 , 首先 , 对 键值对 KV 类型 RDD 对象 数据 中 相同 键 key 对应的 值 value 进行分组 , 然后 , 按照 开发者 提供的 算子 ( 逻辑 / 函数 ) 进行 聚合操作 ; 上面提到的 键值对 KV 型 的数据 , 指的是 二元元组 , 也就是 RDD 对象中存储的数据是

    2024年02月14日
    浏览(54)
  • 【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 )

    在 PySpark 中 RDD 对象 提供了一种 数据计算方法 RDD#map 方法 ; 该 RDD#map 函数 可以对 RDD 数据中的每个元素应用一个函数 , 该 被应用的函数 , 可以将每个元素转换为另一种类型 , 也可以针对 RDD 数据的 原始元素进行 指定操作 ; 计算完毕后 , 会返回一个新的 RDD 对象 ; map 方法 , 又

    2024年02月14日
    浏览(55)
  • Python大数据处理利器之Pyspark详解

    在现代信息时代,数据是最宝贵的财富之一,如何处理和分析这些数据成为了关键。Python在数据处理方面表现得尤为突出。而 pyspark 作为一个强大的分布式计算框架,为大数据处理提供了一种高效的解决方案。本文将详细介绍pyspark的基本概念和使用方法,并给出实际案例。

    2024年02月10日
    浏览(48)
  • 【分享】原力计划的初衷 【探讨】新的一年,你对原力计划有哪些期待?

    哈喽,大家好,我是几何心凉,这是一份全新的专栏,唯一得倒CSDN王总的授权,来对于我们每周四的绿萝时间 ——【直达CSDN】直播内容进行总结概括,让大家能够省去看直播回放的时间也能够了解直播内容和官方的最新动态,希望大家给予凉哥最大的支持,如有未授权用户

    2024年02月16日
    浏览(49)
  • 大数据之PySpark的RDD介绍

    之前的文章主要介绍Spark基础知识,例如集群角色、Spark集群运行流程等,接下来会进一步讨论Spark相对核心的知识,让我们拭目以待,同时也期待各位的精彩留言! RDD称为弹性分布式数据集,是Spark中最基本的数据抽象,其为一个不可变、可分区、元素可并行计算的集合;

    2024年02月03日
    浏览(33)
  • PySpark大数据教程:深入学习SparkCore的RDD持久化和Checkpoint

    本教程详细介绍了PySpark中SparkCore的RDD持久化和Checkpoint功能,重点讲解了缓存和检查点的作用、如何进行缓存、如何设置检查点目录以及它们之间的区别。还提供了join操作的示例和Spark算子补充知识。

    2024年02月08日
    浏览(42)
  • Python大数据之PySpark(二)PySpark安装

    1-明确PyPi库,Python Package Index 所有的Python包都从这里下载,包括pyspark 2-为什么PySpark逐渐成为主流? http://spark.apache.org/releases/spark-release-3-0-0.html Python is now the most widely used language on Spark. PySpark has more than 5 million monthly downloads on PyPI, the Python Package Index. 记住如果安装特定的版本

    2024年02月04日
    浏览(42)
  • 【Python】PySpark 数据处理 ② ( 安装 PySpark | PySpark 数据处理步骤 | 构建 PySpark 执行环境入口对象 )

    执行 Windows + R , 运行 cmd 命令行提示符 , 在命令行提示符终端中 , 执行 命令 , 安装 PySpark , 安装过程中 , 需要下载 310 M 的安装包 , 耐心等待 ; 安装完毕 : 命令行输出 : 如果使用 官方的源 下载安装 PySpark 的速度太慢 , 可以使用 国内的 镜像网站 https://pypi.tuna.tsinghua.edu.cn/simple

    2024年02月06日
    浏览(43)
  • PySpark基础 —— RDD

    1.查看Spark环境信息 2.创建RDD 创建RDD主要有两种方式 第一种:textFile方法 第二种:parallelize方法  2.1.textFile方法 本地文件系统加载数据  2.2.parallelize方法  2.3.wholeTextFiles方法 Action动作算子/行动操作 1.collect 2.take  3.first 4.top 5.takeOrdered 6.takeSample 7.count 8.sum 9.histogram 10.fold 11.re

    2024年02月07日
    浏览(39)
  • Python大数据之PySpark

    Apache Spark是一种用于大规模数据处理的多语言分布式引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习 Spark官网:https://spark.apache.org/ 按照官网描述,Spark关键特征包括: 批/流处理 Spark支持您使用喜欢的语言:Python、SQL、Scala、Java或R,统一批量和实时流处

    2024年02月08日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包