flinksql 流表转换, 自定义udf/udtf,SQL 内置函数及自定义函数

这篇具有很好参考价值的文章主要介绍了flinksql 流表转换, 自定义udf/udtf,SQL 内置函数及自定义函数。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

flinksql 流表转换, 自定义udf/udtf,SQL 内置函数及自定义函数,flink技术原理,sql,数据库
flinksql 流表转换, 自定义udf/udtf,SQL 内置函数及自定义函数,flink技术原理,sql,数据库
flinksql 流表转换, 自定义udf/udtf,SQL 内置函数及自定义函数,flink技术原理,sql,数据库

1、在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用。不需要专门为 Scala 的 Table API 注册函数。

2、函数通过调用 registerFunction()方法在 TableEnvironment 中注册。当用户定义的函数 被注册时,它被插入到 TableEnvironment 的函数目录中,
这样 Table API 或 SQL 解析器就可 以识别并正确地解释它

函数总结,函数总分为四大类
flinksql 流表转换, 自定义udf/udtf,SQL 内置函数及自定义函数,flink技术原理,sql,数据库

1、标量函数

用户定义的标量函数,可以将 0、1 或多个标量值,映射到新的标量值。
为了定义标量函数,必须在 org.apache.flink.table.functions 中扩展基类 Scalar Function, 并实现(一个或多个)求值(evaluation,eval)方法。标量函数的行为由求值方法决定, 求值方法必须公开声明并命名为 eval(直接 def 声明,没有 override)。求值方法的参数类型 和返回类型,确定了标量函数的参数和返回类型。

package table.tableUdf

import com.yangwj.api.SensorReading
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

/**
 * @author yangwj
 * @date 2021/1/15 23:40
 * @version 1.0
 */
object ScalarFunctionTest {
  def main(args: Array[String]): Unit = {
    //1、创建表执行环境、就得使用流式环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)

    //2、连接外部系统,读取数据,注册表
    //2.1读取文件
    val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(inputFile)
    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
      override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000L
    })
    //tp.proctime 处理时间,注意,使用表达式,一定要引用隐式转换,否则无法使用
    val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id,  'temperature, 'timestamp.rowtime as 'ts)

    //调用自定义hash函数,对id进行hash运算
    //1、table api
    //首先new一个UDF的实例
    val hashCode = new HashCode(23)
    val apiResult: Table = sensorTable
      .select('id, 'ts, hashCode('id))


    //2、sql调用
    //需要在环境注册UDF
    tableEnv.createTemporaryView("sensor",sensorTable)
    tableEnv.registerFunction("hashCode",hashCode)
    val udfResult: Table = tableEnv.sqlQuery(
      """
        |select id,ts,hashCode(id)
        |from sensor
      """.stripMargin)

    apiResult.toAppendStream[Row].print("apiResult")
    udfResult.toAppendStream[Row].print("udfResult")
    env.execute("udf test")
  }
}

//自定义标量函数
class  HashCode(factor:Int) extends ScalarFunction{
    //必须叫 eval
    def  eval(s:String): Int ={
          s.hashCode * factor - 10000
    }
}

2、表函数

  • 1、与用户定义的标量函数类似,用户定义的表函数,可以将 0、1 或多个标量值作为输入 参数;与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值。
    、为了定义一个表函数,必须扩展 org.apache.flink.table.functions 中的基类 TableFunction 并实现(一个或多个)求值方法。表函数的行为由其求值方法决定,求值方法必须是 public 的,并命名为 eval。  求值方法的参数类型,决定表函数的所有有效参数。
  • 2、返回表的类型由 TableFunction 的泛型类型确定。求值方法使用 protected collect(T)方 法发出输出行。
  • 3、在 Table API 中,Table 函数需要与.joinLateral 或.leftOuterJoinLateral 一起使用。
  • 4、joinLateral 算子,会将外部表中的每一行,与表函数(TableFunction,算子的参数是它 的表达式)计算得到的所有行连接起来。
  • 5、而 leftOuterJoinLateral 算子,则是左外连接,它同样会将外部表中的每一行与表函数计 算生成的所有行连接起来;并且,对于表函数返回的是空表的外部行,也要保留下来。
  • 6、在 SQL 中,则需要使用 Lateral Table(),或者带有 ON TRUE 条件的左连接
package guigu.table.udf

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.types.Row

/**
 * @program: demo
 * @description: 表函数:一行对应多行(表)数据输出
 * @author: yang
 * @create: 2021-01-16 16:07
 */
object tableFunc {
  def main(args: Array[String]): Unit = {
    //1、基于流执行环境创建table执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    //2、读取文件,注册表视图
    tableEnv.connect(new FileSystem().path("E:\\java\\demo\\src\\main\\resources\\file\\data5.csv"))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("ts",DataTypes.BIGINT())
        .field("temperature",DataTypes.DOUBLE()))
      .createTemporaryTable("sensorTable")

    //3、table api
    val split = new Split("_")     // new一个UDF实例
    val sensorTable: Table = tableEnv.from("sensorTable")
    val resutTable = sensorTable.joinLateral(split('id) as ('word,'length))
      .select('id,'ts,'word,'length)

    //4、sql 实现
    tableEnv.registerFunction("split",split)
    val sqlResult: Table = tableEnv.sqlQuery(
      """
        |select id ,ts ,word ,length
        |from sensorTable,
        |lateral table ( split(id) ) as splitid(word,length) # splitid 为 split和字段的id的组合
      """.stripMargin)

    resutTable.toAppendStream[(Row)].print("api")

    sqlResult.toAppendStream[(Row)].print("sql")

    env.execute("table function")

  }
}

//输出类型(String,Int)
class Split(separator:String) extends TableFunction[(String,Int)]{
  def eval(str:String): Unit ={
    str.split(separator).foreach(
      word => collect((word,word.length))
    )
  }
}

3、聚合函数

  • 1、用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的 数据,聚合成一个标量值。用户定义的聚合函数,是通过继承 AggregateFunction 抽象类实 现的

  • 2、AggregateFunction 的工作原理如下:
    首先,它需要一个累加器,用来保存聚合中间结果的数据结构(状态)。可以通过 调用 AggregateFunction 的 createAccumulator()方法创建空累加器。
    随后,对每个输入行调用函数的 accumulate()方法来更新累加器。
    处理完所有行后,将调用函数的 getValue()方法来计算并返回最终结果。

  • 3、AggregationFunction 要求必须实现的方法:createAccumulator() 、accumulate()、 getValue()

  • 4、除了上述方法之外,还有一些可选择实现的方法。其中一些方法,可以让系统执行查询 更有效率,而另一些方法,对于某些场景是必需的。例如,如果聚合函数应用在会话窗口
    (session group window)的上下文中,则 merge()方法是必需。 retract() merge() resetAccumulator()

package guigu.table.udf

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.types.Row

/**
 * @program: demo
 * @description: 聚合函数:多行数据聚合输出一行数据
 * @author: yang
 * @create: 2021-01-16 16:41
 */
object aggFunc {

  def main(args: Array[String]): Unit = {

    //1、基于流执行环境创建table执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    //2、读取文件,注册表视图
    tableEnv.connect(new FileSystem().path("E:\\java\\demo\\src\\main\\resources\\file\\data5.csv"))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("ts",DataTypes.BIGINT())
        .field("temperature",DataTypes.DOUBLE()))
      .createTemporaryTable("sensorTable")

    val sensorTable: Table = tableEnv.from("sensorTable")
    //table api
    val aggTemp = new AggTemp()
    val apiResult: Table = sensorTable.groupBy('id).aggregate(aggTemp('temperature) as 'vagTemp).select('id, 'vagTemp)

    //sql 实现
    tableEnv.registerFunction("avgTemp",aggTemp)
    val sqlResult: Table = tableEnv.sqlQuery(
      """
        |select id,avgTemp(temperature)
        |from sensorTable
        |group by id
      """.stripMargin)

    apiResult.toRetractStream[Row].print("apiResult")
    sqlResult.toRetractStream[Row].print("sqlResult")

    env.execute("agg Func")

  }

}

//定义一个类,专门用于聚合的状态
class AvgTempAcc{
  var sum :Double = 0.0
  var count:Int = 0

}

//自定义一个聚合函数,求每个传感器的平均温度值,保存状态(tempSum,tempCount)
class AggTemp extends AggregateFunction[Double,AvgTempAcc]{

  //处理计算函数
  def accumulate(accumulator:AvgTempAcc,temp:Double): Unit ={
      accumulator.sum += temp
      accumulator.count += 1
  }

  //计算函数
  override def getValue(accumulator: AvgTempAcc): Double = accumulator.sum / accumulator.count

  //初始化函数
  override def createAccumulator(): AvgTempAcc = new AvgTempAcc
}

4、表聚合函数

  • 1、用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一 个表中数据,聚合为具有多行和多列的结果表。
    这跟 AggregateFunction 非常类似,只是之 前聚合结果是一个标量值,现在变成了一张表。用户定义的表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的。

  • 2、TableAggregateFunction 的工作原理如下:
    首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。 通过调用 TableAggregateFunction的createAccumulator()方法可以创建空累加器。
    随后,对每个输入行调用函数的 accumulate()方法来更新累加器。
    处理完所有行后,将调用函数的 emitValue()方法来计算并返回最终结果。

  • 3、AggregationFunction 要求必须实现的方法: createAccumulator() 、accumulate()
    除了上述方法之外,还有一些可选择实现的方法:retract()、 merge() 、resetAccumulator()、 emitValue()、emitUpdateWithRetract()文章来源地址https://www.toymoban.com/news/detail-837021.html

package guigu.table.udf

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, FlatAggregateTable, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.table.functions.TableAggregateFunction
import org.apache.flink.types.Row
import org.apache.flink.util.Collector

/**
 * @program: demo
 * @description: 多行数据聚合输出多行数据
 * @author: yang
 * @create: 2021-01-16 18:48
 */
object tableAggFunc {
  def main(args: Array[String]): Unit = {
    //1、基于流执行环境创建table执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    //2、读取文件,注册表视图
    tableEnv.connect(new FileSystem().path("E:\\java\\demo\\src\\main\\resources\\file\\data5.csv"))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("ts",DataTypes.BIGINT())
        .field("temperature",DataTypes.DOUBLE()))
      .createTemporaryTable("sensorTable")

    val sensorTable: Table = tableEnv.from("sensorTable")
    //table api
    val top2Temp = new Top2Temp()

    val resultTable: Table = sensorTable.groupBy('id).flatAggregate(top2Temp('temperature) as('temp, 'rank))
        .select('id,'temp,'rank)

    resultTable.toRetractStream[Row].print("flat agg")

    env.execute(" table agg func")
  }
}

//定义一个类,表示表聚合函数的状态
class  Top2TempAcc{
    var highestTemp:Double = Double.MinValue
    var secondHighestTemp:Double = Double.MinValue

}

//自定义表聚合函数,提取所有温度值中最高的两个温度,输出(temp,rank)
class Top2Temp extends TableAggregateFunction[(Double,Int),Top2TempAcc]{
  //初始化函数
  override def createAccumulator(): Top2TempAcc = new Top2TempAcc()

  //实现计算聚合结果的函数accumulate
  //注意:方法名称必须叫accumulate
  def accumulate(acc:Top2TempAcc,temp:Double): Unit ={
    //判断当前温度值是否比状态值大
    if(temp > acc.highestTemp){
      //如果比最高温度还高,排在第一,原来的顺到第二位
      acc.secondHighestTemp = acc.highestTemp
      acc.highestTemp = temp
    }else if(temp > acc.secondHighestTemp){
      //如果在最高和第二高之间,那么直接替换第二高温度
      acc.secondHighestTemp = temp
    }
  }

  //实现一个输出结果的方法,最终处理完表中所有的数据时调用
  //注意:方法名称必须叫emitValue
  def emitValue(acc:Top2TempAcc,out:Collector[(Double,Int)]): Unit ={
    out.collect((acc.highestTemp,1))
    out.collect((acc.secondHighestTemp,2))
  }


}

到了这里,关于flinksql 流表转换, 自定义udf/udtf,SQL 内置函数及自定义函数的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SQL Server用户定义的函数(UDF)使用详解

    与编程语言中的函数一样,SQL Server 用户定义函数是接受参数、执行操作(如复杂计算)并将该操作的结果作为值返回的例程。返回值可以是单个标量值,也可以是结果集。 模块化编程。可以创建一次函数,将其存储在数据库中,并在程序中调用它任意次数。可以独立于程序

    2023年04月12日
    浏览(42)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】

    用户自定义函数( user-defined function , UDF ),即用户可以根据自身需求,重新实现算子的逻辑。 用户自定义函数分为: 函数类 、 匿名函数 、 富函数类 。 Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunction 、 FilterFunction 、 ReduceFunction 等。所

    2024年01月23日
    浏览(47)
  • 0基础学习PyFlink——用户自定义函数之UDTF

    在《0基础学习PyFlink——用户自定义函数之UDF》中,我们讲解了UDF。本节我们将讲解表值函数——UDTF 我们对比下UDF和UDTF 可以发现: UDF比UDTF多了func_type和udf_type参数; UDTF的返回类型比UDF的丰富,多了两个List类型:List[DataType]和List[str]; 特别是最后一点,可以认为是UDF和UD

    2024年02月07日
    浏览(33)
  • SparkSQL函数定义——UDF函数,窗口函数

    目录 1 定义UDF函数  1.1  返回值是数组类型的UDF定义 1.2 返回字典类型的UDF定义 2 窗口函数 目前python仅支持UDF 两种定义方式: 1. sparksession.udf.register() 注册的UDF可以用于DSL和SQL 返回值用于DSL风格,传参内给的名字用于SQL风格         方法一语法: udf对象 =  sparksession.ud

    2024年02月06日
    浏览(50)
  • Hive UDF自定义函数上线速记

    待补充 1.1 提交jar包至hdfs 使用命令or webui 上传jar到hdfs,命令的话格式如下 hdfs dfs -put [Linux目录] [hdfs目录] 示例: 1.2 将 JAR 文件添加到 Hive 中 注意hdfs路径前面要加上 hdfs://namenode , 如果是hdfs集群的话换成 dfs.nameservices 的值 1.3 注册永久 UDF 函数 hdfs://namenode 和1.2步骤保持一致 CR

    2024年02月11日
    浏览(39)
  • 0基础学习PyFlink——用户自定义函数之UDF

    PyFlink中关于用户定义方法有: UDF:用户自定义函数。 UDTF:用户自定义表值函数。 UDAF:用户自定义聚合函数。 UDTAF:用户自定义表值聚合函数。 这些字母可以拆解如下: UD表示User Defined(用户自定义); F表示Function(方法); T表示Table(表); A表示Aggregate(聚合); Aggr

    2024年02月08日
    浏览(46)
  • 44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例--网上有些说法好像是错误的

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月09日
    浏览(43)
  • Spark SQL 内置函数

    1、10类内置函数 Spark SQL内置了大量的函数,位于API org.apache.spark.sql.functions中。这些函数主要分为10类:UDF函数、聚合函数、日期函数、排序函数、非聚合函数、数学函数、混杂函数、窗口函数、字符串函数、集合函数,大部分函数与Hive中相同。 2、两种使用方式 使用内置函

    2024年02月09日
    浏览(43)
  • 大数据Flink(九十九):SQL 函数的解析顺序和系统内置函数

    文章目录 SQL 函数的解析顺序和系统内置函数

    2024年02月07日
    浏览(45)
  • 【Python从入门到人工智能】16个必会的Python内置函数(5)——数据转换与计算(详细语法参考 + 参数说明 + 具体示例)

      你的思绪就像这池水,朋友,稍有外界触动就很难清澈明朗,可如果让它静下来,答案顿时变得清晰了。   🎯 作者主页 : 追光者♂ 🔥          🌸 个人简介 : 计算机专业硕士研究生 💖、 2022年CSDN博客之星人工智能领域TOP4 🌟、 阿里云社区特邀专家博主 🏅、 C

    2023年04月13日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包