4.1 累加器(Accumulator)
累加器是实现了 加法运算 功能和 合并运算(合并多个累加器的结果)功能的一种数据结构,在作业结束后,可以获取所有部分(各个 operator 的各个 subtask)合并后的最终结果并发送到客户端。
Flink 的累加器均实现了 Accumulator
接口,包括如下 2 个方法用于支持加法运算和合并最终结果:
-
add(V value)
:执行加法运算,将值V
累加到当前 UDF 的累加器中 -
merge(Accumulator<V, R> other)
:执行合并操作,将累加器other
与当前累加器合并
累加器的使用方法如下:
Step 1|在需要使用累加器的 UDF 中创建一个累加器对象(此处以计数器为例)
private IntCounter numLines = new IntCounter();
Step 2|在富函数的 open()
方法中注册累加器对象,在注册时需要定义累加器名称用于查询结果
getRuntimeContext().addAccumulator("num-lines", this.numLines);
Step 3|在 UDF 的任何地方(包括 open()
和 close()
方法中)使用累加器
this.numLines.add(1);
Step 4|最终整体结果会存储在由执行环境的 execute()
方法返回的 JobExecutionResult
对象中。
myJobExecutionResult.getAccumulatorResult("num-lines");
单个作业的所有累加器 共享一个命名空间,因此可以在不同算子(operator)的不同 UDF 中使用同一个累加器,Flink 会合并将所有具有相同名称的累加器。
注意:当前累加器的结果只有在整个作业结束后才可以使用。
4.2 Accumulator 接口和 SimpleAccumulator 接口
Flink 内置的所有累加器都实现了 Accumulator
接口。
如果需要自定义累加器,只需要实现 Accumulator
接口或 SimpleAccumulator
接口即可。
4.2.1 Accumulator<V, R>
在 Accumulator
接口中,共定义了以下 5 个方法:
-
add(V value)
:将value
累加到当前累加器中(加法运算) -
getLocalValue()
:获取当前 UDF 中的累加器的值 -
resetLocal()
:重置当前 UDF 中的累加器的值 -
merge(Accumulator<V, R> other)
:将other
合并到当前累加器中,用于在 Flink 系统内部合并多个累加器(合并运算) -
clone()
:复制累加器对象
其中包含 2 个泛型,V
类型表示每一次向累加器中的累加的值的类型,这个类型不要求是可序列化的;R
类型表示累加器结果的类型,这个类型必须是可序列化的。不要求累加的值与累加器结果的值类型相同,可以支持类似直方图、列表等场景。
因为累加器需要在不同字节之间复制、传输,所以累加器自己必须是可序列化的、可复制的。
源码|Github|org.apache.flink.api.common.accumulators.Accumulator
/**
* Accumulators collect distributed statistics or aggregates in a from user functions and operators.
* Each parallel instance creates and updates its own accumulator object, and the different parallel
* instances of the accumulator are later merged. merged by the system at the end of the job. The
* result can be obtained from the result of a job execution, or from the web runtime monitor.
*
* <p>The accumulators are inspired by the Hadoop/MapReduce counters.
*
* <p>The type added to the accumulator might differ from the type returned. This is the case e.g.
* for a set-accumulator: We add single objects, but the result is a set of objects.
*
* @param <V> Type of values that are added to the accumulator
* @param <R> Type of the accumulator result as it will be reported to the client
*/
@Public
public interface Accumulator<V, R extends Serializable> extends Serializable, Cloneable {
/** @param value The value to add to the accumulator object */
void add(V value);
/** @return local The local value from the current UDF context */
R getLocalValue();
/** Reset the local value. This only affects the current UDF context. */
void resetLocal();
/**
* Used by system internally to merge the collected parts of an accumulator at the end of the
* job.
*
* @param other Reference to accumulator to merge in.
*/
void merge(Accumulator<V, R> other);
/**
* Duplicates the accumulator. All subclasses need to properly implement cloning and cannot
* throw a {@link java.lang.CloneNotSupportedException}
*
* @return The duplicated accumulator.
*/
Accumulator<V, R> clone();
}
4.2.2 SimpleAccumulator<T>
SimpleAccumulator
是简化版的 Accumulator
,它继承了 Accumulator<V, R>
,但是要求累加的值的类型与累加器的结果的类型必须相同,适用于一些相对简单的操作,例如计数器。
源码|Github|org.apache.flink.api.common.accumulators.SimpleAccumulator
/** Similar to Accumulator, but the type of items to add and the result value must be the same. */
@Public
public interface SimpleAccumulator<T extends Serializable> extends Accumulator<T, T> {}
4.3 Flink 内置累加器
可以看到 Flink 内置的累加器,除 Histogram
、ListAccumulator
等累加器的累加类型与结果类型不同,直接实现了 Accumulator
接口以外,其他累加器均实现 SimpleAccumulator
接口。
4.3.1 内置累加器列表
这些 Flink 内置的累加器如下:
累加器类 | 累加器用途 | 输入类型 | 输出类型 |
---|---|---|---|
AverageAccumulator |
计算平均值 |
Double (double / long / int ) |
Double |
DoubleCounter |
计算 Double 类型的和 |
Double |
Double |
DoubleMaximum |
计算 Double 类型最大值 |
Double |
Double |
DoubleMinimum |
计算 Double 类型最小值 |
Double |
Double |
IntCounter |
计算 Integer 类型的和 |
Integer |
Integer |
IntMaximum |
计算 Integer 类型最大值 |
Integer |
Integer |
IntMinimum |
计算 Integer 类型最小值 |
Integer |
Integer |
LongCounter |
计算 Long 类型的和 |
Long |
Long |
LongMaximum |
计算 Long 类型的最大值 |
Long |
Long |
LongMinimum |
计算 Long 类型的最小值 |
Long |
Long |
Histogram |
直方图 | Integer |
TreeMap<Integer, Integer> |
ListAccumulator |
列表累加器(将元素存储到列表中) | T |
ArrayList<T> |
SerializedListAccumulator |
序列化的列表累加器(将元素序列化存储到列表中) | T |
ArrayList<byte[]> |
4.3.2 累加器实现样例
以上列表中累加器的实现逻辑是类似的,我们具体来看 AverageAccumulator
作为样例。
源码|Github|org.apache.flink.api.common.accumulators.AverageAccumulator
(部分)
@Public
public class AverageAccumulator implements SimpleAccumulator<Double> {
private long count;
private double sum;
@Override
public void add(Double value) {
this.count++;
this.sum += value;
}
@Override
public Double getLocalValue() {
if (this.count == 0) {
return 0.0;
}
return this.sum / this.count;
}
@Override
public void resetLocal() {
this.count = 0;
this.sum = 0;
}
@Override
public void merge(Accumulator<Double, Double> other) {
if (other instanceof AverageAccumulator) {
AverageAccumulator avg = (AverageAccumulator) other;
this.count += avg.count;
this.sum += avg.sum;
} else {
throw new IllegalArgumentException(
"The merged accumulator must be AverageAccumulator.");
}
}
@Override
public AverageAccumulator clone() {
AverageAccumulator average = new AverageAccumulator();
average.count = this.count;
average.sum = this.sum;
return average;
}
}
AverageAccumulator
在对象属性中存储了当前累加器中所有元素的和 sum
以及元素的数量 count
。
在调用 add(Double value)
方法执行加法运算时,累加 sum
和 count
。
在调用 merge(Accumulator<Double, Double> other)
方法执行合并运算时,将另一个累加器 other
的 count
和 sum
累加到当前累加器中。
在调用 clone()
方法复制累加器对象时,创建一个新的 AverageAccumulator
对象,并将当前累加器的 count
和 sum
复制给该对象。
4.4 AccumulatorHelper 类中累加器相关工具方法
在 AccumulatorHelper
类中,有一些用于操作累加器的静态工具方法。下面,我们来看一下其中在部分外部被使用的方法:
4.4.1 mergeInto():合并 2 个累加器命名空间
在 Flink 中,每个作业的所有累加器共享一个命名空间,Flink 会合并具有相同名称的累加器。这个累加器的命名空间使用 Map<String, Accumulator<?, ?>>
类型存储,在 mergeInto()
方法的 target
和 toMerge
参数均为累加器的命名空间。
源码|Github|org.apache.flink.api.common.accumulators.AccumulatorHelper#mergeInto
public static void mergeInto(
Map<String, OptionalFailure<Accumulator<?, ?>>> target,
Map<String, Accumulator<?, ?>> toMerge) {
for (Map.Entry<String, Accumulator<?, ?>> otherEntry : toMerge.entrySet()) {
OptionalFailure<Accumulator<?, ?>> ownAccumulator = target.get(otherEntry.getKey());
if (ownAccumulator == null) {
// Create initial counter (copy!)
target.put(
otherEntry.getKey(),
wrapUnchecked(otherEntry.getKey(), () -> otherEntry.getValue().clone()));
} else if (ownAccumulator.isFailure()) {
continue;
} else {
Accumulator<?, ?> accumulator = ownAccumulator.getUnchecked();
// Both should have the same type
compareAccumulatorTypes(
otherEntry.getKey(),
accumulator.getClass(),
otherEntry.getValue().getClass());
// Merge target counter with other counter
target.put(
otherEntry.getKey(),
wrapUnchecked(
otherEntry.getKey(),
() -> mergeSingle(accumulator, otherEntry.getValue().clone())));
}
}
}
private static <V, R extends Serializable> Accumulator<V, R> mergeSingle(
Accumulator<?, ?> target, Accumulator<?, ?> toMerge) {
@SuppressWarnings("unchecked")
Accumulator<V, R> typedTarget = (Accumulator<V, R>) target;
@SuppressWarnings("unchecked")
Accumulator<V, R> typedToMerge = (Accumulator<V, R>) toMerge;
typedTarget.merge(typedToMerge);
return typedTarget;
}
参数 target
为需要合并到的累加器 Map,toMerge
参数为需要被合并的累加器的 Map,在合并时:
- 如果
target
中没有对应的累加器,则调用累加器的clone()
方法将toMerge
中的累加器复制到target
中 - 如果
target
中有对应的累加器,则先检查两个累加器的类型后,在mergeSingle
方法中调用累加器的merge()
方法将onMerge
中的累加器合并到target
的累加器中
源码阅读思路|在 Flink 官方文档中,提到单个作业的所有累加器共享一个命名空间,Flink 会合并所有具有相同名称的累加器。从功能上看,这个方法用于合并两个累加器的 Map,应该会在 Flink 执行作业过程中,接近结束作业时被调用。因此,搜索这个方法被调用的位置,可以找到 Flink 执行作业的逻辑。具体地,这个方法被调用的位置如下:
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph#aggregateUserAccumulators
org.apache.flink.runtime.executiongraph.ExecutionJobVertex#getAggregatedUserAccumulatorsStringified
4.4.2 toResultMap():对累加器的命名空间计算结果
源码|Github|org.apache.flink.api.common.accumulators.AccumulatorHelper#toResultMap
public static Map<String, OptionalFailure<Object>> toResultMap(
Map<String, Accumulator<?, ?>> accumulators) {
Map<String, OptionalFailure<Object>> resultMap = new HashMap<>();
for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) {
resultMap.put(
entry.getKey(),
wrapUnchecked(entry.getKey(), () -> entry.getValue().getLocalValue()));
}
return resultMap;
}
在命名空间中,逐个调用累加器的 getLocalValue()
方法获取最终结果并写入到新的 Map
中。
4.4.3 deserializeAccumulators():将序列化的累加器反序列化
源码|Github|org.apache.flink.api.common.accumulators.AccumulatorHelper#deserializeAccumulators
public static Map<String, OptionalFailure<Object>> deserializeAccumulators(
Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators,
ClassLoader loader)
throws IOException, ClassNotFoundException {
if (serializedAccumulators == null || serializedAccumulators.isEmpty()) {
return Collections.emptyMap();
}
Map<String, OptionalFailure<Object>> accumulators =
CollectionUtil.newHashMapWithExpectedSize(serializedAccumulators.size());
for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> entry :
serializedAccumulators.entrySet()) {
OptionalFailure<Object> value = null;
if (entry.getValue() != null) {
value = entry.getValue().deserializeValue(loader);
}
accumulators.put(entry.getKey(), value);
}
return accumulators;
}
逐个遍历序列化的累加器中的每个键值对,调用 org.apache.flink.util.SerializedValue#deserializeValue
方法对序列化的值进行反序列化,并将结果添加到新的 HashMap
中。文章来源:https://www.toymoban.com/news/detail-840096.html
在实现上,因为反序列化后的 HashMap
中的元素数量与反序列化之前一致,所以可以通过在初始化新的 HashMap
时直接指定 HashMap
的容量,来避免扩容时带来的性能损耗。这里调用了 org.apache.flink.util.CollectionUtil#newHashMapWithExpectedSize
方法,该方法中创建了指定容积及扩容比例的 HashMap
。文章来源地址https://www.toymoban.com/news/detail-840096.html
参考文档
- 《Flink 官方文档:应用开发 - DataStream API - 用户自定义 Functions》
到了这里,关于Flink 源码剖析|4. 累加器与相关工具方法的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!