Flink中aggregate[AggregateFunction]的使用及讲解

这篇具有很好参考价值的文章主要介绍了Flink中aggregate[AggregateFunction]的使用及讲解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink的aggregate()方法一般是通过实现AggregateFunction接口对数据流进行聚合计算的场景。例如,在使用 Flink 的DataStream API时,用户经常需要对输入数据进行分组操作,并按照一组 key对数据进行汇总、运算或聚合计算。对于这些场景,可以使用 aggregate()方法来实现聚合计算。通过指定一个AggregateFunction类型的函数作为聚合操作来调用aggregate()方法,可以对元素流进行聚合和处理,生成新的输出流。在具体应用中,根据不同的业务需求,可以根据实际情况选择不同类型的AggregateFunction来完成聚合计算任务。
接下来先对AggregateFunction中的需要实现的4个方法进行说明
1. createAccumulator()
此方法用于创建累加器,并将其初始化为默认值
2. add()
此方法将输入的元素添加到累加器,返回更新后的累加器
3. getResult()
此方法用于从累加器中提取操作的结果
4. merge()
此方法将两个累加器合并为一个新的累加器

下面在通过代码实例说明AggregateFunction的使用,这里都以Tuple2类型作为举例说明

  • 求平均值
public static class AverageAggregate implements AggregateFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>> {

        @Override
        public Tuple2<String, Double> createAccumulator() {
        // 先将累加器进行初始化,这里给了一个""作为key, 0.0作为值
            return Tuple2.of("", 0.0); 
        }

        @Override
        public Tuple2<String, Double> add(Tuple2<String, Double> value, Tuple2<String, Double> accumulator) {
        // 这里的实现是将输入的元素和累加器中的元素相加,并返回一个新的元素
            return Tuple2.of(value.f0, accumulator.f1 + value.f1); 
        }

        @Override
        public Tuple2<String, Double> getResult(Tuple2<String, Double> accumulator) {
        // 这里返回一个包含平均值的 Tuple2 对象,这里是将累加器中的元素除以2,然后返回一个新元素。
            return Tuple2.of(accumulator.f0, accumulator.f1 / 2.0);
        }

        @Override
        public Tuple2<String, Double> merge(Tuple2<String, Double> a, Tuple2<String, Double> b) {
        // 这里是将两个累加器中的元素相加并除以2,然后返回一个新的元素对。
            return Tuple2.of(a.f0, (a.f1 + b.f1) / 2);
        }
    }
  • 求最大值
public class MaxAggregateFunction implements AggregateFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>> {
    @Override
    public Tuple2<String, Double> createAccumulator() {
        return Tuple2.of("", Double.MIN_VALUE); // 将累加器初始化为最小值
    }

    @Override
    public Tuple2<String, Double> add(Tuple2<String, Double> value, Tuple2<String, Double> accumulator) {
        if (value.f1 > accumulator.f1) {
            return Tuple2.of(value.f0, value.f1);
        } else {
            return accumulator;
        }
    }

    @Override
    public Tuple2<String, Double> getResult(Tuple2<String, Double> accumulator) {
        return accumulator; // 返回最大值
    }

    @Override
    public Tuple2<String, Double> merge(Tuple2<String, Double> a,
            Tuple2<String, Double> b) {
        if (a.f1 > b.f1) {
            return a;
        } else {
            return b;
        }
    }
}
  • 求最小值
public class MinAggregateFunction implements AggregateFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>> {
    @Override
    public Tuple2<String, Double> createAccumulator() {
        return Tuple2.of("", Double.MAX_VALUE); // 将累加器初始化为最大值
    }

    @Override
    public Tuple2<String, Double> add(Tuple2<String, Double> value, Tuple2<String, Double> accumulator) {
        if (value.f1 < accumulator.f1) {
            return Tuple2.of(value.f0, value.f1);
        } else {
            return accumulator;
        }
    }

    @Override
    public Tuple2<String, Double> getResult(Tuple2<String, Double> accumulator) {
        return accumulator; // 返回最小值
    }

    @Override
    public Tuple2<String, Double> merge(Tuple2<String, Double> a,
            Tuple2<String, Double> b) {
        if (a.f1 < b.f1) {
            return a;
        } else {
            return b;
        }
    }
}
  • 求和
public class SumAggregateFunction implements AggregateFunction<Tuple2<String, Double>, Double, Double> {
    @Override
    public Double createAccumulator() {
        return 0.0; // 将累加器初始化为0
    }

    @Override
    public Double add(Tuple2<String, Double> value, Double accumulator) {
        return value.f1 + accumulator; // 将输入元素和累加器中的元素相加
    }

    @Override
    public Double getResult(Double accumulator) {
        return accumulator; // 返回总和
    }

    @Override
    public Double merge(Double a,
            Double b) {
        return a + b; // 合并两个累加器中的元素相加
    }
}

以上代码就是通过实现AggregateFunction接口,自定义不同的逻辑达到求平均值、最大值、最小值、总和的目的。

  • 方法调用演示
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/2/1
 * @Description: 测试
 **/
public class Demo1 {
    public static void main(String[] args) throws Exception {
        Properties prop = new Properties();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 这里以kafka作为数据源
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("lx01:9092")
                .setTopics("topic-01")
                .setGroupId("g02")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "KafkaSource");
        // 先将数据转成需要的Tuple2的形式
        SingleOutputStreamOperator<Tuple2<String, Double>> mapStream = stream.map((MapFunction<String, Tuple2<String, Double>>) value -> {
            JSONObject data = JSONObject.parseObject(JSONObject.parseObject(value).get("data").toString());
            return Tuple2.of(data.getString("gender"), data.getDouble("salary"));
        }).returns(TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {
        }));

        // 这里先通过keyBy将数据根据性别进行分组,然后5秒为一个窗口,再求不同性别对应的工资平均值
        SingleOutputStreamOperator<Tuple2<String, Double>> avg = mapStream.keyBy((KeySelector<Tuple2<String, Double>, String>) value -> {
            String key = value.f0;
            return key;
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new AverageAggregate()); // 这里调用平均值的AggregateFunction

        avg.print();
        env.execute();
    }

  • AggregateFunction的这4个方法在flink中执行的原理
    在 Flink 中AggregateFunction的这四个方法在执行过程中会被转化为对应的内部Function对象,用于Flink的运行时执行计算。

    1. 在数据输入流进入 Flink 的过程中,Flink会为每一个key创建对应的累加器。键值对流会按照键所在的组进行分区,然后把每一个组的所有元素分配到一个task slot中,并为每个key创建一个累加器。累加器的类型是任务的一个状态的函数,Flink根据累加器函数的类型来决定使用哪种累加器。
    2. 当每个数据元素输入到累加器中时,add()方法会被调用。add()方法对输入的元素进行变换,然后更新累加器中的结果,返回新的结果给Flink 的相应算子。
    3. 在结果计算完毕后,getResult()方法将被调用,并将结果返回给 Flink。最后,如果有多个累加器需要合并的情况,Flink 会调用merge()方法将结果进行合并。通过这样的执行机制,AggregateFunction对象可以更加灵活快捷地处理数据。
  • 累加器的选择
    当 Flink 创建累加器时,它会根据AggregateFunction的类型来确定使用哪种类型的累加器。具体来说,Flink支持两种类型的累加器:heap-basedincremental
    heap-based累加器需要在内存中存储完整的所有元素,对于数据量较小的情况,它可以提供最好的性能。对于数据量更大的情况,它可能会导致内存不足的问题。
    incremental累加器可以在输入元素上进行增量操作,并在内存中保存仅仅是必要的元素。它可以处理更大的数据量,并且在内存使用上更加高效。在使用增量式累加器时,用户需要重写accumulate()retract()方法。
    根据AggregateFunction的类型,Flink会自动选择合适的累加器类型来进行计算,以提高计算的效率和性能。

以上就是对aggregate方法的使用讲解及简单的原理介绍文章来源地址https://www.toymoban.com/news/detail-617313.html

到了这里,关于Flink中aggregate[AggregateFunction]的使用及讲解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据Flink(一百零三):SQL 表值聚合函数(Table Aggregate Function)

    文章目录 SQL 表值聚合函数(Table Aggregate Function) Python UDTAF,即 Python TableAggregateFunction。Python UDTAF 用来针对一组数据进行聚合运算,比如同一个 window 下的多条数据、或者同一个 key 下的多条数据等,与 Python UDAF 不同的是,针对同一组输入数据,Python UDTAF 可以产生 0 条、1 条

    2024年02月07日
    浏览(34)
  • Flink基本原理剖析讲解

    Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级

    2024年02月16日
    浏览(30)
  • 使用Java代码远程提交flink任务

    导入依赖 参数格式参考: {     \\\"jarPath\\\":\\\"C:\\\\flink-1.13.5\\\\examples\\\\streaming\\\\WordCount.jar\\\",     \\\"parallelism\\\":1,     \\\"entryPointClassName\\\":\\\"org.apache.flink.streaming.examples.wordcount.WordCount\\\" }

    2024年02月11日
    浏览(37)
  • 使用java写一个对接flink的例子

    Maven依赖: 其中, flink.version 和 scala.binary.version 都需要替换为实际使用的版本号。 模拟数据生成: 这个程序使用 Flink 的 generateSequence() 方法生成 1000 个从 0 到 999 的数字作为模拟数据,将它们转化为字符串并拼接成键值对,然后使用 Flink 的 Kafka 生产者将数据写入到 Kafka 的

    2024年02月15日
    浏览(32)
  • PyFlink使用教程,Flink,Python,Java

    环境准备 环境要求 文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/installation/ 打开 Anaconda3 Prompt 下载的包存储在 Anaconda3envsPyFlink-1.17.1Libsite-packages PyFlink 案例 从Flink 1.11版本开始, PyFlink 作业支持在 Windows 系统上运行,因此您也可以在 Windows 上开发和调试

    2024年04月14日
    浏览(16)
  • Flink-Window详细讲解

    当谈到实时数据处理和流式计算,Apache Flink 是一个备受推崇的工具,它提供了丰富的功能来处理连续的数据流。其中,窗口(Window)是 Flink 中一个关键的概念,它使得我们能够在有限的数据集上执行各种计算和分析操作。本文将深入介绍 Flink 窗口的不同类型、使用方法以及

    2024年02月13日
    浏览(28)
  • 实例讲解Flink 流处理程序编程模型

    摘要: 在深入了解 Flink 实时数据处理程序的开发之前,先通过一个简单示例来了解使用 Flink 的 DataStream API 构建有状态流应用程序的过程。 本文分享自华为云社区《Flink 实例:Flink 流处理程序编程模型》,作者:TiAmoZhang 。 在深入了解 Flink 实时数据处理程序的开发之前,先

    2024年02月08日
    浏览(37)
  • Flink-Window详细讲解-countWindow

    一.countWindow和countWindowall区别 1.countWindow : 如果您使用 countWindow(5) ,这意味着您将数据流划分成多个大小为 5 的窗口。划分后的窗口如下: 窗口 1: [1, 2, 3, 4, 5] 窗口 2: [6, 7, 8, 9, 10] 当每个窗口中的元素数量达到 5 时,将触发计算。这意味着窗口 1 中的计算会在处理 5 个元素后

    2024年02月09日
    浏览(27)
  • 使用flink实现《实时数据分析》的案例 java版

    本文档介绍了使用Java和Flink实现实时数据分析的案例。该案例使用Flink的流处理功能,从Kafka主题中读取数据,进行实时处理和分析,并将结果输出到Elasticsearch中。 Java 8 Flink 1.13.2 Kafka 2.8.0 Elasticsearch 7.13.4 本案例使用Kafka作为数据源,从一个名为 user_behavior 的主题中读取数据。

    2024年02月08日
    浏览(31)
  • 使用flink实现《实时监控和日志分析》的案例 java版

    本文档介绍了使用Java和Flink实现实时监控和日志分析的案例。该案例旨在通过实时监控和日志分析来提高系统的可靠性和性能。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kjPKQuIf-1686052913444)(./architecture.png)] 如上图所示,该系统由以下组件组成

    2024年02月06日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包