第三部分:Spark调优篇

这篇具有很好参考价值的文章主要介绍了第三部分:Spark调优篇。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

第一部分:Spark基础篇_奔跑者-辉的博客-CSDN博客

第一部分:Spark基础篇_奔跑者-辉的博客-CSDN博客

第三部分:Spark调优篇_奔跑者-辉的博客-CSDN博客


目录

1 常规性能调优

常规性能调优一:  最优资源配置

常规性能调优二:  RDD调优

常规性能调优三:并行度调节

常规性能调优四:  广播变量

常规性能调优五:Kryo序列化

2 算子调优

算子调优一:调节mapPartitions

算子调优二:foreachPartition优化数据库操作

算子调优三:filter与coalesce的配合使用

算子调优四:repartition解决SparkSQL低并行度问题

算子调优五:reduceByKey本地聚合

3 Shuffle调优

Shuffle调优一:调节map端缓冲区大小

Shuffle调优二:调节reduce端拉取数据缓冲区大小

Shuffle调优三:调节reduce端拉取数据重试次数

Shuffle调优四:调节reduce端拉取数据等待间隔

Shuffle调优五:调节SortShuffle排序操作阈值

4 JVM调优

JVM调优一:降低cache操作的内存占比

JVM调优二:调节Executor堆外内存 

JVM调优三:调节连接等待时长

5 Spark数据倾斜

数据倾斜的表现

定位数据倾斜问题

解决方案:


1 常规性能调优

常规性能调优一:  最优资源配置

Spark性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。

资源的分配在使用脚本提交Spark任务时进行指定: 参数值可以如下表

名称

说明

--num-executors

配置Executor的数量                                50~100

--driver-memory

配置Driver内存(影响不大)                   1~5G

--executor-memory

配置每个Executor的内存大小                  6~10G

--executor-cores

配置每个Executor的CPU core数量          3 

常规性能调优二:  RDD调优

①RDD复用

在对RDD进行算子时,要避免相同计算逻辑之下对RDD进行重复的计算;

②RDD持久化
通过持久化将公共RDD的数据缓存到内存/磁盘中,之后对于公共RDD的计算都会从内存/磁盘中直接获取RDD数据即可;

③RDD尽可能早的filter操作

获取到初始RDD后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升Spark作业的运行效率。
 

常规性能调优三:并行度调节

Spark作业中的并行度指各个stage的task的数量。

理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark作业的性能和运行速度。

Spark官方推荐,task数量应该设置为Spark作业总CPU core数量的2~3倍

常规性能调优四:  广播变量

当算子函数中使用到外部变量时,需要用到广播变量; 广播变量会保证每个Executor的内存中,只驻留1份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,减少网络传输性能开销,减少对Executor内存的占用的开销,提高spark的效率;

常规性能调优五:Kryo序列化

默认情况下,Spark使用Java的序列化机制 , Java序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。

Kryo序列化机制比Java序列化机制性能提高10倍左右,Spark之所以没有默认使用Kryo作为序列化类库,是因为它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便,但从Spark 2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs 已经默认使用Kryo序列化方式了。

2 算子调优

算子调优一:调节mapPartitions

普通的map算子对RDD中的每一个元素进行操作,而mapPartitions算子对RDD中每一个分区进行操作。但是如果使用mapPartitions算子,但数据量非常大时,function一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会OOM,即内存溢出。

因此,mapPartitions算子适用于数据量不是特别大的时候,此时使用mapPartitions算子对性能的提升效果还是不错的。(当数据量很大的时候,一旦使用mapPartitions算子,就会直接OOM);

算子调优二:foreachPartition优化数据库操作

如果使用foreach算子完成数据库的操作,由于foreach算子是遍历RDD的每条数据,因此,每条数据都会建立一个数据库连接,这是对资源的极大浪费; 在生产环境中,通常使用foreachPartition算子来完成对1个数据库的写入,通过foreachPartition算子,可以优化写数据库的性能。

算子调优三:filter与coalesce的配合使用

在Spark任务中我们经常会使用filter算子完成RDD中数据的过滤,但是一旦进过filter过滤后,每个分区的数据量有可能会存在较大差异,如图下图所示:

spark调优,spark,spark,jvm,java

为了解决filter过滤后每个分区数据量不均匀的情况,我们可以使用coalesce进行重分区操作.

算子调优四:repartition解决SparkSQL低并行度问题

为了解决Spark SQL无法手动设置并行度和task数量的问题, 我们可以使用repartition算子,以提高Spark SQL并行度和task的数量;

算子调优五:reduceByKey本地聚合

使用reduceByKey对性能的提升如下:

(1) 本地聚合后,在map端的数据量变少,减少了磁盘IO,也减少了对磁盘空间的占用;

(2) 本地聚合后,下一个stage拉取的数据量变少,减少了网络传输的数据量;

(3) 本地聚合后,在reduce端进行数据缓存的内存占用减少;

(4) 本地聚合后,在reduce端进行聚合的数据量减少。

3 Shuffle调优

spark调优,spark,spark,jvm,java

Shuffle调优一:调节map端缓冲区大小

在Spark任务运行过程中,如果shuffle的map端处理的数据量比较大,但是map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁溢写到磁盘文件中的情况,使得性能非常低下,通过调节map端缓冲的大小,可以避免频繁的磁盘IO操作,进而提升Spark任务的整体性能

map端缓冲的默认配置是32KB,如果每个task处理640KB的数据,那么会发生640/32 = 20次溢写,如果每个task处理64000KB的数据,机会发生64000/32=2000此溢写,这对于性能的影响是非常严重的。32kb[按2倍左右调]====>64KB;

Shuffle调优二:调节reduce端拉取数据缓冲区大小

Spark Shuffle过程中,shuffle reduce task的buffer缓冲区大小决定了reduce task每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。

reduce端数据拉取缓冲区的大小可以通过spark.reducer.maxSizeInFlight参数进行设置,默认为48MB,可优化调为96M ;

Shuffle调优三:调节reduce端拉取数据重试次数

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如6次),以避免由于网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

reduce端拉取数据重试次数可以通过spark.shuffle.io.maxRetries参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为3次.

Shuffle调优四:调节reduce端拉取数据等待间隔

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如60s),以增加shuffle操作的稳定性。

reduce端拉取数据等待间隔可以通过spark.shuffle.io.retryWait参数进行设置,默认值为5s.

Shuffle调优五:调节SortShuffle排序操作阈值

对于SortShuffleManager,如果shuffle reduce task的数量小于某一阈值则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量,那么此时map-side就不会进行排序了,减少了排序的性能开销,但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

SortShuffleManager排序阈值的设置可以通过spark.shuffle.sort. bypassMergeThreshold这一参数进行设置,默认值为200;

4 JVM调优

JVM调优一:降低cache操作的内存占比

①静态内存管理机制 [老版本--->现在已经不需要手动调节]

根据Spark静态内存管理机制,堆内存被划分为了两块,Storage和Execution。Storage主要用于缓存RDD数据和broadcast数据,Execution主要用于缓存在shuffle过程中产生的中间数据,Storage占系统内存的60%,Execution占系统内存的20%,并且两者完全独立。

在Spark UI中可以查看每个stage的运行情况,包括每个task的运行时间、gc时间等等,如果发现gc太频繁,时间太长,就可以考虑调节Storage的内存占比,让task执行算子函数式,有更多的内存可以使用。

②统一内存管理机制 [动态内存管理1.6版本开始引进--->2016.1月推出]

根据Spark统一内存管理机制,堆内存被划分为了两块,Storage和Execution。Storage主要用于缓存数据RDD数据和broadcast数据,Execution主要用于缓存在shuffle过程中产生的中间数据,两者所组成的内存部分称为统一内存,Storage和Execution各占统一内存的50%,由于动态占用机制的实现,shuffle过程需要的内存过大时,会自动占用Storage的内存区域,因此无需手动进行调节。

JVM调优二:调节Executor堆外内存 

Executor的堆外内存主要用于程序的共享库、Perm Space、 线程Stack和一些Memory mapping等, 或者类C方式allocate object。
在运行Spark任务的时候,也打开堆外内存,为堆内内存分担些压力;堆外但是堆外内存打开时默认只取内存的10%大小 ,对于任务来说不够用,要把堆外内存提高至少1G以上。

以上参数配置完成后,会避免掉某些JVM OOM的异常问题,同时,可以提升整体Spark作业的性能。

JVM调优三:调节连接等待时长

垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark的Executor进程就会停止工作。此时,由于没有响应,无法建立网络连接,会导致网络连接超时。

解决: 连接等待时长可以在spark-submit脚本中进行设置。

5 Spark数据倾斜

Spark中的数据倾斜问题主要指shuffle过程中出现的数据倾斜问题,是由于不同的key对应的数据量不同, 导致的不同task所处理的数据量不同的问题

例如,reduce点一共要处理100万条数据,第一个和第二个task分别被分配到了1万条数据,计算5分钟内完成,第三个task分配到了98万数据,此时第三个task可能需要10个小时完成,这使得整个Spark作业需要10个小时才能运行完成,这就是数据倾斜所带来的后果。

注意,要区分开数据倾斜与数据量过量这两种情况,数据倾斜是指少数task被分配了绝大多数的数据,因此少数task运行缓慢;数据过量是指所有task被分配的数据量都很大,相差不多,所有task都运行缓慢。

数据倾斜的表现

① Spark作业的大部分task都执行迅速,只有有限的几个task执行的非常慢,此时可能出现了数据倾斜,作业可以运行,但是运行得非常慢;

② Spark作业的大部分task都执行迅速,但是有的task在运行过程中会突然报出OOM,反复执行几次都在某一个task报出OOM错误,此时可能出现了数据倾斜,作业无法正常运行。

定位数据倾斜问题

① 查阅代码中的shuffle算子,例如reduceByKey、countByKey、groupByKey、join等算子,根据代码逻辑判断此处是否会出现数据倾斜;

② 查看Spark作业的log文件,log文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个stage,对应的shuffle算子是哪一个

解决方案:

解决方案一: 避免数据源的数据倾斜 
   通过对Hive表中对倾斜的数据进行预处理,以及在进行kafka数据分发时尽量进行平均分配 , 这种方案从根源上解决了数据倾斜问题,实现起来方便;

解决方案二: 避免shuffle过程
   为了避免数据倾斜,我们可以考虑避免shuffle过程,如果避免了shuffle过程,那么从根本上就消除了发生数据倾斜问题的可能;

解决方案三:过滤导致倾斜的key
   如果在Spark作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的key对应的数据进行过滤,这样,在Spark作业中就不会发生数据倾斜了;

解决方案四:提高shuffle操作中的reduce并行度
   提高shuffle过程中的reduce端并行度, 就增加了reduce端task的数量,那么每个task分配到的数据量就会相应减少,由此缓解数据倾斜问题。文章来源地址https://www.toymoban.com/news/detail-593271.html

到了这里,关于第三部分:Spark调优篇的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark九:Spark调优之Shuffle调优

    map端和reduce端缓存大小设置,reduce端重试次数和等待时间间隔,以及bypass设置 学习资料:https://mp.weixin.qq.com/s/caCk3mM5iXy0FaXCLkDwYQ 在Spark任务运行过程中,如果shuffle的map端处理的数据量比较大,但是map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁spill溢写到磁盘文件中

    2024年01月20日
    浏览(40)
  • Spark(30):Spark性能调优之常规性能调优

    目录 0. 相关文章链接 1. 最优资源配置 2. RDD优化 2.1. RDD复用 2.2. RDD持久化 2.3. RDD尽可能早的 filter 操作 3. 并行度调节 4. 广播大变量 5. Kryo序列化 6. 调节本地化等待时长  Spark文章汇总          Spark 性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增

    2024年02月16日
    浏览(60)
  • Spark on Yarn 最佳运行参数调优-计算方式_spark on yarn 调优 nodemanager

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新软件测试全套学习资料》

    2024年04月26日
    浏览(44)
  • Spark调优解析-GC调优3(七)

    Spark立足内存计算,常常需要在内存中存放大量数据,因此也更依赖JVM的垃圾回收机制。与此同时,它也兼容批处理和流式处理,对于程序吞吐量和延迟都有较高要求,因此GC参数的调优在Spark应用实践中显得尤为重要。 按照经验来说,当我们配置垃圾收集器时,主要有两种策

    2024年02月19日
    浏览(38)
  • Spark参数配置和调优,Spark-SQL、Config

    一、Hive-SQL / Spark-SQL参数配置和调优 二、shell脚本spark-submit参数配置 三、sparkSession中配置参数

    2024年02月13日
    浏览(47)
  • Spark 参数调优

    目录 Spark 调优 一、代码规范 1.1 避免创建重复 RDD 1.2 尽量复用同一个 RDD 1.3 多次使用的 RDD 要持久化 1.4 使用高性能算子 1.5 好习惯 二、参数调优 资源参数 1.1 --num-executors 100 1.2 --executor-memory 5g 1.3 --executor-cores 4 1.4 --driver-memory 内存参数 spark.storage.memoryFraction、spark.shuffle.memor

    2024年04月11日
    浏览(30)
  • Spark读取JDBC调优

    实际问题:工作中需要读取一个存放了三四年历史数据的pg数仓表(缺少主键id),需要将数据同步到阿里云 MC 中,Spark在使用JDBC读取关系型数据库时,默认只开启一个task去执行,性能低下,因此需要通过设置一些参数来提高并发度。一定要充分理解参数的含义,否则可能会

    2023年04月09日
    浏览(34)
  • Spark:性能调优实战

    链接: 文字文档 极客链接 一、资源申请并行度 一个Executor中同时可以执行的task数目(在Executor内存不变的情况下,executor-cores数越大,平均下来一个task可以使用的内存就越少) Executor Java进程的堆内存大小,即Executor Java进程的Xmx值 Executor Java进程的off-heap内存,包括JVM over

    2024年04月16日
    浏览(45)
  • Spark SQL调优实战

    1、 新添参数说明 // D river 和Executor内存和CPU资源相关配置 -- 是否开启 executor 动态分配 , 开启时 spark.executor.instances 不生效 spark.dynamicAllocation.enabled= false --配置Driver内存 spark.dirver.memory=5g --driver最大结果大小,设置为0代表不限制,driver在拉取结果时,如果结果超过阈值会报异

    2024年02月21日
    浏览(31)
  • spark sql 的join调优

    spark sql中join操作是最耗费性能的操作,因为这涉及到数据的shuffle操作,如果由此导致数据倾斜更是会雪上加霜,那么如何优化join操作的性能呢? 方式一 broadcast广播: 如果是大表和小表的join操作,最简单的解决方式就是对小表进行broadcast操作,把小表的数据广播到各个ex

    2024年02月21日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包