1 累加器(Accumulator)
累加器是实现了 加法运算 功能和 合并运算(合并多个累加器的结果)功能的一种数据结构,在作业结束后,可以获取所有部分(各个 operator 的各个 subtask)合并后的最终结果并发送到客户端。
Flink 的累加器均实现了 Accumulator
接口,包括如下 2 个方法用于支持加法运算和合并最终结果:
-
add(V value)
:执行加法运算,将值V
累加到当前 UDF 的累加器中 -
merge(Accumulator<V, R> other)
:执行合并操作,将累加器other
与当前累加器合并
累加器的使用方法如下:
Step 1|在需要使用累加器的 UDF 中创建一个累加器对象(此处以计数器为例)
private IntCounter numLines = new IntCounter();
Step 2|在富函数的 open()
方法中注册累加器对象,在注册时需要定义累加器名称用于查询结果
getRuntimeContext().addAccumulator("num-lines", this.numLines);
Step 3|在 UDF 的任何地方(包括 open()
和 close()
方法中)使用累加器
this.numLines.add(1);
Step 4|最终整体结果会存储在由执行环境的 execute()
方法返回的 JobExecutionResult
对象中。
myJobExecutionResult.getAccumulatorResult("num-lines");
单个作业的所有累加器 共享一个命名空间,因此可以在不同算子(operator)的不同 UDF 中使用同一个累加器,Flink 会合并将所有具有相同名称的累加器。
注意:当前累加器的结果只有在整个作业结束后才可以使用。
2 Accumulator 接口和 SimpleAccumulator 接口
Flink 内置的所有累加器都实现了 Accumulator
接口。
如果需要自定义累加器,只需要实现 Accumulator
接口或 SimpleAccumulator
接口即可。
2.1 Accumulator<V, R>
在 Accumulator
接口中,共定义了以下 5 个方法:
-
add(V value)
:将value
累加到当前累加器中(加法运算) -
getLocalValue()
:获取当前 UDF 中的累加器的值 -
resetLocal()
:重置当前 UDF 中的累加器的值 -
merge(Accumulator<V, R> other)
:将other
合并到当前累加器中,用于在 Flink 系统内部合并多个累加器(合并运算) -
clone()
:复制累加器对象
其中包含 2 个泛型,V
类型表示每一次向累加器中的累加的值的类型,这个类型不要求是可序列化的;R
类型表示累加器结果的类型,这个类型必须是可序列化的。不要求累加的值与累加器结果的值类型相同,可以支持类似直方图、列表等场景。
因为累加器需要在不同字节之间复制、传输,所以累加器自己必须是可序列化的、可复制的。
源码|Github|org.apache.flink.api.common.accumulators.Accumulator
/**
* Accumulators collect distributed statistics or aggregates in a from user functions and operators.
* Each parallel instance creates and updates its own accumulator object, and the different parallel
* instances of the accumulator are later merged. merged by the system at the end of the job. The
* result can be obtained from the result of a job execution, or from the web runtime monitor.
*
* <p>The accumulators are inspired by the Hadoop/MapReduce counters.
*
* <p>The type added to the accumulator might differ from the type returned. This is the case e.g.
* for a set-accumulator: We add single objects, but the result is a set of objects.
*
* @param <V> Type of values that are added to the accumulator
* @param <R> Type of the accumulator result as it will be reported to the client
*/
@Public
public interface Accumulator<V, R extends Serializable> extends Serializable, Cloneable {
/** @param value The value to add to the accumulator object */
void add(V value);
/** @return local The local value from the current UDF context */
R getLocalValue();
/** Reset the local value. This only affects the current UDF context. */
void resetLocal();
/**
* Used by system internally to merge the collected parts of an accumulator at the end of the
* job.
*
* @param other Reference to accumulator to merge in.
*/
void merge(Accumulator<V, R> other);
/**
* Duplicates the accumulator. All subclasses need to properly implement cloning and cannot
* throw a {@link java.lang.CloneNotSupportedException}
*
* @return The duplicated accumulator.
*/
Accumulator<V, R> clone();
}
2.2 SimpleAccumulator<T>
SimpleAccumulator
是简化版的 Accumulator
,它继承了 Accumulator<V, R>
,但是要求累加的值的类型与累加器的结果的类型必须相同,适用于一些相对简单的操作,例如计数器。
源码|Github|org.apache.flink.api.common.accumulators.SimpleAccumulator
/** Similar to Accumulator, but the type of items to add and the result value must be the same. */
@Public
public interface SimpleAccumulator<T extends Serializable> extends Accumulator<T, T> {}
3 Flink 内置累加器
可以看到 Flink 内置的累加器,除 Histogram
、ListAccumulator
等累加器的累加类型与结果类型不同,直接实现了 Accumulator
接口以外,其他累加器均实现 SimpleAccumulator
接口。
3.1 内置累加器列表
这些 Flink 内置的累加器如下:
累加器类 | 累加器用途 | 输入类型 | 输出类型 |
---|---|---|---|
AverageAccumulator |
计算平均值 |
Double (double / long / int ) |
Double |
DoubleCounter |
计算 Double 类型的和 |
Double |
Double |
DoubleMaximum |
计算 Double 类型最大值 |
Double |
Double |
DoubleMinimum |
计算 Double 类型最小值 |
Double |
Double |
IntCounter |
计算 Integer 类型的和 |
Integer |
Integer |
IntMaximum |
计算 Integer 类型最大值 |
Integer |
Integer |
IntMinimum |
计算 Integer 类型最小值 |
Integer |
Integer |
LongCounter |
计算 Long 类型的和 |
Long |
Long |
LongMaximum |
计算 Long 类型的最大值 |
Long |
Long |
LongMinimum |
计算 Long 类型的最小值 |
Long |
Long |
Histogram |
直方图 | Integer |
TreeMap<Integer, Integer> |
ListAccumulator |
列表累加器(将元素存储到列表中) | T |
ArrayList<T> |
SerializedListAccumulator |
序列化的列表累加器(将元素序列化存储到列表中) | T |
ArrayList<byte[]> |
3.2 累加器实现样例
以上列表中累加器的实现逻辑是类似的,我们具体来看 AverageAccumulator
作为样例。
源码|Github|org.apache.flink.api.common.accumulators.AverageAccumulator
(部分)
@Public
public class AverageAccumulator implements SimpleAccumulator<Double> {
private long count;
private double sum;
@Override
public void add(Double value) {
this.count++;
this.sum += value;
}
@Override
public Double getLocalValue() {
if (this.count == 0) {
return 0.0;
}
return this.sum / this.count;
}
@Override
public void resetLocal() {
this.count = 0;
this.sum = 0;
}
@Override
public void merge(Accumulator<Double, Double> other) {
if (other instanceof AverageAccumulator) {
AverageAccumulator avg = (AverageAccumulator) other;
this.count += avg.count;
this.sum += avg.sum;
} else {
throw new IllegalArgumentException(
"The merged accumulator must be AverageAccumulator.");
}
}
@Override
public AverageAccumulator clone() {
AverageAccumulator average = new AverageAccumulator();
average.count = this.count;
average.sum = this.sum;
return average;
}
}
AverageAccumulator
在对象属性中存储了当前累加器中所有元素的和 sum
以及元素的数量 count
。
在调用 add(Double value)
方法执行加法运算时,累加 sum
和 count
。
在调用 merge(Accumulator<Double, Double> other)
方法执行合并运算时,将另一个累加器 other
的 count
和 sum
累加到当前累加器中。文章来源:https://www.toymoban.com/news/detail-833202.html
在调用 clone()
方法复制累加器对象时,创建一个新的 AverageAccumulator
对象,并将当前累加器的 count
和 sum
复制给该对象。文章来源地址https://www.toymoban.com/news/detail-833202.html
参考文档
- 《Flink 官方文档:应用开发 - DataStream API - 用户自定义 Functions》
到了这里,关于Flink 源码剖析|累加器的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!