1 键控状态
在 Flink 中有如下 5 种键控状态(Keyed State),这些状态仅能在键控数据流(Keyed Stream)的算子(operator)上使用。键控流使用键(key)对数据流中的记录进行分区,同时也会对状态进行分区。要创建键控流,只需要在 DataStream 上使用 keyBy()
方法指定键即可。
具体地,这 5 种键控状态如下:
-
ValueState<T>
:保存一个值;这个值可以通过update(T)
进行更新,通过T value()
进行检索。 -
MapState<UK, UV>
:保存一个映射列表;可以使用put(UK, UV)
或putAll(Map<UK, UV>)
添加映射,使用get(UK)
来检索特定的 key,使用entires()
、keys()
、values()
分别检索映射、键和值的可迭代视图,使用isEmpty()
判断是否包含任何键值对。 -
ListState<T>
:保存一个元素的列表;可以通过add(T)
或者addAll(List<T>)
添加元素,通过Iterable<T> get()
获取整个列表,通过update(List<T>)
覆盖当前的列表。 -
ReducingState<T>
:保存一个值,表示添加到状态的所有值聚合后的结果;使用add(T)
添加元素,并使用提供的reduceFunction
进行聚合。 -
AggregatingState<IN, OUT>
:保存一个值,表示添加到状态的所有值聚合后的结果;使用add(T)
添加元素,并使用提供的AggregateFunction
进行聚合。与ReducingState
不同的时,聚合类型可能与添加到状态的元素类型不同。
所有的类型状态还有一个 clear()
方法,用于清除当前键的状态数据。
键控状态具有如下特性:
- 实现以上 5 个接口的状态对象仅用于与状态交互,状态本身不一定存储在内存中,还可能在磁盘或其他位置。
- 从状态中获取的值取决于输入元素所代表的 Key,在不同 key 上调用同一个接口,可能得到不同的值。
2 键控状态的源码
2.1 State
State
接口是所有状态接口的基类,其中只定义了一个 clear()
方法,用于移除当前 key 下的状态。
源码|Github|org.apache.flink.api.common.state.State
@PublicEvolving
public interface State {
/** Removes the value mapped under the current key. */
void clear();
}
2.2 ValueState:每个 key 存储一个值
ValueState<T>
接口是用于保存一个值的状态,其中定义了 value()
方法和 update(T value)
方法用于查询和更新当前 key 的状态;泛型 V
是存储的值的类型。
源码|Github|org.apache.flink.api.common.state.ValueState
@PublicEvolving
public interface ValueState<T> extends State {
T value() throws IOException;
void update(T value) throws IOException;
}
2.3 MapState:每个 key 存储一个映射
MapState<UK, UV>
接口用于保存一个映射,泛型 UK
是映射的键的类型,泛型 UV
是映射的值的类型。其方法与 Java 的 Map
接口类似,具体包含如下方法:
-
get(UK key)
:获取状态中key
对应的值 -
put(UK key, UV value)
:将键值对key
/value
写入到状态中 -
putAll(Map<UK, UV> map)
:将map
中的所有键值对写入到状态中 -
remove(UK key)
:移除状态中key
及其对应的值 -
contains(UK key)
:查询状态中是否包含key
-
entries()
/iterator()
:遍历状态中的所有键值对 -
keys()
:遍历状态中的所有键 -
values()
:遍历状态中的所有值 -
isEmpty()
:查询状态的映射是否为空
源码|Github|org.apache.flink.api.common.state.MapState
@PublicEvolving
public interface MapState<UK, UV> extends State {
UV get(UK key) throws Exception;
void put(UK key, UV value) throws Exception;
void putAll(Map<UK, UV> map) throws Exception;
void remove(UK key) throws Exception;
boolean contains(UK key) throws Exception;
Iterable<Map.Entry<UK, UV>> entries() throws Exception;
Iterable<UK> keys() throws Exception;
Iterable<UV> values() throws Exception;
Iterator<Map.Entry<UK, UV>> iterator() throws Exception;
boolean isEmpty() throws Exception;
}
2.4 AppendingState 及其子类:每个 key 存储一个累加状态
AppendingState<IN, OUT>
接口定义了一个具有类似累加器性质的状态,其泛型 IN
为每次添加元素的类型,OUT
为结果的类型。其中包含 2 个方法,add(IN value)
方法用于向累加器添加元素,get()
方法用于获取当前累加器的值。
源码|Github|org.apache.flink.api.common.state.AppendingState
@PublicEvolving
public interface AppendingState<IN, OUT> extends State {
OUT get() throws Exception;
void add(IN value) throws Exception;
}
MergingState<IN, OUT>
接口继承了 AppendingState<IN, OUT>
接口,要求在实现时支持类似累加器的合并运算,即将两个 MergingState
实例合并为包含 2 个实例信息的 1 个 MergingState
实例。
源码|Github|org.apache.flink.api.common.state.MergingState
@PublicEvolving
public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> {}
ListState<T>
、ReducingState<T>
和 AggregatingState<IN, OUT>
均继承了 MergingState<IN, OUT>
。
2.4.1 ListState
:输入元素,输出可迭代对象
ListState<T>
的 add(T value)
接受一个 T
类型的元素,get()
方法返回一个 Iterable<T>
。同时,额外定义了 2 个方法:
-
update(List<T> values)
:使用values
替换到当前状态中的列表 -
addAll(List<T> values)
:将values
添加到当前状态的列表中
源码|Github|org.apache.flink.api.common.state.ListState
@PublicEvolving
public interface ListState<T> extends MergingState<T, Iterable<T>> {
void update(List<T> values) throws Exception;
void addAll(List<T> values) throws Exception;
}
2.4.2 ReducingState
:输入和输出类型一致
ReducingState<T>
接口并没有定义新的方法,但是调整了 MergingState
接口的泛型类型,add(T value)
接受一个 T
类型的元素,get()
方法返回一个 T
类型的对象。
源码|Github|org.apache.flink.api.common.state.ReducingState
@PublicEvolving
public interface ReducingState<T> extends MergingState<T, T> {}
2.4.3 AggregatingState
:在每个输入元素后直接聚合
AggregatingState<IN, OUT>
接口并没有定义额外的逻辑,add(IN value)
接受一个 IN
类型的元素,get()
方法返回一个 OUT
类型的对象。但是规定在每个元素输入后,都需要直接将该元素聚合到当前的最终结果上。
源码|Github|org.apache.flink.api.common.state.AggregatingState
@PublicEvolving
public interface AggregatingState<IN, OUT> extends MergingState<IN, OUT> {}
3 使用键控状态
在使用键控状态时,必须创建一个 StateDescriptor
,并在 UDF 的 open()
方法中,并从 RuntimeContext
中获取该状态的状态句柄(state handle)。在状态句柄中,记录了状态名称、状态所持有值的类型以及用户所指定的函数。根据不同的状态类型,可以创建 ValueStateDescriptor
、ListStateDescriptor
、AggregatingStateDescriptor
、ReducingStateDescriptor
或 MapStateDescriptor
。
样例|获取 Tuple2<Long, Long>
类型 ValueState
的状态句柄
private transient ValueState<Tuple2<Long, Long>> sum; // 定义状态句柄
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // 状态名称
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // 状态的类型信息
Tuple2.of(0L, 0L)); // 状态的默认值
sum = getRuntimeContext().getState(descriptor);
}
在使用状态时,直接使用状态句柄中的方法即可。
样例|使用 Tuple2<Long, Long>
类型的 ValueState
// 访问 ValueState 类型状态的值
Tuple2<Long, Long> currentSum = sum.value();
// 更新 ValueState 类型状态的值
sum.update(currentSum);
因为状态句柄需要通过 RuntimeContext
获取,因此只能在富函数中使用。富函数的详细介绍见 “3 UDF 接口与富函数”。
下面来看一个 Flink 官方文档中的状态使用样例。
样例|使用 ValueState
计算每 2 个相邻元素的平均值发往下游的 UDF 函数的样例
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// access the state value
Tuple2<Long, Long> currentSum = sum.value();
// update the count
currentSum.f0 += 1;
// add the second field of the input value
currentSum.f1 += input.f1;
// update the state
sum.update(currentSum);
// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(value -> value.f0)
.flatMap(new CountWindowAverage())
.print();
// the printed output will be (1,4) and (1,5)
定义 UDF 的 ValueState
类型的对象属性 sum
,其值为一个元组,元组中第一个元素用于存储计数结果,第二个元素存储求和结果。
在 open()
方法中,创建 ValueStateDescriptor
并定义了状态名称、状态类型和状态的初始值,在获取到 ValueState
类型的状态句柄后,将其状态存储到实例属性 sum
中。文章来源:https://www.toymoban.com/news/detail-831094.html
使用了 ValueState
接口的 value()
、update()
、clear()
方法获取、更新、清除当前的值。文章来源地址https://www.toymoban.com/news/detail-831094.html
到了这里,关于Flink 源码剖析|键控状态的 API 层的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!