1. count(distinct) 去重
sql中最简单的方式,当数据量小的时候性能还好.当数据量大的时候性能较差.因为distinct全局只有一个reduce任务来做去重操作,极容易发生数据倾斜的情况,整体运行效率较慢.
示例: (对uid去重)
select count(distinct uid) uv,name,age
from A
group by name,age
2. 双重group by 去重
双重group by将去重分成了两步,是分组聚合运算,group by操作能进行多个reduce任务并行处理,每个reduce都能收到一部分数据然后进行分组内去重,不再像distinct只有一个reduce进行全局去重.
并且双重group by使用了“分而治之”的思想,先局部聚合和再进行全局汇总,整体效率要比distinct高很多.
- 首先根据uid,name,age进行分区内去重
- 将分组内去重后的数据进行全局去重.
select count(uid) uv,name,age
from (
select uid,name,age
from A
group by uid,name,age
) a
group by name,age
3. row_number() over() 窗口函数去重
窗口函数(开窗)同样可实现去重功能,相比distinct和group by, row_number() over()要更加灵活,我们在日常需求中,经常遇到需要保留多条重复数据中的某一条或者某几条. 如:
- 相同的用户名中我们需要年龄最大的那一个
- 相同数据中我们需要时间戳最小的那几个
这时候,row_number() over()可以完美解决我们的问题
row_number() over(partiiton by xxx order by xxx [asc / desc])
# 相同用户名只取年龄最大的那一个
select uname
from (
select uname,row_number() over(partition by age desc) rn
from A
) s
where rn = 1
# 相同数据中,只取时间戳最小的3条数据
select uid
from (
select uid, row_number() over(partition by uid order by log_timestamp asc) rn
from A
) s
where rn <= 3
4. sortWithinPartitions + dropDuplicates
DataFrame中,可以先将分区内数据进行排序,然后通过dropDuplicates将重复的数据删除.
- sortWithinPartitions: 将分区内的数据进行排序,通过源码我们可以看出此算子和sql中sort by作用相同,并且此算子是对每个executor中数据进行排序,并不会发生shuffle,所以效率较高.但是此算子并没办法对排序的字段进行倒排(desc),默认是正排(asc).
- dropDuplicates: 此算子是将排序中重复的字段进行去重,最终只保留第一条.
本人理解,此操作和sublime中先排序再通过正则去重的思路完全一致
根据logid排序并进行去重,最终只保留一条logid
val newExt = extractResDf
.sortWithinPartitions(col("logid"))
.dropDuplicates("logid")
5. mapPartitions + HashSet分区内去重
因为业务需要,上面的效率都有些慢,无法满足日常跑数,我们是不是可以省去对分区内数据的排序操作,直接进行去重操作呢?
于是,我就想到了 mapPartitions算子 + HashSet进行分区内去重,就省去了排序的步骤,效率更高.
val testSchema: StructType = testDf.schema
val testEncoder: ExpressionEncoder[Row] = RowEncoder(testSchema)
testDf.mapPartitions(line => {
val rowBuf = new scala.collection.mutable.ArrayBuffer[Row]()
val logidSet = new mutable.HashSet[String]
line.foreach(row => {
if (logidSet.add(row.getAs[String]("logid"))) {
rowBuf.append(row)
}
})
rowBuf.iterator
})(testEncoder)
这里使用mapPartition遍历每个分区的数据, 然后每个分区中有一个HashSet,向HashSet中添加数据,如果文章来源:https://www.toymoban.com/news/detail-415023.html
logidSet.add(row.getAs[String]("logid"))
方法返回结果为true,则说明HashSet中不存在此logid,不进行去重; 如果返回结果为false,则说明此logid已经存在,不放入到ArrayBuffer中即可,最终就能实现每个logid只在分区内保留一条的结果.文章来源地址https://www.toymoban.com/news/detail-415023.html
- 注: 如果不加endocer,会报如下错:
- Error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
到了这里,关于spark5种去重方式,快速去重的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!