Spark核心--checkpoint、 广播变量、累加器介绍

这篇具有很好参考价值的文章主要介绍了Spark核心--checkpoint、 广播变量、累加器介绍。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、缓存和checkpoint机制

rdd 的优化手段,可以提升计算速度。将计算过程中某个rdd保存在缓存或者hdfs上,在后面计算时,使用该rdd可以直接从缓存或者hdfs上直接读取数据

1-1 缓存使用

1、提升计算速度  2、容错

什么样的rdd需要缓存?

1、rdd的计算时间比较长,获取数据的计算比较复杂

2、rdd被频繁使用

  • 缓存级别
    • 指定缓存数据存储的位置
StorageLevel.DISK_ONLY # 将数据缓存到磁盘上
StorageLevel.DISK_ONLY_2 # 将数据缓存到磁盘上 保存两份
StorageLevel.DISK_ONLY_3 # 将数据缓存到磁盘上 保存三份
StorageLevel.MEMORY_ONLY # 将数据缓存到内存  默认
StorageLevel.MEMORY_ONLY_2 # 将数据缓存到内存 保存两份
StorageLevel.MEMORY_AND_DISK # 将数据缓存到内存和磁盘  优先将数据缓存到内存上,内存不足可以缓存到磁盘
StorageLevel.MEMORY_AND_DISK_2 = # 将数据缓存到内存和磁盘
StorageLevel.OFF_HEAP # 不使用
StorageLevel.MEMORY_AND_DISK_ESER # 将数据缓存到内存和磁盘  序列化操作,按照二进制存储,节省空间
  • 使用
from pyspark import SparkContext
from pyspark.storagelevel import StorageLevel

sc = SparkContext()

rdd = sc.parallelize([1,2,3,4,5,6])

# 数据过滤
rdd_filter= rdd.filter(lambda x:x%2==0)
# rdd数据进行缓存
# storageLevel=StorageLevel.MEMORY_ONLY  指定缓存界别  默认MEMORY_ONLY
rdd_filter.persist(storageLevel=StorageLevel.MEMORY_ONLY_2)
# 触发数据进行缓存  使用action算子
rdd_filter.collect()  # 缓存所有数据


# 数据都加一
rdd_map1 = rdd_filter.map(lambda x:x+1)

# 数据乘以2
rdd_map2 = rdd_filter.map(lambda x:x*1)

# 释放缓存  手动释放缓存
# 当代码程序执行完成后自动释放缓存
rdd_filter.unpersist()



# 触发执行
rdd_res1 = rdd_map1.collect()
rdd_res2 = rdd_map2.collect()
print(rdd_res1)
print(rdd_res2)

1-2 checkpoint

和缓存作用一样,都是将计算的rdd结果单独保存 ,提升计算速度

from pyspark import SparkContext
from pyspark.storagelevel import StorageLevel

sc = SparkContext()
# 设置数据在hdfs上的保存路径,指定后会自动创建对应的目录
sc.setCheckpointDir('hdfs://node1:8020/check_point')

rdd = sc.parallelize([1,2,3,4,5,6])

# 数据过滤
rdd_filter= rdd.filter(lambda x:x%2==0)

# rdd数据进行checkpoint
rdd_filter.checkpoint()
# 触发数据进行checkpoint()  使用action算子
rdd_filter.collect()  # 保存所有数据


# 数据都加一
rdd_map1 = rdd_filter.map(lambda x:x+1)

# 数据乘以2
rdd_map2 = rdd_filter.map(lambda x:x*1)


# 触发执行
rdd_res1 = rdd_map1.collect()
rdd_res2 = rdd_map2.collect()
print(rdd_res1)
print(rdd_res2)

1-3 两者区别

  • 相同点都是保存数据,提升计算速度
  • 不同点
    • 存储形式
      • 缓存  存储在内存或者磁盘上
      • checkpoint  存在hdfs
    • 生命周期  存储时间
      • 缓存  程序结束就删除
      • checkpoint    持久存储
    • 依赖关系
      • 缓存   rdd之间的依赖关系会保留
        • rdd2进行了缓存    rdd1–>rdd2–>rdd3 依赖关系会保留
        • 缓存数据丢失,可以根据依赖关系重新计算
      • checkpoint    或删除依赖关系
        • rdd2 进行了checkpoint    rdd3和rdd2的依赖关系会删除(断开)
          • rdd3 要获取rdd2的数据就不不能通过依赖获取,只能从hdfs获取
  • rdd同时进行了缓存和checkpoint,优先从缓存获取数据,缓存获取不到在从checkpoint获取

二、共享变量

2-1 广播变量

如果我们要在分布式计算里面分发大的变量数据,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么每个executor拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。

if  x in []

当每个task线程在对应每个分区内数据进行判断时,可以将判断条件的数据分发给task使用

from pyspark import SparkContext

sc = SparkContext(master='yarn')

# 定义num变量
num = 10

#将num转化为spark的广播变量
broadcast_obj=sc.broadcast(num)# 将num的数据会广播给executor保存在cache中

broadcast_obj2 = sc.broadcast(['+',',','-'])

# 生成一个rdd数据
rdd = sc.parallelize([1,2,3,4,5,6,7,8],numSlices=4)

# 对rdd中的每个数据都加上num变量值
rdd_map = rdd.map(lambda x:x+broadcast_obj.value)  # 该计算过程就是在executor中的task执行,task从cahce中获取num的值

# 触发执行
print(rdd_map.collect())


# 广播变量的使用场景演示
rdd2 = sc.parallelize(['a','b','d',',','f','+'])

rdd_filter  = rdd2.filter(lambda x:x not in broadcast_obj2.value)

print(rdd_filter.collect())

2-2 累加器

在spark应用程序中,我们经常会有这样的需求,如异常监控,调试,记录符合某特性的数据的数目,这种需求都需要用到计数器,如果一个变量不被声明为一个累加器,那么它将在被改变时不会再driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。

让多个task对同一个数据进行累加,修改数据

数量统计

from pyspark import SparkContext

sc = SparkContext(master='yarn')

# 定义num变量
num = 0

#将num转化为spark的累加器的初始值
accumulator_obj=sc.accumulator(num)# 将num的数据会广播给executor的task中进行使用


rdd = sc.parallelize([1,2,3,4,5,6])
print(rdd.reduce(lambda x,y:x+y))

rdd2 = rdd.map(lambda x:accumulator_obj.add(x)) # 在executor的task中执行accumulator_obj.add()累加

# 触发执行
res = rdd2.collect() # 累加后的结构不会存入rdd
print(res)

# 查看累加结果 在dirver中获取的数据
res1 = accumulator_obj.value
print(res1)

缓存和checkpoint: 保存rdd计算结果数据 提升计算效率,容错

缓存:rdd.persist() 保存数据到内存或磁盘   rdd.checkpoint() 保存数据到hdfs

需要调用执行action进行触发

共享变量:driver进程和executor进程  在driver中定义一个变量,需要共享给executor使用。

广播变量  broadcast_obj = sc.broadcast(10)    broadcast_obj .value

累加器    accumulator_obj =  sc.accumulator(10)    accumulator_obj .add(1)  accumulator_obj .value

三、RDD的依赖

窄依赖:是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用

宽依赖:是指一个父RDD的Partition会被多个子RDD的Partition所使用

rdd的依赖是划分stage的依据,当发生宽依赖时,就会拆分stage,将计算过程拆分成多个stage。

rdd的依赖关系有DAG维护,DAG全称叫做有向无环图

spark中封装了一个DAGScheduler类,用来管理rdd的依赖关系,并且根据依赖划分stage

四、Spark的运行流程(内核调度)

  • DAGScheduler
    • 根据rdd间的依赖关系,将提交的job划分成多个stage。
    • 对每个stage中的task进行描述(task编号,task执行的rdd算子)
  • TaskScheduler
    • 获取DAGScheduler提交的task
    • 调用SchedulerBackend获取executor的资源信息
    • 给task分配资源,维护task和executor对应关系
    • 管理task任务队列
  • SchedulerBackend
    • 向RM申请资源
    • 获取executor信息

三个类在sparkcontext进行初始化会生成对应的对象

五、Spark的shuffle过程

rdd在宽依赖就会进行shuffle

5-1 Shuffle介绍

  • spark的shuffle的两个部分
    • shuffle wirte 写
    • shuffle read 读
    • 会进行文件的读写,影响spark的计算速度
  • spark的shuffle方法类
    • 是spark封装好的处理shuffle的方法
    • hashshuffle
      • spark1.2版本前使用的类
      • spark2.0后引入sortshuffle,删除了hashshuffle
      • 优化的hashshufulle和未优化
    • sortshuffle
      • bypass模式版本和普通模式版本
      • bypass模式版本不会排序
      • 普通模式版本会排序进行shuffle
      • 可以通过配置指定按照那种模式执行
    • 无论是hash还是排序都是将相同key值放在一起处理

5-2 SparkShuffle配置

  • spark.shuffle.file.buffer

参数说明:

该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小(默认是32K)。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。 2倍  3倍

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升

  • spark.reducer.maxSizeInFlight

参数说明:

该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。(默认48M)

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

  • spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait

spark.shuffle.io.maxRetries :

shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。(默认是3次)

spark.shuffle.io.retryWait:

该参数代表了每次重试拉取数据的等待间隔。(默认为5s)

调优建议:一般的调优都是将重试次数调高,不调整时间间隔。

  • spark.shuffle.memoryFraction =5

参数说明:

该参数代表了Executor 1G内存中,分配给shuffle read task进行聚合操作内存比例。

  • spark.shuffle.manager

参数说明:该参数用于设置shufflemanager的类型(默认为sort
Hash:spark1.x版本的默认值,HashShuffleManager
Sort:spark2.x版本的默认值,普通机制,当shuffle read task 的数量小于等

  • spark.shuffle.sort.bypassMergeThreshold=200  task数量少于等于200 就采用bypass  task大于200就采用普通模式

参数说明:

当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些

  • 指令中使用
    • pyspark   --conf  ‘spark.shuffle.sort.bypassMergeThreshold=100’   --conf ‘’
  • 代码中配置
from pyspark import SparkContext,SparkConf
conf = SparkConf().set('spark.shuffle.sort.bypassMergeThreshold','100').set()
s
sc = SparkContext(conf=conf)

六、Spark并行度

  • 资源并行度
    • 通过spark-submit 提交代码计算时设置
    • 同时执行多少个task任务
    • 和cpu核心数有关以及executor数量有关
    • –executor-cores=2  设置cpu核心数
    • –num-executors=2   设置executor数量
    • 2*2=4  同时执行4task任务
  • 数据并行度
    • task数量
    • 和分区数有关

官方建议 数据并行度等于资源并行度,

公司实际开发建议数据并行度是资源并行度的2-3倍文章来源地址https://www.toymoban.com/news/detail-792369.html

到了这里,关于Spark核心--checkpoint、 广播变量、累加器介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark---累加器

    累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。 运行结果: 我们预期是想要实现数据的累加,开始数据从Driver被传输到了Execut

    2024年02月02日
    浏览(35)
  • Spark累加器LongAccumulator

    1.Accumulator是由Driver端总体进行维护的,读取当前值也是在Driver端,各个Task在其所在的Executor上也维护了Accumulator变量,但只是局部性累加操作,运行完成后会到Driver端去合并累加结果。Accumulator有两个性质: 1、只会累加,合并即累加; 2、不改变Spark作业懒执行的特点,即没

    2024年01月25日
    浏览(43)
  • 【Spark原理系列】Accumulator累加器原理用法示例源码详解

    源自专栏《SparkML:Spark ML系列专栏目录》 Accumulator是Spark中的一种分布式变量,用于在并行计算中进行累加操作。它是由MapReduce模型中的“全局计数器”概念演化而来的。 Accumulator提供了一个可写的分布式变量,可以在并行计算中进行累加操作。在Spark中,当一个任务对Accum

    2024年03月14日
    浏览(51)
  • Flink 源码剖析|累加器

    累加器是实现了 加法运算 功能和 合并运算 (合并多个累加器的结果)功能的一种数据结构,在作业结束后,可以获取所有部分(各个 operator 的各个 subtask)合并后的最终结果并发送到客户端。 Flink 的累加器均实现了 Accumulator 接口,包括如下 2 个方法用于支持加法运算和合

    2024年02月21日
    浏览(39)
  • 计算机组成原理 累加器实验

    计算机组成原理实验环境 理解累加器的概念和作用。 连接运算器、存储器和累加器,熟悉计算机的数据通路。 掌握使用微命令执行各种操作的方法。 做好实验预习,读懂实验电路图,熟悉实验元器件的功能特性和使用方法。在实验之前设计好要使用的微命令,填入表 6-2 、

    2024年02月06日
    浏览(30)
  • Flink 源码剖析|4. 累加器与相关工具方法

    累加器是实现了 加法运算 功能和 合并运算 (合并多个累加器的结果)功能的一种数据结构,在作业结束后,可以获取所有部分(各个 operator 的各个 subtask)合并后的最终结果并发送到客户端。 Flink 的累加器均实现了 Accumulator 接口,包括如下 2 个方法用于支持加法运算和合

    2024年03月15日
    浏览(33)
  • 【数字IC/FPGA】百度昆仑芯手撕代码--累加器

    已知一个加法器IP,其功能是计算两个数的和,但这个和延迟两个周期才会输出。现在有一串连续的数据输入,每个周期都不间断,试问最少需要例化几个上述的加法器IP,才可以实现累加的功能。 由于加法器两个周期后才能得到结果(再将该结果作为加法器的输入进行累加

    2024年02月09日
    浏览(29)
  • 《JUC并发编程 - 高级篇》05 -共享模型之无锁 (CAS | 原子整数 | 原子引用 | 原子数组 | 字段更新器 | 原子累加器 | Unsafe类 )

    有如下需求,保证 account.withdraw 取款方法的线程安全 原有实现并不是线程安全的 测试代码 执行测试代码,某次执行结果 5.1.1 为么不安全 withdraw 方法是临界区,会存在线程安全问题 查看下字节码 多线程在执行过程中可能会出现指令的交错,从而结果错误! 5.1.2 解决思路1

    2023年04月12日
    浏览(32)
  • spark广播变量

    2024-1-24 广播变量特点 Broadcast Variable会将使用到的变量,只会为每个节点拷贝一份,不会为每个task进行拷贝,能够优化性能(在task数量比较大体现更明显),减少网络传输及内存消耗 通过SparkContext的broadcast()方法,针对某个变量创建广播变量,可以通过广播变量的value()方法

    2024年01月25日
    浏览(32)
  • Apache Spark中的广播变量分发机制

    Apache Spark中的广播变量提供了一种机制,允许用户在集群中共享只读变量,并且每个任务都可以访问这个变量,而不需要在每次任务之间重新发送该变量。这种机制特别适用于在所有节点上都需要访问同一份只读数据集的情况,因为它可以显著减少网络通信的开销。 以下是广

    2024年01月24日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包