Spark SQL自定义collect_list分组排序

这篇具有很好参考价值的文章主要介绍了Spark SQL自定义collect_list分组排序。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

想要在spark sql中对group by + concat_ws()的字段进行排序,可以参考如下方法。
原始数据如下:

+---+-----+----+
|id |name |type|
+---+-----+----+
|1  |name1|p   |
|2  |name2|p   |
|3  |name3|p   |
|1  |x1   |q   |
|2  |x2   |q   |
|3  |x3   |q   |
+---+-----+----+

目标数据如下:

+----+---------------------+
|type|value_list           |
+----+---------------------+
|p   |[name3, name2, name1]|
|q   |[x3, x2, x1]         |
+----+---------------------+

spark-shell:

val df=Seq((1,"name1","p"),(2,"name2","p"),(3,"name3","p"),(1,"x1","q"),(2,"x2","q"),(3,"x3","q")).toDF("id","name","type")
df.show(false)

1.使用开窗函数

df.createOrReplaceTempView("test")
spark.sql("select type,max(c) as c1 from (select type,concat_ws('&',collect_list(trim(name)) over(partition by type order by id desc)) as c  from test) as x group by type ")

因为使用开窗函数本身会使用比较多的资源,
这种方式在大数据量下性能会比较慢,所以尝试下面的操作。

2.使用struct和sort_array(array,asc?true,flase)的方式来进行,效率高些:

val df3=spark.sql("select type, concat_ws('&',sort_array(collect_list(struct(id,name)),false).name) as c from test group by type ")
df3.show(false)

例如:计算一个结果形如:

user_id    stk_id:action_type:amount:price:time   stk_id:action_type:amount:price:time   stk_id:action_type:amount:price:time   stk_id:action_type:amount:price:time 

需要按照time 升序排,则:

Dataset<Row> splitStkView = session.sql("select client_id, innercode, entrust_bs, business_amount, business_price, trade_date from\n" +
                "(select client_id,\n" +
                "       split(action,':')[0] as innercode,\n" +
                "       split(action,':')[1] as entrust_bs,\n" +
                "       split(action,':')[2] as business_amount,\n" +
                "       split(action,':')[3] as business_price,\n" +
                "       split(action,':')[4] as trade_date,\n" +
                "       ROW_NUMBER() OVER(PARTITION BY split(action,':')[0] ORDER BY split(action,':')[4] DESC) AS rn\n" +
                "from stk_temp)\n" +
                "where rn <= 5000");
        splitStkView.createOrReplaceTempView("splitStkView");
        Dataset<Row> groupStkView = session.sql("select client_id, CONCAT(innercode, ':', entrust_bs, ':', business_amount, ':', business_price, ':', trade_date) as behive, trade_date from splitStkView");
        groupStkView.createOrReplaceTempView("groupStkView");
        Dataset<Row> resultData = session.sql("SELECT client_id, concat_ws('\t',sort_array(collect_list(struct(trade_date, behive)),true).behive) as behives FROM groupStkView GROUP BY client_id");
        

3.udf的方式文章来源地址https://www.toymoban.com/news/detail-789066.html

import org.apache.spark.sql.functions._
import org.apache.spark.sql._
val sortUdf = udf((rows: Seq[Row]) => {
  rows.map { case Row(id:Int, value:String) => (id, value) }
    .sortBy { case (id, value) => -id } //id if asc
    .map { case (id, value) => value }
})

val grouped = df.groupBy(col("type")).agg(collect_list(struct("id", "name")) as "id_name")
val r1 = grouped.select(col("type"), sortUdf(col("id_name")).alias("value_list"))
r1.show(false)

到了这里,关于Spark SQL自定义collect_list分组排序的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Collections工具类,可以使用collections工具类对代码中的list进行分组

    当我们需要对代码list进行分组的时候,有时候使用for循环会显得很蠢,那么使用Colletions工具类就会很方便,根据所需要的分类值,进行映射分组集合,以上是案例 toMap 返回map集合 MapLong, ListLong toList 返回list集合 tips stream流一样好用

    2024年03月23日
    浏览(35)
  • 【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎

    【大家好,我是爱干饭的猿,本文重点介绍、SparkSQL的运行流程、 SparkSQL的自动优化、Catalyst优化器、SparkSQL的执行流程、Spark On Hive原理配置、分布式SQL执行引擎概念、代码JDBC连接。 后续会继续分享其他重要知识点总结,如果喜欢这篇文章,点个赞👍,关注一下吧】 上一篇

    2024年02月04日
    浏览(48)
  • Java 1.8 List集合排序、去重、分组、过滤、合并、截取操作

    1、正序 2、逆序 3、根据某个属性或多个属性排序 多个属性排序:需要添加排序条件就在后面添加.thenComparing(UserVO::getxxx),它是在上一个条件的基础上进行排序 1、去重 2、根据某个属性去重(它将该字段还进行排序了) 3、根据某个属性去重(这个方法没有排序) 4、对多个

    2024年02月01日
    浏览(71)
  • 使用Collections.sort方法来对自定义对象进行排序

    Collections.sort方法可以用来对自定义对象进行排序。要实现这一点,需要在调用该方法时传入一个比较器,该比较器用于指定如何比较对象中的值。 举个例子,假设有一个自定义对象Person,其中包含姓名和年龄两个属性,我们可以使用以下代码对该对象列表进行按年龄排序:

    2024年02月13日
    浏览(46)
  • Java List集合去重、过滤、分组、获取数据、求最值、合并、排序、跳数据和遍历

    请各大网友尊重本人原创知识分享,谨记本人博客:南国以南i、 使用stream().map()提取List对象的某一列值及去重 使用 findAny() 和 findFirst() 获取第一条数据 我是南国以南i记录点滴每天成长一点点,学习是永无止境的!转载请附原文链接!!! 参考链接、参考链接

    2024年04月11日
    浏览(70)
  • MapReduce实战小案例(自定义排序、二次排序、分组、分区)

    MapReduce是什么? 我们来看官方文档的解释(我们下载的hadoop中有离线文档:hadoop-2.10.1/share/doc) Hadoop MapReduce 是一个易于编写应用程序的软件框架,它以可靠、容错的方式并行处理商业硬件的大型集群(数千个节点)上的大量数据(数 TB 数据集)。 这里我们可以提炼一下MapReduce的

    2024年02月07日
    浏览(42)
  • SQL基础语法 | 增删改查、分组、排序、limit

    创建数据库 查看数据库 选择数据库 删除数据库 创建表格 删除表格 创建schema模式 删除schema模式 删除一个空模式 删除一个模式及包含的所有对象 1. 增insert into sql语法 : INSERT INTO TABLE_NAME (字段名1, column2, column3,...columnN) VALUES (value1, value2, value3,...valueN) 2. 查询select SQL语法:

    2024年02月15日
    浏览(45)
  • 分组排序取第一条数据 SQL写法

    在数据库查询过程中经常遇到需要分组排序查询第一条数据的情况。例如,在消息列表中需要展示每个联系人最近的一条信息。 目前我接触到的解决方案有两种,分别是开窗函数 row_number 和变量法。 比较常用的解决方案是使用开窗函数 row_number() over(partition by xxx order by xxx)

    2024年04月25日
    浏览(33)
  • 【Sql】根据字段分组排序,取其第一条数据

    (1)问题描述 有时候我们需要对数据进行去重处理,例如查询结果里的文件名有重复,我们希望可以按照创建时间排序,最终结果里每个文件名只取创建时间最近的一个。 (2)有哪些问题 想到去重,可以想到使用distinct或者group by分组。但是这两者有个问题,例如我们查询

    2024年02月16日
    浏览(32)
  • 使用java8 新特性stream流对List<Map<String, Object>>集合进行遍历、过滤、查询、去重、排序、分组

    对于一个ListMapString, Object类型的数据,可以使用Java 8的新特性stream流来进行遍历、过滤、查询、去重、排序、分组等操作。 遍历: 过滤: 查询: 去重: 排序: 分组:

    2024年02月10日
    浏览(67)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包