spark5种去重方式,快速去重

这篇具有很好参考价值的文章主要介绍了spark5种去重方式,快速去重。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. count(distinct) 去重

sql中最简单的方式,当数据量小的时候性能还好.当数据量大的时候性能较差.因为distinct全局只有一个reduce任务来做去重操作,极容易发生数据倾斜的情况,整体运行效率较慢.
示例: (对uid去重)

select count(distinct uid) uv,name,age
from A
group by name,age

2. 双重group by 去重

双重group by将去重分成了两步,是分组聚合运算,group by操作能进行多个reduce任务并行处理,每个reduce都能收到一部分数据然后进行分组内去重,不再像distinct只有一个reduce进行全局去重.
并且双重group by使用了“分而治之”的思想,先局部聚合和再进行全局汇总,整体效率要比distinct高很多.

  1. 首先根据uid,name,age进行分区内去重
  2. 将分组内去重后的数据进行全局去重.
select count(uid) uv,name,age
from (
    select uid,name,age
    from A
    group by uid,name,age
) a
group by name,age

3. row_number() over() 窗口函数去重

窗口函数(开窗)同样可实现去重功能,相比distinct和group by, row_number() over()要更加灵活,我们在日常需求中,经常遇到需要保留多条重复数据中的某一条或者某几条. 如:

  1. 相同的用户名中我们需要年龄最大的那一个
  2. 相同数据中我们需要时间戳最小的那几个
    这时候,row_number() over()可以完美解决我们的问题
row_number() over(partiiton by xxx order by xxx [asc / desc])

# 相同用户名只取年龄最大的那一个
select uname
from (
	select uname,row_number() over(partition by age desc) rn
	from A
) s
where rn = 1

# 相同数据中,只取时间戳最小的3条数据
select uid
from (
	select uid, row_number() over(partition by uid order by log_timestamp asc) rn
	from A
) s
where rn <= 3

4. sortWithinPartitions + dropDuplicates

DataFrame中,可以先将分区内数据进行排序,然后通过dropDuplicates将重复的数据删除.

  • sortWithinPartitions: 将分区内的数据进行排序,通过源码我们可以看出此算子和sql中sort by作用相同,并且此算子是对每个executor中数据进行排序,并不会发生shuffle,所以效率较高.但是此算子并没办法对排序的字段进行倒排(desc),默认是正排(asc).
  • dropDuplicates: 此算子是将排序中重复的字段进行去重,最终只保留第一条.
    本人理解,此操作和sublime中先排序再通过正则去重的思路完全一致
    spark5种去重方式,快速去重
    根据logid排序并进行去重,最终只保留一条logid
val newExt = extractResDf
	.sortWithinPartitions(col("logid"))
	.dropDuplicates("logid")

5. mapPartitions + HashSet分区内去重

因为业务需要,上面的效率都有些慢,无法满足日常跑数,我们是不是可以省去对分区内数据的排序操作,直接进行去重操作呢?
于是,我就想到了 mapPartitions算子 + HashSet进行分区内去重,就省去了排序的步骤,效率更高.

val testSchema: StructType = testDf.schema
val testEncoder: ExpressionEncoder[Row] = RowEncoder(testSchema)
testDf.mapPartitions(line => {
  val rowBuf = new scala.collection.mutable.ArrayBuffer[Row]()
  val logidSet = new mutable.HashSet[String]
  line.foreach(row => {
    if (logidSet.add(row.getAs[String]("logid"))) {
      rowBuf.append(row)
    }
  })
  rowBuf.iterator
})(testEncoder)

这里使用mapPartition遍历每个分区的数据, 然后每个分区中有一个HashSet,向HashSet中添加数据,如果

logidSet.add(row.getAs[String]("logid"))

方法返回结果为true,则说明HashSet中不存在此logid,不进行去重; 如果返回结果为false,则说明此logid已经存在,不放入到ArrayBuffer中即可,最终就能实现每个logid只在分区内保留一条的结果.文章来源地址https://www.toymoban.com/news/detail-415023.html

  • 注: 如果不加endocer,会报如下错:
    spark5种去重方式,快速去重
  • Error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

到了这里,关于spark5种去重方式,快速去重的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 5. Hive的三种去重方法

    Hive的三种去重方法 1. distinct 注意事项: distinct 不能单独用于指定某一列,必须放在 select 中所有字段的最前面,否则会报错 distinct 是对 select 后面所有字段的组合进行去重 ,并不是只对紧跟其后的 column1 去重。distinct 的作用范围是整个 SELECT 子句的结果集 distinct 对 NULL 是不

    2024年02月13日
    浏览(45)
  • MongoDB——去重函数Distinct

    MongoDB的distinct方法可以用于检索指定字段的唯一值,以下是对MongoDB distinct的阐述: 一、distinct方法的语法 distinct方法包含三个参数,field表示要检索唯一值的字段,query表示检索时使用的查询语句,options包括projection和sort等选项。 二、distinct的基本用法 在collection中使用disti

    2024年01月24日
    浏览(41)
  • Mysql 数据库去重(distinct)

    有时需要查询出某个字段不重复的记录,这时可以使用mysql提供的distinct这个来过滤重复的记录,但是实际中我们往往用distinct来返回不重复字段的条数(count(distinct id)),其原因是distinct只能返回他的目标字段,而无法返回其他字段 distinct 这种方式会将全部内容存储在一

    2024年02月13日
    浏览(38)
  • MySQL中去重 distinct 和 group by 是如何去重的

     测试1:对name 字段进行过滤  测试2:对所有的字段进行去重。   对比测试1和测试2 发现, distinct 可以对 单一字段进行去重 ,当对所有的字段去重时,只有 不同数据的每个字段完全一样的数据被 去掉,其他只有一个字段重复的数据并没有变化,因此得出结论: distinct主要

    2024年02月07日
    浏览(50)
  • mysql distinct 和 group by 去重

    MySQL中常用去重复数据的方法是使用 distinct 或者 group by group by 分组后,如果没有对分组后的数据进行操作,如使用聚合函数/分组函数:count、sum、avg、max 、min,分组后直接显示该分组的第一条数据。 说明: 有一个事件评论表,针对每个事件,用户都可以发表评论,每发表一

    2024年02月04日
    浏览(39)
  • mybatis-plus使用sum,count,distinct等函数的方法

    通过mybatis-plus实现以下sql查询 mybatis-plus实现

    2024年02月12日
    浏览(45)
  • java stream distinct根据list某个字段去重

    java stream distinct根据list某个字段去重,普通List简单去重: 很显然这种满足不了需求,我们List里的是实体对象,这里的是字符串。 首先创建了一个Student类,该类包含id、name、age三个字段,使用了注解@Data,我们想根据学生的name去重。 一、Stream流 + TreeSet(推荐) 根据学生的

    2024年01月17日
    浏览(76)
  • hive中Distinct和group by去重的对比

            在Hive中, DISTINCT和GROUP BY都可以用于去重,但是它们背后的实现方式是不同的,因此它们的效率也是不同的。         DISTINCT是一种去重方法,它会扫描整个数据集,然后将重复的记录删除,只留下唯一的记录 。由于DISTINCT需要扫描整个数据集,因此它在处理

    2024年02月04日
    浏览(55)
  • 【数据库】PostgreSQL中使用`SELECT DISTINCT`和`SUBSTRING`函数实现去重查询

    在PostgreSQL中,我们可以使用 SELECT DISTINCT 和 SUBSTRING 函数来实现对某个字段进行去重查询。本文将介绍如何使用这两个函数来实现对 resource_version 字段的去重查询。 1. SELECT DISTINCT 语句 SELECT DISTINCT 语句用于从表中选择不重复的记录。如果没有指定列名,则会选择所有列。在本

    2024年02月14日
    浏览(42)
  • Spark Sql之dropDuplicates去重

    dropDuplicates去重原则:按数据行的顺序保留每行数据出现的第一条 dropDuplicates 在Spark源码里面提供了以下几个方法重载: 这个方法,不需要传入任何的参数,默认根据所有列进行去重,然后按数据行的顺序保留每行数据出现的第一条。 传入的参数是一个序列。你可以在序列中

    2024年02月12日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包