【Python】PySpark 数据输入 ① ( RDD 简介 | RDD 中的数据存储与计算 | Python 容器数据转 RDD 对象 | 文件文件转 RDD 对象 )

这篇具有很好参考价值的文章主要介绍了【Python】PySpark 数据输入 ① ( RDD 简介 | RDD 中的数据存储与计算 | Python 容器数据转 RDD 对象 | 文件文件转 RDD 对象 )。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。





一、RDD 简介




1、RDD 概念


RDD 英文全称为 " Resilient Distributed Datasets " , 对应中文名称 是 " 弹性分布式数据集 " ;

Spark 是用于 处理大规模数据 的 分布式计算引擎 ;

RDD 是 Spark 的基本数据单元 , 该 数据结构 是 只读的 , 不可写入更改 ;

RDD 对象 是 通过 SparkContext 执行环境入口对象 创建的 ;

SparkContext 读取数据时 , 通过将数据拆分为多个分区 , 以便在 服务器集群 中进行并行处理 ;

每个 RDD 数据分区 都可以在 服务器集群 中的 不同服务器节点 上 并行执行 计算任务 , 可以提高数据处理速度 ;


2、RDD 中的数据存储与计算


PySpark 中 处理的 所有的数据 ,

  • 数据存储 : PySpark 中的数据都是以 RDD 对象的形式承载的 , 数据都存储在 RDD 对象中 ;
  • 计算方法 : 大数据处理过程中使用的计算方法 , 也都定义在了 RDD 对象中 ;
  • 计算结果 : 使用 RDD 中的计算方法对 RDD 中的数据进行计算处理 , 获得的结果数据也是封装在 RDD 对象中的 ;

PySpark 中 , 通过 SparkContext 执行环境入口对象 读取 基础数据到 RDD 对象中 , 调用 RDD 对象中的计算方法 , 对 RDD 对象中的数据进行处理 , 得到新的 RDD 对象 其中有 上一次的计算结果 , 再次对新的 RDD 对象中的数据进行处理 , 执行上述若干次计算 , 会 得到一个最终的 RDD 对象 , 其中就是数据处理结果 , 将其保存到文件中 , 或者写入到数据库中 ;





二、Python 容器数据转 RDD 对象




1、RDD 转换


在 Python 中 , 使用 PySpark 库中的 SparkContext # parallelize 方法 , 可以将 Python 容器数据 转换为 PySpark 的 RDD 对象 ;


PySpark 支持下面几种 Python 容器变量 转为 RDD 对象 :

  • 列表 list : 可重复 , 有序元素 ;
  • 元组 tuple : 可重复 , 有序元素 , 可读不可写 , 不可更改 ;
  • 集合 set : 不可重复 , 无序元素 ;
  • 字典 dict : 键值对集合 , 键 Key 不可重复 ;
  • 字符串 str : 字符串 ;

2、转换 RDD 对象相关 API


调用 SparkContext # parallelize 方法 可以将 Python 容器数据转为 RDD 对象 ;

# 将数据转换为 RDD 对象
rdd = sparkContext.parallelize(data)

调用 RDD # getNumPartitions 方法 , 可以获取 RDD 的分区数 ;

print("RDD 分区数量: ", rdd.getNumPartitions())

调用 RDD # collect 方法 , 可以查看 RDD 数据 ;

print("RDD 元素: ", rdd.collect())

完整代码示例 :

# 创建一个包含列表的数据
data = [1, 2, 3, 4, 5]

# 将数据转换为 RDD 对象
rdd = sparkContext.parallelize(data)

# 打印 RDD 的分区数和元素
print("RDD 分区数量: ", rdd.getNumPartitions())
print("RDD 元素: ", rdd.collect())

3、代码示例 - Python 容器转 RDD 对象 ( 列表 )


在下面的代码中 ,

首先 , 创建 SparkConf 对象 , 并将 PySpark 任务 命名为 " hello_spark " , 并设置为本地单机运行 ;

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

然后 , 创建了一个 SparkContext 对象 , 传入 SparkConf 实例对象作为参数 ;

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

再后 , 创建一个包含整数的简单列表 ;

# 创建一个包含列表的数据
data = [1, 2, 3, 4, 5]

再后 , 并使用 parallelize() 方法将其转换为 RDD 对象 ;

# 将数据转换为 RDD 对象
rdd = sparkContext.parallelize(data)

最后 , 我们打印出 RDD 的分区数和所有元素 ;

# 打印 RDD 的分区数和元素
print("RDD 分区数量: ", rdd.getNumPartitions())
print("RDD 元素: ", rdd.collect())

代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)

# 创建一个包含列表的数据
data = [1, 2, 3, 4, 5]

# 将数据转换为 RDD 对象
rdd = sparkContext.parallelize(data)

# 打印 RDD 的分区数和元素
print("RDD 分区数量: ", rdd.getNumPartitions())
print("RDD 元素: ", rdd.collect())

# 停止 PySpark 程序
sparkContext.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/07/30 20:11:35 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/30 20:11:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本号 :  3.4.1
RDD 分区数量:  12
RDD 元素:  [1, 2, 3, 4, 5]

Process finished with exit code 0

【Python】PySpark 数据输入 ① ( RDD 简介 | RDD 中的数据存储与计算 | Python 容器数据转 RDD 对象 | 文件文件转 RDD 对象 ),Python,python,开发语言,Spark,PySpark,PyCharm,原力计划


4、代码示例 - Python 容器转 RDD 对象 ( 列表 / 元组 / 集合 / 字典 / 字符串 )


除了 列表 list 之外 , 还可以将其他容器数据类型 转换为 RDD 对象 , 如 : 元组 / 集合 / 字典 / 字符串 ;


调用 RDD # collect 方法 , 打印出来的 RDD 数据形式 :

  • 列表 / 元组 / 集合 转换后的 RDD 数据打印出来都是列表 ;
data1 = [1, 2, 3, 4, 5]
data2 = (1, 2, 3, 4, 5)
data3 = {1, 2, 3, 4, 5}

# 输出结果
rdd1 分区数量和元素:  12  ,  [1, 2, 3, 4, 5]
rdd2 分区数量和元素:  12  ,  [1, 2, 3, 4, 5]
rdd3 分区数量和元素:  12  ,  [1, 2, 3, 4, 5]
  • 字典 转换后的 RDD 数据打印出来只有 键 Key , 没有值 ;
data4 = {"Tom": 18, "Jerry": 12}

# 输出结果
rdd4 分区数量和元素:  12  ,  ['Tom', 'Jerry']
  • 字符串 转换后的 RDD 数据打印出来 是 列表 , 元素是单个字符 ;
data5 = "Tom"

# 输出结果
rdd5 分区数量和元素:  12  ,  ['T', 'o', 'm']

代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)

# 创建一个包含列表的数据
data1 = [1, 2, 3, 4, 5]
data2 = (1, 2, 3, 4, 5)
data3 = {1, 2, 3, 4, 5}
data4 = {"Tom": 18, "Jerry": 12}
data5 = "Tom"

# 将数据转换为 RDD 对象
rdd1 = sparkContext.parallelize(data1)
rdd2 = sparkContext.parallelize(data2)
rdd3 = sparkContext.parallelize(data3)
rdd4 = sparkContext.parallelize(data4)
rdd5 = sparkContext.parallelize(data5)

# 打印 RDD 的元素
print("rdd1 分区数量和元素: ", rdd1.getNumPartitions(), " , ", rdd1.collect())
print("rdd2 分区数量和元素: ", rdd2.getNumPartitions(), " , ", rdd2.collect())
print("rdd3 分区数量和元素: ", rdd3.getNumPartitions(), " , ", rdd3.collect())
print("rdd4 分区数量和元素: ", rdd4.getNumPartitions(), " , ", rdd4.collect())
print("rdd5 分区数量和元素: ", rdd5.getNumPartitions(), " , ", rdd5.collect())

# 停止 PySpark 程序
sparkContext.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/07/30 20:37:03 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/30 20:37:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本号 :  3.4.1
rdd1 分区数量和元素:  12  ,  [1, 2, 3, 4, 5]
rdd2 分区数量和元素:  12  ,  [1, 2, 3, 4, 5]
rdd3 分区数量和元素:  12  ,  [1, 2, 3, 4, 5]
rdd4 分区数量和元素:  12  ,  ['Tom', 'Jerry']
rdd5 分区数量和元素:  12  ,  ['T', 'o', 'm']

Process finished with exit code 0

【Python】PySpark 数据输入 ① ( RDD 简介 | RDD 中的数据存储与计算 | Python 容器数据转 RDD 对象 | 文件文件转 RDD 对象 ),Python,python,开发语言,Spark,PySpark,PyCharm,原力计划





三、文件文件转 RDD 对象



调用 SparkContext#textFile 方法 , 传入 文件的 绝对路径 或 相对路径 , 可以将 文本文件 中的数据 读取并转为 RDD 数据 ;


文本文件数据 :

Tom
18
Jerry
12

【Python】PySpark 数据输入 ① ( RDD 简介 | RDD 中的数据存储与计算 | Python 容器数据转 RDD 对象 | 文件文件转 RDD 对象 ),Python,python,开发语言,Spark,PySpark,PyCharm,原力计划

代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)

# 读取文件内容到 RDD 中
rdd = sparkContext.textFile("data.txt")

# 打印 RDD 的元素
print("rdd1 分区数量和元素: ", rdd.getNumPartitions(), " , ", rdd.collect())

# 停止 PySpark 程序
sparkContext.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/07/30 20:43:21 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/30 20:43:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本号 :  3.4.1
rdd1 分区数量和元素:  2  ,  ['Tom', '18', 'Jerry', '12']

Process finished with exit code 0

【Python】PySpark 数据输入 ① ( RDD 简介 | RDD 中的数据存储与计算 | Python 容器数据转 RDD 对象 | 文件文件转 RDD 对象 ),Python,python,开发语言,Spark,PySpark,PyCharm,原力计划文章来源地址https://www.toymoban.com/news/detail-621775.html

到了这里,关于【Python】PySpark 数据输入 ① ( RDD 简介 | RDD 中的数据存储与计算 | Python 容器数据转 RDD 对象 | 文件文件转 RDD 对象 )的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Python大数据之PySpark(五)RDD详解

    为什么需要RDD? 首先Spark的提出为了解决MR的计算问题,诸如说迭代式计算,比如:机器学习或图计算 希望能够提出一套基于内存的迭代式数据结构,引入RDD弹性分布式数据集,如下图 为什么RDD是可以容错? RDD依靠于依赖关系dependency relationship reduceByKeyRDD-----mapRDD-----flatMapRD

    2024年02月06日
    浏览(36)
  • Python大数据之PySpark(六)RDD的操作

    函数分类 *Transformation操作只是建立计算关系,而Action 操作才是实际的执行者* 。 Transformation算子 转换算子 操作之间不算的转换,如果想看到结果通过action算子触发 Action算子 行动算子 触发Job的执行,能够看到结果信息 Transformation函数 值类型valueType map flatMap filter mapValue 双值

    2024年02月04日
    浏览(32)
  • 大数据之PySpark的RDD介绍

    之前的文章主要介绍Spark基础知识,例如集群角色、Spark集群运行流程等,接下来会进一步讨论Spark相对核心的知识,让我们拭目以待,同时也期待各位的精彩留言! RDD称为弹性分布式数据集,是Spark中最基本的数据抽象,其为一个不可变、可分区、元素可并行计算的集合;

    2024年02月03日
    浏览(24)
  • PySpark大数据教程:深入学习SparkCore的RDD持久化和Checkpoint

    本教程详细介绍了PySpark中SparkCore的RDD持久化和Checkpoint功能,重点讲解了缓存和检查点的作用、如何进行缓存、如何设置检查点目录以及它们之间的区别。还提供了join操作的示例和Spark算子补充知识。

    2024年02月08日
    浏览(33)
  • PySpark基础 —— RDD

    1.查看Spark环境信息 2.创建RDD 创建RDD主要有两种方式 第一种:textFile方法 第二种:parallelize方法  2.1.textFile方法 本地文件系统加载数据  2.2.parallelize方法  2.3.wholeTextFiles方法 Action动作算子/行动操作 1.collect 2.take  3.first 4.top 5.takeOrdered 6.takeSample 7.count 8.sum 9.histogram 10.fold 11.re

    2024年02月07日
    浏览(32)
  • PySpark RDD的缓存和Checkpoint

    RDD之间进行相互迭代计算(Transformation的转换),当执行开启后,新RDD的生成,代表老RDD的消息,RDD的数据只在处理的过程中存在,一旦处理完成,就不见了,所以RDD的数据是过程数据。 RDD数据是过程数据的这个特性可以最大化的利用资源,老旧的RDD没用了就会从内存中清理

    2023年04月09日
    浏览(69)
  • PySpark之RDD的持久化

    当RDD被重复使用,或者计算该RDD比较容易出错,而且需要消耗比较多的资源和时间的时候,我们就可以将该RDD缓存起来。 主要作用: 提升Spark程序的计算效率 注意事项: RDD的缓存可以存储在内存或者是磁盘上,甚至可以存储在Executor进程的堆外内存中。主要是放在内存中,因此

    2024年01月23日
    浏览(33)
  • 10-用PySpark建立第一个Spark RDD

    PySpark实战笔记系列第一篇 Apache Spark的核心组件的基础是RDD。所谓的RDD,即 弹性分布式数据集(Resiliennt Distributed Datasets) ,基于RDD可以实现Apache Spark各个组件在多个计算机组成的集群中进行无缝集成,从而能够在一个应用程序中完成海量数据处理。 只读不能修改 :只能通过

    2024年04月08日
    浏览(36)
  • Python:实现文件读取与输入,数据存储与读取的常用命令

    文本文件可用于存储大量的数据,里面的数据对于用户而言十分重要,因此,本文就如何利用Python实现文本内容的读取与输入,数据存储与读取进行介绍。 一、读取文件中的数据: 首先需要找到所需文件的路径:例如我在桌面创建了一个文本文件,它的路径为 利用函数 op

    2023年04月08日
    浏览(39)
  • 【云原生技术】云计算中的数据库数据传输服务简介

    云计算中的数据库数据传输服务是指用于在不同数据库环境之间迁移和同步数据的服务。这些服务通常由云服务提供商提供,用于帮助用户将他们的数据从本地数据库迁移到云数据库,或者在不同的云数据库之间迁移数据。这些服务关键在于确保数据迁移的安全性、高效性和

    2024年01月23日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包