FlinkSQL聚合函数(Aggregate Function)详解

这篇具有很好参考价值的文章主要介绍了FlinkSQL聚合函数(Aggregate Function)详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

使用场景: 聚合函数即 UDAF,常⽤于进多条数据,出⼀条数据的场景。

FlinkSQL聚合函数(Aggregate Function)详解,Flink精通~SQLAPI使用,java,flink,sql

上图展示了⼀个 聚合函数的例⼦ 以及 聚合函数包含的重要⽅法

案例场景:

关于饮料的表,有三个字段,分别是 id、name、price,表⾥有 5 ⾏数据,找到所有饮料⾥最贵的饮料的价格,即执⾏⼀个 max() 聚合拿到结果,遍历所有 5 ⾏数据,最终结果就只有⼀个数值。

开发流程:

实现 AggregateFunction 接⼝,其中所有的⽅法必须是 public 的、⾮ static 的;

必须实现以下⽅法:

  • Acc聚合中间结果 createAccumulator() : 为当前 Key 初始化⼀个空的 accumulator,其存储了聚合的中间结果,⽐如在执⾏ max() 时会存储当前的 max 值;
  • accumulate(Acc accumulator, Input输⼊参数) : 每⼀⾏数据,调⽤ accumulate() ⽅法更新 accumulator,处理每⼀条输⼊数据,方法必须声明为 public 和⾮ static 的,accumulate ⽅法可以重载,⽅法的参数类型可以不同,并且⽀持变⻓参数。
  • Output输出参数 getValue(Acc accumulator) : 通过调⽤ getValue ⽅法来计算和返回最终的结果。

某些场景下必须实现:

  • retract(Acc accumulator, Input输⼊参数) : 在回撤流的场景下必须实现,在计算回撤数据时调⽤,如果没有实现会直接报错。
  • merge(Acc accumulator, Iterable it) : 在批式聚合以及流式聚合中的 Session、Hop 窗⼝聚合场景下必须要实现,此外,这个⽅法对于优化也有帮助,例如,打开了两阶段聚合优化,需要 AggregateFunction 实现 merge ⽅法,在数据 shuffle 前先进⾏⼀次聚合计算。
  • resetAccumulator() : 在批式聚合中是必须实现的。

关于⼊参、出参数据类型信息的⽅法:

默认情况下,⽤户的 Input 输⼊参数( accumulate(Acc accumulator, Input输⼊参数) 的⼊参 Input输⼊参数 )、accumulator( Acc聚合中间结果 createAccumulator() 的返回结果)、 Output输出参数数据类型( Output输出参数 getValue(Acc accumulator) 的 Output输出参数 )会被 Flink 使⽤反射获取到。

对于 accumulator 和 Output 输出参数类型,Flink SQL 的类型推导在遇到复杂类型时会推导出错误的结果(注意:Input输⼊参数 因为是上游算⼦传⼊的,类型信息是确认的,不会出现推导错误),⽐如⾮基本类型 POJO 的复杂类型。

同 ScalarFunction 和 TableFunction, AggregateFunction 提供了 AggregateFunction#getResultType() 和AggregateFunction#getAccumulatorType() 指定最终返回值类型和 accumulator 的类型,两个函数的返回值类型是TypeInformation。

  • getResultType() : 即 Output 输出参数 getValue(Acc accumulator) 的输出结果数据类型;
  • getAccumulatorType() : 即 Acc聚合中间结果 createAccumulator() 的返回结果数据类型。

案例: 加权平均值

  • 定义⼀个聚合函数来计算某⼀列的加权平均
  • 在 TableEnvironment 中注册函数
  • 在查询中使⽤函数

实现思路:

为了计算加权平均值,accumulator 需要存储加权总和以及数据的条数,定义了类 WeightedAvgAccumulator 作为 accumulator,Flink 的 checkpoint 机制会⾃动保存 accumulator,在失败时进⾏恢复,保证精确⼀次的语义。

WeightedAvg(聚合函数)的 accumulate ⽅法有三个输⼊参数,第⼀个是 WeightedAvgAccum accumulator,另外两个是⽤户⾃定义的输⼊:输⼊的值 ivalue 和 输⼊的权重 iweight,尽管 retract()、merge()、resetAccumulator() ⽅法在⼤多数聚合类型中都不是必须实现的,但在样例中提供了他们的实现,并且定义了 getResultType() 和 getAccumulatorType()。

代码案例:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;

import java.io.Serializable;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

/**
 * 输入数据:
 * a,1,1
 * a,10,2
 *
 * 输出结果:
 * res1=>:1> +I[a, 1.0]
 * res2=>:1> +I[a, 1.0]
 * res3=>:1> +I[a, 1.0]
 *
 * res1=>:1> -U[a, 1.0]
 * res1=>:1> +U[a, 7.0]
 * res3=>:1> -U[a, 1.0]
 * res3=>:1> +U[a, 7.0]
 * res2=>:1> -U[a, 1.0]
 * res2=>:1> +U[a, 7.0]
 */
public class AggregateFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple3<String, Double, Double>> tpStream = source.map(new MapFunction<String, Tuple3<String, Double, Double>>() {
            @Override
            public Tuple3<String, Double, Double> map(String input) throws Exception {
                return new Tuple3<>(input.split(",")[0],
                        Double.parseDouble(input.split(",")[1]),
                        Double.parseDouble(input.split(",")[2]));
            }
        });

        Table table = tEnv.fromDataStream(tpStream, "field,iValue,iWeight");

        tEnv.createTemporaryView("SourceTable", table);

        Table res1 = tEnv.from("SourceTable")
                .groupBy($("field"))
                .select($("field"), call(WeightedAvg.class, $("iValue"), $("iWeight")));

        // 注册函数
        tEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);

        // Table API 调⽤函数
        Table res2 = tEnv.from("SourceTable")
                .groupBy($("field"))
                .select($("field"), call("WeightedAvg", $("iValue"), $("iWeight")));

        // SQL API 调⽤函数
        Table res3 = tEnv.sqlQuery("SELECT field, WeightedAvg(`iValue`, iWeight) FROM SourceTable GROUP BY field");

        tEnv.toChangelogStream(res1).print("res1=>");
        tEnv.toChangelogStream(res2).print("res2=>");
        tEnv.toChangelogStream(res3).print("res3=>");

        env.execute();

    }

    // ⾃定义⼀个计算权重 avg 的 accmulator
    public static class WeightedAvgAccumulator implements Serializable {
        public Double sum = 0.0;
        public Double count = 0.0;
    }

    // 输⼊:Long iValue, Integer iWeight
    public static class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccumulator> {
        // 创建⼀个 accumulator
        @Override
        public WeightedAvgAccumulator createAccumulator() {
            return new WeightedAvgAccumulator();
        }

        public void accumulate(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {
            acc.sum += iValue * iWeight;
            acc.count += iWeight;
        }

        public void retract(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {
            acc.sum -= iValue * iWeight;
            acc.count -= iWeight;
        }

        // 获取返回结果
        @Override
        public Double getValue(WeightedAvgAccumulator acc) {
            if (acc.count == 0) {
                return null;
            } else {
                return acc.sum / acc.count;
            }
        }

        // Session window 使⽤这个⽅法将⼏个单独窗⼝的结果合并
        public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> it) {
            for (WeightedAvgAccumulator a : it) {
                acc.count += a.count;
                acc.sum += a.sum;
            }
        }

        public void resetAccumulator(WeightedAvgAccumulator acc) {
            acc.count = 0.0;
            acc.sum = 0.0;
        }
    }
}

测试结果:

FlinkSQL聚合函数(Aggregate Function)详解,Flink精通~SQLAPI使用,java,flink,sql文章来源地址https://www.toymoban.com/news/detail-759873.html

到了这里,关于FlinkSQL聚合函数(Aggregate Function)详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【数据库】通过实例讲清楚,Mongodb的增删查改,分组查询,聚合查询aggregate

    目录 一.基础概念 二.数据库的管理 1.创建数据库 2.删除数据库 二.集合的管理 1.显示所有集合 2.创建集合 3.删除当前集合 4.向集合中插入元素 三.文档的管理 1.文档插入 2.文档的更新 3.文档的删除 4.文档查询 (1)查询基本语法: (2)查询table2集合下的所有文档 (3)查询t

    2024年02月10日
    浏览(35)
  • Flink---5、聚合算子、用户自定义函数、物理分区算子、分流、合流

                           星光下的赶路人star的个人主页                        欲买桂花同载酒,终不似,少年游 计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并—这就是

    2024年02月07日
    浏览(35)
  • Flink中aggregate[AggregateFunction]的使用及讲解

    Flink的 aggregate() 方法一般是通过实现 AggregateFunction 接口对数据流进行聚合计算的场景。例如,在使用 Flink 的 DataStream API 时,用户经常需要对输入数据进行分组操作,并按照一组 key 对数据进行汇总、运算或聚合计算。对于这些场景,可以使用 aggregate() 方法来实现聚合计算。

    2024年02月15日
    浏览(24)
  • FlinkSQL【分组聚合-多维分析-性能调优】应用实例分析

    FlinkSQL处理如下实时数据需求: 实时聚合不同 类型/账号/发布时间 的各个指标数据,比如: 初始化/初始化后删除/初始化后取消/推送/成功/失败 的指标数据。要求实时产出指标数据,数据源是mysql cdc binlog数据。 其他配置 flink集群参数 检查点配置 job运行资源 管理节点(JM)

    2024年01月17日
    浏览(51)
  • Java8之Function函数、BiFunction函数详解

    众所周知,Java8提供了一下非常重要的函数式接口。今天我们就来讲讲其中一个函数式接口----- Function 接口。 下面的代码就是Function接口的全部代码。接下来我们逐个分析一下。 @FunctionalInterface 表明该接口是一个函数式接口 T, R T 代表参数,R 代表返回值。 第一个方法------R

    2024年02月11日
    浏览(24)
  • MySQL:聚合函数(全面详解)

    欢迎 免费报名 由CSDN主办的【Java基础及数据库Mysql的新星计划】(点击跳转) 我们特别为 Java基础 和 MySQL 这两个赛道开设了培训课程。 通过参与这个赛道,你将有机会与其他志同道合的学习者进行交流和互动,我们提供一系列精心设计的课程内容,旨在帮助你建立扎实的Java基

    2024年02月11日
    浏览(30)
  • Flink 学习十 FlinkSQL

    flink sql 基于flink core ,使用sql 语义方便快捷的进行结构化数据处理的上层库; 类似理解sparksql 和sparkcore , hive和mapreduce 1.1 工作流程 整体架构和工作流程 数据流,绑定元数据 schema ,注册成catalog 中的表 table / view 用户使用table Api / table sql 来表达计算逻辑 table-planner利用 apache calci

    2024年02月10日
    浏览(35)
  • Flink 优化(六) --------- FlinkSQL 调优

    FlinkSQL 官网配置参数: https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html Flink SQL 新手有可能犯的错误,其中之一就是忘记设置空闲状态保留时间导致状态爆炸。列举两个场景: ➢ FlinkSQL 的 regular join(inner、left、right),左右表的数据都会一直保存在状态里,不

    2024年02月14日
    浏览(31)
  • Flink:FlinkSql解析嵌套Json

    日常开发中都是用的简便json格式,但是偶尔也会遇到嵌套json的时候,因此在用flinksql的时候就有点麻烦,下面用简单例子简单定义处理下 1,数据是网上摘抄,但包含里常用的大部分格式 {     \\\"afterColumns\\\": {         \\\"created\\\": \\\"1589186680\\\",         \\\"extra\\\": {             \\\"

    2023年04月09日
    浏览(26)
  • 损失函数(Loss Function)一文详解-分类问题常见损失函数Python代码实现+计算原理解析

    目录 前言 一、损失函数概述 二、损失函数分类 1.分类问题的损失函数

    2023年04月26日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包