Spark Sql之dropDuplicates去重

这篇具有很好参考价值的文章主要介绍了Spark Sql之dropDuplicates去重。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

算子介绍

dropDuplicates去重原则:按数据行的顺序保留每行数据出现的第一条
dropDuplicates 在Spark源码里面提供了以下几个方法重载:

 def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns) 

这个方法,不需要传入任何的参数,默认根据所有列进行去重,然后按数据行的顺序保留每行数据出现的第一条。

def dropDuplicates(colNames: Seq[String]) 

传入的参数是一个序列。你可以在序列中指定你要根据哪些列的重复元素对数据表进行去重,然后也是返回每一行数据出现的第一条

def dropDuplicates(colNames: Array[String]) 

传入的参数是一个数组,然后方法会把数组转换为序列然后再调用第二个方法

def dropDuplicates(col1: String, cols: String*)

传入的参数为字符串,在方法体内会把你传入的字符串组合成一个序列再调用第二个方法。

示例

新建一个dataFrame

val conf = new SparkConf().setAppName("TTyb").setMaster("local")
val sc = new SparkContext(conf)
val spark = new SQLContext(sc)
val dataFrame = spark.createDataFrame(Seq(
  (1, 1, "2", "5"),
  (2, 2, "3", "6"),
  (2, 2, "35", "68"),
  (2, 2, "34", "67"),
  (2, 2, "38", "68"),
  (3, 2, "36", "69"),
  (1, 3, "4", null)
)).toDF("id", "label", "col1", "col2")

按照id和lable来去重。此时利用distinct是得不到想要的效果的
利用 dropDuplicates 可以实现

dataFrame.dropDuplicates(Seq("id","label")).show(false)

+---+-----+----+----+
|id |label|col1|col2|
+---+-----+----+----+
|2  |2    |3   |6   |
|1  |1    |2   |5   |
|1  |3    |4   |null|
|3  |2    |36  |69  |
+---+-----+----+----+

问题

问题: 分区多时对导致去重出现问题
需求:求最早时间段的那条数据

原数据:
+---------+----------+
|columnVal|dateStr   |
+---------+----------+
|0        |2019-01-01|
|1        |2019-01-01|
|2        |2019-01-01|
|3        |2019-01-01|
|4        |2019-01-01|
|0        |2019-02-01|
|1        |2019-02-01|
|2        |2019-02-01|
|3        |2019-02-01|
|4        |2019-02-01|
|0        |2019-03-01|
|1        |2019-03-01|
|2        |2019-03-01|
|3        |2019-03-01|
|4        |2019-03-01|
+---------+----------+

期望结果:
+---------+----------+
|columnVal|dateStr   |
+---------+----------+
|0        |2019-01-01|
|1        |2019-01-01|
|3        |2019-01-01|
|2        |2019-01-01|
|4        |2019-01-01|
+---------+----------+

错误复现:

# 可以看到下面效果跟预期 完全不符合

master("local[4]") //此时分区为4
df.dropDuplicates(Seq("columnVal","dateStr")).show(false)
+---------+----------+
|columnVal|dateStr   |
+---------+----------+
|3        |2019-02-01|
|1        |2019-02-01|
|4        |2019-03-01|
|0        |2019-01-01|
|4        |2019-01-01|
|0        |2019-02-01|
|4        |2019-02-01|
|3        |2019-01-01|
|1        |2019-01-01|
|2        |2019-01-01|
|3        |2019-03-01|
|2        |2019-02-01|
|0        |2019-03-01|
|1        |2019-03-01|
|2        |2019-03-01|
+---------+----------+

解决

方式一:

df.orderBy(col("dateStr")) //全局有序 只有一个reduce处理所有数据
      .dropDuplicates("columnVal")
      .show(false)

方式二:(基于方式一的优化)

    df.sortWithinPartitions(col("dateStr")) //每个分区有序
      .orderBy(col("dateStr")) //全局有序 只有一个reduce处理所有数据 此时压力会很小
      .dropDuplicates("columnVal") //去重
      .show(false)

dropDuplicates和distinct

  • distinct源码:
def distinct(): Dataset[T] = dropDuplicates()

def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)

可以看到底层调用的是dropDuplicates(this.columns)
而columns: Array[String] = schema.fields.map(_.name)
其实就是获取df的所有字段传进去。

参考

https://blog.csdn.net/qq_39900031/article/details/115797287文章来源地址https://www.toymoban.com/news/detail-526537.html

到了这里,关于Spark Sql之dropDuplicates去重的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark参数配置和调优,Spark-SQL、Config

    一、Hive-SQL / Spark-SQL参数配置和调优 二、shell脚本spark-submit参数配置 三、sparkSession中配置参数

    2024年02月13日
    浏览(38)
  • Hudi(7):Hudi集成Spark之spark-sql方式

    目录 0. 相关文章链接 1. 创建表 1.1. 启动spark-sql 1.2. 建表参数 1.3. 创建非分区表 1.4. 创建分区表 1.5. 在已有的hudi表上创建新表 1.6. 通过CTAS (Create Table As Select)建表 2. 插入数据 2.1. 向非分区表插入数据 2.2. 向分区表动态分区插入数据 2.3. 向分区表静态分区插入数据 2.4

    2024年02月06日
    浏览(35)
  • Spark的dropDuplicates或distinct 对数据去重

    消除重复的数据可以通过使用 distinct 和 dropDuplicates 两个方法。 distinct 是所有的列进行去重的操作,假如你的 DataFrame里面有10列,那么只有这10列完全相同才会去重。 使用distinct:返回当前DataFrame中不重复的Row记录。该方法和接下来的dropDuplicates()方法不传入指定字段时的结果

    2024年02月16日
    浏览(27)
  • Spark-SQL连接Hive的五种方法

    若使用Spark内嵌的Hive,直接使用即可,什么都不需要做(在实际生产活动中,很少会使用这一模式) 步骤: 将Hive中conf/下的hive-site.xml拷贝到Spark的conf/目录下; 把Mysql的驱动copy到jars/目录下; 如果访问不到hdfs,则将core-site.xml和hdfs-site.xml拷贝到conf/目录下; 重启spark-shell;

    2024年02月16日
    浏览(31)
  • 在 spark-sql / spark-shell / hive / beeline 中粘贴 sql、程序脚本时的常见错误

    《大数据平台架构与原型实现:数据中台建设实战》一书由博主历时三年精心创作,现已通过知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描

    2024年02月14日
    浏览(25)
  • Spark-SQL连接JDBC的方式及代码写法

    提示:文章内容仅供参考! 目录 一、数据加载与保存 通用方式: 加载数据: 保存数据: 二、Parquet 加载数据: 保存数据: 三、JSON 四、CSV  五、MySQL SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的

    2024年02月13日
    浏览(24)
  • spark-sql: insert overwrite分区表问题

    用spark-sql,insert overwrite分区表时发现两个比较麻烦的问题: 从目标表select出来再insert overwrite目标表时报错:Error in query: Cannot overwrite a path that is also being read from. 从其他表select出来再insert overwrite目标表时,其他分区都被删除了. 印象中这两个问题也出现过,但凭经验和感觉,

    2024年02月11日
    浏览(31)
  • Hudi Spark-SQL增量查询数据几种方式

    由于项目上主要用Hive查询Hudi,所以之前总结过一篇:Hive增量查询Hudi表。最近可能会有Spark SQL增量查询Hudi表的需求,并且我发现目前用纯Spark SQL的形式还不能直接增量查询Hudi表,于是进行学习总结一下。 先看一下官方文档上Spark SQL增量查询的方式,地址:https://hudi.apache.or

    2024年02月11日
    浏览(30)
  • spark-sql处理json字符串的常用函数

    整理了spark-sql处理json字符串的几个函数: 1 get_json_object 解析不含数组的 json   2 from_json  解析json 3 schema_of_json 提供生成json格式的方法 4 explode   把JSONArray转为多行 get_json_object(string json_string, string path) :适合最外层为{}的json解析。  第一个参数是json对象变量,也就是含j

    2023年04月08日
    浏览(31)
  • spark-sql数据重复之File Output Committer问题

      我们先来回顾下之前介绍过的三种Committer:FileOutputCommitter V1、FileOutputCommitter V2、S3A Committer,其基本代表了整体的演进趋势。 核心代码讲解详细参照:Spark CommitCoordinator 保证数据一致性 OutputCommitter commitTask commitJob mapreduce.fileoutputcommitter.algorithm.version | 技术世界 | committask

    2024年02月14日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包