Spark_SQL函数定义(定义UDF函数、使用窗口函数)

这篇具有很好参考价值的文章主要介绍了Spark_SQL函数定义(定义UDF函数、使用窗口函数)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

                    一、UDF函数定义

        (1)函数定义

        (2)Spark支持定义函数

        (3)定义UDF函数

                (4)定义返回Array类型的UDF

        (5)定义返回字典类型的UDF

二、窗口函数

        (1)开窗函数简述

        (2)窗口函数的语法


一、UDF函数定义

        (1)函数定义

        无论Hive还是SparkSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在pyspark.sql.functions中。SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。
        Hive中自定义函数有三种类型:

        第一种:UDF(User-Defined_-function)函数

                · 一对一的关系,输入一个值经过函数以后输出一个值;

                · 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;

        第二种:UDAF(User-Defined Aggregation Function)聚合函数

                · 多对一的关系,输入多个值输出一个值,通常于groupBy联合使用;

        第三种:UDTF(User-Defined Table-Generating Functions)函数

                · 一对多的关系,输入一个值输出多个值(一行变多为行);

                · 用户自定义生成函数,有点像flatMap;

        (2)Spark支持定义函数

        目前来说Spark框架各个版本及各种语言对自定义函数的支持:在SparkSQL中,目前仅仅支持UDF函数和UDAF函数,目前Python仅支持UDF。

Spark版本及支持函数定义
Apache Spark Version Spark SQL UDF(Python,Java,Scala) Spark SQL UDAF(Java,Scala) Spark SQL UDF(R) Hive UDF,UDAF,UDTF
1.1-1.4
1.5 experimental
1.6
2.0
        (3)定义UDF函数

        ①sparksession.udf.register()

        注册的UDF可以用于DSL和SQL,返回值用于DSL风格,传参内给的名字用于SQL风格。

spark-sql创建hive udf,大数据,1024程序员节

        ②pyspark.sql.functions.udf

        仅能用于DSL风格

spark-sql创建hive udf,大数据,1024程序员节

        其中F是:from pyspark.sql import functions as F。其中,被注册为UDF的方法名是指具体的计算方法,如:def add(x, y): x + y  。 add就是将要被注册成UDF的方法名

# cording:utf8
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])
    df = rdd.toDF(['num'])

    # TODO 1:方式1 sparksession.udf.register(),DSL和SQL风格均可使用
    # UDF的处理函数
    def num_ride_10(num):
        return num * 10
    # 参数1:注册的UDF的名称,这个UDF名称,仅可以用于SQL风格
    # 参数2:UDF的处理逻辑,是一个单独定义的方法
    # 参数3:声明UDF的返回值类型,注意:UDF注册时候,必要声明返回值类型,并且UDF的真实返回值一定要和声明的返回值一致
    # 当前这种方式定义的UDF,可以通过参数1的名称用于SQL风格,通过返回值对象用户的DSL风格
    udf2 = spark.udf.register('udf1', num_ride_10, IntegerType())

    # SQL风格中使用
    # selectExpr 以SELECT的表达式执行,表达式SQL风格的表达式(字符串)
    # select方法,接受普通的字符串字段名,或者返回值时Column对象的计算
    df.selectExpr('udf1(num)').show()

    # DSL 风格使用
    # 返回值UDF对象,如果作为方法使用,传入的参数一定是Column对象
    df.select(udf2(df['num'])).show()

    # TODO 2:方式2注册,仅能用于DSL风格
    udf3 = F.udf(num_ride_10, IntegerType())
    df.select(udf3(df['num'])).show()

        方式1结果:

spark-sql创建hive udf,大数据,1024程序员节

        方式2结果:

spark-sql创建hive udf,大数据,1024程序员节

                (4)定义返回Array类型的UDF

        注意:数组或者list类型,可以使用spark的ArrayType来描述即可。

        注意:声明ArrayType要类似这样::ArrayType(StringType()),在ArrayType中传入数组内的数据类型。

# cording:utf8
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # 构建一个RDD
    rdd = sc.parallelize([['hadoop spark flink'], ['hadoop flink java']])
    df = rdd.toDF(['line'])

    # 注册UDF,UDF的执行函数定义
    def split_line(data):
        return data.split(' ')

    # TODO 1:方式1 后见UDF
    udf2 = spark.udf.register('udf1', split_line, ArrayType(StringType()))

    # DLS 风格
    df.select(udf2(df['line'])).show()

    # SQL风格
    df.createTempView('lines')
    spark.sql('SELECT udf1(line) FROM lines').show(truncate=False)

    # TODO 2:方式的形式构建UDF
    udf3 = F.udf(split_line, ArrayType(StringType()))
    df.select(udf3(df['line'])).show(truncate=False)

spark-sql创建hive udf,大数据,1024程序员节        

        (5)定义返回字典类型的UDF

        注意:字典类型返回值,可以用StructType来进行描述,StructType是—个普通的Spark支持的结构化类型.
        只是可以用在:
                · DF中用于描述Schema
                · UDF中用于描述返回值是字典的数据

# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # 假设 有三个数字: 1 2 3 在传入数字,返回数字所在序号对应的 字母 然后和数字结合组成dict返回
    # 例:传入1 返回{'num':1, 'letters': 'a'}
    rdd = sc.parallelize([[1], [2], [3]])
    df = rdd.toDF(['num'])

    # 注册UDF
    def process(data):
        return {'num': data, 'letters': string.ascii_letters[data]}

    '''
    UDF返回值是字典的话,需要用StructType来接收
    '''
    udf1 = spark.udf.register('udf1', process, StructType().add('num', IntegerType(), nullable=True).\
                              add('letters', StringType(), nullable=True))
    # SQL风格
    df.selectExpr('udf1(num)').show(truncate=False)
    # DSL风格
    df.select(udf1(df['num'])).show(truncate=False)

spark-sql创建hive udf,大数据,1024程序员节

        (6)通过RDD构建UDAF函数

# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
    df = rdd.map(lambda x: [x]).toDF(['num'])

    # 方法:使用RDD的mapPartitions 算子来完成聚合操作
    # 如果用mapPartitions API 完成UDAF聚合,一定要单分区
    single_partition_rdd = df.rdd.repartition(1)

    def process(iter):
        sum = 0
        for row in iter:
            sum += row['num']

        return [sum]    # 一定要嵌套list,因为mapPartitions方法要求返回值是list对象

    print(single_partition_rdd.mapPartitions(process).collect())

spark-sql创建hive udf,大数据,1024程序员节

二、窗口函数

        (1)开窗函数简述

        ●介绍

        开窗函数的引入是为了既显示聚集前的数据又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。 开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合),它对一组值进行操作,不需要使用GROUP BY子句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列。

        ●聚合函数和开窗函数

        聚合函数是将多行变成一行,count,avg...

        开窗函数是将一行变成多行;

        聚合函数如果要显示其他的列必须将列加入到group by中,开窗函数可以不使用group by,直接将所有信息显示出来。

        ●开窗函数分类

        1.聚合开窗函数 聚合函数(列)OVER(选项),这里的选项可以是PARTITION BY子句,但不可以是ORDER BY子句

        2.排序开窗函数 排序函数(列)OVER(选项),这里的选项可以是ORDER BY子句,也可以是OVER(PARTITION BY子句ORDER BY子句),但不可以是PARTITION BY子句。

        3.分区类型NTILE的窗口函数

        (2)窗口函数的语法

        窗口函数的语法:

spark-sql创建hive udf,大数据,1024程序员节

# cording:utf8
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    spark = SparkSession.builder.appName('udf_define').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    rdd = sc.parallelize([
        ('张三', 'class_1', 99),
        ('王五', 'class_2', 35),
        ('王三', 'class_3', 57),
        ('王久', 'class_4', 12),
        ('王丽', 'class_5', 99),
        ('王娟', 'class_1', 90),
        ('王军', 'class_2', 91),
        ('王俊', 'class_3', 33),
        ('王君', 'class_4', 55),
        ('王珺', 'class_5', 66),
        ('郑颖', 'class_1', 11),
        ('郑辉', 'class_2', 33),
        ('张丽', 'class_3', 36),
        ('张张', 'class_4', 79),
        ('黄凯', 'class_5', 90),
        ('黄开', 'class_1', 90),
        ('黄恺', 'class_2', 90),
        ('王凯', 'class_3', 11),
        ('王凯杰', 'class_1', 11),
        ('王开杰', 'class_2', 3),
        ('王景亮', 'class_3', 99)])
    schema = StructType().add('name', StringType()).\
        add('class', StringType()).\
        add('score', IntegerType())
    df = rdd.toDF(schema)
    # 创建表
    df.createTempView('stu')

    # TODO 1:聚合窗口函数的演示
    spark.sql('''
        SELECT *, AVG(score) over() AS avg_socre FROM stu
    ''').show()

    # TODO 2: 排序相关的窗口函数计算
    # RANK over, DENSE_RANK over, ROW_NUMBER over
    spark.sql('''
        SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,
        DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
        RANK() OVER(ORDER BY score) AS RANK
        FROM stu
    ''').show()

    # TODO NTILE
    spark.sql('''
        SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
    ''').show()

        TODO1结果:

spark-sql创建hive udf,大数据,1024程序员节

        TODO2结果展示:

spark-sql创建hive udf,大数据,1024程序员节

        TODO3结果展示:

spark-sql创建hive udf,大数据,1024程序员节文章来源地址https://www.toymoban.com/news/detail-774358.html

到了这里,关于Spark_SQL函数定义(定义UDF函数、使用窗口函数)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flinksql 流表转换, 自定义udf/udtf,SQL 内置函数及自定义函数

    1、在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用。不需要专门为 Scala 的 Table API 注册函数。 2、函数通过调用 registerFunction()方法在 TableEnvironment 中注册。当用户定义的函数 被注册时,它被插入到 TableEnvironment 的函数目录中, 这样 Table API 或 SQL 解

    2024年02月22日
    浏览(50)
  • spark-sql

    [root@localhost bin]# ./spark-sql Error: Failed to load class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver. Failed to load main class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver. You need to build Spark with -Phive and -Phive-thriftserver. 24/02/22 00:23:20 INFO ShutdownHookManager: Shutdown hook called 24/02/22 00:23:20 INFO Shutd

    2024年02月22日
    浏览(41)
  • Spark-SQL小结

    目录 一、RDD、DataFrame、DataSet的概念、区别联系、相互转换操作   1.RDD概念   2.DataFrame概念   3.DataSet概念   4.RDD、DataFrame、DataSet的区别联系   5.RDD、DataFrame、DataSet的相互转换操作    1 RDD-DataFrame、DataSet    2  DataFrame-RDD,DataSet    3 DataSet-RDD,DataFrame 二、Spark-SQL连接JDBC的方式

    2024年02月09日
    浏览(43)
  • spark-sql字段血缘实现

    Apache Spark是一个开源的大数据处理框架,它提供了一种高效、易于使用的方式来处理大规模数据集。在Spark中,数据是通过DataFrame和Dataset的形式进行操作的,这些数据结构包含了一系列的字段(也称为列)。字段血缘是Spark中的一个关键概念,它帮助我们理解数据的来源和流

    2024年02月02日
    浏览(50)
  • Hudi-集成Spark之spark-sql方式

    启动spark-sql 创建表 建表参数: 参数名 默认值 说明 primaryKey uuid 表的主键名,多个字段用逗号分隔。同 hoodie.datasource.write.recordkey.field preCombineField 表的预合并字段。同 hoodie.datasource.write.precombine.field type cow 创建的表类型: type = ‘cow’ type = \\\'mor’同 hoodie.datasource.write.table.ty

    2024年02月05日
    浏览(44)
  • Spark参数配置和调优,Spark-SQL、Config

    一、Hive-SQL / Spark-SQL参数配置和调优 二、shell脚本spark-submit参数配置 三、sparkSession中配置参数

    2024年02月13日
    浏览(45)
  • Hudi(7):Hudi集成Spark之spark-sql方式

    目录 0. 相关文章链接 1. 创建表 1.1. 启动spark-sql 1.2. 建表参数 1.3. 创建非分区表 1.4. 创建分区表 1.5. 在已有的hudi表上创建新表 1.6. 通过CTAS (Create Table As Select)建表 2. 插入数据 2.1. 向非分区表插入数据 2.2. 向分区表动态分区插入数据 2.3. 向分区表静态分区插入数据 2.4

    2024年02月06日
    浏览(40)
  • Spark-SQL连接Hive的五种方法

    若使用Spark内嵌的Hive,直接使用即可,什么都不需要做(在实际生产活动中,很少会使用这一模式) 步骤: 将Hive中conf/下的hive-site.xml拷贝到Spark的conf/目录下; 把Mysql的驱动copy到jars/目录下; 如果访问不到hdfs,则将core-site.xml和hdfs-site.xml拷贝到conf/目录下; 重启spark-shell;

    2024年02月16日
    浏览(42)
  • spark-udf函数

    from pyspark.sql import SparkSession from pyspark.sql.types import * ss = SparkSession.builder.getOrCreate() df_csv = ss.read.csv(‘hdfs://node1:8020/user/hive/warehouse/data/stu.csv’, schema=‘name string,age int,gender string,phone string,email string,city string,address string’) df_csv.show() def func(email): username = email.split(‘@’)[0] email

    2024年01月22日
    浏览(35)
  • spark-sql: insert overwrite分区表问题

    用spark-sql,insert overwrite分区表时发现两个比较麻烦的问题: 从目标表select出来再insert overwrite目标表时报错:Error in query: Cannot overwrite a path that is also being read from. 从其他表select出来再insert overwrite目标表时,其他分区都被删除了. 印象中这两个问题也出现过,但凭经验和感觉,

    2024年02月11日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包