0基础学习PyFlink——用户自定义函数之UDTAF

这篇具有很好参考价值的文章主要介绍了0基础学习PyFlink——用户自定义函数之UDTAF。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在前面几篇文章中,我们分别介绍了UDF、UDTF和UDAF这三种用户自定义函数。本节我们将介绍最后一种函数:UDTAF——用户自定义表值聚合函数。
0基础学习PyFlink——用户自定义函数之UDTAF,大数据,数据库,flink,sql,python

UDTAF

UDTAF函数即具备了UDTF的特点,也具备UDAF的特点。即它可以像《0基础学习PyFlink——用户自定义函数之UDTF》介绍的UDTF那样可以返回任意数量的行作为输,又可以像《0基础学习PyFlink——用户自定义函数之UDAF》介绍的UDAF那样通过聚合的数据(多组)计算出一个值
举一个例子:我们拿到一个学生成绩表,每行包括:

  • 学生姓名
  • 英语成绩
  • 数学成绩
  • 年级

现在我们需要把这张表调整为:

  • 学生姓名
  • 成绩
  • 科目
  • 科目年级平均成绩
  • 年级
    0基础学习PyFlink——用户自定义函数之UDTAF,大数据,数据库,flink,sql,python
    将一行中的“英语成绩”和“数学成绩”,拆成“成绩”和“科目”,相当于把一行数据拆解成多行,如上图左侧“张三”只有一行,而右侧有两行“张三”信息。这种拆解操作就需要T类型的用户自定义函数,比如UDTF和UDTAF。
    而我们需要计算一个年级一科的平均成绩,比如1年级英语的平均成绩,则需要按年级聚合之后再做计算。这个就需要A类型的用户自定义函数,比如UDAF和UDTAF。
    同时要满足上述两种技术方案的就是UDTAF。我们先看下主体代码,它和《0基础学习PyFlink——用户自定义函数之UDAF》中的很像。但是有两个重要区别:
  • 要设置成in_streaming_mode模式,否则会报错
  • udtaf要修饰一个对象,而非一个方法;
def calc():
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_streaming_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)
    
    row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('english', DataTypes.FLOAT()), DataTypes.FIELD('math', DataTypes.FLOAT()), DataTypes.FIELD('grade', DataTypes.STRING())])
    students_score = [
        ("张三", 80.0, 60.0, "1"),
        ("李四", 75.0, 95.0, "1"),
        ("王五", 90.0, 90.0, "2"),
        ("赵六", 85.0, 70.0, "2"),
        ("孙七", 60.0, 0.0, "3"),
    ]
    tab_source = t_env.from_elements(students_score, row_type_tab_source)
    
    split_class = udtaf(SplitClass())
    tab_source.group_by(col('grade')) \
        .flat_aggregate(split_class) \
        .select(col('*')) \
        .execute().print()

TableAggregateFunction的实现

用于计算的类要继承于TableAggregateFunction,即UDTAF中的TAF。

class SplitClass(TableAggregateFunction):
    _class_keys = ["english", "math"]

我们需要通过get_result_type告诉框架,UDTAF函数返回的是什么类型的数据。一般我们都是构造一个行类型——ROW,然后定义其每个字段的值和类型:

  • name:string类型,用户姓名;
  • score:float类型,考分;
  • avg score:float类型,科目年级平均分数;
  • class:sting类型,科目名称;

累加器

accumulator(累加器)是用于参与计算的中间数据。比如这个案例中,我们会向让accumulator保存拆解后的数据(即一行拆解成多行后的数据),然后再计算各年级每科的平均成绩。

定义

    def get_accumulator_type(self):
        return DataTypes.ARRAY(DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())])) 

因为只是为了保存展开的数据,于是我们只用定义均值计算之前的字段:

  • name:string类型,姓名;
  • score:float类型,分数;
  • class:string类型,科目名称;

创建

刚开始时,我们让其是一个空数组,对应上定义中的ARRAY类型。

    def create_accumulator(self):
        return []

累加

我们对科目进行遍历,进行行的拆分。即将(“张三”, 80.0, 60.0, “1”)拆解成(“张三”, 80.0, “english”)和(“张三”, 60.0, “math”)这样的两组数据。

    def accumulate(self, accumulator, row):
        for i in self._class_keys:
            accumulator.append(Row(row["name"], row[i], i))

返回

类型

    def get_result_type(self):
        return DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("avg score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())])

可以看到result_type(返回类型)和accumulator_type(累加器类型)是不一样的(也可以一样,主要看怎么计算规则)。前者比后者多了“学科年级平均分”(avg score),这就更加接近我们希望获得的最终结果。
这些字段和我们目标字段只差一个grade(年级)。因为原始表中有grade,且我们会通过grade聚类,所以最终我们可以获得这个信息,而不用在这儿定义。
需要注意的是,虽然表值类型函数返回的是一组数据(若干Row),但是这儿只是返回Row的具体定义,而不是ARRAY[Row]。

计算

    def emit_value(self, accumulator):
        rows = []
        for i in self._class_keys: 
            total = 0.0
            student_count = 0
            for y in accumulator:
                # y[2] y[]"class"]
                if i == y[2]:
                    # y[1] y["score"]
                    total = total + y[1]
                    student_count = student_count + 1
            avg_score = total / student_count
            for y in accumulator:
                if i == y[2]:
                    rows.append(Row(y[0], y[1], avg_score, y[2]))
        for x in rows:   
            yield x

这个函数会在最后执行,它会通过累加器中的数据计算“学科年级平均分”,然后构造和“返回类型”一直的Row到rows数组中。最后通过yeild关键字返回一个生成器,我们可以将其看成还是一组Row,即拆解后的结果。

最后我们看下结果

+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                          grade |                           name |                          score |                      avg score |                          class |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                              1 |                           张三 |                           80.0 |                           77.5 |                        english |
| +I |                              1 |                           李四 |                           75.0 |                           77.5 |                        english |
| +I |                              1 |                           张三 |                           60.0 |                           77.5 |                           math |
| +I |                              1 |                           李四 |                           95.0 |                           77.5 |                           math |
| +I |                              2 |                           王五 |                           90.0 |                           87.5 |                        english |
| +I |                              2 |                           赵六 |                           85.0 |                           87.5 |                        english |
| +I |                              2 |                           王五 |                           90.0 |                           80.0 |                           math |
| +I |                              2 |                           赵六 |                           70.0 |                           80.0 |                           math |
| +I |                              3 |                           孙七 |                           60.0 |                           60.0 |                        english |
| +I |                              3 |                           孙七 |                            0.0 |                            0.0 |                           math |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
10 rows in set

0基础学习PyFlink——用户自定义函数之UDTAF,大数据,数据库,flink,sql,python
0基础学习PyFlink——用户自定义函数之UDTAF,大数据,数据库,flink,sql,python
0基础学习PyFlink——用户自定义函数之UDTAF,大数据,数据库,flink,sql,python文章来源地址https://www.toymoban.com/news/detail-717299.html

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf,TableAggregateFunction
import pandas as pd
from pyflink.table.udf import UserDefinedFunction
from typing import List

class SplitClass(TableAggregateFunction):
    _class_keys = ["english", "math"]

    def emit_value(self, accumulator):
        rows = []
        for i in self._class_keys: 
            total = 0.0
            student_count = 0
            for y in accumulator:
                if i == y[2]:
                    total = total + y[1]
                    student_count = student_count + 1
            avg_score = total / student_count
            for y in accumulator:
                if i == y[2]:
                    rows.append(Row(y[0], y[1], avg_score, y[2]))
        return rows

    def create_accumulator(self):
        return []

    def accumulate(self, accumulator, row):
        for i in self._class_keys:
            accumulator.append(Row(row["name"], row[i], i))

    def get_accumulator_type(self):
        return DataTypes.ARRAY(DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())]))  

    def get_result_type(self):
        return DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("avg score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())])

    
def calc():
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_streaming_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)
    
    row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('english', DataTypes.FLOAT()), DataTypes.FIELD('math', DataTypes.FLOAT()), DataTypes.FIELD('grade', DataTypes.STRING())])
    students_score = [
        ("张三", 80.0, 60.0, "1"),
        ("李四", 75.0, 95.0, "1"),
        ("王五", 90.0, 90.0, "2"),
        ("赵六", 85.0, 70.0, "2"),
        ("孙七", 60.0, 0.0, "3"),
    ]
    tab_source = t_env.from_elements(students_score, row_type_tab_source)
    
    split_class = udtaf(SplitClass())
    tab_source.group_by(col('grade')) \
        .flat_aggregate(split_class) \
        .select(col('*')) \
        .execute().print()
    
if __name__ == '__main__':
    calc()

到了这里,关于0基础学习PyFlink——用户自定义函数之UDTAF的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 0基础学习PyFlink——事件时间和运行时间的窗口

    在 《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》一文中,我们使用的是运行时间(Tumbling ProcessingTime Windows)作为窗口的参考时间: 而得到的结果也是不稳定的。 这是因为每次运行时,CPU等系统资源的繁忙程度是不一样的,这就影响了最后的运行结果。 为了让结果稳

    2024年02月05日
    浏览(53)
  • 0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)

    在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们发现如果窗口内元素个数没有达到窗口大小时,计算个数的函数是不会被调用的。如下图中红色部分 那么有没有办法让上图中(B,2)和(D,5)也会被计算呢? 这就可以使用本节介绍的时间滚动窗口。它不

    2024年02月06日
    浏览(42)
  • Python零基础学习7.1—Python自定义函数的定义和调用

    函数是组织好的,可重复使用的,用来实现单一,或相关联功能的代码段。 函数能提高应用的模块性,和代码的重复利用率。Python提供了许多内建函数,比如print()。但我们也可以自己创建函数来实现一些功能,这被叫做用户自定义函数。 下面来看几个例题: 任务描述 本关

    2024年02月04日
    浏览(53)
  • Python零基础学习7.2—Python自定义函数的综合应用

    本章我们加大一点难度,来让大家更好的掌握Python函数的使用技巧 来看例题: 任务描述 本关任务:素数问题函数。 (1)实现isPrime()函数,参数为整数。如果是素数,返回True,否则返回False。 (2)在(1)的基础上,编写一个函数listPrime(),该函数可以接受任意个数数据,返

    2024年02月03日
    浏览(52)
  • C++ 学习 ::【基础篇:14】:C++ 类的基本成员函数:析构函数的作用 及 自定义析构函数情形

    本系列 C++ 相关文章 仅为笔者学习笔记记录,用自己的理解记录学习!C++ 学习系列将分为三个阶段: 基础篇、STL 篇、高阶数据结构与算法篇 ,相关重点内容如下: 基础篇 : 类与对象 (涉及C++的三大特性等); STL 篇 : 学习使用 C++ 提供的 STL 相关库 ; 高阶数据结构与算

    2024年02月07日
    浏览(45)
  • pyflink map函数例子

    [root@master pyflink]# cat k102.py  from pyflink.datastream import StreamExecutionEnvironment # 创建 StreamExecutionEnvironment 对象 env = StreamExecutionEnvironment.get_execution_environment() # 读取文件,创建 DataStream 对象 data_stream = env.read_text_file(\\\'/root/pyflink/test.log\\\') # 对每行数据添加字符串 \\\'aaaa\\\' new_stream = data_st

    2024年02月07日
    浏览(39)
  • SQL Server用户定义的函数(UDF)使用详解

    与编程语言中的函数一样,SQL Server 用户定义函数是接受参数、执行操作(如复杂计算)并将该操作的结果作为值返回的例程。返回值可以是单个标量值,也可以是结果集。 模块化编程。可以创建一次函数,将其存储在数据库中,并在程序中调用它任意次数。可以独立于程序

    2023年04月12日
    浏览(43)
  • 16 | Spark SQL 的 UDF(用户自定义函数)

    UDF(用户自定义函数) :Spark SQL 允许用户定义自定义函数,以便在 SQL 查询或 DataFrame 操作中使用。这些 UDF 可以扩展 Spark SQL 的功能,使用户能够执行更复杂的数据操作。 示例:

    2024年02月10日
    浏览(41)
  • Flink---5、聚合算子、用户自定义函数、物理分区算子、分流、合流

                           星光下的赶路人star的个人主页                        欲买桂花同载酒,终不似,少年游 计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并—这就是

    2024年02月07日
    浏览(45)
  • FPGA基础知识-用户自定义原语

    目录 学习目标 学习内容 1.UDP的组成 2.UDP定义规则 3.表示组合逻辑的UDP 4.表示时序逻辑的UDP 5.UDP表中的缩写符号 6.UDP设计指南  学习时间 学习总结 提示:这里可以添加学习目标 理解编写UDP的规则,明白UDP的各个组成部分。 学会编写表示时序和表示组合逻辑的两种不同的UDP, 理

    2024年02月11日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包