Spark数据倾斜场景及解决思路

这篇具有很好参考价值的文章主要介绍了Spark数据倾斜场景及解决思路。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

数据倾斜发生时的现象

绝大多数 task 执行得都非常快,但个别 task 执行极慢。

数据倾斜发生的原理

在进行 shuffle 的时候,必须将各个节点上相同的 key 拉取到某个节点上的一个 task 来进行处理,比如按照 key 进行聚合或 join 等操 作。此时如果某个 key 对应的数据量特别大的话,就会发生数据倾斜。 因此出现数据倾斜的时候,Spark 作业看起来会运行得非常缓慢,甚至可能因为 某个 task 处理的数据量过大导致内存溢出。而整个 stage 的运行速度也 由运行最慢的那个 task 所决定。 10条 / 100 万条

如何定位导致数据倾斜的代码

数据倾斜只会发生在 shuffle 过程中。 可能会触 发 shuffle 操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、 join、cogroup、repartition 等。

某个 task 执行特别慢的情况

就是数据倾斜发生在第几个 stage 中。 用 yarn-client 模式 提交,那么本地是直接可以看到 log 的,可以在 log 中找到当前运行到了第几个 stage, 用 yarn-cluster 模式提交,则可以通过 Spark Web UI 来查看当前运行到 了第几个 stage。
在 Spark Web UI 上深入看一下当前这个 stage 各个 task 分配的数据量,从而进一 步确定是不是 task 分配的数据不均匀导致了数据倾斜。
知道数据倾斜发生在哪一个 stage 之后,接着我们就需要根据 stage 划分原理, 推算出来发生倾斜的那个 stage 对应代码中的哪一部分,这部分代码中肯定会有 一个 shuffle 类算子。一般shuffle类算子会切分stage。

某个 task 莫名其妙内存溢出的情况

看 yarn-client 模式下本地 log 的异常栈,或者是通过 YARN 查看 yarn-cluster 模式下的 log 中的异常栈。一般来说,通过异常栈信息就可以定位到代码中哪一行发生了 内存溢出。然后在那行代码附近找找,一般也会有 shuffle 类算子,此时很可能 就是这个算子导致了数据倾斜。 但是内存溢出不一定就是发生了数据倾斜,也有可能是代码存在bug。
查看导致数据倾斜的 key 的数据分布情况
知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了 shuffle 操作并 且导致了数据倾斜的 RDD/Hive 表,查看一下其中 key 的分布情况。这主要是为 之后选择哪一种技术方案提供依据。

  • 如果是 Spark SQL 中的 group by、join 语句导致的数据倾斜,那么就查询一下 SQL 中使用的表的 key 分布情况。
  • 如果是对 Spark RDD 执行 shuffle 算子导致的数据倾斜,那么可以在 Spark 作业中加入查看 key 分布的代码,比如 RDD.countByKey() 。对统计出来的次数,collect/take 到客户端打印一下,就可以 看到 key 的分布情况。

数据倾斜解决方案

1. Hive 表中的数据本身很不均匀 -使用 Hive ETL 预处理数据

Hive ETL 预先对数据按照 key 进行聚合,或者是预先和其他表进行 join, 然后在 Spark 作业中针对的数据源就不是原来的 Hive 表了,而是预处理后的 Hive 表。此时由于数据已经预先进行过聚合或 join 操作了,那么在 Spark 作业 中也就不需要使用原先的 shuffle 类算子执行这类操作了。
这种方案从根源上解决了数据倾斜,因为彻底避免了在 Spark 中 执行 shuffle 类算子,那么肯定就不会有数据倾斜的问题了。 但是指标不治本,Hive ETL也会发生数据倾斜。

2. 导致倾斜的 key 就少数几个,而且对计算本身的影响并不大的-过滤少数导致倾斜的 key

少数几个数据量特别多的 key,对作业的执行和 计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个 key。 将导致数据倾斜的 key 给过滤掉之后,这些 key 就不会参与计算 了,自然不可能产生数据倾斜。

3. 必须要对数据倾斜迎难而上 -提高 shuffle 操作的并行度

对于 Spark SQL 中的 shuffle 类语句,比如 group by、join 等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了 shuffle read task 的并行度,该值默认是 200,对于很多场景来说都有点过小。 增加 shuffle read task 的数量,可以让原本分配给一个 task 的多个 key 分配给多个 task,从而让每个 task 处理比原来更少的数据 。
缺点:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况, 比如某个 key 对应的数据量有 100 万,那么无论你的 task 数量增加到多少,这 个对应着 100 万数据的 key 肯定还是会分配到一个 task 中去处理,因此注定还 是会发生数据倾斜的。

4. 聚合类 (reduceByKey /group by )的shuffle-两阶段聚合(局部聚合+全局聚合)

这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚 合,先给每个 key 都打上一个随机数,比如 10 以内的随机数,此时原先一样的 key 就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1), 就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打 上随机数后的数据,执行 reduceByKey 等聚合操作,进行局部聚合,那么局部聚 合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个 key 的前缀给去 掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结 果了,比如(hello, 4)。
将原本相同的 key 通过附加随机前缀的方式,变成多个不同的 key, 就可以让原本被一个 task 处理的数据分散到多个 task 上去做局部聚合,进而解 决单个 task 处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合, 就可以得到最终的结果。

5. 小表join大表-将 reduce join 转为 map join

将较小 RDD 中的数据直接通过 collect 算子拉取到 Driver 端的 内存中来,然后对其创建一个 Broadcast 变量;接着对另外一个 RDD 执行 map 类算子,在算子函数内,从 Broadcast 变量中获取较小 RDD 的全量数据,与当前 RDD 的每一条数据按照连接 key 进行比对,如果连接 key 相同的话,那么就将两 个 RDD 的数据用你需要的方式连接起来。 普通的 join 是会走 shuffle 过程的,而一旦 shuffle,就相当于 会将相同 key 的数据拉取到一个 shuffle read task 中再进行 join,此时就是 reduce join。但是如果一个 RDD 是比较小的,则可以采用广播小 RDD 全量数据 +map 算子来实现与 join 同样的效果,也就是 map join,此时就不会发生 shuffle 操作,也就不会发生数据倾斜。

6. 两个表数据量都比较大,但是其中一个少数几个 key 的数据量过大 ,另一个可以均匀-采样倾斜 key 并分拆 join 操作

  • 对包含少数几个数据量过大的 key 的那个 RDD,通过 sample 算子采样,计算出来数据量最大的是哪几个 key。
  • 然后将这几个 key 对应的数据从原来的 RDD 中拆分出来,形成一个单独的 RDD,并给每个 key 都打上 n 以内的随机数作为前缀,而不会导致倾斜的 大部分 key 形成另外一个 RDD。
  • 接着将需要 join 的另一个 RDD,也过滤出来那几个倾斜 key 对应的数据 并形成一个单独的 RDD,将每条数据膨胀成 n 条数据,这 n 条数据都按顺 序附加一个 0~n 的前缀,不会导致倾斜的大部分 key 也形成另外一个 RDD。
  • 再将附加了随机前缀的独立 RDD 与另一个膨胀 n 倍的独立 RDD 进行 join, 此时就可以将原先相同的 key 打散成 n 份,分散到多个 task 中去进行 join 了。
  • 而另外两个普通的 RDD 就照常 join 即可。
  • 最后将两次 join 的结果使用 union 算子合并起来即可,就是最终的 join 结果。
    对于 join 导致的数据倾斜,如果只是某几个 key 导致了倾斜, 可以将少数几个 key 分拆成独立 RDD,并附加随机前缀打散成 n 份去进行 join, 此时这几个 key 对应的数据就不会集中在少数几个 task 上,而是分散到多个 task 进行 join 了。

7. join 操作时,RDD 中有大量的 key 导致数据倾斜,分拆 key 也没什么意义 -使用随机前缀和扩容 RDD

  • 首先查看 RDD/Hive 表中 的数据分布情况,找到那个造成数据倾斜的 RDD/Hive 表,比如有多个 key 都对应了超过 1 万条数据。
  • 然后将该 RDD 的每条数据都打上一个 n 以内的随机前缀。
  • 同时对另外一个正常的 RDD 进行扩容,将每条数据都扩容成 n 条数据,扩容出来的每条-数据都依次打上一个 0~n 的前缀。
  • 最后将两个处理后的 RDD 进行 join 即可。
    相当于上面一种情况不分拆。

参考资料

《五分钟学大数据-Spark数据倾斜及解决方案》文章来源地址https://www.toymoban.com/news/detail-423163.html

到了这里,关于Spark数据倾斜场景及解决思路的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 万字解决Flink|Spark|Hive 数据倾斜

    此篇主要总结到Hive,Flink,Spark出现数据倾斜的表现,原因和解决办法。首先会让大家认识到不同框架或者计算引擎处理倾斜的方案。最后你会发现计算框架只是“异曲”,文末总结才是“同工之妙”。点击收藏与分享,工作和涨薪用得到!!! 数据倾斜最笼统概念就是数据的

    2024年02月03日
    浏览(45)
  • Hive数据倾斜常见场景及解决方案(超全!!!)

    Hive数据倾斜常见问题和解决方案 目录 前言 一、Explain 二、数据倾斜 1.什么是数据倾斜?它的主要表现? 2.产生数据倾斜的常见原因 一.join时:首先是大表关联小表,容易发生数据倾斜 二.join时:空key过多,或者相同key过多 三.join时:不同数据类型关联产生数据倾斜 四.join时

    2024年02月03日
    浏览(44)
  • 基于MapReduce的Hive数据倾斜场景以及解决方案

    通常认为当所有的map task全部完成,并且99%的reduce task完成,只剩下一个或者少数几个reduce task一直在执行,这种情况下一般都是发生了数据倾斜。 即为在整个计算过程中,大量相同的key被分配到了同一个reduce任务上造成。Hive的数据倾斜本质上是MapReduce计算引擎的数据倾斜,

    2024年02月13日
    浏览(52)
  • spark 数据倾斜处理

    1. 对多次使用的RDD进行持久化 同常内存够的时候建议使用:MEMORY_ONLY 如果内存不够的时候使用 通常建议使用:MEMORY_AND_DISK_SER策略,而不是 MEMORY_AND_DISK策略。 2. 使用高性能的算子 3. 广播大变量 4. 使用Kryo优化序列化性能 Kryo序列化器介绍: Spark支持使用Kryo序列化机制。Kryo序列化

    2024年02月11日
    浏览(48)
  • Hive & Spark & Flink 数据倾斜

    绝大部分任务都很快完成,只有一个或者少数几个任务执行的很慢甚至最终执行失败, 这样的现象为数据倾斜现象。 任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 redu

    2024年02月07日
    浏览(41)
  • spark 的group by ,join数据倾斜调优

    spark任务中最常见的耗时原因就是数据分布不均匀,从而导致有些task运行时间很长,长尾效应导致的整个job运行耗时很长 首先我们要定位数据倾斜,我们可以通过在spark ui界面中查看某个stage下的task的耗时,如果发现某些task耗时很长,对应要处理的数据很多,证明有数据倾斜

    2024年02月21日
    浏览(46)
  • 【详解】Spark数据倾斜问题由基础到深入详解-完美理解-费元星

    数据倾斜定义:顾名思义,就是大量相似或相同数据聚集在一个块的节点里,导致计算和资源分配不均导致的计算缓慢(长尾)问题。 数据倾斜原因: count(distinct field) group by  NULL 空值 Shuffle (概率最高、发生最普遍的数据倾斜问题,本文重点讲述这个) ##########################

    2024年02月20日
    浏览(40)
  • spark sql 数据倾斜--join 同时开窗去重的问题优化

    背景: 需求:在一张查询日志表中,有百亿数据,需要join上维表,再根据几个字段进行去重 开窗去重和join 一定要分步进行 ,按照需求先做join再开窗,或者去重完成后在进行join。 dwd_tmp1 中存在百亿用户查询日志数据 数据倾斜 数据量超百亿,资源给到200 * 2c * 20G,执行引擎

    2024年02月11日
    浏览(57)
  • 基于MapReduce的Hive数据倾斜场景以及调优方案

    通常认为当所有的map task全部完成,并且99%的reduce task完成,只剩下一个或者少数几个reduce task一直在执行,这种情况下一般都是发生了数据倾斜。 即为在整个计算过程中,大量相同的key被分配到了同一个reduce任务上造成。Hive的数据倾斜本质上是MapReduce计算引擎的数据倾斜,

    2024年02月12日
    浏览(51)
  • ArcGIS Pro 转换Smart3D生成的倾斜3D模型数据osgb——创建集成网格场景图层包

    最近在做Arcgis 批处理的一些工作,然后再学习Python的同时,偶然觉得arcgis Pro是个好东西呢?然后结合近期的Smart3D倾斜3D模型数据,是否可以在arcgis里查看呢?带着这样的疑问和好奇,开始了arcgis Pro的学习,从安装到自学。找到了方法。 就是使用arcgis Pro创建集成网格场景图

    2023年04月19日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包