count distinct在spark中的运行机制

这篇具有很好参考价值的文章主要介绍了count distinct在spark中的运行机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

预备 数据和执行语句

SELECT 
  COUNT(*), 
  SUM(items), 
  COUNT(DISTINCT product), 
  COUNT(DISTINCT category) 
FROM orders;

假设源数据分布在两个1核的结点上,数据就8行
count distinct在spark中的运行机制,spark,大数据

Expand

spark把count distinct操作转换成count操作。

第一步是对每个要count distinct的列,生成新的行(这里是product和category列),当然原来不需要distinct聚合的列也在。

原来items列不需要distinct,product和category列要distinct,所以数据膨胀了2倍。原来8条数据,现在是8*(1+2)=24条

count distinct在spark中的运行机制,spark,大数据

spark加了gid这一列,值为0代表所有非distinct聚合(这里是count(*)和sum(items)),值为1和2分别代表其他distinct聚合(这里1代表product,2代表category)。

NULL是怎么赋值的:对输入列来说,每行只有1个非空值。在spark的物理执行计划中,可以看到操作是这样的

  Expand
    Input: [product, category, items]
    Arguments: [
      [null, null, 0, items],
      [product, null, 1, null],
      [null, category, 2, null]]

第一次HashAggregate

Spark使用所有count distinct的列和gid作为关键字(product、category和gid)对行进行局部散列,并对非distinct的聚合(count(*)和SUM(items))执行局部局部聚合:

相当于执行了select product,category,gid,count(*) cnt,sum(items) items from 膨胀后的表 group by product,category,gid

count distinct在spark中的运行机制,spark,大数据

这可以使得膨胀后的数据变小。

如果不同值的数量比较少,减少的数据是相当可观的,最终结果可能比原始数据还要少。

可以看到原来每个结点上有4行,膨胀后是12行,局部聚合后变成了6行。

Shuffle and Second HashAggregate

在每个结点内部HashAggregate后,经过shuffle后变成这样

count distinct在spark中的运行机制,spark,大数据

重新再每个结点做局部shuffle,得到

(相当于执行了select product,category,gid,count(*) cnt,sum(items) items from 膨胀后的表 group by product,category,gid

count distinct在spark中的运行机制,spark,大数据

这一步使得所有键都变成了唯一的。

最后结果

现在所有行可以合并成一个partition,再次HashAggregation,但这次不用group by product, category和gid

count distinct在spark中的运行机制,spark,大数据

现在再也没有重复值了,简单的count和根据gid筛选就可以得到想要的count distinct结果

  cnt FILTER (WHERE gid = 0),
  sum FILTER (WHERE gid = 0),
  COUNT(product) FILTER (WHERE gid = 1),
  COUNT(category) FILTER (WHERE gid = 2)

Result:

  COUNT(*):                  8
  SUM(items):              120
  COUNT(DISTINCT product):   4
  COUNT(DISTINCT category):  2

性能

  • 如果不同值的数量比较少,那么即使膨胀后,最后要shuffle的行也很少,这样因为spark局部聚合的原因,count distinct是相对比较快的
  • 如果不同值的数量很多,并且你在一个语句中使用多个count distinct对不同的列。那么要shuffle行因为膨胀会很多,局部聚合也不能有效遏制数据的膨胀,那么要让查询语句成功执行需要消耗更多的executor内存。

原文

Distributed COUNT DISTINCT – How it Works in Spark, Multiple COUNT DISTINCT, Transform to COUNT with Expand, Exploded Shuffle, Partial Aggregations – Large-Scale Data Engineering in Cloud (cloudsqale.com)文章来源地址https://www.toymoban.com/news/detail-765797.html

到了这里,关于count distinct在spark中的运行机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark on Hive及 Spark SQL的运行机制

    代码中集成Hive: Spark SQL底层依然运行的是Spark RDD的程序,所以说Spark RDD程序的运行的流程,在Spark SQL中依然是存在的,只不过在这个流程的基础上增加了从SQL翻译为RDD的过程 Spark SQL的运行机制,其实就是在描述如何将Spark SQL翻译为RDD程序 Catalyst内部具体的执行流程: 专业术

    2024年01月23日
    浏览(50)
  • Apache Spark中的广播变量分发机制

    Apache Spark中的广播变量提供了一种机制,允许用户在集群中共享只读变量,并且每个任务都可以访问这个变量,而不需要在每次任务之间重新发送该变量。这种机制特别适用于在所有节点上都需要访问同一份只读数据集的情况,因为它可以显著减少网络通信的开销。 以下是广

    2024年01月24日
    浏览(51)
  • Spark大数据处理讲课笔记--- RDD持久化机制

    理解RDD持久化的必要性 了解RDD的存储级别 学会如何查看RDD缓存 Spark中的RDD是懒加载的,只有当遇到行动算子时才会从头计算所有RDD,而且当同一个RDD被多次使用时,每次都需要重新计算一遍,这样会严重增加消耗。为了避免重复计算同一个RDD,可以将RDD进行持久化。 Spark中

    2024年02月06日
    浏览(45)
  • 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】

    视频地址: 尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程、案例实操)】 尚硅谷大数据技术Spark教程-笔记03【SparkSQL(概述、核心编程、

    2023年04月21日
    浏览(48)
  • 云计算与大数据笔记之Spark【重点:流水线机制】

    图片和部分笔记来自于厦门大学-林子雨-大数据技术原理与应用(第3版) 配套PPT Storm、Hadoop和Spark都是处理大数据的框架,但它们各自在设计上有着不同的侧重点,这导致了它们在实际应用中的不同定位。 主要组件 :Hadoop Distributed File System (HDFS) 和 MapReduce。 设计理念 :主要

    2024年04月16日
    浏览(44)
  • 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境)】

    视频地址: 尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程、案例实操)】 尚硅谷大数据技术Spark教程-笔记03【SparkSQL(概述、核心编程、

    2023年04月15日
    浏览(55)
  • MySQL 中的 distinct 和 group by 哪个效率更高?

    在语义相同,有索引的情况下:group by和distinct都能使用索引,效率相同。 在语义相同,无索引的情况下:distinct效率高于group by。原因是distinct 和 group by都会进行分组操作,但group by可能会进行排序,触发filesort,导致sql执行效率低下。 DISTINCT 用于返回唯一不同的值。

    2024年01月23日
    浏览(42)
  • mysql中的count(1)、count(*)、count(id)哪个更快?

    1 首先我们要先了解count()方法的原理 count()方法的作用就是计算当前sql语句所能查到的 非NUL L的行数, mysql分为server层和存储引擎层,具体结构如下: 常见的存储引擎是InnoDB、myisam。 为什么要介绍引擎种类呢?因为count()方法在不同的存储引擎下,他的实现方式是不一样的。

    2024年02月07日
    浏览(39)
  • 大数据 HDFS 的历史、特性、适用场景 运行机制、数据布局、读写流程、容错机制等

    作者:禅与计算机程序设计艺术 大数据的快速增长、高并发、海量数据、多样化的数据源、动态变化的数据特征,给数据的分析、挖掘带来了巨大的挑战。而HDFS就是存储大数据的一个关键组件。HDFS是一个分布式文件系统,主要用来存储和处理超大规模的数据集。HDFS可以方便

    2024年02月08日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包