【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 )

这篇具有很好参考价值的文章主要介绍了【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 )。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。





一、RDD#map 方法




1、RDD#map 方法引入


在 PySpark 中 RDD 对象 提供了一种 数据计算方法 RDD#map 方法 ;

该 RDD#map 函数 可以对 RDD 数据中的每个元素应用一个函数 , 该 被应用的函数 ,

  • 可以将每个元素转换为另一种类型 ,
  • 也可以针对 RDD 数据的 原始元素进行 指定操作 ;

计算完毕后 , 会返回一个新的 RDD 对象 ;


2、RDD#map 语法


map 方法 , 又称为 map 算子 , 可以将 RDD 中的数据元素 逐个进行处理 , 处理的逻辑 需要用外部 通过 参数传入 map 函数 ;


RDD#map 语法 :

rdd.map(fun)

传入的 fun 是一个函数 , 其函数类型为 :

(T) -> U

上述 函数 类型 前面的 小括号 及其中的内容 , 表示 函数 的参数类型 ,

  • () 表示不传入参数 ;
  • (T) 表示传入 1 个参数 ;

同时 T 类型是 泛型 , 表示任意类型 , 也就是说 该函数的 参数 可以是任意类型的 ;


上述 函数 类型 右箭头 后面的 U , -> U 表示的是 函数 返回值类型 ,

  • (T) -> U 表示 参数 类型为 T , 返回值类型为 U , T 和 U 类型都是任意类型 , 可以是一个类型 , 也可以是不同的类型 ;
  • (T) -> T 函数类型中 , T 可以是任意类型 , 但是如果确定了参数 , 那么返回值必须也是相同的类型 ;

U 类型也是 泛型 , 表示任意类型 , 也就是说 该函数的 参数 可以是任意类型的 ;


3、RDD#map 用法


RDD#map 方法 , 接收一个 函数 作为参数 , 计算时 , 该 函数参数 会被应用于 RDD 数据中的每个元素 ;

下面的 代码 , 传入一个 lambda 匿名函数 , 将 RDD 对象中的元素都乘以 10 ;

# 将 RDD 对象中的元素都乘以 10
rdd.map(lambda x: x * 10)  

4、代码示例 - RDD#map 数值计算 ( 传入普通函数 )


在下面的代码中 ,

首先 , 创建了一个包含整数的 RDD ,

# 创建一个包含整数的 RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])

然后 , 使用 map() 方法将每个元素乘以 10 ;

# 为每个元素执行的函数
def func(element):
    return element * 10


# 应用 map 操作,将每个元素乘以 10
rdd2 = rdd.map(func)

最后 , 打印新的 RDD 中的内容 ;

# 打印新的 RDD 中的内容
print(rdd2.collect())

代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "Y:/002_WorkSpace/PycharmProjects/pythonProject/venv/Scripts/python.exe"

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)

# 创建一个包含整数的 RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])


# 为每个元素执行的函数
def func(element):
    return element * 10


# 应用 map 操作,将每个元素乘以 10
rdd2 = rdd.map(func)

# 打印新的 RDD 中的内容
print(rdd2.collect())

# 停止 PySpark 程序
sparkContext.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/07/30 21:39:59 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/30 21:39:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本号 :  3.4.1
[10, 20, 30, 40, 50]

Process finished with exit code 0

【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 ),Python,python,PyCharm,PySpark,Spark,map,原力计划


5、代码示例 - RDD#map 数值计算 ( 传入 lambda 匿名函数 )


在下面的代码中 ,

首先 , 创建了一个包含整数的 RDD ,

# 创建一个包含整数的 RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])

然后 , 使用 map() 方法将每个元素乘以 10 , 这里传入了 lambda 函数作为参数 , 该函数接受一个整数参数 element , 并返回 element * 10 ;

# 应用 map 操作,将每个元素乘以 10
rdd2 = rdd.map(lambda element: element * 10)

最后 , 打印新的 RDD 中的内容 ;

# 打印新的 RDD 中的内容
print(rdd2.collect())

代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "Y:/002_WorkSpace/PycharmProjects/pythonProject/venv/Scripts/python.exe"

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)

# 创建一个包含整数的 RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])

# 应用 map 操作,将每个元素乘以 10
rdd2 = rdd.map(lambda element: element * 10)

# 打印新的 RDD 中的内容
print(rdd2.collect())

# 停止 PySpark 程序
sparkContext.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/07/30 21:46:53 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/30 21:46:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本号 :  3.4.1
[10, 20, 30, 40, 50]

Process finished with exit code 0

【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 ),Python,python,PyCharm,PySpark,Spark,map,原力计划


6、代码示例 - RDD#map 数值计算 ( 链式调用 )


在下面的代码中 , 先对 RDD 对象中的每个元素数据都乘以 10 , 然后再对计算后的数据每个元素加上 5 , 最后对最新的计算数据每个元素除以 2 , 整个过程通过函数式编程 , 链式调用完成 ;

核心代码如下 :

# 创建一个包含整数的 RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])

# 应用 map 操作,将每个元素乘以 10
rdd2 = rdd.map(lambda element: element * 10)\
    .map(lambda element: element + 5)\
    .map(lambda element: element / 2)

# 打印新的 RDD 中的内容
print(rdd2.collect())

代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "Y:/002_WorkSpace/PycharmProjects/pythonProject/venv/Scripts/python.exe"

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)

# 创建一个包含整数的 RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])

# 应用 map 操作,将每个元素乘以 10
rdd2 = rdd.map(lambda element: element * 10)\
    .map(lambda element: element + 5)\
    .map(lambda element: element / 2)

# 打印新的 RDD 中的内容
print(rdd2.collect())

# 停止 PySpark 程序
sparkContext.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/07/30 21:50:29 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/30 21:50:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本号 :  3.4.1
[7.5, 12.5, 17.5, 22.5, 27.5]

Process finished with exit code 0

【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 ),Python,python,PyCharm,PySpark,Spark,map,原力计划文章来源地址https://www.toymoban.com/news/detail-631215.html

到了这里,关于【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 )的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Python大数据之PySpark(五)RDD详解

    为什么需要RDD? 首先Spark的提出为了解决MR的计算问题,诸如说迭代式计算,比如:机器学习或图计算 希望能够提出一套基于内存的迭代式数据结构,引入RDD弹性分布式数据集,如下图 为什么RDD是可以容错? RDD依靠于依赖关系dependency relationship reduceByKeyRDD-----mapRDD-----flatMapRD

    2024年02月06日
    浏览(41)
  • Python大数据之PySpark(六)RDD的操作

    函数分类 *Transformation操作只是建立计算关系,而Action 操作才是实际的执行者* 。 Transformation算子 转换算子 操作之间不算的转换,如果想看到结果通过action算子触发 Action算子 行动算子 触发Job的执行,能够看到结果信息 Transformation函数 值类型valueType map flatMap filter mapValue 双值

    2024年02月04日
    浏览(42)
  • 大数据之PySpark的RDD介绍

    之前的文章主要介绍Spark基础知识,例如集群角色、Spark集群运行流程等,接下来会进一步讨论Spark相对核心的知识,让我们拭目以待,同时也期待各位的精彩留言! RDD称为弹性分布式数据集,是Spark中最基本的数据抽象,其为一个不可变、可分区、元素可并行计算的集合;

    2024年02月03日
    浏览(29)
  • PySpark大数据教程:深入学习SparkCore的RDD持久化和Checkpoint

    本教程详细介绍了PySpark中SparkCore的RDD持久化和Checkpoint功能,重点讲解了缓存和检查点的作用、如何进行缓存、如何设置检查点目录以及它们之间的区别。还提供了join操作的示例和Spark算子补充知识。

    2024年02月08日
    浏览(41)
  • PySpark基础 —— RDD

    1.查看Spark环境信息 2.创建RDD 创建RDD主要有两种方式 第一种:textFile方法 第二种:parallelize方法  2.1.textFile方法 本地文件系统加载数据  2.2.parallelize方法  2.3.wholeTextFiles方法 Action动作算子/行动操作 1.collect 2.take  3.first 4.top 5.takeOrdered 6.takeSample 7.count 8.sum 9.histogram 10.fold 11.re

    2024年02月07日
    浏览(39)
  • PySpark之RDD的持久化

    当RDD被重复使用,或者计算该RDD比较容易出错,而且需要消耗比较多的资源和时间的时候,我们就可以将该RDD缓存起来。 主要作用: 提升Spark程序的计算效率 注意事项: RDD的缓存可以存储在内存或者是磁盘上,甚至可以存储在Executor进程的堆外内存中。主要是放在内存中,因此

    2024年01月23日
    浏览(41)
  • PySpark RDD的缓存和Checkpoint

    RDD之间进行相互迭代计算(Transformation的转换),当执行开启后,新RDD的生成,代表老RDD的消息,RDD的数据只在处理的过程中存在,一旦处理完成,就不见了,所以RDD的数据是过程数据。 RDD数据是过程数据的这个特性可以最大化的利用资源,老旧的RDD没用了就会从内存中清理

    2023年04月09日
    浏览(76)
  • PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解

    目录 前言 一、PySpark基础功能  1.Spark SQL 和DataFrame 2.Pandas API on Spark 3.Streaming 4.MLBase/MLlib 5.Spark Core 二、PySpark依赖 Dependencies 三、DataFrame 1.创建 创建不输入schema格式的DataFrame 创建带有schema的DataFrame 从Pandas DataFrame创建 通过由元组列表组成的RDD创建 2.查看 DataFrame.show() spark.sql.

    2024年01月18日
    浏览(49)
  • 10-用PySpark建立第一个Spark RDD

    PySpark实战笔记系列第一篇 Apache Spark的核心组件的基础是RDD。所谓的RDD,即 弹性分布式数据集(Resiliennt Distributed Datasets) ,基于RDD可以实现Apache Spark各个组件在多个计算机组成的集群中进行无缝集成,从而能够在一个应用程序中完成海量数据处理。 只读不能修改 :只能通过

    2024年04月08日
    浏览(42)
  • 【python】python根据传入参数不同,调用不同的方法

    大家好,我是木头左。 今天介绍三种不同方法实现根据传入参数不同,调用不同的方法。 使用条件语句 在Python中,可以使用条件语句(如if-elif-else语句)来根据传入的参数调用不同的方法。以下是一个示例: 在上述示例中,我们定义了三个不同的方法(method1,method2和met

    2024年02月09日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包