Flink 源码剖析|累加器

这篇具有很好参考价值的文章主要介绍了Flink 源码剖析|累加器。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1 累加器(Accumulator)

累加器是实现了 加法运算 功能和 合并运算(合并多个累加器的结果)功能的一种数据结构,在作业结束后,可以获取所有部分(各个 operator 的各个 subtask)合并后的最终结果并发送到客户端。

Flink 的累加器均实现了 Accumulator 接口,包括如下 2 个方法用于支持加法运算和合并最终结果:

  • add(V value):执行加法运算,将值 V 累加到当前 UDF 的累加器中
  • merge(Accumulator<V, R> other):执行合并操作,将累加器 other 与当前累加器合并

累加器的使用方法如下:

Step 1|在需要使用累加器的 UDF 中创建一个累加器对象(此处以计数器为例)

private IntCounter numLines = new IntCounter();

Step 2|在富函数的 open() 方法中注册累加器对象,在注册时需要定义累加器名称用于查询结果

getRuntimeContext().addAccumulator("num-lines", this.numLines);

Step 3|在 UDF 的任何地方(包括 open()close() 方法中)使用累加器

this.numLines.add(1);

Step 4|最终整体结果会存储在由执行环境的 execute() 方法返回的 JobExecutionResult 对象中。

myJobExecutionResult.getAccumulatorResult("num-lines");

单个作业的所有累加器 共享一个命名空间,因此可以在不同算子(operator)的不同 UDF 中使用同一个累加器,Flink 会合并将所有具有相同名称的累加器。

注意:当前累加器的结果只有在整个作业结束后才可以使用。

2 Accumulator 接口和 SimpleAccumulator 接口

Flink 内置的所有累加器都实现了 Accumulator 接口。

如果需要自定义累加器,只需要实现 Accumulator 接口或 SimpleAccumulator 接口即可。

2.1 Accumulator<V, R>

Accumulator 接口中,共定义了以下 5 个方法:

  • add(V value):将 value 累加到当前累加器中(加法运算)
  • getLocalValue():获取当前 UDF 中的累加器的值
  • resetLocal():重置当前 UDF 中的累加器的值
  • merge(Accumulator<V, R> other):将 other 合并到当前累加器中,用于在 Flink 系统内部合并多个累加器(合并运算)
  • clone():复制累加器对象

其中包含 2 个泛型,V 类型表示每一次向累加器中的累加的值的类型,这个类型不要求是可序列化的;R 类型表示累加器结果的类型,这个类型必须是可序列化的。不要求累加的值与累加器结果的值类型相同,可以支持类似直方图、列表等场景。

因为累加器需要在不同字节之间复制、传输,所以累加器自己必须是可序列化的、可复制的。

源码|Github|org.apache.flink.api.common.accumulators.Accumulator

/**
 * Accumulators collect distributed statistics or aggregates in a from user functions and operators.
 * Each parallel instance creates and updates its own accumulator object, and the different parallel
 * instances of the accumulator are later merged. merged by the system at the end of the job. The
 * result can be obtained from the result of a job execution, or from the web runtime monitor.
 *
 * <p>The accumulators are inspired by the Hadoop/MapReduce counters.
 *
 * <p>The type added to the accumulator might differ from the type returned. This is the case e.g.
 * for a set-accumulator: We add single objects, but the result is a set of objects.
 *
 * @param <V> Type of values that are added to the accumulator
 * @param <R> Type of the accumulator result as it will be reported to the client
 */
@Public
public interface Accumulator<V, R extends Serializable> extends Serializable, Cloneable {
    /** @param value The value to add to the accumulator object */
    void add(V value);

    /** @return local The local value from the current UDF context */
    R getLocalValue();

    /** Reset the local value. This only affects the current UDF context. */
    void resetLocal();

    /**
     * Used by system internally to merge the collected parts of an accumulator at the end of the
     * job.
     *
     * @param other Reference to accumulator to merge in.
     */
    void merge(Accumulator<V, R> other);

    /**
     * Duplicates the accumulator. All subclasses need to properly implement cloning and cannot
     * throw a {@link java.lang.CloneNotSupportedException}
     *
     * @return The duplicated accumulator.
     */
    Accumulator<V, R> clone();
}

2.2 SimpleAccumulator<T>

SimpleAccumulator 是简化版的 Accumulator,它继承了 Accumulator<V, R>,但是要求累加的值的类型与累加器的结果的类型必须相同,适用于一些相对简单的操作,例如计数器。

源码|Github|org.apache.flink.api.common.accumulators.SimpleAccumulator

/** Similar to Accumulator, but the type of items to add and the result value must be the same. */
@Public
public interface SimpleAccumulator<T extends Serializable> extends Accumulator<T, T> {}

3 Flink 内置累加器

Flink 源码剖析|累加器,Flink,flink,Accumulator,累加器

可以看到 Flink 内置的累加器,除 HistogramListAccumulator 等累加器的累加类型与结果类型不同,直接实现了 Accumulator 接口以外,其他累加器均实现 SimpleAccumulator 接口。

3.1 内置累加器列表

这些 Flink 内置的累加器如下:

累加器类 累加器用途 输入类型 输出类型
AverageAccumulator 计算平均值 Doubledouble / long / int Double
DoubleCounter 计算 Double 类型的和 Double Double
DoubleMaximum 计算 Double 类型最大值 Double Double
DoubleMinimum 计算 Double 类型最小值 Double Double
IntCounter 计算 Integer 类型的和 Integer Integer
IntMaximum 计算 Integer 类型最大值 Integer Integer
IntMinimum 计算 Integer 类型最小值 Integer Integer
LongCounter 计算 Long 类型的和 Long Long
LongMaximum 计算 Long 类型的最大值 Long Long
LongMinimum 计算 Long 类型的最小值 Long Long
Histogram 直方图 Integer TreeMap<Integer, Integer>
ListAccumulator 列表累加器(将元素存储到列表中) T ArrayList<T>
SerializedListAccumulator 序列化的列表累加器(将元素序列化存储到列表中) T ArrayList<byte[]>

3.2 累加器实现样例

以上列表中累加器的实现逻辑是类似的,我们具体来看 AverageAccumulator 作为样例。

源码|Github|org.apache.flink.api.common.accumulators.AverageAccumulator(部分)

@Public
public class AverageAccumulator implements SimpleAccumulator<Double> {

    private long count;
    private double sum;

    @Override
    public void add(Double value) {
        this.count++;
        this.sum += value;
    }

    @Override
    public Double getLocalValue() {
        if (this.count == 0) {
            return 0.0;
        }
        return this.sum / this.count;
    }

    @Override
    public void resetLocal() {
        this.count = 0;
        this.sum = 0;
    }

    @Override
    public void merge(Accumulator<Double, Double> other) {
        if (other instanceof AverageAccumulator) {
            AverageAccumulator avg = (AverageAccumulator) other;
            this.count += avg.count;
            this.sum += avg.sum;
        } else {
            throw new IllegalArgumentException(
                    "The merged accumulator must be AverageAccumulator.");
        }
    }

    @Override
    public AverageAccumulator clone() {
        AverageAccumulator average = new AverageAccumulator();
        average.count = this.count;
        average.sum = this.sum;
        return average;
    }
}

AverageAccumulator 在对象属性中存储了当前累加器中所有元素的和 sum 以及元素的数量 count

在调用 add(Double value) 方法执行加法运算时,累加 sumcount

在调用 merge(Accumulator<Double, Double> other) 方法执行合并运算时,将另一个累加器 othercountsum 累加到当前累加器中。

在调用 clone() 方法复制累加器对象时,创建一个新的 AverageAccumulator 对象,并将当前累加器的 countsum 复制给该对象。文章来源地址https://www.toymoban.com/news/detail-833202.html

参考文档

  • 《Flink 官方文档:应用开发 - DataStream API - 用户自定义 Functions》

到了这里,关于Flink 源码剖析|累加器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark---累加器和广播变量

    累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。 运行结果: 我们预期是想要实现数据的累加,开始数据从Driver被传输到了Execut

    2024年01月18日
    浏览(39)
  • 计算机组成原理 累加器实验

    计算机组成原理实验环境 理解累加器的概念和作用。 连接运算器、存储器和累加器,熟悉计算机的数据通路。 掌握使用微命令执行各种操作的方法。 做好实验预习,读懂实验电路图,熟悉实验元器件的功能特性和使用方法。在实验之前设计好要使用的微命令,填入表 6-2 、

    2024年02月06日
    浏览(30)
  • Spark核心--checkpoint、 广播变量、累加器介绍

    rdd 的优化手段,可以提升计算速度。将计算过程中某个rdd保存在缓存或者hdfs上,在后面计算时,使用该rdd可以直接从缓存或者hdfs上直接读取数据 1-1 缓存使用 1、提升计算速度  2、容错 什么样的rdd需要缓存? 1、rdd的计算时间比较长,获取数据的计算比较复杂 2、rdd被频繁使

    2024年01月16日
    浏览(32)
  • Spark编程-共享变量(广播变量和累加器)

     Spark中的两个重要抽象一个是RDD,另一个就是共享变量。         在默认情况下, 当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本 。         但是,有时候,需要在多个任务之间共享变

    2024年02月16日
    浏览(39)
  • 【数字IC/FPGA】百度昆仑芯手撕代码--累加器

    已知一个加法器IP,其功能是计算两个数的和,但这个和延迟两个周期才会输出。现在有一串连续的数据输入,每个周期都不间断,试问最少需要例化几个上述的加法器IP,才可以实现累加的功能。 由于加法器两个周期后才能得到结果(再将该结果作为加法器的输入进行累加

    2024年02月09日
    浏览(29)
  • 大数据开发之Spark(累加器、广播变量、Top10热门品类实战)

    累加器:分布式共享只写变量。(executor和executor之间不能读数据) 累加器用来把executor端变量信息聚合到driver端。在driver中定义的一个变量,在executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回driver端进行合并计算。 1、累加器使用 1)

    2024年01月24日
    浏览(31)
  • 尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】

    视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01【SparkCore(概述、快速上手、运行环境、运行架构)】 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,RDD-核心属性-执行原理-基础编程-并行度与分区-转换算子)】 尚硅

    2024年02月01日
    浏览(74)
  • 《JUC并发编程 - 高级篇》05 -共享模型之无锁 (CAS | 原子整数 | 原子引用 | 原子数组 | 字段更新器 | 原子累加器 | Unsafe类 )

    有如下需求,保证 account.withdraw 取款方法的线程安全 原有实现并不是线程安全的 测试代码 执行测试代码,某次执行结果 5.1.1 为么不安全 withdraw 方法是临界区,会存在线程安全问题 查看下字节码 多线程在执行过程中可能会出现指令的交错,从而结果错误! 5.1.2 解决思路1

    2023年04月12日
    浏览(32)
  • 深入理解 Flink(五)Flink Standalone 集群启动源码剖析

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年02月02日
    浏览(39)
  • Flink 源码剖析|RuntimeContext 接口

    每个并行的实例都会包含一个 RuntimeContext 。 RuntimeContext 接口包含函数执行的上下文信息,提供了如下功能: 访问静态上下文信息(例如当前并行度) 添加及访问累加器 访问外部资源信息 访问广播变量和分布式缓存 访问并编辑状态 下面,我们逐类介绍 RuntimeContext 接口的方

    2024年02月22日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包