Hive & Spark & Flink 数据倾斜

这篇具有很好参考价值的文章主要介绍了Hive & Spark & Flink 数据倾斜。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

定义现象

绝大部分任务都很快完成,只有一个或者少数几个任务执行的很慢甚至最终执行失败,
这样的现象为数据倾斜现象。
任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 reduce 处理的记录数和平均记录数相差太大,通常达到好几倍之多,最长时间远大于平均时长(木桶原理)。

Hive

单表数据

  • 对于某些不需要计算的数据可以优先过滤

  • 当任务重存在 group by 的聚合操作时,开启参数设置

是否在 Map 端进行聚合,默认为 True
set hive.map.aggr = true;
在 Map 端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000;
有数据倾斜的时候进行负载均衡(默认是 false)
set hive.groupby.skewindata = true;

  • 增加 reduce 数据

每个 Reduce 处理的数据量默认是 256MB
set hive.exec.reducers.bytes.per.reducer = 256000000
每个任务最大的 reduce 数,默认为 1009
set hive.exec.reducers.max = 1009
计算 reducer 数的公式
N=min(参数 2,总输入数据量/参数 1)(参数 2 指的是上面的 1009,参数 1 值得是 256M)

设置每个 job 的 Reduce 个数
set mapreduce.job.reduces = 15;

多表join数据

  • 使用参数

join 的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
set hive.skewjoin.key=100000;
如果是 join 过程出现倾斜应该设置为 true
set hive.optimize.skewjoin=false;

如果开启了,在 Join 过程中 Hive 会将计数超过阈值 hive.skewjoin.key(默认 100000)的倾斜 key 对应的行临时写进文件中,然后再启动另一个 job 做 map join 生成结果。通过hive.skewjoin.mapjoin.map.tasks 参数还可以控制第二个 job 的 mapper 数量,默认 10000。

set hive.skewjoin.mapjoin.map.tasks=10000;

  • MapJoin

设置自动选择 MapJoin
set hive.auto.convert.join=true; #默认为 true
大表小表的阈值设置(默认 25M 以下认为是小表):
set hive.mapjoin.smalltable.filesize=25000000;

MapJoin 是将 Join 双方比较小的表直接分发到各个 Map 进程的内存中,在 Map 进程中进行 Join 操作,这样就不用进行 Reduce 步骤,从而提高了速度。

Spark

数据倾斜一般是发生在 shuffle 类的算子,比如 distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup 等,涉及到数据重分区,如果其中某一个 key 数量特别大,就发生了数据倾斜。

单表数据

  • 可以在 shuffle 之前的 map 端做预聚合操作。即在 shuffle 前将同一分区内所属同 key 的记录先进行一个预结算,再将结果进行 shuffle,发送到 reduce 端做一个汇总。
    比如 : reduceByKey 替代 groupByKey、两阶段聚合(加盐局部聚合+去盐全局聚合)等。

多表Join 数据

  • 大小表的Join,使用 广播机制。小表足够小,可被加载进 Driver 并通过 Broadcast 方法广播到各个 Executor 中。
  • 拆分大 key 打散大表,扩容小表。
  1. 将数据倾斜的 key 和没有数据倾斜的 key 分别为两个数据集;
  2. 对数据倾斜的 key 在其加上随机前缀,然后与另一个表的相同 key 也做随机前缀后 join。
  3. 对正常的数据做join,对倾斜的数据做join,然后 union。
方案 简介
HiveETL预处理 Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行 join,然后再 Spark作业中针对的数据就不是原来的表,而是预处理后的表,在Spark时就不需要进行原先的shuffle操作了
过滤少数导致数据倾斜的key 如果少数几个数据量特别多的 key 对作业的执行和计算结果不重要,那么直接结果掉它们
提高shuffle 并行度 提高shuffle类算子并行度
两阶段聚合 先随机加前缀预聚合,第二次去掉前缀再聚合
将reduce join 转成 map join 小表广播到大表所在 executor进行 mapjoin
采样倾斜key并分拆join 对少数key的数据加上随机前缀,另一个表也膨胀为加上随机前缀,然后进行 join,再与正常join数据进行 union
使用随机前缀和扩容RDD进行join 将有大量数据倾斜的 key,每条都打上一个随机前缀,将另一个RDD彭场,然后两个RDD进行join

Flink

  • 使用 LocalKeyBy
    通过在 KeyBy 之前积攒一定数量的数据,然后进行聚合,减少下游的数据量,从而使得 keyBy 之后的聚合操作不再是任务的瓶颈
  • shuffle 之前发送数据倾斜
    由于某些原因 Kafka 的 topic 中某些 partition 的数据量较大,某些partition 的数据量较少。需要让 Flink 任务强制进行 shuffle。使用 shuffle、rebalance 或 rescale算子即可将数据均匀分配,从而解决数据倾斜的问题。
  • keyBy 后的窗口聚合操作存在数据倾斜
    因为使用了窗口,变成了有界数据(攒批)的处理,窗口默认是触发时才会输出一条结果发往下游,所以可以使用两阶段聚合的方式:
    ➢ 第一阶段聚合:key 拼接随机数前缀或后缀,进行 keyby、开窗、聚合
    注意:聚合完不再是 WindowedStream,要获取 WindowEnd 作为窗口标记作为第二
    阶段分组依据,避免不同窗口的结果聚合到一起)
    ➢ 第二阶段聚合:按照原来的 key 及 windowEnd 作 keyby、聚合

总结

综上所述,对数据倾斜的问题,首先要判断该 key 是否会对结果产生影响,对其进行过滤或者打上随机 key。然后还可以通过随机前缀的两阶段处理 和 增加 reduce, map,减少 shuffle,重分区(Flink)等。文章来源地址https://www.toymoban.com/news/detail-466409.html

到了这里,关于Hive & Spark & Flink 数据倾斜的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Doris-05-集成Spark、Flink、Datax,以及数据湖分析(JDBC、ODBC、ES、Hive、多源数据目录Catalog)

    准备表和数据: Spark 读写 Doris Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。 代码库地址:https://github.com/apache/incubator-doris-spark-connector 支持从 Doris 中读取数据 支持 Spark DataFrame 批量/流式 写入 Doris 可以将 Doris 表映射为 DataFra

    2024年02月06日
    浏览(47)
  • 处理大数据的基础架构,OLTP和OLAP的区别,数据库与Hadoop、Spark、Hive和Flink大数据技术

    2022找工作是学历、能力和运气的超强结合体,遇到寒冬,大厂不招人,可能很多算法学生都得去找开发,测开 测开的话,你就得学数据库,sql,oracle,尤其sql要学,当然,像很多金融企业、安全机构啥的,他们必须要用oracle数据库 这oracle比sql安全,强大多了,所以你需要学

    2024年02月08日
    浏览(47)
  • Hive数据存储格式有哪些?TextFile、SequenceFile、RCFile、ORCFile、Parquet有什么区别?为什么绝大多数都使用ORCFile、Parquet格式?

    Hive 的数据存储,是 Hive 操作数据的基础。 选择一个合适的底层数据存储文件格式,即使在不改变当前 Hive SQL 的情况下,性能也能得到数量级的提升 。 这种优化方式对 MySQL 等关系型数据库有些类似,选择不同的数据存储引擎,代表着不同的数据组织方式,对于数据库的表现

    2024年02月02日
    浏览(43)
  • spark 数据倾斜处理

    1. 对多次使用的RDD进行持久化 同常内存够的时候建议使用:MEMORY_ONLY 如果内存不够的时候使用 通常建议使用:MEMORY_AND_DISK_SER策略,而不是 MEMORY_AND_DISK策略。 2. 使用高性能的算子 3. 广播大变量 4. 使用Kryo优化序列化性能 Kryo序列化器介绍: Spark支持使用Kryo序列化机制。Kryo序列化

    2024年02月11日
    浏览(36)
  • Spark数据倾斜及解决方法

    数据倾斜是指少量的Task运行大量的数据,可能会导致OOM。数据过量是所有的Task都很慢。避免数据倾斜的方式主要有: 按照Key分组后,一组数据拼接成一个字符串,这样一个Key只有一条数据了。这个方式个人觉得有点僵硬。 增大或缩小Key的粒度:增大粒度一个Key包含更多的数

    2024年02月15日
    浏览(26)
  • Hive 数据倾斜

    数据倾斜:数据分布不均匀,造成数据大量的集中到一点,造成数据热点。主要表现为任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 reduce 处理的记录数和平均记录数相差

    2024年04月28日
    浏览(26)
  • Hive-数据倾斜

    在计算各省份的GMV时,有可能会发生数据倾斜,解决办法如下: 分组聚合 预聚合思想 map-side(预聚合在map里面) skew-groupby(多个reduce阶段进行汇总):先对倾斜的key加上随机数,均匀分发到不同的reduce,进行一次聚合,然后去掉随机数,再发到一个reduce进行聚合。 表与表的

    2024年02月14日
    浏览(30)
  • Spark数据倾斜解决方案一:源数据预处理和过滤倾斜key

    为什么把源数据预处理和过滤掉倾斜的key两种处理倾斜的方式写到一起? 因为这两种方式在实际的项目中场景较少而且单一,对于数据源预处理,比如原本要在spark中进行聚合或join的操作,提前到hive中去做,这种方式虽然解决了spark中数据倾斜的问题,但是hive中依然也会存

    2024年02月09日
    浏览(33)
  • Spark数据倾斜问题分析和解决

    一、背景 首先需要掌握 Spark DAG、stage、task的相关概念 Spark的job、stage和task的机制论述 - 知乎 task数量和rdd 分区数相关 running task数=executor-core* num-executors (如果running task 没有达到乘积最大,一般是队列资源不足) https://www.cnblogs.com/muyue123/p/14036648.html 二、任务慢的原因分析 找到

    2024年02月03日
    浏览(63)
  • Spark数据倾斜场景及解决思路

    绝大多数 task 执行得都非常快,但个别 task 执行极慢。 在进行 shuffle 的时候,必须将各个节点上相同的 key 拉取到某个节点上的一个 task 来进行处理,比如按照 key 进行聚合或 join 等操 作。此时如果某个 key 对应的数据量特别大的话,就会发生数据倾斜。 因此出现数据倾斜的

    2023年04月24日
    浏览(24)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包