SparkSQL中控制文件输出数量

这篇具有很好参考价值的文章主要介绍了SparkSQL中控制文件输出数量。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在生产中,无论是通过SQL语句或者Scala/Java等代码的方式使用Spark SQL处理数据,在Spark SQL写数据时,往往会遇到生成的小文件过多的问题,而管理这些大量的小文件,是一件非常头疼的事情。
大量的小文件会影响Hadoop集群管理或者Spark在处理数据时的稳定性:

  1. Spark SQL写Hive或者直接写入HDFS,过多的小文件会对NameNode内存管理等产生巨大的压力,会影响整个集群的稳定运行。
  2. 容易导致task数过多,如果超过参数spark.driver.maxResultSize的配置(默认1g),会抛出类似如下的异常,影响任务的处理。

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 478 tasks (2026.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

当然可以通过调大spark.driver.maxResultSize的默认配置来解决问题,但如果不能从源头上解决小文件问题,以后还可能遇到类似的问题。

下面通过一个例子,Spark SQL写数据时,导致产生分区数"剧增"的典型场景,通过分区数"剧增",以及Spark中task数和分区数的关系等,来倒推小文件过多的可能原因(这里的分区数是指生成的DataSet/RDD的分区数,不是Hive分区表的分区概念):

现象

  1. 对表test_tab进行写入操作
  2. t1的分区数是100,t2的分区数是200,union all后生成的tmp分区数是300
  3. test_tab产生的小文件数基本也在300左右
select * from t1 union all select * from t2 as tmp;
insert overwrite table test_tab select * from tmp;

分析

1)执行上述insert操作时的分区并行度,主要受tmp的分区数(对应一个DataSet)影响,

2)tmp的分区数主要受t1、t2以及union all的影响

3)暂且不考虑t1或t2是物理表还是经过其他处理生成的临时表,它们的分区数是确定的,这里主要看经过union all处理后,生成的tmp的分区数和t1、t2的分区数有何关系?

4)Spark SQL语句中的union all对应到DataSet中即为unionAll算子,底层调用union算子

Spark RDD中的union算子对union产生的新的RDD的分区数是如何受被union的多个RDD的影响的,这里直接给出结论:

RDD在调用union算子时,最终生成的RDD分区数分两种情况:

  1. union的RDD分区器已定义并且它们的分区器相同
    多个符RDD具有相同额分区器,union后产生的RDD的分区器与父RDD相同且分区数也相同。比如,n个RDD的分区器相同且是defined,分区数是m个。那么这n个RDD最终union生成的一个RDD的分区数仍是m,分区器也是相同的
  2. 不满足第一种情况,则通过union生成的RDD的分区数为父RDD的分区数之和。

同样的这种机制也可以套用到Spark SQL中的DataSet上,那么就很好解释了tmp的分区数为什么等于t1和t2的分区数的和。

最后,Spark中一个task处理一个分区从而也会影响最终生成的文件数。

当然上述只是以Spark SQL中的一个场景阐述了小文件产生过多的原因之一(分区数过多)。在数仓建设中,产生小文件过多的原因有很多种,比如:

  1. 流式处理中,每个批次的处理执行保存操作也会产生很多小文件

  2. 为了解决数据更新问题,同一份数据保存了不同的几个状态,也容易导致文件数过多

那么如何解决这种小文件的问题呢?

  1. 通过repartition或coalesce算子控制最后的DataSet的分区数
    注意repartition和coalesce的区别。
  2. 将Hive风格的Coalesce and Repartition Hint 应用到Spark SQL需要注意这种方式对Spark的版本有要求,建议在Spark2.4.X及以上版本使用,示例:
--提示名称不区分大小写
INSERT ... SELECT /*+ COALESCE(numPartitions) */ ...
INSERT ... SELECT /*+ REPARTITION(numPartitions) */ ...

Coalesce Hint减少了分区数,它仅合并分区 ,因此最大程度地减少了数据移动,但须注意内存不足容易OOM。
Repartition Hint可以增加或减少分区数量,它执行数据的完全shuffle,并确保数据平均分配。
repartition增加了一个新的stage,因此它不会影响现有阶段的并行性;相反,coalesce会影响现有阶段的并行性,因为它不会添加新stage。
该写法还支持多个插入查询和命名子查询。

  1. 小文件定期合并
    可以定时通过异步的方式针对Hive分区表的每一个分区中的小文件进行合并操作。

补充

如何在纯写SQL的场景下,如何用Spark SQL做数据导入时候,控制小文件的数量。

  • 对于原始数据进行按照分区字段进行shuffle,可以规避小文件问题。但有可能引入数据倾斜的问题;

  • 可以通过distribute by ss_sold_date_sk, cast(rand() * N as int),N值可以在文件数量和倾斜度之间做权衡;

  • 知道倾斜键的情况下,可以将原始数据分成几个部分处理,不倾斜的按照分区键shuffle,倾斜部分可以按照rand函数来shuffle;

  • 对于Spark 2.4 以上版本的用户,也可以使用HINT 详情,链接如下:
    https://issues.apache.org/jira/browse/SPARK-24940

  • 对于Spark 3.0 以上版本的用户,可以使用自适应查询(AQE)功能,设置spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled为true,Spark就会在计算过程中自动帮助用户合并小文件,更加方便和智能。

# 自适应执行框架的开关
spark.sql.adaptive.enabled = true
# reduce个数区间最小值
spark.sql.adaptive.minNumPostShufflePartitions = 1
# reduce个数区间最大值
spark.sql.adaptive.maxNumPostShufflePartitions = 500 
# 动态调整reduce个数的partition大小依据,如果设置为64 MB,则reduce阶段每个task最少处理64 MB的数据
spark.sql.adaptive.shuffle.targetPostShuffleInputSize = 67108864
# 动态调整reduce个数的partition条数依据,如设置20000000则reduce阶段每个task最少处理20000000条的数据
spark.sql.adaptive.shuffle.targetPostShuffleRowCount = 20000000
spark.sql.auto.repartition = true

# 以下配置是针对join操作进行的性能优化
spark.sql.adaptive.join.enabled = true 
spark.sql.adaptive.skewJoin.enabled = true 
spark.shuffle.consolidateFiles = true 
spark.shuffle.service.enabled = true
spark.sql.adaptive.allowAdditionalShuffle = true

必须要出发shuffle,如果任务中只有map task,需要通过group by 或者distribute 触发shuffle的执行,只有触发shuffle,才能使用adaptive解决小文件问题。

如何避免Spark SQL做数据导入时产生大量小文件

SparkSQL自适应执行

参考文章:

hdfs 小文件如何处理

spark sql合并小文件_Spark SQL小文件问题在OPPO的解决方案

spark sql合并小文件_Spark SQL 小文件问题产生原因分析以及处理方案

使用Spark sql 合并 Flink 写Hive表的小文件文章来源地址https://www.toymoban.com/news/detail-473931.html

到了这里,关于SparkSQL中控制文件输出数量的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SparkSQL列数量比较多引发的Too many arguments in method signature in class file问题

    我在写一个Spark程序的时候,做两个表的关联,其中一个表为feature表,一共有96个特征,我使用下面的 代码片 的时候 在运行的时候会报错 在运行日志里面还有下面的错误提示 增加一行: .config(\\\"spark.sql.codegen.wholeStage\\\", value = false) 这个错误一般是由于 JVM 的方法参数限制所导

    2024年02月08日
    浏览(39)
  • Logback日志记录只在控制台输出sql,未写入日志文件【解决】

    原因:持久层框架对于Log接口实现方式不一样,日记记录的位置及展示方式也也不一样 可以分别配置到两个环境中,dev用StdOutImpl,prod用Slf4jImpl或者其他的都行,具体需要看他是如何实现的,使用logger对象输出的都是会写入日志的,使用System.out或err的只会在控制台显示,以下是实验

    2024年02月09日
    浏览(64)
  • 卷积神经网络每一层输出的形状、通道数、特征图数量以及过滤器数量的理解与计算。

    参考: http://t.csdn.cn/8ApfD ‘http://t.csdn.cn/ZmEOJ 核心观点:  前一层的通道数(特征图数量),决定这一层过滤器的深度; 这一层过滤器的数量,决定这一层输出的通道数(特征图数量) 神经网络每一层输出矩阵的形状一般是4个维度[y1, y2, y3, y4] y1 通常是batch_size,就是每一圈丢

    2023年04月09日
    浏览(53)
  • 微信小程序动态控制tabbar的数量,uniApp动态控制tabbar的数量

    需求分析 :         小程序登录进来有2种身份,每种身份看到的页面不一样,而且tabbar的数量也不一样,这个时候就需要用到微信小程序的自定义tabbar, 自定义tabbar和原生tabbar在用户体验上差不多,几乎看不出有什么区别,废话不多说直接上代码 创建一个文件夹 custom-ta

    2024年02月13日
    浏览(45)
  • .net 6 web api项目添加日志(Serilog)管理,将日志输出到控制台、文件、数据库

    1.在nuget安装下面几个包 Serilog Serilog.AspNetCore //用于日志输出到控制台 Serilog.Formatting.Compact //用于日志输出到mysql数据库 Serilog.Sinks.MySQL //用于日志输出到文件 Serilog.Sinks.RollingFile 2.在Program.cs文件配置日志参数,依赖注入 别忘了在appsettings.json添加数据库连接字符串 3. 使用日志

    2024年02月10日
    浏览(82)
  • Linux命令:重复多次后台运行且不保存输出,查看命令对应的进程数量

    要在后台重复运行 Linux 命令并查看对应的进程数量,你可以使用循环结构和后台运行符号 `` 结合起来。以下是一个示例: ```bash for i in {1..3}; do     your_command /dev/null 21 done ``` 命令 `your_command /dev/null 21 ` 的含义如下: 1. `` 符号表示重定向输出。在这个命令中,`your_command` 的标

    2024年02月22日
    浏览(47)
  • STM32高级定时器输出指定数量PWM(STM32CubeMx配置)

    高级定时器中有一个重复计数器,本实验输出指定个数PWM就是利用了重复计数器的特性,先来看看重复计数器的特性是什么: 计数器每次上溢或下溢都能使重复计数器减1,减到0时,再发生一次溢出就会产生更新事件 这是什么意思呢,这里举个例子比如说我设定重复计数器的

    2024年02月02日
    浏览(88)
  • 控制goroutine 的并发执行数量

    正常项目,协程数量超过十万就需要引起重视。如果有上百万goroutine,一般是有问题的。 但并不是说协程数量的上限是100多w 1048575的来自类似如下的demo代码: 执行后,很快报错 panic: too many concurrent operations on a single file or socket (max 1048575) 但这个是因为fmt.Printf导致的: 对单个

    2024年02月11日
    浏览(37)
  • springboot日志使用 SLF4J+Logback 实现(springboot默认的日志实现),日志打印到控制台及日志输出到指定文件

    还是直接上代码 @Slf4j 这玩意 默认支持 不用引入 yml 配置文件 下面分享 xml 方式 在 资源目录下创建 logback-spring.xml 粘贴走 即可 重启 看控制台变化 还有磁盘 有没有写入 坑 : 我在创建的时候 发现 xml 没有生效 排查了半天 发现 在创建 logback-spring.xml 这个文件的时候 我不是手

    2024年04月22日
    浏览(80)
  • 23、springboot日志使用入门-- SLF4J+Logback 实现(springboot默认的日志实现),日志打印到控制台及日志输出到指定文件

    就是springboot的默认的日志依赖实现。创建项目的时候存在这个依赖里面。 下面的日志实现就是基于 SLF4J+Logback SLF4J+Logback:SLF4J是门面,Logback是实现 设置日志的级别,可通过以下方式: ▲ 改变Spring Boot的核心日志级别 ▲ 改变程序组件(包括所有各种框架)的核心日志级别

    2024年02月03日
    浏览(61)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包