0基础学习PyFlink——用户自定义函数之UDAF

这篇具有很好参考价值的文章主要介绍了0基础学习PyFlink——用户自定义函数之UDAF。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在前面几篇文章中,我们学习了非聚合类的用户自定义函数。这节我们将介绍最简单的聚合函数UDAF。
0基础学习PyFlink——用户自定义函数之UDAF,大数据,flink,python,大数据

UDAF

我们对比下UDAF和UDF的定义

def udaf(f: Union[Callable, AggregateFunction, Type] = None,
         input_types: Union[List[DataType], DataType, str, List[str]] = None,
         result_type: Union[DataType, str] = None, 
         accumulator_type: Union[DataType, str] = None,
         deterministic: bool = None, 
         name: str = None,
         func_type: str = "general") -> Union[UserDefinedAggregateFunctionWrapper, Callable]:
def udf(f: Union[Callable, ScalarFunction, Type] = None,
        input_types: Union[List[DataType], DataType, str, List[str]] = None,
        result_type: Union[DataType, str] = None,
        deterministic: bool = None, 
        name: str = None, 
        func_type: str = "general",
        udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]:

可以发现:

  • udaf比udf多了一个参数accumulator_type
  • udaf比udf少了一个参数udf_type

accumulator中文是“累加器”。我们可以将其看成聚合过后(比如GroupBy)的成批数据,每批都要走一次函数。
举一个例子:我们对图中左侧的成绩单,使用人名(name)进行聚类,然后计算出最高分数。即算出每个人考出的最高分数是多少。
0基础学习PyFlink——用户自定义函数之UDAF,大数据,flink,python,大数据
如图所示,聚合后的数据每个都会经过accumulator计算。计算出来的值的类型就是accumulator_type。这个类型的数据是中间态,它并不是最终UDAF返回的数据类型——result_type。具体这块的知识我们会在后面讲解。
为了方便讲解,我们就以上面例子来讲解其使用。先贴出准备的代码:

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunction

    
def calc():
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_batch_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)
    
    row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('score', DataTypes.FLOAT()), DataTypes.FIELD('class', DataTypes.STRING())])
    students_score = [
        ("张三", 80.0, "English"),
        ("李四", 75.0, "English"),
        ("王五", 90.0, "English"),
        ("赵六", 85.0, "English"),
        ("张三", 60.0, "Math"),
        ("李四", 95.0, "Math"),
        ("王五", 90.0, "Math"),
        ("赵六", 70.0, "Math"),
        ("孙七", 60.0, "Math"),
    ]
    tab_source = t_env.from_elements(students_score, row_type_tab_source )

我们在tab_source表中录入了学生的成绩信息,其中包括姓名(name)、成绩(score)和科目(class)。文章来源地址https://www.toymoban.com/news/detail-720275.html

入参并非表中一行(Row)的集合

计算每个人考了几门课

  1. 按姓名(name)聚类
  2. UDTF统计聚类后集合的个数并返回
  3. 别名UDTF返回的列名
  4. select出数据
@udaf(result_type=DataTypes.ROW([DataTypes.FIELD("count", DataTypes.BIGINT())]), func_type="pandas")
    def exam_count(pandas_df: pd.DataFrame):
        return Row(pandas_df.count())

    tab_student_exam_count = tab_source.group_by(col('name')) \
        .aggregate(exam_count(col('name')).alias("count")) \
        .select(col('name'), col('count')) 
    tab_student_exam_count.execute().print()
+--------------------------------+----------------------+
|                           name |                count |
+--------------------------------+----------------------+
|                           孙七 |                    1 |
|                           张三 |                    2 |
|                           李四 |                    2 |
|                           王五 |                    2 |
|                           赵六 |                    2 |
+--------------------------------+----------------------+
5 rows in set

计算每门课有几个人考试

  1. 按姓名(class)聚类
  2. UDTF统计聚类后集合的个数并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("count", DataTypes.BIGINT())]), func_type="pandas")
    def exam_count(pandas_df: pd.DataFrame):
        return Row(pandas_df.count())
    
    tab_class_exam_count = tab_source.group_by(col('class')) \
        .aggregate(exam_count(col('class')).alias("count")) \
        .select(col('class'), col('count')) 
    tab_class_exam_count.execute().print()
+--------------------------------+----------------------+
|                          class |                count |
+--------------------------------+----------------------+
|                        English |                    4 |
|                           Math |                    5 |
+--------------------------------+----------------------+
2 rows in set

计算每个人的平均分

  1. 按姓名(name)聚类
  2. UDTF统计聚类后集合的均值并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("avg", DataTypes.FLOAT())]), func_type="pandas")
    def avg_score(pandas_df: pd.DataFrame):
        return Row(pandas_df.mean())

    tab_student_avg_score = tab_source.group_by(col('name')) \
        .aggregate(avg_score(col('score')).alias("avg")) \
        .select(col('name'), col('avg')) 
    tab_student_avg_score.execute().print()
+--------------------------------+--------------------------------+
|                           name |                            avg |
+--------------------------------+--------------------------------+
|                           孙七 |                           60.0 |
|                           张三 |                           70.0 |
|                           李四 |                           85.0 |
|                           王五 |                           90.0 |
|                           赵六 |                           77.5 |
+--------------------------------+--------------------------------+
5 rows in set

计算每课的平均分

  1. 按姓名(class)聚类
  2. UDTF统计聚类后集合的均值并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("avg", DataTypes.FLOAT())]), func_type="pandas")
    def avg_score(pandas_df: pd.DataFrame):
        return Row(pandas_df.mean())

    tab_class_avg_score = tab_source.group_by(col('class')) \
        .aggregate(avg_score(col('score')).alias("avg")) \
        .select(col('class'), col('avg')) 
    tab_class_avg_score.execute().print()
+--------------------------------+--------------------------------+
|                          class |                            avg |
+--------------------------------+--------------------------------+
|                        English |                           82.5 |
|                           Math |                           75.0 |
+--------------------------------+--------------------------------+
2 rows in set

计算每个人的最高分和最低分

  1. 按姓名(name)聚类
  2. UDTF统计聚类后集合的最大值和最小值,并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("min", DataTypes.FLOAT())]), func_type="pandas")
    def max_min_score(pandas_df: pd.DataFrame):
        return Row(pandas_df.max(), pandas_df.min())

    tab_student_max_min_score = tab_source.group_by(col('name')) \
        .aggregate(max_min_score(col('score')).alias("max", "min")) \
        .select(col('name'), col('max'), col('min')) 
    tab_student_max_min_score.execute().print()
+--------------------------------+--------------------------------+--------------------------------+
|                           name |                            max |                            min |
+--------------------------------+--------------------------------+--------------------------------+
|                           孙七 |                           60.0 |                           60.0 |
|                           张三 |                           80.0 |                           60.0 |
|                           李四 |                           95.0 |                           75.0 |
|                           王五 |                           90.0 |                           90.0 |
|                           赵六 |                           85.0 |                           70.0 |
+--------------------------------+--------------------------------+--------------------------------+
5 rows in set

入参是表中一行(Row)的集合

计算每个人的最高分、最低分以及所属的课程

  1. 按姓名(name)聚类
  2. UDTF统计聚类后集合中分数最大值、最小值;分数最大值所在行的课程名,和分数最小值所在行的课程名,并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("max tag", DataTypes.STRING()), DataTypes.FIELD("min", DataTypes.FLOAT()), DataTypes.FIELD("min tag", DataTypes.STRING())]), func_type="pandas")
    def max_min_score_with_class(pandas_df: pd.DataFrame):
        return Row(pandas_df["score"].max(), pandas_df.loc[pandas_df["score"].idxmax(), "class"], pandas_df["score"].min(), pandas_df.loc[pandas_df["score"].idxmin(), "class"])

    tab_student_max_min_score = tab_source.group_by(col('name')) \
        .aggregate(max_min_score_with_class.alias("max", "class(max)", "min", "class(min)")) \
        .select(col('name'), col('max'), col('class(max)'), col('min'), col('class(min)')) 
    tab_student_max_min_score.execute().print()
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|                           name |                            max |                     class(max) |                            min |                     class(min) |
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|                           孙七 |                           60.0 |                           Math |                           60.0 |                           Math |
|                           张三 |                           80.0 |                        English |                           60.0 |                           Math |
|                           李四 |                           95.0 |                           Math |                           75.0 |                        English |
|                           王五 |                           90.0 |                        English |                           90.0 |                        English |
|                           赵六 |                           85.0 |                        English |                           70.0 |                           Math |
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
5 rows in set

计算每课的最高分数、最低分数以及所属人

  1. 按姓名(class)聚类
  2. UDTF统计聚类后集合中分数最大值、最小值;分数最大值所在行的人名,和分数最小值所在行的人名,并返回
  3. 别名UDTF返回的列名
  4. select出数据
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("max tag", DataTypes.STRING()), DataTypes.FIELD("min", DataTypes.FLOAT()), DataTypes.FIELD("min tag", DataTypes.STRING())]), func_type="pandas")
    def max_min_score_with_name(pandas_df: pd.DataFrame):
        return Row(pandas_df["score"].max(), pandas_df.loc[pandas_df["score"].idxmax(), "name"], pandas_df["score"].min(), pandas_df.loc[pandas_df["score"].idxmin(), "name"])
    
    tab_class_max_min_score = tab_source.group_by(col('class')) \
        .aggregate(max_min_score_with_name.alias("max", "name(max)", "min", "name(min)")) \
        .select(col('class'), col('max'), col('name(max)'), col('min'), col('name(min)')) 
    tab_class_max_min_score.execute().print()
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|                          class |                            max |                      name(max) |                            min |                      name(min) |
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|                        English |                           90.0 |                           王五 |                           75.0 |                           李四 |
|                           Math |                           95.0 |                           李四 |                           60.0 |                           张三 |
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
2 rows in set

完整代码

入参并非表中一行(Row)的集合

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunction

    
def calc():
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_batch_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)
    
    row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('score', DataTypes.FLOAT()), DataTypes.FIELD('class', DataTypes.STRING())])
    students_score = [
        ("张三", 80.0, "English"),
        ("李四", 75.0, "English"),
        ("王五", 90.0, "English"),
        ("赵六", 85.0, "English"),
        ("张三", 60.0, "Math"),
        ("李四", 95.0, "Math"),
        ("王五", 90.0, "Math"),
        ("赵六", 70.0, "Math"),
        ("孙七", 60.0, "Math"),
    ]
    tab_source = t_env.from_elements(students_score, row_type_tab_source )
        
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("count", DataTypes.BIGINT())]), func_type="pandas")
    def exam_count(pandas_df: pd.DataFrame):
        return Row(pandas_df.count())

    tab_student_exam_count = tab_source.group_by(col('name')) \
        .aggregate(exam_count(col('name')).alias("count")) \
        .select(col('name'), col('count')) 
    tab_student_exam_count.execute().print()
    
    
    tab_class_exam_count = tab_source.group_by(col('class')) \
        .aggregate(exam_count(col('class')).alias("count")) \
        .select(col('class'), col('count')) 
    tab_class_exam_count.execute().print()
    
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("avg", DataTypes.FLOAT())]), func_type="pandas")
    def avg_score(pandas_df: pd.DataFrame):
        return Row(pandas_df.mean())

    tab_student_avg_score = tab_source.group_by(col('name')) \
        .aggregate(avg_score(col('score')).alias("avg")) \
        .select(col('name'), col('avg')) 
    tab_student_avg_score.execute().print()
    
    tab_class_avg_score = tab_source.group_by(col('class')) \
        .aggregate(avg_score(col('score')).alias("avg")) \
        .select(col('class'), col('avg')) 
    tab_class_avg_score.execute().print()
    
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("min", DataTypes.FLOAT())]), func_type="pandas")
    def max_min_score(pandas_df: pd.DataFrame):
        return Row(pandas_df.max(), pandas_df.min())

    tab_student_max_min_score = tab_source.group_by(col('name')) \
        .aggregate(max_min_score(col('score')).alias("max", "min")) \
        .select(col('name'), col('max'), col('min')) 
    tab_student_max_min_score.execute().print()
    
    
if __name__ == '__main__':
    calc()

入参是表中一行(Row)的集合

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunction

    
def calc():
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_batch_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)
    
    row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('score', DataTypes.FLOAT()), DataTypes.FIELD('class', DataTypes.STRING())])
    students_score = [
        ("张三", 80.0, "English"),
        ("李四", 75.0, "English"),
        ("王五", 90.0, "English"),
        ("赵六", 85.0, "English"),
        ("张三", 60.0, "Math"),
        ("李四", 95.0, "Math"),
        ("王五", 90.0, "Math"),
        ("赵六", 70.0, "Math"),
        ("孙七", 60.0, "Math"),
    ]
    tab_source = t_env.from_elements(students_score, row_type_tab_source )
    
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("max tag", DataTypes.STRING()), DataTypes.FIELD("min", DataTypes.FLOAT()), DataTypes.FIELD("min tag", DataTypes.STRING())]), func_type="pandas")
    def max_min_score_with_class(pandas_df: pd.DataFrame):
        return Row(pandas_df["score"].max(), pandas_df.loc[pandas_df["score"].idxmax(), "class"], pandas_df["score"].min(), pandas_df.loc[pandas_df["score"].idxmin(), "class"])

    tab_student_max_min_score = tab_source.group_by(col('name')) \
        .aggregate(max_min_score_with_class.alias("max", "class(max)", "min", "class(min)")) \
        .select(col('name'), col('max'), col('class(max)'), col('min'), col('class(min)')) 
    tab_student_max_min_score.execute().print()
    
    @udaf(result_type=DataTypes.ROW([DataTypes.FIELD("max", DataTypes.FLOAT()), DataTypes.FIELD("max tag", DataTypes.STRING()), DataTypes.FIELD("min", DataTypes.FLOAT()), DataTypes.FIELD("min tag", DataTypes.STRING())]), func_type="pandas")
    def max_min_score_with_name(pandas_df: pd.DataFrame):
        return Row(pandas_df["score"].max(), pandas_df.loc[pandas_df["score"].idxmax(), "name"], pandas_df["score"].min(), pandas_df.loc[pandas_df["score"].idxmin(), "name"])
    
    tab_class_max_min_score = tab_source.group_by(col('class')) \
        .aggregate(max_min_score_with_name.alias("max", "name(max)", "min", "name(min)")) \
        .select(col('class'), col('max'), col('name(max)'), col('min'), col('name(min)')) 
    tab_class_max_min_score.execute().print()
    
if __name__ == '__main__':
    calc()

到了这里,关于0基础学习PyFlink——用户自定义函数之UDAF的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统

    在 《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。 在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。 Sink用于将Reduce结果输出到外部系统。它也是通过

    2024年02月08日
    浏览(36)
  • 0基础学习PyFlink——事件时间和运行时间的窗口

    在 《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》一文中,我们使用的是运行时间(Tumbling ProcessingTime Windows)作为窗口的参考时间: 而得到的结果也是不稳定的。 这是因为每次运行时,CPU等系统资源的繁忙程度是不一样的,这就影响了最后的运行结果。 为了让结果稳

    2024年02月05日
    浏览(52)
  • 0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)

    在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们发现如果窗口内元素个数没有达到窗口大小时,计算个数的函数是不会被调用的。如下图中红色部分 那么有没有办法让上图中(B,2)和(D,5)也会被计算呢? 这就可以使用本节介绍的时间滚动窗口。它不

    2024年02月06日
    浏览(41)
  • Python零基础学习7.1—Python自定义函数的定义和调用

    函数是组织好的,可重复使用的,用来实现单一,或相关联功能的代码段。 函数能提高应用的模块性,和代码的重复利用率。Python提供了许多内建函数,比如print()。但我们也可以自己创建函数来实现一些功能,这被叫做用户自定义函数。 下面来看几个例题: 任务描述 本关

    2024年02月04日
    浏览(53)
  • Hive UDF、UDAF和UDTF函数详解

    在 Hive 中,可以编写和使用不同类型的自定义函数,包括 UDF(User-Defined Functions)、UDAF(User-Defined Aggregate Functions)和 UDTF(User-Defined Table Functions)。这些自定义函数允许你扩展 Hive 的功能,以执行自定义的数据处理操作。 UDF(User-Defined Functions) : 用途:UDF 用于处理一行数

    2024年02月10日
    浏览(38)
  • Python零基础学习7.2—Python自定义函数的综合应用

    本章我们加大一点难度,来让大家更好的掌握Python函数的使用技巧 来看例题: 任务描述 本关任务:素数问题函数。 (1)实现isPrime()函数,参数为整数。如果是素数,返回True,否则返回False。 (2)在(1)的基础上,编写一个函数listPrime(),该函数可以接受任意个数数据,返

    2024年02月03日
    浏览(52)
  • C++ 学习 ::【基础篇:14】:C++ 类的基本成员函数:析构函数的作用 及 自定义析构函数情形

    本系列 C++ 相关文章 仅为笔者学习笔记记录,用自己的理解记录学习!C++ 学习系列将分为三个阶段: 基础篇、STL 篇、高阶数据结构与算法篇 ,相关重点内容如下: 基础篇 : 类与对象 (涉及C++的三大特性等); STL 篇 : 学习使用 C++ 提供的 STL 相关库 ; 高阶数据结构与算

    2024年02月07日
    浏览(45)
  • pyflink map函数例子

    [root@master pyflink]# cat k102.py  from pyflink.datastream import StreamExecutionEnvironment # 创建 StreamExecutionEnvironment 对象 env = StreamExecutionEnvironment.get_execution_environment() # 读取文件,创建 DataStream 对象 data_stream = env.read_text_file(\\\'/root/pyflink/test.log\\\') # 对每行数据添加字符串 \\\'aaaa\\\' new_stream = data_st

    2024年02月07日
    浏览(39)
  • SQL Server用户定义的函数(UDF)使用详解

    与编程语言中的函数一样,SQL Server 用户定义函数是接受参数、执行操作(如复杂计算)并将该操作的结果作为值返回的例程。返回值可以是单个标量值,也可以是结果集。 模块化编程。可以创建一次函数,将其存储在数据库中,并在程序中调用它任意次数。可以独立于程序

    2023年04月12日
    浏览(43)
  • 16 | Spark SQL 的 UDF(用户自定义函数)

    UDF(用户自定义函数) :Spark SQL 允许用户定义自定义函数,以便在 SQL 查询或 DataFrame 操作中使用。这些 UDF 可以扩展 Spark SQL 的功能,使用户能够执行更复杂的数据操作。 示例:

    2024年02月10日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包