Spark
Spark是什么
- 定义:Apache Spark是用于大规模数据处理的统一分析引擎
- 简单来说,spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算PB、TB乃至EB级别的海量数据
PySpark
- Spark对Python语言的支持重点体现在Python第三方库:PySpark上
- PySpark是由Spark官方开发的Python语言第三方库
- Python开发者可以使用pip程序快速的安装PySpark库
基础准备
Spark库安装
- 使用pip命令直接安装
pip install pyspark
构建pyspark执行环境入口对象
- 想要使用pyspark库完成数据处理,首先需要构建一个执行环境入口对象
- pyspark的执行环境入口对象是:类
SparkContext
的类对象 - 代码示例
# *_*coding:utf-8 *_*
# 导包
from pyspark import SparkConf, SparkContext
# 创建SparkConf对象
# setMaster("local[*]")表明spark运行模式是单机,运行在本地
# setAppName("test") 给当前spark程序起一个名字
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于sparkConf构建sparkContext类对象
sc = SparkContext(conf=conf)
# 查看spark运行版本
print(sc.version)
# 停止pyspark程序
sc.stop()
PySpark的编程模型
- SparkContext类对象,是PySpark变成中一切功能的入口
- PySpark的编程,主要分为三大步骤
- 数据输入:通过SparkContext类对象的成员方法,完成数据的读取操作,读取后得到RDD类对象
- 数据处理计算:通过RDD类对象的成员方法完成各种数据计算的需求
- 数据输出:将处理完成后的RDD对象,调用各种成员方法完成,写出文件,转换为list等操作
数据输入
RDD对象
- PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象
- RDD全称是:弹性分布式数据集
- PySpark针对数据的处理都是以RDD对象作为载体
- 数据存储在RDD内
- 各类数据的计算方法,也都是RDD的成员方法
- RDD的数据计算方法,返回值依旧是RDD对象
Python数据容器转RDD对象
- PySpark支持通过SparkContext对象的parallelize成员方法将
list/tuple/set/dict/str
转换为PySpark的RDD对象 - 注意
- 字符串会被拆分成一个个的字符存入RDD对象
- 字典仅有key会被存入RDD对象
- 代码示例
# *_*coding:utf-8 *_*
from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于sparkConf构建sparkContext类对象
sc = SparkContext(conf=conf)
# rdd = sc.parallelize(数据容器对象)
rdd1 = sc.parallelize([1, 2, 3, 4])
rdd2 = sc.parallelize((1, 2, 3, 4))
rdd3 = sc.parallelize({1, 2, 3, 4})
rdd4 = sc.parallelize({"a": 1, "b": 2})
rdd5 = sc.parallelize("erdtfhdsadg")
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
sc.stop()
- 运行结果
[1, 2, 3, 4]
[1, 2, 3, 4]
[1, 2, 3, 4]
['a', 'b']
['e', 'r', 'd', 't', 'f', 'h', 'd', 's', 'a', 'd', 'g']
读取文件转RDD对象
-
通过textFile()方法将文件转成RDD对象
-
代码示例
# *_*coding:utf-8 *_*
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster('local[*]').setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.textFile('./读取文件转RDD对象.py')
print(rdd.collect())
sc.stop()
数据计算
map方法
-
PySpark的数据计算,都是基于RDD对象来进行的,RDD对象内置丰富的:成员方法(算子)
-
功能:map算子是将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD
-
语法:
rdd.map(func)
# func: f:(T)->U
# (T)->U 表示的是方法的定义
# ()表示传入参数,(T)表示传入一个参数,()表示没有传入参数
# T是泛型的代称,在这里表示任意类型
# U也是泛型的代称,在这里表示任意类型
# ->U 表示返回值
# (T)->U 总结起来的意思是:这是一个方法,接收一个参数传入,传入参数类型不限,返回一个返回值,返回值类型不限
# (A)->A 总结起来的意思是:这是一个方法,接收一个参数传入,传入参数类型不限,返回一个返回值,返回值和传入参数类型一致
- 代码示例
# *_*coding:utf-8 *_*
from pyspark import SparkContext, SparkConf
# map方法
sc = SparkContext(conf=SparkConf().setAppName("test_spark_app").setMaster("local[*]"))
# 准备一个rdd
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 调用map方法进行计算
# 对传入的值先进行*10在进行+5
rdd1 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)
# 查看计算完毕后rdd中的内容
print(rdd1.collect())
flatMap方法
-
功能:对rdd执行map操作,然后进行解除嵌套操作
-
解除嵌套
# 嵌套的list
lst = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
# 解除嵌套后的list
lst = [1, 2, 3, 4, 5, 6, 7, 8, 9]
- 代码示例
# *_*coding:utf-8 *_*
from pyspark import SparkContext, SparkConf
# flatmap方法
sc = SparkContext(conf=SparkConf().setAppName("test_spark_app").setMaster("local[*]"))
# 准备一个rdd
rdd = sc.parallelize(["fdsf fsf eerer", "fdtfydus adas ouore", "ier wir hdgi ldre"])
# 需求:将RDD数据里面的一个个单词提取出来
rdd1 = rdd.map(lambda x: x.split(" "))
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(f'map方法转化完内容:{rdd1.collect()}')
print(f'flatmap方法转化完内容:{rdd2.collect()}')
sc.stop()
reduceByKey方法
-
功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成数据的聚合操作文章来源:https://www.toymoban.com/news/detail-854711.html
-
用法文章来源地址https://www.toymoban.com/news/detail-854711.html
rdd.reduceByKey(func)
# func:(V,V)->V
# 接收两个传入参数(类型要一致),返回一个返回值,类型和传入要求一致
- 代码示例
# *_*coding:utf-8 *_*
from pyspark import SparkContext, SparkConf
# flatmap方法
sc = SparkContext(conf=SparkConf().setAppName("test_spark_app").setMaster("local[*]"))
# 准备一个rdd
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])
# reduceByKey 自动按照key分组,然后根据聚合逻辑完成value聚合操作
# rdd中数据有两个key:'a'和'b'
# 聚合逻辑为:不断的将value值相加
rdd1 = rdd.reduceByKey(lambda a, b: a + b)
print(f'map方法转化完内容:{rdd1.collect()}')
sc.stop()
Filter方法
- 功能:过滤想要的数据进行保留
- 语法
rdd.filter(func)
# func: (T)->bool 传入一个随机类型参数,返回值必须是bool类型,返回为True的数据被保留,返回为False的被丢弃
- 代码示例
# *_*coding:utf-8 *_*
from pyspark import SparkContext, SparkConf
sc = SparkContext(conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app"))
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 需求:保留奇数
# rdd1 = rdd.filter(lambda x: x % 2 == 1)
# print(rdd1.collect())
print(rdd.filter(lambda x: x % 2 == 1).collect())
distinct方法
- 功能:对rdd数据进行去重返回新的rdd
- 语法:
rdd.distinct()
- 代码示例:
# *_*coding:utf-8 *_*
from pyspark import SparkContext, SparkConf
sc = SparkContext(conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app"))
rdd = sc.parallelize([1, 1, 2, 4, 2, 1])
print(rdd.distinct().collect())
sortby方法
- 功能:基于指定的排序规则对RDD数据进行排序
- 语法:
rdd.sortBy(func, ascending=False, numPartitions=1)
# func: (T)-> U:告知RDD中的那个数据进行排序,比如lambda x: x[1]表示按照RDD中的第二列元素进行排序
# ascending True升序,False降序
# numPartitions:用多少分区排序
- 代码示例
# *_*coding:utf-8 *_*
from pyspark import SparkContext, SparkConf
sc = SparkContext(conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app"))
rdd = sc.parallelize([("dasds", 5), ("ferew", 2), ("dsgyds", 7), ("dsdsfds", 4), ("dsfsfs", 2)])
print(rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1).collect())
数据输出
collect方法
- 功能:将RDD各个分区的数据,统一收集到Driver中,形成一个list对象
- 用法:
rdd.collect()
,返回值是一个list
reduce方法
- 功能:对RDD数据按照传入的逻辑进行聚合
- 语法:
rdd.reduce(func)
# func: (T,T) ->T
# 2个参数 1个返回值,返回值和传入参数要求类型一致
- 代码示例
# *_*coding:utf-8 *_*
from pyspark import SparkContext, SparkConf
sc = SparkContext(conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app"))
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.reduce(lambda a, b: a + b))
take方法
- 功能:取RDD的前N个元素,组成list后返回
- 用法:
rdd.take(N)
count方法
- 功能:计算RDD有多少条数据,返回一个数字
- 用法:
rdd.count()
输出到文件
- saveAsTextFile方法
- 功能:将RDD的数据写入到文本文件中
- 支持 本地写出,hdfs等文件系统
- 代码示例
sc.parallelize([1, 2, 3, 4, 5]).saveAsTextFile("输出文件路径")
到了这里,关于【黑马程序员】PySpark学习的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!