Spark SQL生产优化经验--任务参数配置模版

这篇具有很好参考价值的文章主要介绍了Spark SQL生产优化经验--任务参数配置模版。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

大表扫描

特殊case说明:当任务存在扫event_log表时需注意,若对event_log表进行了过滤,且过滤比很高,如下图的case,input为74T,但shuffle write仅为3.5G,那么建议提高单partition的读取数据量,将参数set spark.sql.files.maxPartitionBytes=536870912提高10倍至5368709120;
Spark SQL生产优化经验--任务参数配置模版,spark,sql,大数据

小型任务

目前测试:在不手动添加任何参数、平均时长在30min以内、单个shuffle 量在500G以下的任务可以使用该模版,但实际任务情况还需跟踪观察
开启AQE后,spark.sql.shuffle.partitio参数失效(spark默认200,任务从头到尾的所有shuffle都是这个并行度,无法自由操控)
AQE:实现原理是在每个stage完成后再启动一个子Job来计算shuffle中间结果量,依此来进行调节task个数,让作业根据每次shuffle的数据量自行调节并行度。
spark.sql.adaptive.minNumPostShufflePartitions 控制所有阶段的reduce个数,reduce个数区间最小值,为了防止分区数过小
spark.sql.adaptive.maxNumPostShufflePartitions reduce个数区间最大值,同时也是shuffle分区数的初始值
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 动态调整 reduce 个数的 partition 大小依据。如设置 64MB,则 reduce 阶段每个 task 最少处理 64MB 的数据。默认值为 64MB。

--基础资源
set spark.driver.memory=15g;
set spark.driver.cores=3;
set spark.driver.memoryOverhead=4096;
set spark.executor.memory=5G;
set spark.executor.memoryOverhead=1024;
set spark.executor.cores=2;
set spark.vcore.boost.ratio=2;
--动态executor申请
set spark.dynamicAllocation.minExecutors=10;
set spark.dynamicAllocation.maxExecutors=300;
--ae,shuffle partition并行度
set spark.sql.adaptive.minNumPostShufflePartitions=10;
set spark.sql.adaptive.maxNumPostShufflePartitions=1000;
--268435456;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=536870912;
--开启parquet切分
set spark.sql.parquet.adaptiveFileSplit=true;
--初始task调节,合并小文件
set spark.sql.files.maxPartitionBytes=536870912;

中型任务

目前测试:在不手动添加任何参数、平均时长在90min以内、单个shuffle 量在2T以下的任务可以使用该模版,但实际任务情况还需跟踪观察。
spark.executor.memoryOverhead 每个executor的堆外内存大小,堆外内存主要用于数据IO,对于报堆外OOM的任务要适当调大,单位Mb,与之配合要调大executor JVM参数,例如:set spark.executor.memoryOverhead=3072
set spark.executor.extraJavaOptions=-XX:MaxDirectMemorySize=2560m

--基础资源
set spark.driver.memory=25g;
set spark.driver.cores=4;
set spark.driver.memoryOverhead=5120;
set spark.executor.memory=10G;
set spark.executor.memoryOverhead=4096;
set spark.executor.cores=3;
set spark.vcore.boost.ratio=1;
--动态executor申请
set spark.dynamicAllocation.minExecutors=10;
set spark.dynamicAllocation.maxExecutors=600;
--AQE
set spark.sql.adaptive.minNumPostShufflePartitions=10;
set spark.sql.adaptive.maxNumPostShufflePartitions=1000;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize= 536870912;
--开启parquet切分,初始task调节,合并小文件
set spark.sql.parquet.adaptiveFileSplit=true;
set spark.sql.files.maxPartitionBytes=536870912;
--推测
set spark.speculation.multiplier=2.5;
set spark.speculation.quantile=0.8;
--shuffle 落地hdfs
set spark.shuffle.hdfs.enabled=true;
set spark.shuffle.io.maxRetries=1;
set spark.shuffle.io.retryWait=0s;

大型任务

目前测试:在不手动添加任何参数、平均时长在120min以内、单个shuffle 量在10T以下的任务可以使用该模版,但实际任务情况还需跟踪观察。

--基础资源
set spark.driver.memory=25g;
set spark.driver.cores=4;
set spark.driver.memoryOverhead=5120;
set spark.executor.memory=15G;
set spark.executor.memoryOverhead=3072;
set spark.executor.cores=3;
set spark.vcore.boost.ratio=1;
--动态executor申请
set spark.dynamicAllocation.minExecutors=10;
set spark.dynamicAllocation.maxExecutors=900;
--ae
set spark.sql.adaptive.minNumPostShufflePartitions=10;
set spark.sql.adaptive.maxNumPostShufflePartitions=3000;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize= 536870912;
--shuffle 落地hdfs
set spark.shuffle.hdfs.enabled=true;
set spark.shuffle.io.maxRetries=1;
set spark.shuffle.io.retryWait=0s;
--开启parquet切分,合并小文件
set spark.sql.parquet.adaptiveFileSplit=true;
set spark.sql.files.maxPartitionBytes=536870912;
--推测
set spark.speculation.multiplier=2.5;
set spark.speculation.quantile=0.9;

超大型任务

目前测试:在不手动添加任何参数、平均时长大于120min、单个shuffle 量在10T以上的任务可以使用该模版,但实际任务情况还需跟踪观察。

--基础资源
set spark.driver.memory=30g;
set spark.driver.cores=4;
set spark.driver.memoryOverhead=5120;
set spark.executor.memory=20G;
set spark.executor.memoryOverhead= 5120;
set spark.executor.cores=5;
set spark.vcore.boost.ratio=1;
--动态executor申请
set spark.dynamicAllocation.minExecutors=10;
set spark.dynamicAllocation.maxExecutors=1500;
--ae
set spark.sql.adaptive.minNumPostShufflePartitions=10;
set spark.sql.adaptive.maxNumPostShufflePartitions=7000;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize= 536870912;
--开启parquet切分,合并小文件
set spark.sql.parquet.adaptiveFileSplit=true;
set spark.sql.files.maxPartitionBytes=536870912;
-- shuffle 落地 hdfs,shuffle文件上传hdfs
set spark.shuffle.hdfs.enabled=true;
set spark.shuffle.io.maxRetries=1;
set spark.shuffle.io.retryWait=0s;
--推测
set spark.speculation.multiplier=2.5;
set spark.speculation.quantile=0.9;

其他常用参数

--ae hash join
set spark.sql.adaptive.hashJoin.enabled=true;
set spark.sql.adaptiveHashJoinThreshold=52428800;
--输出文件合并 byBytes,该功能会生成两个stage,
--第一个stage shuffle的数据量来预估最后生成到hdfs上的文件数据量大小,
--并通过预估的文件数据量大小计算第二个stage的并行度,即最后生成的文件个数。
--该功能只能控制生成的文件大小尽量接近spark.merge.files.byBytes.fileBytes,且有一定的性能损耗,需根据实测情况选择使用。    
-- 最终文件数量:(totalBytes / fileBytes / compressionRatio).toInt + 1                       
set spark.merge.files.byBytes.enabled=true;
set spark.merge.files.byBytes.repartitionNumber=100;  --第一个stage的并行读
set spark.merge.files.byBytes.fileBytes=134217728;		-- 预期的文件大小
set spark.merge.files.byBytes.compressionRatio=3;		-- 压缩比,shuffle文件和最后生成的文件格式和压缩格式都不相同,因此通过该参数调节
--输出文件合并  该功能会在原来job的最后一个stage后面增加1个stage来控制最后生成的文件数量,
--对于动态分区,每个分区生成spark.merge.files.number个文件。
spark.merge.files.enabled=true            
spark.merge.files.number=512
--skew_join 解析绕过tqs
set tqs.analysis.skip.hint=true;
--初始task上限
set spark.sql.files.openCostInBytes=4194304;
set spark.datasource.splits.max=20000;
--broadcast时间
set spark.sql.broadcastTimeout = 3600;
--(防止get json报错)
set spark.sql.mergeGetMapValue.enabled=true;

--ae 倾斜处理 HandlingSkewedJoin  OptimizeSkewedJoin
set spark.sql.adaptive.allowBroadcastExchange.enabled=true;
set spark.sql.adaptive.hashJoin.enabled=false;
set spark.sql.adaptive.skewedPartitionFactor=3; -- 某partition数据量大于中位数的3倍,判定为倾斜
set spark.sql.adaptive.skewedPartitionMaxSplits=20; -- 限制某一partition最多拆分多少分,spark3已失效
set spark.sql.adaptive.skewedJoin.enabled=true; -- Normal Join Pattern的优化开关
set spark.sql.adaptive.skewedJoinWithAgg.enabled=true; -- JoinWithAgg Pattern的优化开关,非开源版
set spark.sql.adaptive.multipleSkewedJoin.enabled=true;-- MultipleJoin Pattern的优化开关,非开源版
set spark.shuffle.highlyCompressedMapStatusThreshold=20000; -- 分区数大于20000时 使用HighlyCompressedMapStatus统计每个partition数据量,会降低数据统计进度

--并发读文件
set spark.sql.concurrentFileScan.enabled=true;
--filter按比例读取文件
set spark.sql.files.tableSizeFactor={table_name}:{filter 比例};
set spark.sql.files.tableSizeFactor=dm_content.tcs_task_dict:10;
--AM failed 时长
set spark.yarn.am.waitTime=200s;
--shuffle service 超时设置
set spark.shuffle.registration.timeout=12000;
set spark.shuffle.registration.maxAttempts=5;
--parquet index 特性生效,in 条件的个数
set spark.sql.parquet.pushdown.inFilterThreshold=30; 

--设置engine
set tqs.query.engine.type=sparkcli;

--hive metastore 超时
spark.hadoop.hive.metastore.client.socket.timeout=600

--manta备用
spark.sql.adaptive.maxNumPostShufflePartitions 5000
spark.executor.memoryOverhead 8000
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 536870912

数据倾斜优化 AQE Skewed Join文章来源地址https://www.toymoban.com/news/detail-526631.html

到了这里,关于Spark SQL生产优化经验--任务参数配置模版的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark 提交任务参数设置关于(线程,shuffle,序列化)

    是在使用 Apache Spark 时,为了设置 Java 虚拟机(JVM)的堆栈大小而使用命令行选项。 -Xss 是 Java 虚拟机的一个选项,用于设置线程的堆栈大小。在这个命令行选项中, -Xss6m 表示将线程的堆栈大小设为 6MB。这个选项的作用是为了避免在运行 Spark 任务时出现堆栈溢出的错误。

    2024年02月02日
    浏览(570)
  • 记一次生产慢sql索引优化及思考

    夜黑风高的某一晚,突然收到一条运营后台数据库慢sql的报警,耗时竟然达到了60s。 看了一下,还好不是很频繁,内心会更加从容排查问题,应该是特定条件下没有走到索引导致,如果频繁出现慢查询,可能会将数据库连接池打满,导致数据库不可用,从而导致应用不可用。

    2024年02月04日
    浏览(44)
  • spark sql官网优化指南

    缓存数据 调整参数 把数据缓存到内存,spark sql能够只扫描需要列并且会自动压缩数据,占用最小的内存和减小GC压力。这无需多言,内存远远要快于磁盘,spark效率比hive高这个就是一个主要原因。 缓存数据代码 释放缓存 用完后一定要记得释放掉,不要空占的内存浪费资源。

    2024年02月19日
    浏览(35)
  • HadoopYarn常用命令、yarn application查看任务、yarn logs查看日志、yarn applicationattempt查看尝试运行的任务、查看容器、Yarn生产环境核心参数

    Yarn状态的查询,除了可以在hadoop103:8088页面查看外,还可以通过命令操作。常见的命令操作如下所示: 需求:执行WordCount案例,并用Yarn命令查看任务运行情况。 [summer@hadoop102 ~]$ myhadoop.sh start 这个是之前写的脚本,想了解的可用看我之前写的文章 https://blog.csdn.net/Redamancy06/

    2024年01月18日
    浏览(49)
  • Spark SQL优化:NOT IN子查询优化解决

    文章最前 : 我是Octopus,这个名字来源于我的中文名--章鱼;我热爱编程、热爱算法、热爱开源。所有源码在我的个人github ;这博客是记录我学习的点点滴滴,如果您对 Python、Java、AI、算法有兴趣,可以关注我的动态,一起学习,共同进步。  相关文章: LEFT ANTI JOIN的使用

    2024年02月05日
    浏览(53)
  • Kafka重要生产参数配置建议

    单次poll()的调用可返回的最大消息总数, 默认是500条 循环拉取并放入List集合就返回,可以看出这个取值的大小,将会影响一次poll()所需消耗的时间。 这个参数就规定了,当调用poll()之后,如果在 max.poll.interval.ms 指定的时间内未消费完消息,也就是未再调用poll()方法,则C

    2023年04月09日
    浏览(68)
  • 【Kafka系列】生产级集群参数配置

    目录 1. 概述 2. Bocker 端参数 2.1 存储相关配置 2.2 Zookeeper 相关配置 2.3 Brocker 连接相关配置 2.4 Topic 管理相关配置 2.5 数据存留 相关配置 3. Topic 级别参数 4. JVM 参数 5. 操作系统参数 6. 小结 本文将从 Bocker 端参数、Topic 级别参数、JVM 端参数、操作系统级别参数分别介绍 Kafka 集群

    2024年02月22日
    浏览(37)
  • 性能优化:Spark SQL中的谓词下推和列式存储

    Apache Spark是一个强大的分布式计算框架,Spark SQL是其一个核心模块,用于处理结构化数据。性能优化是大数据处理中的一个关键问题,本文将深入探讨Spark SQL中的两个性能优化技术:谓词下推(Predicate Pushdown)和列式存储(Columnar Storage),以提高查询性能和降低资源消耗。

    2024年02月02日
    浏览(44)
  • 【Spark】配置参数关系-重要

    并行度指所有Executor可以同时执行的Task数, 每个Executor中的一个Core(线程,虚拟核数)同时只能执行一个Task, 所以  最大并行度 = Executor数量 * 每个Executor的Core数; eg:资源配置10个Executor节点,每个节点2个Core,那么同一时间可以并行计算的task数为20, 如果RDD有100个分区,

    2024年02月08日
    浏览(31)
  • spark sql 数据倾斜--join 同时开窗去重的问题优化

    背景: 需求:在一张查询日志表中,有百亿数据,需要join上维表,再根据几个字段进行去重 开窗去重和join 一定要分步进行 ,按照需求先做join再开窗,或者去重完成后在进行join。 dwd_tmp1 中存在百亿用户查询日志数据 数据倾斜 数据量超百亿,资源给到200 * 2c * 20G,执行引擎

    2024年02月11日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包