【Python】PySpark 数据计算 ⑤ ( RDD#sortBy方法 - 排序 RDD 中的元素 )

这篇具有很好参考价值的文章主要介绍了【Python】PySpark 数据计算 ⑤ ( RDD#sortBy方法 - 排序 RDD 中的元素 )。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。





一、RDD#sortBy 方法




1、RDD#sortBy 语法简介


RDD#sortBy 方法 用于 按照 指定的 键 对 RDD 中的元素进行排序 , 该方法 接受一个 函数 作为 参数 , 该函数从 RDD 中的每个元素提取 排序键 ;

根据 传入 sortBy 方法 的 函数参数 和 其它参数 , 将 RDD 中的元素按 升序 或 降序 进行排序 , 同时还可以指定 新的 RDD 对象的 分区数 ;


RDD#sortBy 语法 :

sortBy(f: (T) ⇒ U, ascending: Boolean, numPartitions: Int): RDD[T]
  • 参数说明 :
    • f: (T) ⇒ U 参数 : 函数 或 lambda 匿名函数 , 用于 指定 RDD 中的每个元素 的 排序键 ;
    • ascending: Boolean 参数 : 排序的升降设置 , True 生序排序 , False 降序排序 ;
    • numPartitions: Int 参数 : 设置 排序结果 ( 新的 RDD 对象 ) 中的 分区数 ;
      • 当前没有接触到分布式 , 将该参数设置为 1 即可 , 排序完毕后是全局有序的 ;
  • 返回值说明 : 返回一个新的 RDD 对象 , 其中的元素是 按照指定的 排序键 进行排序的结果 ;

2、RDD#sortBy 传入的函数参数分析


RDD#sortBy 传入的函数参数 类型为 :

(T) ⇒ U

T 是泛型 , 表示传入的参数类型可以是任意类型 ;

U 也是泛型 , 表示 函数 返回值 的类型 可以是任意类型 ;

T 类型的参数 和 U 类型的返回值 , 可以是相同的类型 , 也可以是不同的类型 ;





二、代码示例 - RDD#sortBy 示例




1、需求分析


统计 文本文件 word.txt 中出现的每个单词的个数 , 并且为每个单词出现的次数进行排序 ;

Tom Jerry
Tom Jerry Tom
Jack Jerry Jack Tom

【Python】PySpark 数据计算 ⑤ ( RDD#sortBy方法 - 排序 RDD 中的元素 ),Python,python,开发语言,PySpark,Spark,PyCharm,原力计划

读取文件中的内容 , 统计文件中单词的个数并排序 ;

思路 :

  • 读取数据到 RDD 中 ,
  • 然后 按照空格分割开 再展平 , 获取到每个单词 ,
  • 根据上述单词列表 , 生成一个 二元元组 列表 , 列表中每个元素的 键 Key 为单词 , 值 Value 为 数字 1 ,
  • 对上述 二元元组 列表 进行 聚合操作 , 相同的 键 Key 对应的 值 Value 进行相加 ;
  • 将聚合后的结果的 单词出现次数作为 排序键 进行排序 , 按照升序进行排序 ;

2、代码示例


对 RDD 数据进行排序的核心代码如下 :

# 对 rdd4 中的数据进行排序
rdd5 = rdd4.sortBy(lambda element: element[1], ascending=True, numPartitions=1)

要排序的数据如下 :

[('Tom', 4), ('Jack', 2), ('Jerry', 3)]

按照上述二元元素的 第二个 元素 进行排序 , 对应的 lambda 表达式为 :

lambda element: element[1]

ascending=True 表示升序排序 ,

numPartitions=1 表示分区个数为 1 ;


排序后的结果为 :

[('Jack', 2), ('Jerry', 3), ('Tom', 4)]

代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "D:/001_Develop/022_Python/Python39/python.exe"

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)

# 将 文件 转为 RDD 对象
rdd = sparkContext.textFile("word.txt")
print("查看文件内容 : ", rdd.collect())

# 通过 flatMap 展平文件, 先按照 空格 切割每行数据为 字符串 列表
#   然后展平数据解除嵌套
rdd2 = rdd.flatMap(lambda element: element.split(" "))
print("查看文件内容展平效果 : ", rdd2.collect())

# 将 rdd 数据 的 列表中的元素 转为二元元组, 第二个元素设置为 1
rdd3 = rdd2.map(lambda element: (element, 1))
print("转为二元元组效果 : ", rdd3.collect())

# 应用 reduceByKey 操作,
#   将同一个 Key 下的 Value 相加, 也就是统计 键 Key 的个数
rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
print("统计单词 : ", rdd4.collect())

# 对 rdd4 中的数据进行排序
rdd5 = rdd4.sortBy(lambda element: element[1], ascending=True, numPartitions=1)
print("最终统计单词并排序 : ", rdd4.collect())

# 停止 PySpark 程序
sparkContext.stop()



3、执行结果


执行结果 :

D:\001_Develop\022_Python\Python39\python.exe D:/002_Project/011_Python/HelloPython/Client.py
23/08/04 10:49:06 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: Could not locate Hadoop executable: D:\001_Develop\052_Hadoop\hadoop-3.3.4\bin\winutils.exe -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
PySpark 版本号 :  3.4.1
查看文件内容 :  ['Tom Jerry', 'Tom Jerry Tom', 'Jack Jerry Jack Tom']
查看文件内容展平效果 :  ['Tom', 'Jerry', 'Tom', 'Jerry', 'Tom', 'Jack', 'Jerry', 'Jack', 'Tom']
转为二元元组效果 :  [('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jack', 1), ('Jerry', 1), ('Jack', 1), ('Tom', 1)]
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
统计单词 :  [('Tom', 4), ('Jack', 2), ('Jerry', 3)]
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
最终统计单词并排序 :  [('Jack', 2), ('Jerry', 3), ('Tom', 4)]

Process finished with exit code 0

【Python】PySpark 数据计算 ⑤ ( RDD#sortBy方法 - 排序 RDD 中的元素 ),Python,python,开发语言,PySpark,Spark,PyCharm,原力计划文章来源地址https://www.toymoban.com/news/detail-628164.html

到了这里,关于【Python】PySpark 数据计算 ⑤ ( RDD#sortBy方法 - 排序 RDD 中的元素 )的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Python大数据之PySpark(五)RDD详解

    为什么需要RDD? 首先Spark的提出为了解决MR的计算问题,诸如说迭代式计算,比如:机器学习或图计算 希望能够提出一套基于内存的迭代式数据结构,引入RDD弹性分布式数据集,如下图 为什么RDD是可以容错? RDD依靠于依赖关系dependency relationship reduceByKeyRDD-----mapRDD-----flatMapRD

    2024年02月06日
    浏览(45)
  • Python大数据之PySpark(六)RDD的操作

    函数分类 *Transformation操作只是建立计算关系,而Action 操作才是实际的执行者* 。 Transformation算子 转换算子 操作之间不算的转换,如果想看到结果通过action算子触发 Action算子 行动算子 触发Job的执行,能够看到结果信息 Transformation函数 值类型valueType map flatMap filter mapValue 双值

    2024年02月04日
    浏览(43)
  • 大数据之PySpark的RDD介绍

    之前的文章主要介绍Spark基础知识,例如集群角色、Spark集群运行流程等,接下来会进一步讨论Spark相对核心的知识,让我们拭目以待,同时也期待各位的精彩留言! RDD称为弹性分布式数据集,是Spark中最基本的数据抽象,其为一个不可变、可分区、元素可并行计算的集合;

    2024年02月03日
    浏览(33)
  • PySpark大数据教程:深入学习SparkCore的RDD持久化和Checkpoint

    本教程详细介绍了PySpark中SparkCore的RDD持久化和Checkpoint功能,重点讲解了缓存和检查点的作用、如何进行缓存、如何设置检查点目录以及它们之间的区别。还提供了join操作的示例和Spark算子补充知识。

    2024年02月08日
    浏览(42)
  • PySpark基础 —— RDD

    1.查看Spark环境信息 2.创建RDD 创建RDD主要有两种方式 第一种:textFile方法 第二种:parallelize方法  2.1.textFile方法 本地文件系统加载数据  2.2.parallelize方法  2.3.wholeTextFiles方法 Action动作算子/行动操作 1.collect 2.take  3.first 4.top 5.takeOrdered 6.takeSample 7.count 8.sum 9.histogram 10.fold 11.re

    2024年02月07日
    浏览(39)
  • PySpark RDD的缓存和Checkpoint

    RDD之间进行相互迭代计算(Transformation的转换),当执行开启后,新RDD的生成,代表老RDD的消息,RDD的数据只在处理的过程中存在,一旦处理完成,就不见了,所以RDD的数据是过程数据。 RDD数据是过程数据的这个特性可以最大化的利用资源,老旧的RDD没用了就会从内存中清理

    2023年04月09日
    浏览(78)
  • PySpark之RDD的持久化

    当RDD被重复使用,或者计算该RDD比较容易出错,而且需要消耗比较多的资源和时间的时候,我们就可以将该RDD缓存起来。 主要作用: 提升Spark程序的计算效率 注意事项: RDD的缓存可以存储在内存或者是磁盘上,甚至可以存储在Executor进程的堆外内存中。主要是放在内存中,因此

    2024年01月23日
    浏览(42)
  • 10-用PySpark建立第一个Spark RDD

    PySpark实战笔记系列第一篇 Apache Spark的核心组件的基础是RDD。所谓的RDD,即 弹性分布式数据集(Resiliennt Distributed Datasets) ,基于RDD可以实现Apache Spark各个组件在多个计算机组成的集群中进行无缝集成,从而能够在一个应用程序中完成海量数据处理。 只读不能修改 :只能通过

    2024年04月08日
    浏览(47)
  • PySpark数据分析基础:PySpark Pandas创建、转换、查询、转置、排序操作详解

    目录 前言 一、Pandas数据结构 1.Series 2.DataFrame  3.Time-Series  4.Panel 5.Panel4D 6.PanelND 二、Pyspark实例创建 1.引入库 2.转换实现 pyspark pandas series创建 pyspark pandas dataframe创建 from_pandas转换  Spark DataFrame转换  三、PySpark Pandas操作 1.读取行列索引 2.内容转换为数组 3.DataFrame统计描述 4.转

    2024年02月02日
    浏览(57)
  • python实现对excel表中的某列数据进行排序

    如下需要对webCms中的B列数据进行升序排序, 且不能影响到其他列、工作表中的数据和格式 。 排序后

    2024年02月10日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包