Flink SQL自定义表值函数(Table Function)

这篇具有很好参考价值的文章主要介绍了Flink SQL自定义表值函数(Table Function)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用场景: 表值函数即 UDTF,⽤于进⼀条数据,出多条数据的场景。

开发流程:

  • 实现 org.apache.flink.table.functions.TableFunction 接⼝
  • 实现⼀个或者多个⾃定义的 eval 函数,名称必须叫做 eval,eval ⽅法签名必须是 public 的
  • eval ⽅法的⼊参是直接体现在 eval 函数签名中,出参是体现在 TableFunction 类的泛型参数 T 中

注意:

eval 是没有返回值的,和标量函数不同,Flink TableFunction 接⼝提供了 collect(T) 来发送输出的数据,如果体现在函数签名上,就成了标量函数,使⽤ collect(T) 能体现出 进⼀条数据 出多条数据。

在 SQL 中是⽤ SQL 中的 LATERAL TABLE() 配合 JOIN 、 LEFT JOIN xxx ON TRUE 使⽤。

开发案例:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.*;

/**
 * 输入数据:
 * nc -lk 8888
 * a,bb,cc
 * 
 * 输出结果:
 * 
 * res1=>:5> +I[a,bb,cc, a, 1]
 * res1=>:7> +I[a,bb,cc, cc, 2]
 * res1=>:6> +I[a,bb,cc, bb, 2]
 * res8=>:4> +I[a,bb,cc, a, 1]
 * res8=>:5> +I[a,bb,cc, bb, 2]
 * res8=>:6> +I[a,bb,cc, cc, 2]
 * res4=>:3> +I[a,bb,cc, cc, 2]
 * res4=>:1> +I[a,bb,cc, a, 1]
 * res4=>:2> +I[a,bb,cc, bb, 2]
 * res7=>:8> +I[a,bb,cc, bb, 2]
 * res7=>:1> +I[a,bb,cc, cc, 2]
 * res7=>:7> +I[a,bb,cc, a, 1]
 * res2=>:2> +I[a,bb,cc, cc, 2]
 * res2=>:8> +I[a,bb,cc, a, 1]
 * res2=>:1> +I[a,bb,cc, bb, 2]
 * res6=>:1> +I[a,bb,cc, cc, 2]
 * res6=>:7> +I[a,bb,cc, a, 1]
 * res6=>:8> +I[a,bb,cc, bb, 2]
 * res3=>:6> +I[a,bb,cc, bb, 2]
 * res3=>:7> +I[a,bb,cc, cc, 2]
 * res3=>:5> +I[a,bb,cc, a, 1]
 * res5=>:7> +I[a,bb,cc, bb, 2]
 * res5=>:8> +I[a,bb,cc, cc, 2]
 * res5=>:6> +I[a,bb,cc, a, 1]
 */
public class TableFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8888);

        Table table = tEnv.fromDataStream(source, "field");

        tEnv.createTemporaryView("SourceTable", table);

        // 在 Table API ⾥可以直接调⽤ UDF
        Table res1 = tEnv.from("SourceTable")
                .joinLateral(call(SplitFunction.class, $("field")))
                .select($("field"), $("word"), $("length"));

        Table res2 = tEnv
                .from("SourceTable")
                .leftOuterJoinLateral(call(SplitFunction.class, $("field")))
                .select($("field"), $("word"), $("length"));


        // 在 Table API ⾥重命名 UDF 的结果字段
        Table res3 = tEnv.from("SourceTable")
                .leftOuterJoinLateral(call(SplitFunction.class, $("field")))
                .as("myField", "newWord", "newLength")
                .select($("myField"), $("newWord"), $("newLength"));

        // 注册函数
        tEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);

        // 在 Table API ⾥调⽤注册好的 UDF
        Table res4 = tEnv
                .from("SourceTable")
                .joinLateral(call("SplitFunction", $("field")))
                .select($("field"), $("word"), $("length"));


        Table res5 = tEnv
                .from("SourceTable")
                .leftOuterJoinLateral(call("SplitFunction", $("field")))
                .select($("field"), $("word"), $("length"));

        // 在 SQL ⾥调⽤注册好的 UDF
        Table res6 = tEnv.sqlQuery(
                "SELECT field, word, length " +
                        "FROM SourceTable, LATERAL TABLE(SplitFunction(field))");

        Table res7 = tEnv.sqlQuery(
                "SELECT field, word, length " +
                        "FROM SourceTable " +
                        "LEFT JOIN LATERAL TABLE(SplitFunction(field)) ON TRUE");


        // 在 SQL ⾥重命名 UDF 字段
        Table res8 = tEnv.sqlQuery(
                "SELECT field, newWord, newLength " +
                        "FROM SourceTable " +
                        "LEFT JOIN LATERAL TABLE(SplitFunction(field)) AS T(newWord, newLength) ON TRUE");

        tEnv.toDataStream(res1).print("res1=>");
        tEnv.toDataStream(res2).print("res2=>");
        tEnv.toDataStream(res3).print("res3=>");
        tEnv.toDataStream(res4).print("res4=>");
        tEnv.toDataStream(res5).print("res5=>");
        tEnv.toDataStream(res6).print("res6=>");
        tEnv.toDataStream(res7).print("res7=>");
        tEnv.toDataStream(res8).print("res8=>");

        env.execute();
    }

    @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
    public static class SplitFunction extends TableFunction<Row> {
        public void eval(String str) {
            for (String s : str.split(",")) {
                // 输出结果
                collect(Row.of(s, s.length()));
            }
        }
    }
}

注意: 如果使⽤ Scala 实现函数,不要使⽤ Scala 中 object 实现 UDF,Scala object 是单例的,可能会导致并发问题。

测试结果:

Flink SQL自定义表值函数(Table Function),Flink精通~SQLAPI使用,flink,sql,大数据文章来源地址https://www.toymoban.com/news/detail-753656.html

到了这里,关于Flink SQL自定义表值函数(Table Function)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink Table/Sql自定义Kudu Sink实战(其它Sink可参考)

    使用第三方的org.apache.bahir » flink-connector-kudu,batch模式写入数据到Kudu会有FlushMode相关问题 具体可以参考我的这篇博客通过Flink SQL操作创建Kudu表,并读写Kudu表数据 Flink的Dynamic table能够统一处理batch和streaming 实现自定义Source或Sink有两种方式: 通过对已有的connector进行拓展。比

    2024年02月14日
    浏览(43)
  • 32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月05日
    浏览(42)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月19日
    浏览(52)
  • 24、Flink 的table api与sql之Catalogs(java api操作分区与函数、表)-4

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月08日
    浏览(44)
  • Flink Temporal Join 系列 (4):用 Temporal Table Function 实现基于处理时间的关联

    博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧

    2024年04月23日
    浏览(36)
  • 44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例--网上有些说法好像是错误的

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月09日
    浏览(43)
  • 微软函数 for vba自定义函数Function

    \\\"XLAM\\\"    支持WPS、Office 2007及以上版本。 \\\"XLA\\\"    支持WPS、Office 2003及以上版本。 声明: 必须具有VBA运行环境。Office自带了VBA不必再安装;部分WPS须提前安装VBA插件(下载链接在下方)。 使用: 解压后,手动加载到 [开发工具] 下的 [EXCEL加载项] 中,然后在单元格输入 =\\\'函数名

    2024年03月15日
    浏览(48)
  • 【Flink SQL】Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API

    《 Flink SQL 基础概念 》系列,共包含以下 5 篇文章: Flink SQL 基础概念(一):SQL Table 运行环境、基本概念及常用 API Flink SQL 基础概念(二):数据类型 Flink SQL 基础概念(三):SQL 动态表 连续查询 Flink SQL 基础概念(四):SQL 的时间属性 Flink SQL 基础概念(五):SQL 时区问

    2024年03月21日
    浏览(72)
  • Java 中函数 Function 的使用和定义

    模块化和可复用性:将代码逻辑封装在函数中,可以 提高代码的模块化程度,使得代码更易于维护和重用 。通过函数,可以将通用的逻辑抽取出来,降低代码的重复性。 可组合性: 函数可以作为参数传递给其他函数,也可以作为返回值返回 ,从而实现代码的组合和复用。

    2024年04月09日
    浏览(38)
  • Flink Table API 与 SQL 编程整理

    Flink API 总共分为 4 层这里主要整理 Table API 的使用 Table API 是流处理和批处理通用的关系型 API , Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。 Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的, Table API 是 Scala 和 Java 语言集成式的 API 。与常规 SQL 语言

    2024年02月04日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包