【Python】PySpark

这篇具有很好参考价值的文章主要介绍了【Python】PySpark。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。

简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。

Spark对Python语言的支持,重点体现在Python第三方库:PySpark

PySpark是由Spark官方开发的Python语言第三方库。

Python开发者可以使用pip程序快速的安装PySpark并像其它第三方库那样直接使用。

【Python】PySpark,Python,python,spark,开发语言

基础准备

安装

同其它的Python第三方库一样,PySpark同样可以使用pip程序进行安装。

pip install pyspark

或使用国内代理镜像网站(清华大学源)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark

构建PySpark执行环境入口对象

想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。

PySpark的执行环境入口对象是:类SparkContext的类对象

# 导包
from pyspark import SparkConf, SparkContext

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

# 打印PySpark的运行版本
print(sc.version)

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

运行需要Java环境,推荐jdk8

PySpark的编程模型

SparkContext类对象,是PySpark编程中一切功能的入口。

PySpark的编程,主要分为如下三大步骤:

【Python】PySpark,Python,python,spark,开发语言

数据输入

PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象

RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)

PySpark针对数据的处理,都是以RDD对象作为载体,即:

  • 数据存储在RDD内
  • 各类数据的计算方法,也都是RDD的成员方法
  • RDD的数据计算方法,返回值依旧是RDD对象

【Python】PySpark,Python,python,spark,开发语言

Python数据容器转RDD对象

PySpark支持通过SparkContext对象的parallelize成员方法,将list/tuple/set/dict/str转换为PySpark的RDD对象

# 导包
from pyspark import SparkConf, SparkContext

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd1 = sc.parallelize([1, 2, 3])    
rdd2 = sc.parallelize((1, 2, 3))    
rdd3 = sc.parallelize({1, 2, 3})    
rdd4 = sc.parallelize({'key1': 'value1', 'key2': 'value2'}) 
rdd5 = sc.parallelize('hello')  

# 输出RDD的内容,需要使用collect()
print(rdd1.collect())   # [1, 2, 3]
print(rdd2.collect())   # [1, 2, 3]
print(rdd3.collect())   # [1, 2, 3]
print(rdd4.collect())   # ['key1', 'key2']
print(rdd5.collect())   # ['h', 'e', 'l', 'l', 'o']

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

注意:

  • 字符串会被拆分出一个个的字符,存入RDD对象
  • 字典仅有key会被存入RDD对象

读取文件转RDD对象

PySpark也支持通过SparkContext入口对象来读取文件,构建出RDD对象。

先提前预备一个txt文件

hello
python
day
# 导包
from pyspark import SparkConf, SparkContext

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.textFile('E:\\code\\py-space\\8.27\\hello.txt')

# 输出RDD的内容,需要使用collect()
print(rdd.collect())    # ['hello', 'python', 'day']

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

数据计算

RDD对象内置丰富的:成员方法(算子)

map算子

将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD

rdd.map(func)
# func: f:(T) -> U
# f: 表示这是一个函数
# (T) -> U 表示的是方法的定义:()表示无需传入参数,(T)表示传入1个参数
# T是泛型的代称,在这里表示 任意类型
# U是泛型的代称,在这里表示 任意类型

# (T) -> U : 这是一个函数,该函数接收1个参数,传入参数类型不限,返回一个返回值,返回值类型不限
# (A) -> A : 这是一个函数,该函数接收1个参数,传入参数类型不限,返回一个返回值,返回值类型和传入参数类型一致

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5, 6])

# 通过map方法将全部数据乘以10,传入参数为函数
rdd2 = rdd.map(lambda x: x * 10)

# 输出RDD的内容,需要使用collect()
print(rdd2.collect())   # [10, 20, 30, 40, 50, 60]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

由于map()的返回值还是RDD对象,可以继续在尾部进行链式调用

rdd3 = rdd.map(lambda x: x * 10).map(lambda x: x + 9)

flatMap算子

对RDD执行map操作,然后进行解除嵌套操作。

【Python】PySpark,Python,python,spark,开发语言

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize(['a b c', 'd e f'])

# 输出RDD的内容,需要使用collect()
print(rdd.map(lambda x: x.split(' ')).collect())    # [['a', 'b', 'c'], ['d', 'e', 'f']]
print(rdd.flatMap(lambda x:x.split(' ')).collect())   # ['a', 'b', 'c', 'd', 'e', 'f']

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduceByKey算子

针对KV型(二元元组)RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作

rdd.reduceByKey(func)
# func: (V, V) -> V
# 接收2个传入参数(类型要一致),返回一个返回值,返回值类型和传入参数类型要求一致

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])

# 输出RDD的内容,需要使用collect()
print(rdd.reduceByKey(lambda a, b: a+b).collect())  # [('b', 3), ('a', 2)]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduceByKey中的聚合逻辑是:比如有[1,2,3,4,5],然后聚合函数是:lambda a,b: a+b

【Python】PySpark,Python,python,spark,开发语言

注意:reduceByKey中接收的函数,只负责聚合,不理会分组;分组是自动by key来分组的

filter算子

过滤想要的数据进行保留。

rdd.filter(func)
# func: (T) -> bool
# 传入一个参数任意类型,返回值必须是True/False,返回是True的数据被保留,False的数据被丢弃

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5, 6])

# 输出RDD的内容,需要使用collect()
print(rdd.filter(lambda x: x % 2 == 0).collect())  # [2, 4, 6]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

distinct算子

对RDD数据进行去重,返回新的RDD

rdd.distinct() # 无需传参

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 3, 2, 6])

# 输出RDD的内容,需要使用collect()
print(rdd.distinct().collect())  # [6, 1, 2, 3]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

sortBy算子

对RDD数据进行排序,基于你指定的排序依据。

rdd.sortKey(func, ascending=False, numPartitions=1)
# func: (T) -> U:告知按照RDD中的哪个数据进行排序,比如lambda x: x[1]表示按照RDD中的第二列元素进行排序
# ascending:True升序,False降序
# numPartitions:用多少分区排序,全局排序需要设置为1

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([('Aiw', 9), ('Tom', 6), ('Jack', 8), ('Bolb', 5)])

# 输出RDD的内容,需要使用collect()
print(rdd.sortBy(lambda x: x[1], ascending=False,
      numPartitions=1).collect())  # [('Aiw', 9), ('Jack', 8), ('Tom', 6), ('Bolb', 5)]

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

数据输出

collect算子

将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象。

rdd.collect()
# 返回值是一个List

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3])

rdd_list: list = rdd.collect()

print(rdd_list)   # [1, 2, 3]
print(type(rdd_list))   # <class 'list'>

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduce算子

对RDD数据集按照你传入的逻辑进行聚合

rdd.reduce(func)
# func:(T, T) -> T
# 传入2个参数,1个返回值,要求返回值和参数类型一致

【Python】PySpark,Python,python,spark,开发语言

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize(range(1, 10))

print(rdd.reduce(lambda a, b: a+b))   # 45

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

take算子

取RDD的前N个元素,组合成List进行返回。

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize(range(1, 10))

rdd_take: list = rdd.take(3)

print(rdd_take)   # [1, 2, 3]
print(type(rdd_take))   # <class 'list'>

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

count算子

计算RDD有多少条数据,返回值是一个数字。

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize(range(1, 10))

rdd_count: int = rdd.count()

print(rdd_count)   # 9
print(type(rdd_count))   # <class 'int'>

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

saveAsTextFile算子

将RDD的数据写入文本文件中。支持本地写出、HDFS等文件系统。

注意事项:

【Python】PySpark,Python,python,spark,开发语言

# 导包
from pyspark import SparkConf, SparkContext, sql
import os

# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'
os.environ['HADOOP_HOME'] = 'D:/Hadoop-3.0.0'

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')

# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

rdd = sc.parallelize(range(1, 10))

rdd.saveAsTextFile('./8.27/output') # 运行之前确保输出文件夹不存在,否则报错

# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

上述代码输出结果,输出文件夹内有多个分区文件

修改RDD分区为1个

方式一:SparkConf对象设置属性全局并行度为1:

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')
# 设置属性全局并行度为1
conf.set('spark.default.parallelism','1')
# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

方式二:创建RDD的时候设置(parallelize方法传入numSlices参数为1)文章来源地址https://www.toymoban.com/news/detail-685972.html

rdd = sc.parallelize(range(1, 10), numSlices=1)
rdd = sc.parallelize(range(1, 10), 1)

到了这里,关于【Python】PySpark的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Python大数据之PySpark(二)PySpark安装

    1-明确PyPi库,Python Package Index 所有的Python包都从这里下载,包括pyspark 2-为什么PySpark逐渐成为主流? http://spark.apache.org/releases/spark-release-3-0-0.html Python is now the most widely used language on Spark. PySpark has more than 5 million monthly downloads on PyPI, the Python Package Index. 记住如果安装特定的版本

    2024年02月04日
    浏览(43)
  • 【Python】PySpark

    前言 Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。 简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。 Spark对Python语言的支持,重点体现在Python第三方库:PySpark PySpark是由Spark官方开发

    2024年02月10日
    浏览(31)
  • Python学习路线 - Python高阶技巧 - PySpark案例实战

    Spark是什么 定义:Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。 简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃致EB级别的海量数据 Python On Spark Spark作为全球顶级的分布式计算框架,支持众多的编程语言进行开

    2024年02月21日
    浏览(49)
  • 【Python】PySpark 数据处理 ② ( 安装 PySpark | PySpark 数据处理步骤 | 构建 PySpark 执行环境入口对象 )

    执行 Windows + R , 运行 cmd 命令行提示符 , 在命令行提示符终端中 , 执行 命令 , 安装 PySpark , 安装过程中 , 需要下载 310 M 的安装包 , 耐心等待 ; 安装完毕 : 命令行输出 : 如果使用 官方的源 下载安装 PySpark 的速度太慢 , 可以使用 国内的 镜像网站 https://pypi.tuna.tsinghua.edu.cn/simple

    2024年02月06日
    浏览(43)
  • pyspark 集成指定python版本

    1.制作python环境 1)可以使用anacoda方式创建虚拟环境,或者自己利用自己安装好得python环境进行打包。打包之前使用pip安装好自己需要使用得python 模块。 2)打包 进入到python 得安装目录如下图是到bin 级别目录下,然后使用zip进行打包 zip -r py3.zip ./* 3)打包好后将打好得zip 包上

    2024年02月14日
    浏览(33)
  • Python大数据之PySpark

    Apache Spark是一种用于大规模数据处理的多语言分布式引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习 Spark官网:https://spark.apache.org/ 按照官网描述,Spark关键特征包括: 批/流处理 Spark支持您使用喜欢的语言:Python、SQL、Scala、Java或R,统一批量和实时流处

    2024年02月08日
    浏览(44)
  • Python学习之PySpark案例实战

    Spark是什么 Apache Spark是用于 大规模数据(large-scala data)处理的统一 (unified) 分析引擎 。 简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。 Spark作为全球顶级的分布式计算框架,支持众多的编程语言进行开发 而Python语

    2024年02月05日
    浏览(43)
  • Python小案例(九)PySpark读写数据

    有些业务场景需要Python直接读写Hive集群,也需要Python对MySQL进行操作。pyspark就是为了方便python读取Hive集群数据,当然环境搭建也免不了数仓的帮忙,常见的如开发企业内部的 Jupyter Lab 。 ⚠️注意:以下需要在企业服务器上的jupyter上操作,本地jupyter是无法连接公司hive集群的

    2024年02月12日
    浏览(46)
  • Python进阶知识:整理1 -> pySpark入门

    pySpark大数据分析过程分为3步: 数据输入、数据计算、数据输出 ,以下内容将重点介绍这三个过程   在数据输入完成后,都会得到一个 RDD类的对象 (RDD全称为弹性分布式数据集) map算子是将RDD的数据进行一条条处理(处理的逻辑基于map算子接收的处理函数),返回新的R

    2024年01月18日
    浏览(50)
  • Spark与PySpark(1.概述、框架、模块)

    目录 1.Spark 概念 2. Hadoop和Spark的对比 3. Spark特点 3.1 运行速度快 3.2 简单易用 3.3 通用性强 3.4 可以允许运行在很多地方 4. Spark框架模块 4.1 Spark Core 4.2 SparkSQL 4.3 SparkStreaming 4.4 MLlib 4.5 GraphX 5. Spark的运行模式 5.1 本地模式(单机) Local运行模式 5.2 Standalone模式(集群) 5.3 Hado

    2024年02月02日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包