Flink 归约聚合(reduce)

这篇具有很好参考价值的文章主要介绍了Flink 归约聚合(reduce)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

如果说简单聚合是对一些特定统计需求的实现,那么 reduce 算子就是一个一般化的聚合统计操作了。从大名鼎鼎的 MapReduce 开始,我们对 reduce 操作就不陌生:它可以对已有的
数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。与简单聚合类似,reduce 操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元
素数据类型,所以输出类型和输入类型是一样的。调用 KeyedStream 的 reduce 方法时,需要传入一个参数,实现 ReduceFunction 接口。接口在源码中的定义如下:

@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {

    /**
     * The core method of ReduceFunction, combining two values into one value of the same type. The
     * reduce function is consecutively applied to all values of a group until only a single value
     * remains.
     *
     * @param value1 The first value to combine.
     * @param value2 The second value to combine.
     * @return The combined value of both input values.
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the
     *     operation to fail and may trigger recovery.
     */
    T reduce(T value1, T value2) throws Exception;
}

ReduceFunction 接口里需要实现 reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件;所以,对于一组数据,我们可以先取两个进行合并,然后再
将合并的结果看作一个数据、再跟后面的数据合并,最终会将它“简化”成唯一的一个数据,这也就是 reduce“归约”的含义。在流处理的底层实现过程中,实际上是将中间“合并的结果”
作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

其实,reduce 的语义是针对列表进行规约操作,运算规则由 ReduceFunction 中的 reduce方法来定义,而在 ReduceFunction 内部会维护一个初始值为空的累加器,注意累加器的类型
和输入元素的类型相同,当第一条元素到来时,累加器的值更新为第一条元素的值,当新的元素到来时,新元素会和累加器进行累加操作,这里的累加操作就是 reduce 函数定义的运算规
则。然后将更新以后的累加器的值向下游输出。

我们可以单独定义一个函数类实现 ReduceFunction 接口,也可以直接传入一个匿名类。当然,同样也可以通过传入 Lambda 表达式实现类似的功能。与简单聚合类似,reduce 操作也会将 KeyedStream 转换为 DataStrema。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。下面我们来看一个稍复杂的例子。

我们将数据流按照用户 id 进行分区,然后用一个 reduce 算子实现 sum 的功能,统计每个用户访问的频次;进而将所有统计结果分到一组,用另一个 reduce 算子实现 maxBy 的功能,记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。

package com.rosh.flink.test;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;


/**
 * 我们将数据流按照用户 id 进行分区,然后用一个 reduce 算子实现 sum 的功能,统计每个
 * 用户访问的频次;进而将所有统计结果分到一组,用另一个 reduce 算子实现 maxBy 的功能,
 * 记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。
 */
public class TransReduceTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //随机生成数据
        Random random = new Random();
        List<Integer> userIds = new ArrayList<>();
        for (int i = 1; i <= 10; i++) {
            userIds.add(random.nextInt(5));
        }
        DataStreamSource<Integer> userIdDS = env.fromCollection(userIds);

        //每个ID访问记录一次
        SingleOutputStreamOperator<Tuple2<Integer, Long>> mapDS = userIdDS.map(new MapFunction<Integer, Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> map(Integer value) throws Exception {
                return new Tuple2<>(value, 1L);
            }
        });


        //统计每个user访问多少次
        SingleOutputStreamOperator<Tuple2<Integer, Long>> sumDS = mapDS.keyBy(tuple -> tuple.f0).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
                return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
            }
        });
        sumDS.print("sumDS  ->>>>>>>>>>>>>");


        //把所有分区合并,求出最大的访问量
        SingleOutputStreamOperator<Tuple2<Integer, Long>> maxDS = sumDS.keyBy(key -> true).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
                if (value1.f1 > value2.f1) {
                    return value1;
                } else {
                    return value2;
                }
            }
        });
        maxDS.print("maxDS ->>>>>>>>>>>");

        env.execute("TransReduceTest");
    }

}

Flink 归约聚合(reduce)文章来源地址https://www.toymoban.com/news/detail-435121.html

到了这里,关于Flink 归约聚合(reduce)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink流处理案例:实时数据聚合

    Apache Flink是一个流处理框架,可以处理大规模数据流,实现实时数据处理和分析。Flink支持各种数据源和接口,如Kafka、HDFS、TCP流等,可以实现高吞吐量、低延迟的流处理。 在本文中,我们将通过一个实际的Flink流处理案例来讲解Flink的核心概念、算法原理和最佳实践。我们将

    2024年02月19日
    浏览(45)
  • Flink配置Yarn日志聚合、配置历史日志。

    对于已经结束的yarn应用,flink进程已经退出无法提供webui服务。所以需要通过JobHistoryServer查看保留在yarn上的日志。 下面就给大家分享一下我在配置方面的经历吧。 1.yarn配置聚合日志 编辑 :yarn-site.xml 说明 : 开启后任务执行 “完毕” 后,才会上传日志至hdfs 查询 :yarn lo

    2024年02月10日
    浏览(36)
  • Flink学习20:聚合算子(sum,max,min)

    常见的聚合算子 sum,max,min等 聚合算子可以在在keyedStream 流上进行滚动的聚合(即累计的操作),而且同一个 keyedStream 流上只能调用一次 聚合算子      

    2024年02月07日
    浏览(63)
  • Flink---5、聚合算子、用户自定义函数、物理分区算子、分流、合流

                           星光下的赶路人star的个人主页                        欲买桂花同载酒,终不似,少年游 计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并—这就是

    2024年02月07日
    浏览(46)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(2)转换算子(Transformation)【基本转换算子、聚合算子】

    数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream。 map 是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个 “一 一映射”,消费一个元素就产出一个元素 。 我们只

    2024年01月23日
    浏览(48)
  • 大数据Flink(一百零二):SQL 聚合函数(Aggregate Function)

    文章目录 SQL 聚合函数(Aggregate Function) Python UDAF,即 Python AggregateFunction。Python UDAF 用来针对一组数据进行聚合运算,比如同一个 window 下的多条数据、或者同一个 key 下的多条数据等。针对同一组输入数据,Python AggregateFunction 产生一条输出数据。比如以下示例,定义了一个

    2024年02月08日
    浏览(40)
  • Flink系列Table API和SQL之:滚动窗口、滑动窗口、累计窗口、分组聚合

    有了时间属性,接下来就可以定义窗口进行计算了。窗口可以将无界流切割成大小有限的桶(bucket)来做计算,通过截取有限数据集来处理无限的流数据。在DataStream API中提供了对不同类型的窗口进行定义和处理的接口,而在Table API和SQL中,类似的功能也都可以实现。 在Flink 1

    2023年04月27日
    浏览(57)
  • 大数据Flink(一百零三):SQL 表值聚合函数(Table Aggregate Function)

    文章目录 SQL 表值聚合函数(Table Aggregate Function) Python UDTAF,即 Python TableAggregateFunction。Python UDTAF 用来针对一组数据进行聚合运算,比如同一个 window 下的多条数据、或者同一个 key 下的多条数据等,与 Python UDAF 不同的是,针对同一组输入数据,Python UDTAF 可以产生 0 条、1 条

    2024年02月07日
    浏览(40)
  • flink sql 实战实例 及延伸问题:聚合/数据倾斜/DAU/Hive流批一体 等

    ⭐ 需求:上游是一个 kafka 数据源,数据内容是用户 QQ 等级变化明细数据(time,uid,level)。需要你求出当前每个等级的用户数。 ⭐ 需求:数据源:用户心跳日志(uid,time,type)。计算分 Android,iOS 的 DAU,最晚一分钟输出一次当日零点累计到当前的结果。 经过测试 在fl

    2024年02月22日
    浏览(49)
  • Elasticsearch简单搜索以及聚合分析

    1.批量索引文档 如果你有大量文档要索引,你能通过 批量 API ( bulk API ) 来批量提交它们。批量文档操作比单独提交请求显著更快,因为它极简了网络往返。 最佳的批量数量取决于许多因素:文档的大小和复杂度、索引和搜索的负载以及集群可用资源。一种好的方式是批量

    2024年02月04日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包