一、缓存和checkpoint机制
rdd 的优化手段,可以提升计算速度。将计算过程中某个rdd保存在缓存或者hdfs上,在后面计算时,使用该rdd可以直接从缓存或者hdfs上直接读取数据
1-1 缓存使用
1、提升计算速度 2、容错
什么样的rdd需要缓存?
1、rdd的计算时间比较长,获取数据的计算比较复杂
2、rdd被频繁使用
- 缓存级别
- 指定缓存数据存储的位置
StorageLevel.DISK_ONLY # 将数据缓存到磁盘上
StorageLevel.DISK_ONLY_2 # 将数据缓存到磁盘上 保存两份
StorageLevel.DISK_ONLY_3 # 将数据缓存到磁盘上 保存三份
StorageLevel.MEMORY_ONLY # 将数据缓存到内存 默认
StorageLevel.MEMORY_ONLY_2 # 将数据缓存到内存 保存两份
StorageLevel.MEMORY_AND_DISK # 将数据缓存到内存和磁盘 优先将数据缓存到内存上,内存不足可以缓存到磁盘
StorageLevel.MEMORY_AND_DISK_2 = # 将数据缓存到内存和磁盘
StorageLevel.OFF_HEAP # 不使用
StorageLevel.MEMORY_AND_DISK_ESER # 将数据缓存到内存和磁盘 序列化操作,按照二进制存储,节省空间
- 使用
from pyspark import SparkContext
from pyspark.storagelevel import StorageLevel
sc = SparkContext()
rdd = sc.parallelize([1,2,3,4,5,6])
# 数据过滤
rdd_filter= rdd.filter(lambda x:x%2==0)
# rdd数据进行缓存
# storageLevel=StorageLevel.MEMORY_ONLY 指定缓存界别 默认MEMORY_ONLY
rdd_filter.persist(storageLevel=StorageLevel.MEMORY_ONLY_2)
# 触发数据进行缓存 使用action算子
rdd_filter.collect() # 缓存所有数据
# 数据都加一
rdd_map1 = rdd_filter.map(lambda x:x+1)
# 数据乘以2
rdd_map2 = rdd_filter.map(lambda x:x*1)
# 释放缓存 手动释放缓存
# 当代码程序执行完成后自动释放缓存
rdd_filter.unpersist()
# 触发执行
rdd_res1 = rdd_map1.collect()
rdd_res2 = rdd_map2.collect()
print(rdd_res1)
print(rdd_res2)
1-2 checkpoint
和缓存作用一样,都是将计算的rdd结果单独保存 ,提升计算速度
from pyspark import SparkContext
from pyspark.storagelevel import StorageLevel
sc = SparkContext()
# 设置数据在hdfs上的保存路径,指定后会自动创建对应的目录
sc.setCheckpointDir('hdfs://node1:8020/check_point')
rdd = sc.parallelize([1,2,3,4,5,6])
# 数据过滤
rdd_filter= rdd.filter(lambda x:x%2==0)
# rdd数据进行checkpoint
rdd_filter.checkpoint()
# 触发数据进行checkpoint() 使用action算子
rdd_filter.collect() # 保存所有数据
# 数据都加一
rdd_map1 = rdd_filter.map(lambda x:x+1)
# 数据乘以2
rdd_map2 = rdd_filter.map(lambda x:x*1)
# 触发执行
rdd_res1 = rdd_map1.collect()
rdd_res2 = rdd_map2.collect()
print(rdd_res1)
print(rdd_res2)
1-3 两者区别
- 相同点都是保存数据,提升计算速度
- 不同点
- 存储形式
- 缓存 存储在内存或者磁盘上
- checkpoint 存在hdfs
- 生命周期 存储时间
- 缓存 程序结束就删除
- checkpoint 持久存储
- 依赖关系
- 缓存 rdd之间的依赖关系会保留
- rdd2进行了缓存 rdd1–>rdd2–>rdd3 依赖关系会保留
- 缓存数据丢失,可以根据依赖关系重新计算
- checkpoint 或删除依赖关系
- rdd2 进行了checkpoint rdd3和rdd2的依赖关系会删除(断开)
- rdd3 要获取rdd2的数据就不不能通过依赖获取,只能从hdfs获取
- rdd2 进行了checkpoint rdd3和rdd2的依赖关系会删除(断开)
- 缓存 rdd之间的依赖关系会保留
- 存储形式
- rdd同时进行了缓存和checkpoint,优先从缓存获取数据,缓存获取不到在从checkpoint获取
二、共享变量
2-1 广播变量
如果我们要在分布式计算里面分发大的变量数据,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么每个executor拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。
if x in []
当每个task线程在对应每个分区内数据进行判断时,可以将判断条件的数据分发给task使用
from pyspark import SparkContext
sc = SparkContext(master='yarn')
# 定义num变量
num = 10
#将num转化为spark的广播变量
broadcast_obj=sc.broadcast(num)# 将num的数据会广播给executor保存在cache中
broadcast_obj2 = sc.broadcast(['+',',','-'])
# 生成一个rdd数据
rdd = sc.parallelize([1,2,3,4,5,6,7,8],numSlices=4)
# 对rdd中的每个数据都加上num变量值
rdd_map = rdd.map(lambda x:x+broadcast_obj.value) # 该计算过程就是在executor中的task执行,task从cahce中获取num的值
# 触发执行
print(rdd_map.collect())
# 广播变量的使用场景演示
rdd2 = sc.parallelize(['a','b','d',',','f','+'])
rdd_filter = rdd2.filter(lambda x:x not in broadcast_obj2.value)
print(rdd_filter.collect())
2-2 累加器
在spark应用程序中,我们经常会有这样的需求,如异常监控,调试,记录符合某特性的数据的数目,这种需求都需要用到计数器,如果一个变量不被声明为一个累加器,那么它将在被改变时不会再driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。
让多个task对同一个数据进行累加,修改数据
数量统计
from pyspark import SparkContext
sc = SparkContext(master='yarn')
# 定义num变量
num = 0
#将num转化为spark的累加器的初始值
accumulator_obj=sc.accumulator(num)# 将num的数据会广播给executor的task中进行使用
rdd = sc.parallelize([1,2,3,4,5,6])
print(rdd.reduce(lambda x,y:x+y))
rdd2 = rdd.map(lambda x:accumulator_obj.add(x)) # 在executor的task中执行accumulator_obj.add()累加
# 触发执行
res = rdd2.collect() # 累加后的结构不会存入rdd
print(res)
# 查看累加结果 在dirver中获取的数据
res1 = accumulator_obj.value
print(res1)
缓存和checkpoint: 保存rdd计算结果数据 提升计算效率,容错
缓存:rdd.persist() 保存数据到内存或磁盘 rdd.checkpoint() 保存数据到hdfs
需要调用执行action进行触发
共享变量:driver进程和executor进程 在driver中定义一个变量,需要共享给executor使用。
广播变量 broadcast_obj = sc.broadcast(10) broadcast_obj .value
累加器 accumulator_obj = sc.accumulator(10) accumulator_obj .add(1) accumulator_obj .value
三、RDD的依赖
窄依赖:是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用
宽依赖:是指一个父RDD的Partition会被多个子RDD的Partition所使用
rdd的依赖是划分stage的依据,当发生宽依赖时,就会拆分stage,将计算过程拆分成多个stage。
rdd的依赖关系有DAG维护,DAG全称叫做有向无环图
spark中封装了一个DAGScheduler类,用来管理rdd的依赖关系,并且根据依赖划分stage
四、Spark的运行流程(内核调度)
- DAGScheduler
- 根据rdd间的依赖关系,将提交的job划分成多个stage。
- 对每个stage中的task进行描述(task编号,task执行的rdd算子)
- TaskScheduler
- 获取DAGScheduler提交的task
- 调用SchedulerBackend获取executor的资源信息
- 给task分配资源,维护task和executor对应关系
- 管理task任务队列
- SchedulerBackend
- 向RM申请资源
- 获取executor信息
三个类在sparkcontext进行初始化会生成对应的对象
五、Spark的shuffle过程
rdd在宽依赖就会进行shuffle
5-1 Shuffle介绍
- spark的shuffle的两个部分
- shuffle wirte 写
- shuffle read 读
- 会进行文件的读写,影响spark的计算速度
- spark的shuffle方法类
- 是spark封装好的处理shuffle的方法
- hashshuffle
- spark1.2版本前使用的类
- spark2.0后引入sortshuffle,删除了hashshuffle
- 优化的hashshufulle和未优化
- sortshuffle
- bypass模式版本和普通模式版本
- bypass模式版本不会排序
- 普通模式版本会排序进行shuffle
- 可以通过配置指定按照那种模式执行
- 无论是hash还是排序都是将相同key值放在一起处理
5-2 SparkShuffle配置
- spark.shuffle.file.buffer
参数说明:
该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小(默认是32K)。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。 2倍 3倍
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升
- spark.reducer.maxSizeInFlight
参数说明:
该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。(默认48M)
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
- spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait
spark.shuffle.io.maxRetries :
shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。(默认是3次)
spark.shuffle.io.retryWait:
该参数代表了每次重试拉取数据的等待间隔。(默认为5s)
调优建议:一般的调优都是将重试次数调高,不调整时间间隔。
- spark.shuffle.memoryFraction =5
参数说明:
该参数代表了Executor 1G内存中,分配给shuffle read task进行聚合操作内存比例。
- spark.shuffle.manager
参数说明:该参数用于设置shufflemanager的类型(默认为sort
Hash:spark1.x版本的默认值,HashShuffleManager
Sort:spark2.x版本的默认值,普通机制,当shuffle read task 的数量小于等
- spark.shuffle.sort.bypassMergeThreshold=200 task数量少于等于200 就采用bypass task大于200就采用普通模式
参数说明:
当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些
- 指令中使用
- pyspark --conf ‘spark.shuffle.sort.bypassMergeThreshold=100’ --conf ‘’
- 代码中配置
from pyspark import SparkContext,SparkConf
conf = SparkConf().set('spark.shuffle.sort.bypassMergeThreshold','100').set()
s
sc = SparkContext(conf=conf)
六、Spark并行度
- 资源并行度
- 通过spark-submit 提交代码计算时设置
- 同时执行多少个task任务
- 和cpu核心数有关以及executor数量有关
- –executor-cores=2 设置cpu核心数
- –num-executors=2 设置executor数量
- 2*2=4 同时执行4task任务
- 数据并行度
- task数量
- 和分区数有关
官方建议 数据并行度等于资源并行度,文章来源:https://www.toymoban.com/news/detail-792369.html
公司实际开发建议数据并行度是资源并行度的2-3倍文章来源地址https://www.toymoban.com/news/detail-792369.html
到了这里,关于Spark核心--checkpoint、 广播变量、累加器介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!