Spark 参数调优

这篇具有很好参考价值的文章主要介绍了Spark 参数调优。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spark 调优

目录

  • Spark 调优
      • 一、代码规范
        • 1.1 避免创建重复 RDD
        • 1.2 尽量复用同一个 RDD
        • 1.3 多次使用的 RDD 要持久化
        • 1.4 使用高性能算子
        • 1.5 好习惯
      • 二、参数调优
        • 资源参数
        • 1.1 --num-executors 100
        • 1.2 --executor-memory 5g
        • 1.3 --executor-cores 4
        • 1.4 --driver-memory
        • 内存参数
        • spark.storage.memoryFraction、spark.shuffle.memoryFraction(spark1.6 之前静态内存管理)
        • spark.memory.fraction(spark1.6 及之后,统一内存管理)
        • spark.memory.storageFraction
        • spark.kryoserializer.buffer.max
        • dfs.client.block.write.locateFollowingBlock.retries
        • spark.driver.maxResultSize
        • shuffle 参数
        • spark.shuffle.file.buffer
        • spark.shuffle.spill.batchSize
        • spark.shuffle.io.maxRetries
        • spark.shuffle.io.retryWait
        • spark.reducer.maxSizeInFlight
        • spark.reducer.maxReqsInFlight
        • spark.reducer.maxReqSizeShuffleToMem
        • spark.reducer.maxBlocksInFlightPerAddress
        • 文件相关
        • spark.sql.files.maxPartitionBytes
        • spark.sql.parquet.compression.codec
        • spark.io.compression.codec
        • spark.serializer
        • spark.sql.hive.convertMetastoreParquet
        • spark.sql.parquet.writeLegacyFormat
      • 参考文章

一、代码规范

  • 调优顺序:spark 任务的调优顺序依次是代码规范、资源参数 (并行度)、数据倾斜、shuffle 调优、业务层面
1.1 避免创建重复 RDD
  • 对于新手,或者一些较为复杂的 spark 任务,可能会忘记之前对于某一份数据已经创建过一个 RDD,而重复创建,造成不必要的计算;
1.2 尽量复用同一个 RDD
  • 下游需要使用 key-value 类型和 key 类型的两个 RDD,这两个 RDD 的数据完全相同,只是格式不同,那么就只需要创建 key-value 这一个 RDD 就行,而使用 key 类型的 RDD 直接复用 key-value 类型的 RDD 就行了;因此,对于需要使用数据相同,格式不同的数据源时,最好复用字段较多的 RDD;
1.3 多次使用的 RDD 要持久化
  • 当一个 RDD 被使用了多次,比如上面的复用同一个 RDD,那么这个 RDD 就要做持久化,否则这个 RDD 就会被计算多次;例如,a = rdd1.map(); b = rdd1.map(); 那么就需要对 rdd1 做持久化 rdd1.persist(),否则 rdd1 就会被计算两次;
1.4 使用高性能算子
  • **用 reduceByKey 替代 groupByKey 求聚合:**前者是 map-side 预聚合算子,会在 map 端预聚合,类似于 Combiner;
  • **用 combineByKey 代替 groupByKey 求 topN:**前者可以自定义分区内合并和分区间合并的计算逻辑,也是预聚合;
  • **mapPartition 替代 map:**一次调用处理一个分区的数据,对于需要在 map 中创建很多重复对象的场景,最好使用 mapPartition,同时注意 OOM 问题;
  • **foreachPartition 替代 foreach:**道理同 mapPartition 一样;在需要将 rdd 的数据写入 MySQL 时,后者是一条一条数据插入,并且每条数据都会创建一次数据库连接;而前者则是一个分区操作一次,性能有很高的提升;
1.5 好习惯
  • **广播大变量:**当需要在算子中使用大变量(1g 以内)时,最好将大变量广播到 Executor 中,例如:rdd1.filter(x=>slant.contains(x)),如果 slant 在 20M~1G 之间,就可以将 slant 广播;

  • **filter 后 coalesce:**由于 filter 后,各个分区中的数据不再均衡,使用 coalesce 再平衡一下分区数据;

  • **优化数据结构:**对于算子中的数据结构,能用数组就不要用集合类型,最好使用字符串代替对象,用基本类型代替字符串;

  • **使用 Kryo 序列化:**spark 中的三个场景会涉及到序列化,算子中使用外部变量、将自定义对象作为 RDD 中的类型、可序列化的持久化策略(如 MEMORY_ONLY_SER),使用 kryo 的性能会高很多;使用 Kryo 序列化时,最好注册所有的自定义类;conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]));

  • **persist 后 unpersist:**unpersist 是立即释放缓存,对复用的 RDD 使用 persist 缓存后,需要使用行动算子提交 job 后,才会真正的缓存,然后再使用 unpersist 释放缓存;所以当 persist 缓存的 RDD 不会再使用时,最好是手动 unpersist 释放缓存;

二、参数调优

资源参数
1.1 --num-executors 100
  • **参数解释:**任务可以申请的 Excutor 最大数量,并不是一次性分配 100 个 Excutor;Excutor 数量会在任务的运行过程中动态调整,有 job 处于 pending 状态则申请 Excutor,一个 Excutor 空闲时间过长则将其移除;Excutor 的数量决定了任务的并行度;

  • **申请 Excutor:**当有任务处于 pending 状态(积压)超过一定时间,就认为资源不足,需要申请 Excutor;

    何时申请:当pending积压的任务超过spark.dynamicAllocation.schedulerBacklogTimeout(1秒)就申请
    申请多少:申请数量 = 正在运行和pending的任务数量 * spark.dynamicAllocation.executorAllocationRatio(1)/ 并行度
    
    
    
  • 移除 Excutor:

    spark.dynamicAllocation.enabled(false)决定是否使用资源动态分配;必须开启外部shuffle;
    spark.dynamicAllocation.executorIdleTimeout (60s)空闲60s就会被回收(并且没有缓存);
    
    
    
  • **决定任务的并行度:**executor 的数量就是工作节点的数量,直接决定了任务的并行度;准确的说是由 executor*core 决定的;这只是物理上提供的最大并行度,而任务实际的并行度还是由程序中设置的并行度决定,也就是 RDD 的分区数;

1.2 --executor-memory 5g
  • **参数解释:**每个 executor 的内存大小;对于 spark 调优和 OOM 异常,通常都是对 executor 的内存做调整,spark 内存模型也是指 executor 的内存分配,所以 executor 的内存管理是非常重要的;
  • **内存分配:**该参数是总的内存分配,而在任务运行中,会根据 spark 内存模型对这个总内存再次细分;在实际生产中,通常需要根据程序中使用的缓存内存和计算内存,来划分不同的比例,从而合理的利用内存,避免 OOM,提高性能;
1.3 --executor-cores 4
  • **参数解释:**每个 executor 的核数;是每个 executor 内部的并行度,即一个 executor 中可同时执行的 task 数量;
  • **并行度:**core 的数量决定了一个 executor 同时执行的 task 数量,如果 task 数量越多,则意味着占用的 executor 内存也越多;所以,在 executor 内存固定的情况下,可以通过增加 executor 数量,减少 core 数量,使任务总并行度不变的前提下,降低 OOM 风险;如果任务需要广播大变量,可以增大 core 数,使更多的 task 共用广播变量;
1.4 --driver-memory
  • **参数解释:**driver 端的内存大小;如果要 collect 大量数据到 driver 端,或者要广播大变量时,就需要调大 driver 端的内存;一般给个 3G、4G 就够了;
内存参数
spark.storage.memoryFraction、spark.shuffle.memoryFraction(spark1.6 之前静态内存管理)
  • **参数解释:**在 spark1.6 之前,使用的是静态内存管理,而这两个参数就是用来决定缓存内存和执行内存大小的;在 spark1.6 及之后,采用的是统一内存管理(也叫动态内存管理),这两个参数就废弃了(但也可以让它生效)
spark.memory.fraction(spark1.6 及之后,统一内存管理)
  • **参数解释:**spark1.6 及之后采用的是统一内存管理,也叫动态内存管理,顾名思义,就是缓存内存和执行内存统一管理,并且是动态的;首先解释 “统一”:spark.memory.fraction 是堆内内存中用于执行、shuffle、缓存的内存比例;这个值越低,则执行时溢出到磁盘更频繁、同时缓存被逐出内存也更频繁;一般使用默认值就好了,spark2.2 默认是 0.6,那么剩下的 0.4 就是用于存储用户的数据结构(比如 map 算子中定义的中间数据)以及 spark 内部的元数据;
spark.memory.storageFraction
  • **参数解释:**存储内存不会被逐出内存的总量,这个是基于 spark.memory.fraction 的占比;这个值越高,则执行、shuffle 的内存就越少,从而溢写到磁盘就越频繁;一般使用默认值就好了,spark2.2 默认是 0.5;
spark.kryoserializer.buffer.max
  • **参数解释:**kryo 序列化时使用的缓存大小;如果 collect 大量数据到 driver 端,可能会抛 buffer limit exceeded 异常,这个时候就要调大该参数;默认是 64m,挂了就设置为 1024m;如果序列化的一个对象很大,那么就需要增大改参数的值 spark.kryoserializer.buffer(默认 64k);
dfs.client.block.write.locateFollowingBlock.retries
  • **参数解释:**写入块后尝试关闭的次数;Unable to close file because the last block does not have enough number of replicas 异常的原因;2.7.4 已修复;默认是 5,挂了就设置为 6;
spark.driver.maxResultSize
  • **参数解释:**一次 collect 到 driver 端的最大内存大小,Total size of serialized results of 16 tasks (1048.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) 异常时需要调大该值;默认 1g,挂了就设置为 2g,0 表示不限制;
shuffle 参数
spark.shuffle.file.buffer
  • **参数解释:**shuffle write 时,会先写到 BufferedOutputStream 缓冲区中,然后再溢写到磁盘;该参数就是缓存区大小,默认 32k,建议设置为 64k;
spark.shuffle.spill.batchSize
  • **参数解释:**shuffle 在 spill 溢写过程中需要将数据序列化和反序列化,这个是一个批次处理的条数;默认是 10000,可以调大该值,2 万 5 万都可以;
spark.shuffle.io.maxRetries
  • **参数解释:**shuffle read 拉取数据时,由于网络异常或者 gc 导致拉取失败,会自动重试,改参数就是配置重试次数,在数据量达到十亿、百亿级别的时候,最好调大该参数以增加稳定性;默认是 3 次,建议设置为 10 到 20;
spark.shuffle.io.retryWait
  • **参数解释:**该参数是 spark.shuffle.io.maxRetries 的重试间隔,默认是 5s,建议设置为 20s;
spark.reducer.maxSizeInFlight
  • **参数解释:**shuffle read 拉取数据时的缓存区大小,也就是一次拉取的数据大小;注意是从 5 个节点拉取 48M 的数据,而不是从一个节点获取 48M;默认 48m,建议设置为 96m;
  • **原理解释:**从远程节点拉取数据时,是并行的从发送 5 个请求,每个请求拉取的最大长度是 48M / 5,但是拉取时都是以 block 为最小单位的,所以实际获取的有可能会大于这个值;
spark.reducer.maxReqsInFlight
  • **参数解释:**shuffle read 时,一个 task 的一个批次同时发送的请求数量;默认是 Int 的最大值;
  • **原理解释:**构造远程请求时,单个请求大小限制是 48M / 5,而在一次拉取远程 block 数据时,是按批次拉取,一个批次的大小限制是 48M,所以理想情况下一个批次会发送 5 个请求;但如果 block 的分布不均匀,导致一个请求的请求大小远小于 48M / 5 (例如 1M),而一个批次的大小限制是 48M,所以这个批次就会发送 48 个请求;当节点数较多时,一个 task 的一个批次可能会发送非常多的请求,导致某些节点的入站连接数过多,从而导致失败;
spark.reducer.maxReqSizeShuffleToMem
  • **参数解释:**shuffle read 时,从远程拉取 block 如果大于这个值就会强行落盘,默认是 Long 的最大值,建议小于 2G,一般设为 200M,spark2.2 开始生效;(spark2.3 开始换成了这个参数 spark.maxRemoteBlockSizeFetchToMem);shuffle read 这个部分的参数在 spark 的版本更新中变化较大,所以在优化时一定要根据集群的 spark 版本设置对应的参数;
  • **原理解释:**一次拉取请求中,如果要拉取的数据比较大,内存放不下,就直接落盘;对于数据倾斜比较严重的任务,有可能一个 block 非常大,而没有足够的内存存放时就会 OOM,所以最好限制该参数的大小;还有一个原因就是 netty 的最大限制是 2G,所以大于 2G 肯定会报错;spark2.4 该参数的默认值是:Int 的最大值 - 512 (2G,减 512 用来存储元数据);spark3.0 的最大值也是 2G,并且给了默认值 200M;
spark.reducer.maxBlocksInFlightPerAddress
  • **参数解释:**shuffle read 时,一个节点同时被拉取的最大 block 数,如果太多可能会导致 executor 服务或 nodemanager 崩溃;默认 Int 的最大值;(spark2.2.1 开始支持);

  • **原理解释:**shuffle read 时每个 task 都会从 shuffle write 所在的节点拉取自己的 block 数据,如果一个 shuffle write 的 executor 运行了 9 个 task,就会 write9 个 data 文件;如果 shuffle read 有 1000 核,那么同时运行 1000 个 task,每个 task 要到 shuffle write 所在的 executor 获取 9 个 block,极端情况下一个 shuffle write 的 executor 会被请求 9000 次;当节点数非常多时,一个 shuffle write 的 executor 会同时被很多节点拉取 block,从而导致失败;文章来源地址https://www.toymoban.com/news/detail-848022.html

文件相关
spark.sql.files.maxPartitionBytes
  • **参数解释:**sparksql 读取文件时,每个分区的最大文件大小,这个参数决定了读文件时的并行度;默认 128M;例如一个 300M 的 text 文件,按 128M 划分为 3 个切片,所以 SparkSQL 读取时最少有 3 个分区;
  • **原理解释:**sparksql 读取文件的并行度 = max(spark 默认并行度,切片数量 (文件大小 / 该参数));这里要注意压缩文件是否可分割;但是要注意,对于 parquet 格式,一个切片对应一个 row group;
spark.sql.parquet.compression.codec
  • **参数解释:**parquet 格式的压缩方式,默认是 snappy,可以选择 gzip、lzo、或者 uncompressed 不压缩;
spark.io.compression.codec
  • **参数解释:**spark 中 rdd 分区、广播变量、shuffle 输出的压缩格式,spark2.2 默认是 lz4;
spark.serializer
  • **参数解释:**spark 序列化的实现,这里的序列化是针对 shuffle、广播和 rdd cache 的序列化方式;默认使用 java 的序列化方式 org.apache.spark.serializer.JavaSerializer 性能比较低,所以一般都使用 org.apache.spark.serializer.KryoSerializer ,使用 Kryo 序列化时最好注册十分需要空间的类型,可以节省很多空间;spark task 的序列化由参数 spark.closure.serializer 配置,目前只支持 JavaSerializer;
spark.sql.hive.convertMetastoreParquet
  • **参数解释:**是否采用 spark 自己的 Serde 来解析 Parquet 文件;Spark SQL 为了更好的性能,在读取 hive metastore 创建的 parquet 文件时,会采用自己 Parquet Serde,而不是采用 hive 的 Parquet Serde 来序列化和反序列化,这在处理 null 值和 decimal 精度时会有问题;默认为 true,设为 false 即可 (会采用与 hive 相同的 Serde);
spark.sql.parquet.writeLegacyFormat
  • **参数解释:**是否使用遗留的 (hive 的方式)format 来写 Parquet 文件;由于 decimal 精度问题,hive 读取 spark 创建的 Parquet 文件会报错;所以这里的 spark 采用与 hive 相同的 writeFormat 来写 Parquet 文件,这样 hive 在读取时就不会报错;并且上下游表的精度最好一致,例如 a 表的字段精度为 decimal(10,2),b 表也最好是 decimal(10,2);
  • **原理解释:**在 hive 中 decimal 类型是固定的用 int32 来表示,而标准的 parquet 规范约定,根据精度的不同会采用 int32 和 int64 来存储,而 spark 就是采用的标准的 parquet 格式;所以对于精度不同 decimal 的,底层的存储类型有变化;所以使用 spark 存储的 parquet 文件,在使用 hive 读取时报错;将 spark.sql.parquet.writeLegacyFormat(默认 false) 配置设为 true,即采用与 hive 相同的 format 类来读写 parquet 文件;

参考文章

  • https://spark-reference-doc-cn.readthedocs.io/zh_CN/latest/more-guide/configuration.html spark 中文官网
  • https://support.huaweicloud.com/trouble-mrs/mrs_03_0081.html spark 写入 hdfs 时 close 失败
  • https://blog.csdn.net/wangpei1949/article/details/83649642 spark 分桶写入 hdfs
  • https://tech.meituan.com/2016/05/12/spark-tuning-pro.html 美团 数据倾斜
  • https://segmentfault.com/a/1190000039706830 shuffle 性能调优
  • https://zhuanlan.zhihu.com/p/108454557 spark 性能优化
  • https://blog.51cto.com/u_14222592/2893326 spark 源码优化
  • https://www.huaweicloud.com/articles/229d728cf7b313656cddc5de2dbd75e9.html spark 内存管理
  • https://www.cnblogs.com/zz-ksw/p/12228608.html spark 动态资源分配

到了这里,关于Spark 参数调优的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据开源框架环境搭建(七)——Spark完全分布式集群的安装部署

    前言:七八九用于Spark的编程实验 大数据开源框架之基于Spark的气象数据处理与分析_木子一个Lee的博客-CSDN博客_spark舆情分析 目录 实验环境: 实验步骤: 一、解压 二、配置环境变量:  三、修改配置文件  1.修改spark-env.sh配置文件: 2.修改配置文件slaves: 3.分发配置文件:

    2024年02月11日
    浏览(49)
  • 云计算与大数据第16章 分布式内存计算平台Spark习题

    1、Spark是Hadoop生态(  B  )组件的替代方案。 A. Hadoop     B. MapReduce        C. Yarn             D.HDFS 2、以下(  D  )不是Spark的主要组件。 A. Driver      B. SparkContext       C. ClusterManager D. ResourceManager 3、Spark中的Executor是(  A  )。 A.执行器      B.主节

    2024年02月14日
    浏览(104)
  • 分布式计算中的大数据处理:Hadoop与Spark的性能优化

    大数据处理是现代计算机科学的一个重要领域,它涉及到处理海量数据的技术和方法。随着互联网的发展,数据的规模不断增长,传统的计算方法已经无法满足需求。因此,分布式计算技术逐渐成为了主流。 Hadoop和Spark是目前最为流行的分布式计算框架之一,它们都提供了高

    2024年01月23日
    浏览(54)
  • 分布式计算框架:Spark、Dask、Ray 分布式计算哪家强:Spark、Dask、Ray

    目录 什么是分布式计算 分布式计算哪家强:Spark、Dask、Ray 2 选择正确的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式计算是一种计算方法,和集中式计算是相对的。 随着计算技术的发展, 有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成

    2024年02月11日
    浏览(64)
  • 数据存储和分布式计算的实际应用:如何使用Spark和Flink进行数据处理和分析

    作为一名人工智能专家,程序员和软件架构师,我经常涉及到数据处理和分析。在当前大数据和云计算的时代,分布式计算已经成为了一个重要的技术方向。Spark和Flink是当前比较流行的分布式计算框架,它们提供了强大的分布式计算和数据分析功能,为数据处理和分析提供了

    2024年02月16日
    浏览(58)
  • Spark单机伪分布式环境搭建、完全分布式环境搭建、Spark-on-yarn模式搭建

    搭建Spark需要先配置好scala环境。三种Spark环境搭建互不关联,都是从零开始搭建。 如果将文章中的配置文件修改内容复制粘贴的话,所有配置文件添加的内容后面的注释记得删除,可能会报错。保险一点删除最好。 上传安装包解压并重命名 rz上传 如果没有安装rz可以使用命

    2024年02月06日
    浏览(76)
  • 【Spark分布式内存计算框架——Spark 基础环境】1. Spark框架概述

    第一章 说明 整个Spark 框架分为如下7个部分,总的来说分为Spark 基础环境、Spark 离线分析和Spark实时分析三个大的方面,如下图所示: 第一方面、Spark 基础环境 主要讲述Spark框架安装部署及开发运行,如何在本地模式和集群模式运行,使用spark-shell及IDEA开发应用程序,测试及

    2024年02月11日
    浏览(65)
  • spark分布式解压工具

    ​ spark解压缩工具,目前支持tar、gz、zip、bz2、7z压缩格式,默认解压到当前路下,也支持自定义的解压输出路径。另外支持多种提交模式,进行解压任务,可通过自定义配置文件,作为spark任务的资源设定 2.1 使用hadoop的FileSystem类,对tos文件的进行读取、查找、写入等操作

    2024年02月02日
    浏览(45)
  • Spark分布式内存计算框架

    目录 一、Spark简介 (一)定义 (二)Spark和MapReduce区别 (三)Spark历史 (四)Spark特点 二、Spark生态系统 三、Spark运行架构 (一)基本概念 (二)架构设计 (三)Spark运行基本流程 四、Spark编程模型 (一)核心数据结构RDD (二)RDD上的操作 (三)RDD的特性 (四)RDD 的持

    2024年02月04日
    浏览(61)
  • 分布式计算MapReduce | Spark实验

    题目1 输入文件为学生成绩信息,包含了必修课与选修课成绩,格式如下: 班级1, 姓名1, 科目1, 必修, 成绩1 br (注: br 为换行符) 班级2, 姓名2, 科目1, 必修, 成绩2 br 班级1, 姓名1, 科目2, 选修, 成绩3 br ………., ………, ………, ………, ……… br 编写两个Hadoop平台上的MapRed

    2024年02月08日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包