spark优化总结:
一、spark 代码优
六大代码优化: 避免创建重复的RDD 尽可能复用同一个RDD 对多次使用的RDD进行持久化 尽量避免使用shuffle类算子 使用map-side预聚合的shuffle操作 使用高性能的算子 广播大变量 使用Kryo优化序列化性能 优化数据结构 使用高性能的库fastutil
1. 对多次使用的RDD进行持久化
同常内存够的时候建议使用:MEMORY_ONLY
如果内存不够的时候使用通常建议使用:MEMORY_AND_DISK_SER策略,而不是 MEMORY_AND_DISK策略。
如何选择一种最合适的持久化策略 1 默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大, 可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避 免了这部分的性能开销;对这个RDD的后续算子操作, 都是基于纯内存中的数据的操作 ,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传 送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种 策略的场景还是有限的, 如果RDD中数据比较多时(比如几十亿),直接用这种持久化 级别,会导致JVM的OOM内存溢出异常。 如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用 MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个 partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。 这种级别 比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算 子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上, 如果RDD中的数据量过多的话, 还是可能会导致OOM内存溢出的异常。
如何选择一种最合适的持久化策略 2 如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是 MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无 法完全放下。序列化后的数据比较少, 可以节省内存和磁盘的空间开销。同时该策略会优 先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。 通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写 ,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将 所有数据都复制一份副本,并发送到其他节点上, 数据复制以及网络传输会导致较大的性 能开销,除非是要求作业的高可用性,否则不建议使用。
2. 使用高性能的算子
使用reduceByKey/aggregateByKey替代groupByKey 使用mapPartitions替代普通map Transformation算子 使用foreachPartitions替代foreach Action算子 使用filter之后进行coalesce操作 使用repartitionAndSortWithinPartitions替代repartition与sort类操作代码 repartition:coalesce(numPartitions,true) 增多分区使用这个 coalesce(numPartitions,false) 减少分区 没有shuffle只是合并 partition
3. 广播大变量
1. 开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如 100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提 升性能 2. 函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络 传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如 100M,甚至1G), 那么大量的变量副本在网络中传输的性能开销,以及在各个节 点的Executor中占用过多内存导致的频繁GC(垃圾回收),都会极大地影响性能 3. 如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播 后的变量,会保证每个Executor的内存中,只驻留一份变量副本, 而Executor中的 task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本 的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低 GC的频率 4. 广播大变量发送方式:Executor一开始并没有广播变量,而是task运行需要用到广 播变量,会找executor的blockManager要,bloackManager找Driver里面的 blockManagerMaster要。
4. 使用Kryo优化序列化性能
在Spark中,主要有三个地方涉及到了序列化: 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,SXT是自定义类型),所有自 定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现 Serializable接口。 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个 partition都序列化成一个大的字节数组。
Kryo序列化器介绍: Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快 ,序列化后的数据要更小,大概是Java序列化机制的1/10。所以Kryo序列化优化以后,可 以让网络传输的数据变少;在集群中耗费的内存资源大大减少。 对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和 反序列化的性能。Spark默认使用的是Java的序列化机制,也就是 ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同 时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。 官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有 使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类 型,因此对于开发者来说,这种方式比较麻烦
5. 优化数据结构
Java中,有三种类型比较耗费内存: 对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。 集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来 封装集合元素,比如Map.Entry。 因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽 量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如 Int、Long)替代字符串, 使用数组替代集合类型,这样尽可能地减少内存占用 ,从而降低GC频率,提升性能。
6. 使用高性能的库fastutil
fastutil介绍: fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、 HashSet)的类库,提供了特殊类型的map、set、list和queue; fastutil能够提供更小的内存占用,更快的存取速度;我们使用fastutil提供的集合类,来 替代自己平时使用的JDK的原生的Map、List、Set,好处在于, fastutil集合类,可以减 小内存的占用,并且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素 的值的时候,提供更快的存取速度; fastutil最新版本要求Java 7以及以上版本; fastutil的每一种集合类型,都实现了对应的Java中的标准接口(比如fastutil的map,实 现了Java的Map接口),因此可以直接放入已有系统的任何代码中。 fastutil的每一种集合类型,都实现了对应的Java中的标准接口(比如fastutil的 map,实现了Java的Map接口),因此可以直接放入已有系统的任何代码中。 使用? IDEA中导入依赖
<!-- https://mvnrepository.com/artifact/fastutil/fastutil -->
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>
二、spark 参数调优
--num-executors executor的数量
--executor-memory 每一个executor的内存
--executor-cores 每一个executor的核心数
--driver-memory Driver的内存1G-2G(保存广播变量)
--spark.storage.memoryFraction 用于缓存的内存占比默认时0.6,如果代码中没有用到缓存 可以将内存分配给shuffle
--spark.shuffle.memoryFraction 用户shuffle的内存占比默认0.2
总的内存=num-executors*executor-memory
总的核数=num-executors*executor-cores
spark on yarn 资源设置标准
1、单个任务总的内存和总的核数一般做多在yarn总资源的1/3到1/2之间
比如公司集群有10太服务器
单台服务器内存是128G,核数是40
yarn总的内存=10*128G=1280G*0.8=960G 需要预留一般分内存给系统进程
yarn总的核数=40*10=400
提交单个spark任务资源上线
总的内存=960G *(1/3| 1/2) = 300G-500G
总的核数=400 * (1/3| 1/2) = 120 - 200
2、在上线内再按照需要处理的数据量来合理指定资源 -- 最理想的情况是一个task对应一个core
2.1、数据量比较小 - 10G
10G = 80个block = rdd80分区 = 80个task
- 最理想资源指定 -- 剩余资源充足
--num-executors=40
--executor-memory=4G
--executor-cores=2
- 资源里面最优的方式 -- 剩余资源不是很充足时
--num-executors=20
--executor-memory=4G
--executor-cores=2
2.2、数据量比较大时 - 80G
80G = 640block = 640分区 = 640task
- 最理想资源指定 -- 剩余资源充足, 如果剩余资源不够,还需要减少指定的资源
--num-executors=100
--executor-memory=4G
--executor-cores=2
-- spark.locality.wait: spark task 再executor中执行前的等待时间 默认3秒
spark.yarn.executor.memoryOverhead : 堆外内存 默认等于堆内存的10%
spark.network.timeout spark网络链接的超时时间 默认120s
提高数据本地化优先级别
附录:参数调优详解
1参数调优 1.1num-executors 参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。 这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。 参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。 1.2executor-memory 参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。 参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少, num-executors乘以executor-memory,就代表了你的Spark作业申请到的总内存量(也就是所有Executor进程的内存总和),这个量是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列, 那么申请的总内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。 1.3executor-cores 可以用total-executor-cores总的核数 executor-cores = total-executor-cores / num-executors 参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多, 越能够快速地执行完分配给自己的所有task线程。 参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量, 来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。 1.4driver-memory 参数说明:该参数用于设置Driver进程的内存。 参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大, 否则会出现OOM内存溢出的问题。 1.5spark.default.parallelism 参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。 参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量, 默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个, 内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适, 比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。 1.6spark.storage.memoryFraction 参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时, 可能数据就不会持久化,或者数据会写入磁盘。 参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。 但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时), 意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。 1.7spark.shuffle.memoryFraction 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。 shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。 参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上, 降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。 资源参数的调优,没有一个固定的值,需要同学们根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议, 合理地设置上述参数。
spark任务提交参数设置模板(企业中)
spark-submit
--master yarn-cluster
--num-executors = 50
--executor-memory = 4G
--executor-cores = 2
--driver-memory = 2G
--conf spark.storage.memoryFraction=0.4
--conf spark.shuffle.memoryFraction=0.4
--conf spark.locality.wait=10s
--conf spark.shuffle.file.buffer=64kb
--conf spark.yarn.executor.memoryOverhead=1024
--conf spark.network.timeout=200s
三、spark 数据倾斜
1、使用Hive ETL预处理数据
方案适用场景:如果导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个 key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表 执行某个分析操作, 那么比较适合使用这种技术方案。 方案实现思路:此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对 数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是 原来的Hive表了, 而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么 在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。 方案实现原理:这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子 ,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。 因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作 时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中, 避免Spark程序发生数据倾斜而已。
2、过滤少数导致倾斜的key
方案适用场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很 适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导 致了数据倾斜。 方案实现思路:如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别 重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中 对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时, 动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后 计算出每个key的数量,取数据量最多的key过滤掉即可。 方案实现原理:将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生 数据倾斜。
3、提高shuffle操作的并行度
方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如 reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于 Spark SQL中的shuffle类语句, 比如group by、join等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很 多场景来说都有点过小。 方案实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个 task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个key,每个key对应10条 数据, 这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时 间都会变短了。具体原理如下图所示。
4、双重聚合join
方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by 语句进行分组聚合时,比较适用这种方案。 方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key 都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了, 比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着 对打上随机数后的数据,执行reduceByKey等聚合操作, 进行局部聚合,那么局部聚合结果,就会 变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次 进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。 方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被 一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。 接着去除掉随机前缀, 再次进行全局聚合,就可以得到最终的结果
5、将reduce join转为map join
方案适用场景:在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中 的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。 方案实现思路:不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作, 进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。 将较小RDD中的数据直接通过 collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD 执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据, 与当前RDD的每 一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式 连接起来。 方案实现原理:普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉 取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的, 则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不 会发生shuffle操作,也就不会发生数据倾斜
6、采样倾斜key并分拆join操作
方案适用场景:两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五 ”,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜, 是因为其中某一 个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均 匀,那么采用这个解决方案是比较合适的。 方案实现思路: 对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个 key的数量,计算出来数据量最大的是哪几个key。 然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以 内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。 接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数 据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个 RDD。 再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打 散成n份,分散到多个task中去进行join了。 而另外两个普通的RDD就照常join即可。 最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。
7、使用随机前缀和扩容RDD进行join
方案适用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没 什么意义,此时就只能使用最后一种方案来解决问题了。 方案实现思路: 该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成 数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。 然后将该RDD的每条数据都打上一个n以内的随机前缀。 同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一 个0~n的前缀。 最后将两个处理后的RDD进行join即可。 方案实现原理:将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的 “不同key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与“解决方 案六”的不同之处就在于,上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处 理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大 量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对 内存资源要求很高。
一、常规性能调优
1. 最优资源配置
Spark性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。资源的分配在使用脚本提交Spark任务时进行指定,标准的Spark任务提交脚本如下所示:
bin/spark-submit \
--class com.fancyry.spark.Analysis \
--master yarn--deploy-mode cluster
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
/usr/opt/modules/spark/jar/spark.jar \
可以进行分配的资源如表所示:
名称 说明
–num-executors 配置Executor的数量
–driver-memory 配置Driver内存(影响不大)
–executor-memory 配置每个Executor的内存大小
–executor-cores 配置每个Executor的CPU core数量
调节原则:尽量将任务分配的资源调节到可以使用的资源的最大限度。对于具体资源的分配,我们分别讨论Spark的两种Cluster运行模式:
➢第一种是SparkStandalone模式,你在提交任务前,一定知道或者可以从运维部门获取到你可以使用的资源情况,在编写submit脚本的时候,就根据可用的资源情况进行资源的分配,比如说集群有15台机器,每台机器为8G内存,2个CPU core,那么就指定15个Executor,每个Executor分配8G内存,2个CPUcore。
➢第二种是SparkYarn模式,由于Yarn使用资源队列进行资源的分配和调度,在编写submit脚本的时候,就根据Spark作业要提交到的资源队列,进行资源的分配,比如资源队列有400G内存,100个CPU core,那么指定50个Executor,每个Executor分配8G内存,2个CPU core。
对各项资源进行了调节后,得到的性能提升会有如下表现:
名称 解析
增加Executor·个数 在资源允许的情况下,增加Executor的个数可以提高执行task的并行度。比如有4个Executor,每个Executor有2个CPU core,那么可以并行执行8个task,如果将Executor的个数增加到8个(资源允许的情况下),那么可以并行执行16个task,此时的并行能力提升了一倍。
增加每个Executor的CPU core个数 在资源允许的情况下,增加每个Executor的Cpu core个数,可以提高执行task的并行度。比如有4个Executor,每个Executor有2个CPU core,那么可以并行执行8个task,如果将每个Executor的CPU core个数增加到4个 (资源允许的情况下),那么可以并行执行16个task,此时的并行能力提升了一倍。
增加每个Executor的内存量 在资源允许的情况下,增加每个Executor的内存量以后,对性能的提升有三点:1.可以缓存更多的数据(即对RDD进行cache),写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘IO;2.可以为shuffle操作提供更多内存,即有更多空间来存放reduce端拉取的数据,写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘IO;3.可以为task的执行提供更多内存,在task的执行过程中可能创建很多对象,内存较小时会引发频繁的GC,增加内存后,可以避免频繁的GC,提升整体性能。
补充:生产环境 Spark submit 脚本配置
bin/spark-submit \
--class com.fancy.spark.WordCount \
--master yarn \
--deploy-modecluster \
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
--queue root.default \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.core.connection.ack.wait.timeout=300 \/usr/local/spark/spark.jar
参数配置参考值:
2. RDD优化
A、RDD复用
在对 RDD 进行算子时,要避免相同的算子和计算逻辑之下对 RDD 进行重复的计算
对上图中的RDD计算架构进行修改,得到如下图所示的优化结果:
B、RDD持久化
在Spark中,当多次对同一个RDD执行算子操作时,每一次都会对这个RDD以之前的父RDD重新计算一次,这种情况是必须要避免的,对同一个RDD的重复计算是对资源的极大浪费,因此,必须对多次使用的RDD进行持久化,通过持久化将公共RDD的数据缓存到内存/磁盘中,之后对于公共RDD的计算都会从内存/磁盘中直接获取RDD数据。对于RDD的持久化,有两点需要说明:
➢RDD的持久化是可以进行序列化的,当内存无法将RDD的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中。
➢如果对于数据的可靠性要求很高,并且内存充足,可以使用副本机制,对RDD数据进行持久化。当持久化启用了复本机制时,对于持久化的每个数据单元都存储一个副本,放在其他节点上面,由此实现数据的容错,一旦一个副本数据丢失,不需要重新计算,还可以使用另外一个副本。
C、RDD尽可能早的filter操作
获取到初始RDD后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升Spark作业的运行效率。
3. 并行度调节
Spark作业中的并行度指各个stage的task的数量。如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费,例如,20个Executor,每个Executor分配3个CPU core,而Spark作业有40个task,这样每个Executor分配到的task个数是2个,这就使得每个Executor有一个CPU core空闲,导致资源的浪费。
理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark作业的性能和运行速度。
Spark官方推荐,task数量应该设置为Spark作业总CPU core数量的2~3倍。之所以没有推荐task数量与CPU core总数相等,是因为task的执行时间不同,有的task执行速度快而有的task执行速度慢,如果task数量与CPU core总数相等,那么执行快的task执行完成后,会出现CPU core空闲的情况。如果task数量设置为CPU core总数的2~3倍,那么一个task执行完毕后,CPU core会立刻执行下一个task,降低了资源的浪费,同时提升了Spark作业运行的效率。
Spark作业并行度的设置如下所示:
valconf = new SparkConf().set("spark.default.parallelism", "500")
1
4. 广播大变量
默认情况下,task中的算子中如果使用了外部的变量,每个task都会获取一份变量的复本,这就造成了内存的极大消耗。
一方面,如果后续对RDD进行持久化,可能就无法将RDD数据存入内存,只能写入磁盘,磁盘IO将会严重消耗性能;另一方面,task在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的GC,GC会导致工作线程停止,进而导致Spark暂停工作一段时间,严重影响Spark性能。
假设当前任务配置了20个Executor,指定500个task,有一个20M的变量被所有task共用,此时会在500个task中产生500个副本,耗费集群10G的内存,如果使用了广播变量,那么每个Executor保存一个副本,一共消耗400M内存,内存消耗减少了5倍。广播变量在每个Executor保存一个副本,此Executor的所有task共用此广播变量,这让变量产生的副本数量大大减少。
在初始阶段,广播变量只在Driver中有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager中尝试获取变量,如果本地没有,BlockManager就会从Driver或者其他节点的BlockManager上远程拉取变量的复本,并由本地的BlockManager进行管理;之后此Executor的所有task都会直接从本地的BlockManager中获取变量。
5. Kryo序列化
默认情况下,Spark使用Java的序列化机制。Java的序列化机制使用方便,不需要额外的配置,在算子中使用的变量实现Serializable接口即可,但是,Java序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。
Kryo 序列化机制比 Java 序列化机制性能提高 10 倍左右,Spark 之所以没有默认使用 Kryo 作为序列化类库,是因为它不支持所有对象的序列化,同时 Kryo 需要用户在使用前注册需要序列化的类型,不够方便,但从 Spark 2.0.0 版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs 已经默认使用Kryo序列化方式了。
public class MyKryoRegistrator implements KryoRegistrator {
@Overridepublic void registerClasses(Kryo kryo) {
kryo.register(StartupReportLogs.class);
}
}
配置Kryo序列化方式的实例代码:
//创建SparkConf对象
val conf = new SparkConf().setMaster(...).setAppName(...)
//使用Kryo序列化库,如果要使用Java序列化库,需要把该行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//在Kryo序列化库中注册自定义的类集合,如果要使用Java序列化库,需要把该行屏蔽掉
conf.set("spark.kryo.registrator", "fancy.com.MyKryoRegistrator");
6. 调节本地化等待时长
Spark 作业运行过程中,Driver 会对每一个 stage 的 task 进行分配。根据 Spark 的 task 分配算法,Spark 希望 task 能够运行在它要计算的数据算在的节点 (数据本地化思想),这样就可以避免数据的网络传输。通常来说,task可能不会被分配到它处理的数据所在的节点,因为这些节点可用的资源可能已经用尽,此时,Spark会等待一段时间,默认3s,如果等待指定时间后仍然无法在指定节点运行,那么会自动降级,尝试将task分配到比较差的本地化级别所对应的节点上,比如将task分配到离它要计算的数据比较近的一个节点,然后进行计算,如果当前级别仍然不行,那么继续降级。
当 task 要处理的数据不在 task 所在节点上时,会发生数据的传输。task 会通过所在节点的 BlockManager 获取数据,BlockManager 发现数据不在本地时,户通过网络传输组件从数据所在节点的BlockManager 处获取数据。网络传输数据的情况是我们不愿意看到的,大量的网络传输会严重影响性能,因此,我们希望通过调节本地化等待时长,如果在等待时长这段时间内,目标节点处理完成了一部分 task,那么当前的 task 将有机会得到执行,这样就能够改善 Spark 作业的整体性能。Spark 的本地化等级如表所示
名称 解析
PROCESS_LOCAL 进程本地化,task和数据在同一个Executor中,性能最好。
NODE_LOCAL 节点本地化,task和数据在同一个节点中,但是task和数据不在同一个Executor中,数据需要在进程间进行传输。
RACK_LOCAL 机架本地化,task和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输。
NO_PREF 对于task来说,从哪里获取都一样,没有好坏之分。
ANY task和数据可以在集群的任何地方,而且不在一个机架中,性能最差。
在Spark项目开发阶段,可以使用client模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的task数据本地化的级别,如果大部分都是 PROCESS_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是 NODE_LOCAL、ANY,那么需要对本地化的等待时长进行调节,通过延长本地化等待时长,看看 task 的本地化级别有没有提升,并观察 Spark 作业的运行时间有没有缩短。注意,过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得 Spark作业的运行时间反而增加了。Spark本地化等待时长的设置如代码所示:
val conf = new SparkConf().set("spark.locality.wait", "6")
1
二、算子调优
1. mapPartitions
普通的map算子对RDD中的每一个元素进行操作,而mapPartitions算子对RDD中每一个分区进行操作。如果是普通的map算子,假设一个partition有1万条数据,那么map算子中的function要执行1万次,也就是对每个元素进行操作。
如果是 mapPartition 算子,由于一个 task 处理一个 RDD 的 partition,那么一个task只会执行一次function,function 一次接收所有的 partition 数据,效率比较高。
比如,当要把RDD中的所有数据通过JDBC写入数据,如果使用map算子,那么需要对RDD中的每一个元素都创建一个数据库连接,这样对资源的消耗很大,如果使用mapPartitions算子,那么针对一个分区的数据,只需要建立一个数据库连接。
mapPartitions算子也存在一些缺点:对于普通的map操作,一次处理一条数据,如果在处理了2000条数据后内存不足,那么可以将已经处理完的2000条数据从内存中垃圾回收掉;但是如果使用mapPartitions算子,但数据量非常大时,function一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会OOM,即内存溢出。
因此,mapPartitions算子适用于数据量不是特别大的时候,此时使用mapPartitions算子对性能的提升效果还是不错的。(当数据量很大的时候,一旦使用mapPartitions算子,就会直接OOM)在项目中,应该首先估算一下RDD的数据量、每个partition的数据量,以及分配给每个Executor的内存资源,如果资源允许,可以考虑使用mapPartitions算子代替map。
2. foreachPartition 优化数据库操作
在生产环境中,通常使用 foreachPartition 算子来完成数据库的写入,通过 foreachPartition 算子的特性,可以优化写数据库的性能。
如果使用 foreach 算子完成数据库的操作,由于 foreach 算子是遍历RDD的每条数据,因此,每条数据都会建立一个数据库连接,这是对资源的极大浪费,因此,对于写数据库操作,我们应当使用 foreachPartition 算子。
与 mapPartitions 算子非常相似,foreachPartition 是将RDD 的每个分区作为遍历对象,一次处理一个分区的数据,也就是说,如果涉及数据库的相关操作,一个分区的数据只需要创建一次数据库连接,如图所示:
使用了foreachPartition算子后,可以获得以下的性能提升:
➢对于我们写的function函数,一次处理一整个分区的数据;
➢对于一个分区内的数据,创建唯一的数据库连接;
➢只需要向数据库发送一次SQL语句和多组参数;
在生产环境中,全部都会使用foreachPartition算子完成数据库操作。foreachPartition算子存在一个问题,与mapPartitions算子类似,如果一个分区的数据量特别大,可能会造成OOM,即内存溢出。
3. filter 与 coalesce 的配合使用
在Spark任务中我们经常会使用filter算子完成RDD中数据的过滤,在任务初始阶段,从各个分区中加载到的数据量是相近的,但是一旦进过filter过滤后,每个分区的数据量有可能会存在较大差异,如图所示:
根据图中信息我们可以发现两个问题:
➢ 每个partition的数据量变小了,如果还按照之前与partition相等的task个数去处理当前数据,有点浪费task的计算资源;
➢ 每个partition的数据量不一样,会导致后面的每个task处理每个partition数据的时候,每个task要处理的数据量不同,这很有可能导致数据倾斜问题。
如上图所示,第二个分区的数据过滤后只剩100条,而第三个分区的数据过滤后剩下800条,在相同的处理逻辑下,第二个分区对应的task处理的数据量与第三个分区对应的task处理的数据量差距达到了8倍,这也会导致运行速度可能存在数倍的差距,这也就是数据倾斜问题。
针对上述的两个问题,我们分别进行分析:
➢针对第一个问题,既然分区的数据量变小了,我们希望可以对分区数据进行重新分配,比如将原来4个分区的数据转化到2个分区中,这样只需要用后面的两个task进行处理即可,避免了资源的浪费。
➢针对第二个问题,解决方法和第一个问题的解决方法非常相似,对分区数据重新分配,让每个partition中的数据量差不多,这就避免了数据倾斜问题。
那么具体应该如何实现上面的解决思路?我们需要 coalesce 算子。repartition 与 coalesce 都可以用来进行重分区,其中 repartition 只是 coalesce 接口中 shuffle为 true 的简易实现,coalesce 默认情况下不进行 shuffle,但是可以通过参数进行设置。假设我们希望将原本的分区个数 A 通过重新分区变为 B,那么有以下几种情况:
➢A > B(多数分区合并为少数分区)
A、A与B相差值不大此时使用coalesce即可,无需shuffle过程。
B、A与B相差值很大
此时可以使用coalesce并且不启用shuffle过程,但是会导致合并过程性能低下,所以推荐设置coalesce的第二个参数为true,即启动shuffle过程。
➢A < B(少数分区分解为多数分区)
此时使用 repartition 即可,如果使用 coalesce 需要将 shuffle 设置为 true,否则 coalesce 无效。我们可以在filter 操作之后,使用coalesce算子针对每个partition的数据量各不相同的情况,压缩partition的数量,而且让每个partition的数据量尽量均匀紧凑,以便于后面的task进行计算操作,在某种程度上能够在一定程度上提升性能。
注意:local模式是进程内模拟集群运行,已经对并行度和分区数量有了一定的内部优化,因此不用去设置并行度和分区数量。
4. repartition解决SparkSQL低并行度问题
在第一节的常规性能调优中我们讲解了并行度的调节策略,但是,并行度的设置对于 Spark SQL 是不生效的,用户设置的并行度只对于 Spark SQL 以外的所有 Spark 的 stage 生效。
Spark SQL 的并行度不允许用户自己指定,Spark SQL自己会默认根据 hive 表对应的 HDFS 文件的 split 个数自动设置 Spark SQL 所在的那个 stage 的并行度,用户自己通 spark.default.parallelism 参数指定的并行度,只会在没 Spark SQL的stage中生效。
由于Spark SQL所在stage的并行度无法手动设置,如果数据量较大,并且此stage中后续的transformation操作有着复杂的业务逻辑,而Spark SQL自动设置的task数量很少,这就意味着每个task要处理为数不少的数据量,然后还要执行非常复杂的处理逻辑,这就可能表现为第一个有Spark SQL的 stage 速度很慢,而后续的没有 Spark SQL 的 stage 运行速度非常快。
为了解决Spark SQL无法设置并行度和task数量的问题,我们可以使用repartition算子。
Spark SQL这一步的并行度和task数量肯定是没有办法去改变了,但是,对于Spark SQL查询出来的RDD,立即使用repartition算子,去重新进行分区,这样可以重新分区为多个partition,从repartition之后的RDD操作,由于不再设计Spark SQL,因此stage的并行度就会等于你手动设置的值,这样就避免了Spark SQL所在的stage只能用少量的task去处理大量数据并执行复杂的算法逻辑。
5. reduceByKey 预聚合
reduceByKey相较于普通的shuffle操作一个显著的特点就是会进行map端的本地聚合,map端会先对本地的数据进行combine操作,然后将数据写入给下个stage的每个task创建的文件中,也就是在map端,对每一个key对应的value,执行reduceByKey算子函数。reduceByKey算子的执行过程如图所示:
使用reduceByKey对性能的提升如下:
➢本地聚合后,在map端的数据量变少,减少了磁盘IO,也减少了对磁盘空间的占用;
➢本地聚合后,下一个stage拉取的数据量变少,减少了网络传输的数据量;
➢本地聚合后,在reduce端进行数据缓存的内存占用减少;
➢本地聚合后,在reduce端进行聚合的数据量减少。
基于reduceByKey的本地聚合特征,我们应该考虑使用reduceByKey代替其他的shuffle算子,例如groupByKey。reduceByKey与groupByKey的运行原理如图所示:
根据上图可知,groupByKey 不会进行 map 端的聚合,而是将所有 map 端的数据 shuffle 到 reduce 端,然后在 reduce 端进行数据的聚合操作。由于 reduceByKey 有 map 端聚合的特性,使得网络传输的数据量减小,因此效率要明显高于 groupByKey。
三、Shuffle调优
1. 调节map端缓冲区大小
在 Spark 任务运行过程中,如果 shuffle 的 map 端处理的数据量比较大,但是 map 端缓冲的大小是固定的,可能会出现 map 端缓冲数据频繁 spill 溢写到磁盘文件中的情况,使得性能非常低下,通过调节map 端缓冲的大小,可以避免频繁的磁盘IO操作,进而提升Spark任务的整体性能。
map 端缓冲的默认配置是 32KB,如果每个 task 处理 640KB 的数据,那么会发生 640/32 = 20 次溢写,如果每个 task 处理 64000KB的数据,机会发生64000/32=2000此溢写,这对于性能的影响是非常严重的。
map端缓冲的配置方法如代码清单所示:
val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")
1
2. 调节reduce端拉取数据缓冲区大小
Spark Shuffle过程中,shuffle reduce task的 buffer缓冲区大小决定了reduce task每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。
reduce 端数据拉取缓冲区的大小可以通过 spark.reducer.maxSizeInFlight 参数进行设置,默认为48MB,该参数的设置方法如代码清单所示:
val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "96")
1
3. 调节reduce端拉取数据重试次数
Spark Shuffle 过程中,reduce task 拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的 shuffle 操作的作业,建议增加重试最大次数 (比如60次),以避免由于 JVM 的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
reduce端拉取数据重试次数可以通过spark.shuffle.io.maxRetries参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为3,该参数的设置方法如代码清单所示:
val conf = new SparkConf().set("spark.shuffle.io.maxRetries", "6")
1
4. 调节reduce端拉取数据等待间隔
Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长 (比如60s),以增加shuffle 操作的稳定性。reduce端拉取数据等待间隔可以通过 spark.shuffle.io.retryWait 参数进行设置,默认值为 5s,该参数的设置方法如代码清单所示:
val conf = new SparkConf().set("spark.shuffle.io.retryWait", "60s")
1
5. 调节SortShuffle排序操作阈值
对于SortShuffleManager,如果shuffle reduce task的数量小于某一阈值则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量,那么此时map-side就不会进行排序了,减少了排序的性能开销,但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。SortShuffleManager排序操作阈值的设置可以通过spark.shuffle.sort. bypassMergeThreshold这一参数进行设置,默认值为200,该参数的设置方法如代码清单所示:
val conf = new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "400")
1
四、JVM调优
对于JVM调优,首先应该明确,full gc/minor gc,都会导致JVM的工作线程停止工作,即stop the world。
1. 降低cache操作的内存占比
A、静态内存管理机制根据Spark静态内存管理机制,堆内存被划分为了两块,Storage和Execution。
Storage 主要用于缓存 RDD 数据和 broadcast 数据,Execution 主要用于缓存在shuffle过程中产生的中间数据,Storage占系统内存的60%,Execution占系统内存的20%,并且两者完全独立。在一般情况下,Storage的内存都提供给了cache操作,但是如果在某些情况下cache操作内存不是很紧张,而task的算子中创建的对象很多,Execution内存又相对较小,这回导致频繁的minor gc,甚至于频繁的full gc,进而导致Spark频繁的停止工作,性能影响会很大。
在Spark UI中可以查看每个stage的运行情况,包括每个task的运行时间、gc时间等等,如果发现gc太频繁,时间太长,就可以考虑调节Storage的内存占比,让task执行算子函数式,有更多的内存可以使用。Storage内存区域可以通过spark.storage.memoryFraction参数进行指定,默认为0.6,即60%,可以逐级向下递减,如代码清单所示:
val conf = new SparkConf().set("spark.storage.memoryFraction", "0.4")
1
B、统一内存管理机制根据Spark统一内存管理机制,堆内存被划分为了两块,Storage和Execution。Storage主要用于缓存数据,Execution主要用于缓存在shuffle过程中产生的中间数据,两者所组成的内存部分称为统一内存,Storage和Execution各占统一内存的50%,由于动态占用机制的实现,shuffle过程需要的内存过大时,会自动占用Storage的内存区域,因此无需手动进行调节。
2. 调节Executor堆外内存
Executor的堆外内存主要用于程序的共享库、PermSpace、线程Stack和一些Memory mapping等, 或者类C方式allocate object。有时,如果你的Spark作业处理的数据量非常大,达到几亿的数据量,此时运行Spark作业会时不时地报错,例如shuffle output file cannot find,executor lost,task lost,out of memory等,这可能是Executor的堆外内存不太够用,导致Executor在运行的过程中内存溢出。
stage的task在运行的时候,可能要从一些Executor中去拉取shuffle map output文件,但是Executor可能已经由于内存溢出挂掉了,其关联的BlockManager也没有了,这就可能会报出shuffle output file cannot find,executor lost,task lost,out of memory等错误,此时,就可以考虑调节一下Executor的堆外内存,也就可以避免报错,与此同时,堆外内存调节的比较大的时候,对于性能来讲,也会带来一定的提升。
默认情况下,Executor堆外内存上限大概为300多MB,在实际的生产环境下,对海量数据进行处理的时候,这里都会出现问题,导致Spark作业反复崩溃,无法运行,此时就会去调节这个参数,到至少1G,甚至于2G、4G。
Executor堆外内存的配置需要在spark-submit脚本里配置,如代码清单所示:
--conf spark.yarn.executor.memoryOverhead=2048
1
以上参数配置完成后,会避免掉某些JVM OOM的异常问题,同时,可以提升整体 Spark 作业的性能。
3. 调节连接等待时长
在Spark作业运行过程中,Executor优先从自己本地关联的BlockManager中获取某份数据,如果本地BlockManager没有的话,会通过TransferService远程连接其他节点上Executor的BlockManager来获取数据。
如果task在运行过程中创建大量对象或者创建的对象较大,会占用大量的内存,这回导致频繁的垃圾回收,但是垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark的Executor进程就会停止工作,无法提供相应,此时,由于没有响应,无法建立网络连接,会导致网络连接超时。
在生产环境下,有时会遇到file not found、file lost这类错误,在这种情况下,很有可能是 Executor 的BlockManager在拉取数据的时候,无法建立连接,然后超过默认的连接等待时长60s后,宣告数据拉取失败,如果反复尝试都拉取不到数据,可能会导致Spark作业的崩溃。这种情况也可能会导致DAGScheduler反复提交几次stage,TaskScheduler返回提交几次task,大大延长了我们的Spark作业的运行时间。
优化目的
Spark调优的目标是在不影响其他业务正常运行的前提下,高效的完成业务目标,通常为了达成该目标,一般需要最大限度利用集群的物理资源,如CPU、内存、磁盘IO,使其某一项达到瓶颈。
Spark-core的优化
Yarn 模式下动态资源调度
原理:
动态资源调度就是为了解决的资源浪费和资源不合理,根据当前应用任务的负载情况,实时的增减Executor个数,从而实现动态分配资源,使整个Spark系统更加健康。
适合场景:批任务。特别在使用Spark作为一个常驻的服务时候,动态资源调度将大大的提高资源的利用率。例如JDBCServer服务,大多数时间该进程并不接受JDBC请求,因此将这段空闲时间的资源释放出来,将极大的节约集群的资源
条件:必须开启Yarn External Shuffle才能使用这个功能
spark.shuffle.service.enabled=true
spark.dynamicAllocation.enabled=true //开启动态资源调度
spark.dynamicAllocation.minExecutors //最小Executor个数。
spark.dynamicAllocation.initialExecutors //初始Executor个数。
spark.dynamicAllocation.maxExecutors //最大executor个
spark.dynamicAllocation.executorIdleTimeout //普通Executor空闲超时时间。
Shuffle阶段调优
1)使用序列化KryoSerializer方式
spark支持使用kryo序列化机制。kryo序列化机制,比默认的java序列化机制,速度要快,序列化后的数据要更小,大概是java序列化机制的1/10,所以kryo序列化优化后,可以让网络传输的数据变少,在集群中耗费的内存资源大大减少。
第一步,在sparkconf中设置:SparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
第二步,注册你使用到的,需要通过kryo序列化的一些自定义类: SparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer").registerKryoClasses(new Class[]{CategorySortKey.class})
2)设置合理并行度
调整并行度让任务的数量和每个任务处理的数据与机器的处理能力达到最优。 查看CPU使用情况和内存占用情况,当任务和数据不是平均分布在各节点,而是集中在个别节点时,可以增大并行度使任务和数据更均匀的分布在各个节点。增加任务的并行度,充分利用集群机器的计算能力,一般并行度设置为集群CPU总和的2-3倍。
设置平行度的方法:
(1)在会产生shuffle的操作函数内设置并行度参数,优先级最高
RDD. groupByKey(10);
(2)在代码配置参数中设置并行度,优先级次之
SparkConf spConf=new SparkConf().setMaster("local[4]")
.set("spark.default.parallelism", "10")
(3)在spark-defaults.conf配置文件中配置,优先级最低
spark.default.parallelism=10
3)使用广播变量
原理:
Broadcast把数据集合分发到每一个节点上,Spark任务在执行过程中要使用这个数据集合时,就会在本地查找Broadcast过来的数据集合。如果不使用Broadcast,每次任务需要数据集合时,都会把数据序列化到任务里面,不但耗时,还使任务变得很大。
使用场景:
每个任务分片在执行中都需要同一份数据集合时,就可以把公共数据集Broadcast到每个节点,让每个节点在本地都保存一份。
大表和小表做join操作时可以把小表Broadcast到各个节点,从而就可以把join操作转变成普通的操作,减少了shuffle操作。
ArrayList list= new ArrayList();
list.add("test");
Broadcast bc= javaSparkContext.broadcast(list);
4)使用缓存
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,也就是把中间计算结果缓存起来,避免每次迭代重计算。
常用的缓存方式:
MEMORY_ONLY_SER
MEMORY_ONLY
MEMORY_AND_DISK
DISK_ONLY
rdd.persist(StorageLevel.MEMORY_ONLY);
rdd.unpersist;
5)使用Checkpoint
checkpoint在spark中主要有两块应用:一块是在spark core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;另外一块是应用在spark streaming中,使用checkpoint用来保存DStreamGraph以及相关配置信息,以便在Driver崩溃重启的时候能够接着之前进度继续进行处理(如之前waiting batch的job会在重启后继续处理)。
rdd.checkpoint() 并不会触发计算,只有在遇到action方法后,才会触发计算,在job执行完毕后,会启动checkpoint计算,对这个rdd再次触发一个job执行checkpoint计算。所以在checkpoint前,对rdd做cache,可以避免checkpoint计算过程中重新根据rdd依赖链计算。
javaSparkContext.setCheckpointDir(pathName);
rdd.cache();
rdd.checkpoint();
6)设置spark.shuffle.memoryFraction
该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
7)开启consolidateFiles优化
shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲(这个缓存大小可以通过上面的参数来设定)相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。一直循环,直到最后将所有数据到拉取完,并得到最终的结果。
开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。
new SparkConf().set("spark.shuffle.consolidateFiles", "true") 默认为false。
MapPartitions分区替换map计算结果
使用mapPartitions,按每个分区计算结果
使用foreachPartitions替代foreach
原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写Oracle,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。
设置num-executors参数
该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
该参数设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。建议该参数设置1-5。
设置executor-memory参数
该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常也有直接的关联。
针对数据交换的业务场景,建议本参数设置在512M及以下。
设置executor-cores
该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
注意Collect的使用
collect操作会将Executor的数据发送到Driver端,因此使用collect前需要确保Driver端内存足够,以免Driver进程发生OutOfMemory异常。当不确定数据量大小时,可使用saveAsTextFile等操作把数据写入HDFS中。只有在能够大致确定数据大小且driver内存充足的时候,才能使用collect。
使用reduceByKey替换groupByKey
reduceByKey会在Map端做本地聚合,使得Shuffle过程更加平缓,而groupByKey等Shuffle操作不会在Map端做聚合。因此能使用reduceByKey的地方尽量使用该算子,避免出现groupByKey。
数据倾斜
当数据发生倾斜,虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。
需要重新设计key,以更小粒度的key使得task大小合理化。
修改并行度。
将HDFS上的文本格式数据转换为Parquet格式数据
列式存储布局查询中只涉及到部分列,所以只需读取这些列对应的数据块,而不需要读取整个表的数据,从而减少I/O开销。Parquet还支持灵活的压缩选项,可以显著减少磁盘上的存储。
Spark-sql的优化
使用分区表
数据量在1GB以上的大表之间相互关联,或者对大表进行聚合操作前,可以在建表时先对大表根据关联字段或聚合字段进行分区。这样可以避免Shuffle操作,提高性能。
使用广播
将小表BroadCast到各个节点上,从而转变成非shuffle操作,提高任务执行性能。
spark.sql.autoBroadcastJoin.Threshold= 10485760 //-1表示不广播
使用重分区优化小文件
df.repartition(5).write.mode(SaveMode.Append).saveAsTable("t1");
宽依赖和窄依赖
1)宽依赖:是指1个父RDD分区对应多个子RDD的分区
如:groupByKey,reduceByKey,sortByKey,join 即使shuffle操作算子一般属于宽依赖。
2)窄依赖:是指一个或多个父RDD分区对应一个子RDD分区
如:map,filter,union,co-partioned join
即使:宽依赖就是1对多,窄依赖就是一对一或者多对一
Spark Shuffle
在Spark的中,两个Stage之间就是shuffle,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager随着Spark的发展有两种实现的方式,分别为HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle两种。
在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager,Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。HashShuffleManager会产生很多中间小文件,SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
Spark Stage的划分
对RDD的操作分为transformation和action两类,真正的作业提交运行发生在action之后,调用action之后会将对原始输入数据的所有transformation操作封装成作业并向集群提交运行。这个过程大致可以如下描述:
由DAGScheduler对RDD之间的依赖性进行分析,通过DAG来分析各个RDD之间的转换依赖关系
根据DAGScheduler分析得到的RDD依赖关系将Job划分成多个stage
每个stage会生成一个TaskSet并提交给TaskScheduler,调度权转交给TaskScheduler,由它来负责分发task到worker执行
stage的划分:
stage的划分基于DAG确定依赖关系,将依赖链断开,每个stage内部可以并行运行,整个作业按照stage顺序依次执行,最终完成整个Job。Spark利用依赖关系,调度器从DAG图末端出发,逆向遍历整个依赖关系链,遇到ShuffleDependency(宽依赖关系的一种叫法)就断开,遇到NarrowDependency(窄依赖)就将其加入到当前stage。stage中task数目由stage末端的RDD分区个数来决定,RDD转换是基于分区的一种粗粒度计算,一个stage执行的结果就是这几个分区构成的RDD。
yarn-cluster和yarn-client模式
yarn-cluster和yarn-client模式的区别其实就是Application Master进程的区别,yarn-cluster模式下,driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行。然而yarn-cluster模式不适合运行交互类型的作业。而yarn-client模式下,Application Master仅仅向YARN请求executor,client会和请求的container通信来调度他们工作,也就是说Client不能离开。
yarn-cluster适用于生产环境;而yarn-client适用于交互和调试。
Spark在Executor上的内存分配
spark.serializer (default org.apache.spark.serializer.JavaSerializer )
建议设置为 org.apache.spark.serializer.KryoSerializer,因为KryoSerializer比JavaSerializer快,但是有可能会有些Object会序列化失败,这个时候就需要显示的对序列化失败的类进行KryoSerializer的注册,这个时候要配置spark.kryo.registrator参数
Spark的Executor内存分配
Spark Executor有两种内存:
堆内内存:受JVM管理
堆外内存:不受jvm管理
Executo堆内内存:
Spark在一个Executor中的内存分为三块,一块是execution内存,一块是storage内存,一块是other内存。
execution和storage是Spark Executor中内存的大户,other占用内存相对少很多。在spark-1.6.0以前的版本,execution和storage的内存分配是固定的,使用的参数配置分别是spark.shuffle.memoryFraction(execution内存占Executor总内存大小,default 0.2)和spark.storage.memoryFraction(storage内存占Executor内存大小,default 0.6),因为是1.6.0以前这两块内存是互相隔离的,这就导致了Executor的内存利用率不高,而且需要根据Application的具体情况,使用者自己来调节这两个参数才能优化Spark的内存使用。在spark-1.6.0以上的版本,execution内存和storage内存可以相互借用,提高了内存的Spark中内存的使用率,同时也减少了OOM的情况。
execution内存是执行内存,文档中说join,aggregate都在这部分内存中执行,shuffle的数据也会先缓存在这个内存中,满了再写入磁盘,能够减少IO。其实map过程也是在这个内存中执行的。默认总内存的0.2,由Spark应用程序启动时的–executor-memory或spark.executor.memory参数配置。
storage内存是存储broadcast,cache,persist数据的地方。默认总内存的0.6,通过spark.storage.storageFraction参数设置。
other内存是程序执行时预留给自己的内存。默认总内存的0.2。
Executo堆外内存:
在默认情况下,堆外内存并不启用,可通过配置spark.memory.offHeap.enabled参数启用,并由spark.memory.offHeap.size参数设定堆外空间的大小。堆外内存主要存储经过序列化的二进制数据。
堆外的空间分配较为简单,除了没有 other空间,存储内存、执行内存的大小同样是固定的,所有运行中的并发任务共享存储内存和执行内存。
Job的划分
1、Application :
应用,创建一个SparkContext可以认为创建了一个Application
2、Job;
在一个app中每执行一次行动算子就会创建一个Job,一个application会有多个job
3、stage;
阶段,每碰到一个shuffle算子,会产生一个新的stage,一个Job中可以包含多个stage;
4、task
任务,表示阶段执行的时候的并行度,一个stage会有多个task;
spark性能优化
一:Spark的性能优化,主要手段包括:
1、使用高性能序列化类库
2、优化数据结构
3、对多次使用的RDD进行持久化 / Checkpoint
4、使用序列化的持久化级别
5、Java虚拟机垃圾回收调优
6、提高并行度
7、广播共享数据
8、数据本地化
9、reduceByKey和groupByKey的合理使用
10、Shuffle调优(核心中的核心,重中之重)
二:spark诊断内存消耗
java主要的内存消耗
1、每个Java对象,都有一个对象头,会占用16个字节,主要是包括了一些对象的元信息,比如指向它的类的指针。如果一个对象本身很小,比如就包括了一个int类型的field,那么它的对象头实际上比对象自己还要大。 2、Java的String对象,会比它内部的原始数据,要多出40个字节。因为它内部使用char数组来保存内部的字符序列的,并且还得保存诸如数组长度之类的信息。而且因为String使用的是UTF-16编码,所以每个字符会占用2个字节。比如,包含10个字符的String,会占用60个字节。 3、Java中的集合类型,比如HashMap和LinkedList,内部使用的是链表数据结构,所以对链表中的每一个数据,都使用了Entry对象来包装。Entry对象不光有对象头,还有指向下一个Entry的指针,通常占用8个字节。 4、元素类型为原始数据类型(比如int)的集合,内部通常会使用原始数据类型的包装类型,比如Integer,来存储元素。
怎么判断程序消耗的内存:
1、首先,自己设置RDD的并行度,有两种方式:要不然,在parallelize()、textFile()等方法中,传入第二个参数,设置RDD的task / partition的数量;要不然,用SparkConf.set()方法,设置一个参数,spark.default.parallelism,可以统一设置这个application所有RDD的partition数量。 2、其次,在程序中将RDD cache到内存中,调用RDD.cache()方法即可。 3、最后,观察Driver的log,你会发现类似于:“INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)”的日志信息。这就显示了每个partition占用了多少内存。 4、将这个内存信息乘以partition数量,即可得出RDD的内存占用量。
三:spark高性能序列化库
两种序列化机制
spark默认使用了第一种序列化机制: 1、Java序列化机制:默认情况下,Spark使用Java自身的ObjectInputStream和ObjectOutputStream机制进行对象的序列化。只要你的类实现了Serializable接口,那么都是可以序列化的。而且Java序列化机制是提供了自定义序列化支持的,只要你实现Externalizable接口即可实现自己的更高性能的序列化算法。Java序列化机制的速度比较慢,而且序列化后的数据占用的内存空间比较大。 2、Kryo序列化机制:Spark也支持使用Kryo类库来进行序列化。Kryo序列化机制比Java序列化机制更快,而且序列化后的数据占用的空间更小,通常比Java序列化的数据占用的空间要小10倍。Kryo序列化机制之所以不是默认序列化机制的原因是,有些类型虽然实现了Seriralizable接口,但是它也不一定能够进行序列化;此外,如果你要得到最佳的性能,Kryo还要求你在Spark应用程序中,对所有你需要序列化的类型都进行注册。
如何使用Kroyo序列
方式一:SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 方式二:如果要注册自定义的类型,那么就使用如下的代码,即可: Scala版本: val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[Counter] )) val sc = new SparkContext(conf) Java版本: SparkConf conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Counter.class) JavaSparkContext sc = new JavaSparkContext(conf)
使用Kroyo序列的建议:
1、优化缓存大小 如果注册的要序列化的自定义的类型,本身特别大,比如包含了超过100个field。那么就会导致要序列化的对象过大。此时就需要对Kryo本身进行优化。因为Kryo内部的缓存可能不够存放那么大的class对象。此时就需要调用SparkConf.set()方法,设置spark.kryoserializer.buffer.mb参数的值,将其调大。 默认情况下它的值是2,就是说最大能缓存2M的对象,然后进行序列化。可以在必要时将其调大。比如设置为10。 2、预先注册自定义类型 虽然不注册自定义类型,Kryo类库也能正常工作,但是那样的话,对于它要序列化的每个对象,都会保存一份它的全限定类名。此时反而会耗费大量内存。因此通常都建议预先注册号要序列化的自定义的类。
使用场景:
首先,这里讨论的都是Spark的一些普通的场景,一些特殊的场景,比如RDD的持久化,在后面会讲解。这里先不说。 那么,这里针对的Kryo序列化类库的使用场景,就是算子函数使用到了外部的大数据的情况。比如说吧,我们在外部定义了一个封装了应用所有配置的对象,比如自定义了一个MyConfiguration对象,里面包含了100m的数据。然后,在算子函数里面,使用到了这个外部的大对象。 此时呢,如果默认情况下,让Spark用java序列化机制来序列化这种外部的大对象,那么就会导致,序列化速度缓慢,并且序列化以后的数据还是比较大,比较占用内存空间。 因此,在这种情况下,比较适合,切换到Kryo序列化类库,来对外部的大对象进行序列化操作。一是,序列化速度会变快;二是,会减少序列化后的数据占用的内存空间。
四:Spark优化数据结构
目的:使用数据结构是为了减少数据的占用量,从而减少内存的开销。
优化的对象:主要就是优化你的算子函数,内部使用到的局部数据,或者是算子函数外部的数据。都可以进行数据结构的优化。优化之后,都会减少其对内存的消耗和占用。
优化方式:
1、优先使用数组以及字符串,而不是集合类。也就是说,优先用array,而不是ArrayList、LinkedList、HashMap等集合。 比如,有个List<Integer> list = new ArrayList<Integer>(),将其替换为int[] arr = new int[]。这样的话,array既比List少了额外信息的存储开销,还能使用原始数据类型(int)来存储数据,比List中用Integer这种包装类型存储数据,要节省内存的多。 还比如,通常企业级应用中的做法是,对于HashMap、List这种数据,统一用String拼接成特殊格式的字符串,比如Map<Integer, Person> persons = new HashMap<Integer, Person>()。可以优化为,特殊的字符串格式:id:name,address|id:name,address...。 2、避免使用多层嵌套的对象结构。比如说,public class Teacher { private List<Student> students = new ArrayList<Student>() }。就是非常不好的例子。因为Teacher类的内部又嵌套了大量的小Student对象。 比如说,对于上述例子,也完全可以使用特殊的字符串来进行数据的存储。比如,用json字符串来存储数据,就是一个很好的选择。 {"teacherId": 1, "teacherName": "leo", students:[{"studentId": 1, "studentName": "tom"},{"studentId":2, "studentName":"marry"}]} 3、对于有些能够避免的场景,尽量使用int替代String。因为String虽然比ArrayList、HashMap等数据结构高效多了,占用内存量少多了,但是之前分析过,还是有额外信息的消耗。比如之前用String表示id,那么现在完全可以用数字类型的int,来进行替代。 这里提醒,在spark应用中,id就不要用常用的uuid了,因为无法转成int,就用自增的int类型的id即可。(sdfsdfdf-234242342-sdfsfsfdfd)
五:对多次使用的RDD进行持久化操作 或 CheckPoint
对多次运算的RDD进行持久化或放到内存,可以减少对重复计算的代价;
如果要保证在RDD的持久化数据可能丢失的情况下,还要保证高性能,那么可以对RDD进行Checkpoint操作。
对数据的持久化有多重级别:
除了对多次使用的RDD进行持久化操作之外,还可以进一步优化其性能。因为很有可能,RDD的数据是持久化到内存,或者磁盘中的。那么,此时,如果内存大小不是特别充足,完全可以使用序列化的持久化级别,比如MEMORY_ONLY_SER、MEMORY_AND_DISK_SER等。使用RDD.persist(StorageLevel.MEMORY_ONLY_SER)这样的语法即可。 这样的话,将数据序列化之后,再持久化,可以大大减小对内存的消耗。此外,数据量小了之后,如果要写入磁盘,那么磁盘io性能消耗也比较小。 对RDD持久化序列化后,RDD的每个partition的数据,都是序列化为一个巨大的字节数组。这样,对于内存的消耗就小的多了。但是唯一的缺点就是,获取RDD数据时,需要对其进行反序列化,会增大其性能开销。 因此,对于序列化的持久化级别,还可以进一步优化,也就是说,使用Kryo序列化类库,这样,可以获得更快的序列化速度,并且占用更小的内存空间。但是要记住,如果RDD的元素(RDD<T>的泛型类型),是自定义类型的话,在Kryo中提前注册自定义类型。
六:JVM虚拟机垃圾回收
主要是创建少量的对象,以及创建对象的大小。编程中避免大对象。
还有一些jvm的通用方法。都是通用的,可以参考一些通用方法。
七:提高并行度
实际上Spark集群的资源并不一定会被充分利用到,所以要尽量设置合理的并行度,来充分地利用集群的资源。才能充分提高Spark应用程序的性能。
Spark会自动设置以文件作为输入源的RDD的并行度,依据其大小,比如HDFS,就会给每一个block创建一个partition,也依据这个设置并行度。对于reduceByKey等会发生shuffle的操作,就使用并行度最大的父RDD的并行度即可。
可以手动使用textFile()、parallelize()等方法的第二个参数来设置并行度;也可以使用spark.default.parallelism参数,来设置统一的并行度。
比如说,spark-submit设置了executor数量是10个,每个executor要求分配2个core,那么application总共会有20个core。此时可以设置new SparkConf().set("spark.default.parallelism", "60")来设置合理的并行度,从而充分利用资源。
官方建议设置的并行数量为2-3倍的cpu cores的数量,这样可以使一些计算能力较弱的cpu少计算一些数据。能力好的cpu计算多一些数据。
八:广播共享文件
如果你的算子函数中,使用到了特别大的数据,那么,这个时候,推荐将该数据进行广播。这样的话,就不至于将一个大数据拷贝到每一个task上去。而是给每个节点拷贝一份,然后节点上的task共享该数据。
这样的话,就可以减少大数据在节点上的内存消耗。并且可以减少数据到节点的网络传输消耗。
九:数据本地化
基于移动计算的成本要远远小于移动数据的原则。
数据本地化级别:
数据本地化,指的是,数据离计算它的代码有多近。基于数据距离代码的距离,有几种数据本地化级别: 1、PROCESS_LOCAL:数据和计算它的代码在同一个JVM进程中。 2、NODE_LOCAL:数据和计算它的代码在一个节点上,但是不在一个进程中,比如在不同的executor进程中,或者是数据在HDFS文件的block中。 3、NO_PREF:数据从哪里过来,性能都是一样的。 4、RACK_LOCAL:数据和计算它的代码在一个机架上。 5、ANY:数据可能在任意地方,比如其他网络环境内,或者其他机架上。
优化方案:
Spark倾向于使用最好的本地化级别来调度task,但是这是不可能的。如果没有任何未处理的数据在空闲的executor上,那么Spark就会放低本地化级别。这时有两个选择:第一,等待,直到executor上的cpu释放出来,那么就分配task过去;第二,立即在任意一个executor上启动一个task。 Spark默认会等待一会儿,来期望task要处理的数据所在的节点上的executor空闲出一个cpu,从而将task分配过去。只要超过了时间,那么Spark就会将task分配到其他任意一个空闲的executor上。 可以设置参数,spark.locality系列参数,来调节Spark等待task可以进行数据本地化的时间。spark.locality.wait(3000毫秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack。
十: groupByKey 和 ReduceByKey
如果能用reduceByKey,那就用reduceByKey,因为它会在map端,先进行本地combine,可以大大减少要传输到reduce端的数据量,减小网络传输的开销。
只有在reduceByKey处理不了时,才用groupByKey().map()来替代。
十一: shuffle优化
了解下shuffle的过程:
优化参数:
new SparkConf().set("spark.shuffle.consolidateFiles", "true") spark.shuffle.consolidateFiles:是否开启shuffle block file的合并,默认为false spark.reducer.maxSizeInFlight:reduce task的拉取缓存,默认48m spark.shuffle.file.buffer:map task的写磁盘缓存,默认32k spark.shuffle.io.maxRetries:拉取失败的最大重试次数,默认3次 spark.shuffle.io.retryWait:拉取失败的重试间隔,默认5s spark.shuffle.memoryFraction:用于reduce端聚合的内存比例,默认0.2,超过比例就会溢出到磁盘上
1- spark.shuffle.consolidateFiles参数优化
没有开启consolidation机制的时候,shuffle write的性能是比较低下的,而且会影响到shuffle read的性能,也会比较低下。
因为在shuffle map端创建的磁盘文件太多了,导致shuffle write要耗费大量的性能到磁盘文件的创建,以及磁盘的io上。对于shuffle read,也是一样的,每个result task可能都需要通过磁盘io读取多个文件的数据,都只shuffle read,性能可能也会受到影响。做主要的还是shuffle write,因为要写的磁盘文件太多。
比如每个节点有100个shuffle map task,10个CPU core是,总共有1000个result task。所以,每个节点上的磁盘文件为100*1000个。
设置为true时,每个cpu为每个result task写一个文件(文件内容是之前的数据进行合并的结果),每个节点上的磁盘文件为10*1000个。
2- spark.reducer.maxSizeInFlight
如果内存足够的话,这个量应该增大,这样,result task拉取的次数会减少(每次拉取数据量增加)。
3- spark.shuffle.file.buffer
可以适量增大,这样每次写入到文件的数据量减少,从而减少写文件的次数。
4- spark.shuffle.io.maxRetries
拉取数据的时候,可能jvm在full GC。
5- spark.shuffle.io.retryWait
可以适当增加时间。为了应对jvm 的full GC。
6- spark.shuffle.memoryFraction
可以适当的调大。
执行reduce task的Excetor中,有一部分内存是用来汇聚各个reduce task拉取的数据,放到map集合中,进行聚合。
当该数据超过总缓存*比例时,会把该内存的数据写入到磁盘上。文章来源:https://www.toymoban.com/news/detail-500445.html
7- 如果jvm GC没有调优好,会导致每次gc都需要1min。那么拉取的最大默认时间为3*5s=15s。就会导致频繁的很多文件拉取失败。会报shuffle output file lost。然后DAGScheduler会重试task和stage。最后甚至导致application挂掉。文章来源地址https://www.toymoban.com/news/detail-500445.html
到了这里,关于spark 数据倾斜处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!