0基础学习PyFlink——用户自定义函数之UDTF

这篇具有很好参考价值的文章主要介绍了0基础学习PyFlink——用户自定义函数之UDTF。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在《0基础学习PyFlink——用户自定义函数之UDF》中,我们讲解了UDF。本节我们将讲解表值函数——UDTF
0基础学习PyFlink——用户自定义函数之UDTF,大数据,flink,大数据,python

表值函数

我们对比下UDF和UDTF

def udf(f: Union[Callable, ScalarFunction, Type] = None,
        input_types: Union[List[DataType], DataType, str, List[str]] = None,
        result_type: Union[DataType, str] = None,
        deterministic: bool = None, 
        name: str = None, 
        func_type: str = "general",
        udf_type: str = None
        ) -> Union[UserDefinedScalarFunctionWrapper, Callable]:
def udtf(f: Union[Callable, TableFunction, Type] = None,
         input_types: Union[List[DataType], DataType, str, List[str]] = None,
         result_types: Union[List[DataType], DataType, str, List[str]] = None,
         deterministic: bool = None,
         name: str = None
         ) -> Union[UserDefinedTableFunctionWrapper, Callable]:

可以发现:

  • UDF比UDTF多了func_type和udf_type参数;
  • UDTF的返回类型比UDF的丰富,多了两个List类型:List[DataType]和List[str];

特别是最后一点,可以认为是UDF和UDTF在应用上的主要区别。
换种更容易理解的说法是:UDTF可以返回任意数量的行作为输出而不是像UDF那样返回单个值(行)。
举一个例子:

word_count_data = ["A", "B", "C", "a", "C"] 

我们希望统计上面这些字符的个数,以及小写后字符的个数。这样A的个数是1,a的个数是2(因为a算一个,A小写后又算一个)。C的个数是2,g的个数是2。
这就要求统计算法在遇到大写字母时,需要统计大小写两种字母;而遇到小写字母时,只需要统计小写字母。

    @udtf(result_types=[DataTypes.STRING()], input_types=row_type_tab_source)
    def rowFunc(row):
        if row[0].isupper():
            yield row[0]
            yield row[0].lower()
        else:
            yield row[0]

yield关键字返回的是generator生成器。Table API对rowFunc的调用最终会生成[“A”,“a”,“B”,“b”,“C”,“c”,“a”,“C”,“c”]。
和调用UDF不同的是,需要使用flat_map来调用UDTF。flat即为“打平”,可以生动的理解为将多维降为一维。

    tab_trans=tab_source.flat_map(rowFunc)
    tab_trans.execute().print()
+--------------------------------+
|                             f0 |
+--------------------------------+
|                              A |
|                              a |
|                              B |
|                              b |
|                              C |
|                              c |
|                              a |
|                              C |
|                              c |
+--------------------------------+
9 rows in set

由于我们没有指定经过处理的值所属的字段名称,于是会使用默认的f0作为字段名。我们可以使用alias来给它别名下。

    tab_trans_alias=tab_trans.alias('trans_word')
    tab_trans_alias.execute().print()
+--------------------------------+
|                     trans_word |
+--------------------------------+
|                              A |
|                              a |
|                              B |
|                              b |
|                              C |
|                              c |
|                              a |
|                              C |
|                              c |
+--------------------------------+
9 rows in set

最后我们就可以用这个新的表做字数统计计算文章来源地址https://www.toymoban.com/news/detail-724057.html

    tab_trans_alias.group_by(col('trans_word')) \
        .select(col('trans_word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()
+I[A, 1]
+I[a, 2]
+I[B, 1]
+I[b, 1]
+I[C, 2]
+I[c, 2]

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunction

word_count_data = ["A", "B", "C", "a", "C"]  
    
def word_count():
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_batch_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)
    
    row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])
    tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source)

    # define the sink schema
    sink_schema = Schema.new_builder() \
        .column("word", DataTypes.STRING().not_null()) \
        .column("count", DataTypes.BIGINT()) \
        .primary_key("word") \
        .build()
        
    # Create a sink descriptor
    sink_descriptor = TableDescriptor.for_connector('print')\
        .schema(sink_schema) \
        .build()
    
    t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)
    
    @udtf(result_types=[DataTypes.STRING()], input_types=row_type_tab_source)
    def rowFunc(row):
        if row[0].isupper():
            yield row[0]
            yield row[0].lower()
        else:
            yield row[0]

    tab_trans=tab_source.flat_map(rowFunc)
    tab_trans.execute().print()
    tab_trans_alias=tab_trans.alias('trans_word')
    tab_trans_alias.execute().print()
    tab_trans_alias.group_by(col('trans_word')) \
        .select(col('trans_word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()

if __name__ == '__main__':
    word_count()

到了这里,关于0基础学习PyFlink——用户自定义函数之UDTF的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统

    在 《0基础学习PyFlink——使用PyFlink的SQL进行字数统计》一文中,我们直接执行了Select查询操作,在终端中直接看到了查询结果。 在生产环境,我们往往要将计算结果保存到外部系统中,比如Mysql等。这个时候我们就要使用Sink。 Sink用于将Reduce结果输出到外部系统。它也是通过

    2024年02月08日
    浏览(29)
  • 0基础学习PyFlink——事件时间和运行时间的窗口

    在 《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》一文中,我们使用的是运行时间(Tumbling ProcessingTime Windows)作为窗口的参考时间: 而得到的结果也是不稳定的。 这是因为每次运行时,CPU等系统资源的繁忙程度是不一样的,这就影响了最后的运行结果。 为了让结果稳

    2024年02月05日
    浏览(42)
  • 0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)

    在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们发现如果窗口内元素个数没有达到窗口大小时,计算个数的函数是不会被调用的。如下图中红色部分 那么有没有办法让上图中(B,2)和(D,5)也会被计算呢? 这就可以使用本节介绍的时间滚动窗口。它不

    2024年02月06日
    浏览(32)
  • Python零基础学习7.1—Python自定义函数的定义和调用

    函数是组织好的,可重复使用的,用来实现单一,或相关联功能的代码段。 函数能提高应用的模块性,和代码的重复利用率。Python提供了许多内建函数,比如print()。但我们也可以自己创建函数来实现一些功能,这被叫做用户自定义函数。 下面来看几个例题: 任务描述 本关

    2024年02月04日
    浏览(42)
  • Hive UDF、UDAF和UDTF函数详解

    在 Hive 中,可以编写和使用不同类型的自定义函数,包括 UDF(User-Defined Functions)、UDAF(User-Defined Aggregate Functions)和 UDTF(User-Defined Table Functions)。这些自定义函数允许你扩展 Hive 的功能,以执行自定义的数据处理操作。 UDF(User-Defined Functions) : 用途:UDF 用于处理一行数

    2024年02月10日
    浏览(29)
  • Python零基础学习7.2—Python自定义函数的综合应用

    本章我们加大一点难度,来让大家更好的掌握Python函数的使用技巧 来看例题: 任务描述 本关任务:素数问题函数。 (1)实现isPrime()函数,参数为整数。如果是素数,返回True,否则返回False。 (2)在(1)的基础上,编写一个函数listPrime(),该函数可以接受任意个数数据,返

    2024年02月03日
    浏览(41)
  • C++ 学习 ::【基础篇:14】:C++ 类的基本成员函数:析构函数的作用 及 自定义析构函数情形

    本系列 C++ 相关文章 仅为笔者学习笔记记录,用自己的理解记录学习!C++ 学习系列将分为三个阶段: 基础篇、STL 篇、高阶数据结构与算法篇 ,相关重点内容如下: 基础篇 : 类与对象 (涉及C++的三大特性等); STL 篇 : 学习使用 C++ 提供的 STL 相关库 ; 高阶数据结构与算

    2024年02月07日
    浏览(36)
  • hive学习笔记之十一:UDTF

    为了验证UDTF的功能,咱们要先把表和数据都准备好: 新建名为t16的表: create table t16( person_name string, string_field string ) row format delimited fields terminated by ‘|’ stored as textfile; 本地新建文本文件016.txt,内容如下: tom|1:province:guangdong jerry|2:city:shenzhen john|3 导入数据: load data loca

    2024年04月08日
    浏览(25)
  • pyflink map函数例子

    [root@master pyflink]# cat k102.py  from pyflink.datastream import StreamExecutionEnvironment # 创建 StreamExecutionEnvironment 对象 env = StreamExecutionEnvironment.get_execution_environment() # 读取文件,创建 DataStream 对象 data_stream = env.read_text_file(\\\'/root/pyflink/test.log\\\') # 对每行数据添加字符串 \\\'aaaa\\\' new_stream = data_st

    2024年02月07日
    浏览(32)
  • SQL Server用户定义的函数(UDF)使用详解

    与编程语言中的函数一样,SQL Server 用户定义函数是接受参数、执行操作(如复杂计算)并将该操作的结果作为值返回的例程。返回值可以是单个标量值,也可以是结果集。 模块化编程。可以创建一次函数,将其存储在数据库中,并在程序中调用它任意次数。可以独立于程序

    2023年04月12日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包