本文源码为flink 1.18.0版本。
其他相关文章:
Flink window 源码分析1:窗口整体执行流程
Flink window 源码分析2:Window 的主要组件
Flink window 源码分析3:WindowOperator
Flink window 源码分析4:WindowState
reduce、aggregate 等函数中怎么使用 WindowState ?
主要考虑 reduce、aggregate 函数中的托管状态是在什么时候触发和使用的?使用时与WindowState有什么联系?
- 状态描述器
若用户定义了 Evictor,则窗口中创建 ListState 描述器:
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>(WINDOW_STATE_NAME, streamRecordSerializer);
在 process、apply 中创建 ListState 描述器:
ListStateDescriptor<T> stateDesc =
new ListStateDescriptor<>(
WINDOW_STATE_NAME, inputType.createSerializer(config));
在 reduce 中创建 ReducingState 描述器:
ReducingStateDescriptor<T> stateDesc =
new ReducingStateDescriptor<>(
WINDOW_STATE_NAME, reduceFunction, inputType.createSerializer(config));
在 aggregate 中创建 AggregatingState 描述器:
AggregatingStateDescriptor<T, ACC, V> stateDesc =
new AggregatingStateDescriptor<>(
WINDOW_STATE_NAME,
aggregateFunction,
accumulatorType.createSerializer(config));
在创建ReducingState、AggregatingState时,直接将用户定义的函数添加到状态中。
2. 创建状态
上述描述器会在 WindowOperator.open() 方法中使用。getOrCreateKeyedState() 会从状态后端中创建或检索相应的状态。最终会得到的状态类型是 InternalAppendingState 。
如果是会话窗口,会进一步将 windowState 转换为 windowMergingState, 其类型为 InternalMergingState。
// create (or restore) the state that hold the actual window contents
// NOTE - the state may be null in the case of the overriding evicting window operator
if (windowStateDescriptor != null) {
windowState =
(InternalAppendingState<K, W, IN, ACC, ACC>)
getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
}
- 使用状态
在 WindowOperator 类中的 processElement()、
windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());
... # 判断窗口是否触发
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(actualWindow, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
会调用 windowState 的 add() 和 get() 来添加元素或获取元素。这两个方法在接口 AppendingState 中做了定义,前面的那些类都继承该接口。不同类型的 State 对这两个函数的具体实现是不同的。
@PublicEvolving
public interface AppendingState<IN, OUT> extends State {
OUT get() throws Exception;
void add(IN var1) throws Exception;
}
为了可以在emitWindowContents()函数中统一调用用户自定义的代码,会将用户自定义的代码转换为 InternalIterableWindowFunction 类型,在该类型的 process() 方法中会执行用户定义的逻辑。若 windowState 是 ReducingState 或 AggregatingState 类型,则会提供“空”的 InternalIterableWindowFunction,因为逻辑已经绑定到 windowState 上了。
- 若使用的 process()、apply() 方法,在调用add()方法时,则向ListState中添加数据。等待 emitWindowContents() 函数执行时,使用用户定义的Function处理数据。
- 若使用的 reduce()、aggragate() 函数,在add()时进行聚合。emitWindowContents()函数执行时,直接将状态中聚合的数据进行提交。
- 若用户定义了 Evictor,在调用add()方法时,则向ListState中添加数据。等待 emitWindowContents() 函数执行时,使用用户定义的Function处理数据。如果是使用的reduce()、aggragate()函数,那么会在这里遍历窗口的所有数据,反复执行用户自定义的函数。在执行用户窗口处理函数前后会执行用户定义的 Evictor 中的方法 evictBefore() 和 evictAfter() ,这两个函数中可能对窗口的历史数据做处理。
evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
... # 执行用户窗口处理函数
evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
不同滑动窗口的 state 是否会有重叠?
这节的代码在WindowOperator.processElement()中。
滑动窗口与滚动窗口的底层代码相同,区别只是 WindowAssigner 不同。
滑动窗口到达新数据时,该数据可能属于多个窗口,那么 WindowAssigner 会返回窗口集合:
final Collection<W> elementWindows =
windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
进而会访问该集合的每个元素(窗口),在每个窗口中都处理一次该元素:
for (W window : elementWindows) {
# 判断数据是否迟到
# 数据添加到 windowState中
# 判断该数据是否触发窗口操作
# 如果触发窗口操作,则进一步处理
}
在该循环中,每个窗口访问其状态时,为了区别,需要设置 namespace:
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
每个窗口就是一个 namespace。namespace 不同,访问到的状态会有联系吗?状态存储和检索时通过 StateTable 管理,StateTable 具体是使用 StateMap 管理每个 state 。StateMap 的 value 就是 state 。key 和 namespace 会分别序列化成 byte,两个 byte 数组拼接起来作为 StateMap 的 key。所以,namespace 不同,状态也不同。所以每个 window 单独存储其 state。如果一个数据属于多个窗口,那么它会被复制多份存储。
下面是通过key和namespace获得数据存储位置的函数(不重要):
MemorySegment serializeToSegment(K key, N namespace) {
outputStream.reset();
try {
// serialize namespace
outputStream.setPosition(Integer.BYTES);
namespaceSerializer.serialize(namespace, outputView);
} catch (IOException e) {
throw new RuntimeException("Failed to serialize namespace", e);
}
int keyStartPos = outputStream.getPosition();
try {
// serialize key
outputStream.setPosition(keyStartPos + Integer.BYTES);
keySerializer.serialize(key, outputView);
} catch (IOException e) {
throw new RuntimeException("Failed to serialize key", e);
}
final byte[] result = outputStream.toByteArray();
final MemorySegment segment = MemorySegmentFactory.wrap(result);
// set length of namespace and key
segment.putInt(0, keyStartPos - Integer.BYTES);
segment.putInt(keyStartPos, result.length - keyStartPos - Integer.BYTES);
return segment;
}
会话窗口比较特殊,会涉及到 windowState 的合并:
// merge the merged state windows into the newly resulting
// state window
windowMergingState.mergeNamespaces(
stateWindowResult, mergedStateWindows);
WindowState 与 用户自定义 KeyedState
两者创建过程不同(下面有两者创建状态的源码),但创建出的同一类型状态是一样的。因为在 getPartitionedState() 方法中,如果状态在以前没有创建过,则使用的就是 getOrCreateKeyedState() 方法进行创建,因此两方法所得到的状态是一样的。那么状态描述符一样的话,状态使用起来也就没差别。
- WindowState 创建使用 AbstractKeyedStateBackend.getOrCreateKeyedState() 方法 :
@Override
@SuppressWarnings("unchecked")
public <N, S extends State, V> S getOrCreateKeyedState(
final TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)
throws Exception {
checkNotNull(namespaceSerializer, "Namespace serializer");
checkNotNull(
keySerializer,
"State key serializer has not been configured in the config. "
+ "This operation cannot use partitioned state.");
# 判断之前是否已经创建过该状态
InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
if (kvState == null) {
if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
}
kvState =
LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled(
TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
namespaceSerializer, stateDescriptor, this, ttlTimeProvider),
stateDescriptor,
latencyTrackingStateConfig);
keyValueStatesByName.put(stateDescriptor.getName(), kvState);
publishQueryableStateIfEnabled(stateDescriptor, kvState);
}
return (S) kvState;
}
- KeyedState 创建使用 AbstractKeyedStateBackend.getPartitionedState() 方法 :
@SuppressWarnings("unchecked")
@Override
public <N, S extends State> S getPartitionedState(
final N namespace,
final TypeSerializer<N> namespaceSerializer,
final StateDescriptor<S, ?> stateDescriptor)
throws Exception {
checkNotNull(namespace, "Namespace");
# 如果上一次创建的与这一次是同一个name,则返回上次创建的
if (lastName != null && lastName.equals(stateDescriptor.getName())) {
lastState.setCurrentNamespace(namespace);
return (S) lastState;
}
# 若之前已经创建过该状态,则返回之前创建的
# 在 getOrCreateKeyedState 函数中也有该判断
InternalKvState<K, ?, ?> previous = keyValueStatesByName.get(stateDescriptor.getName());
if (previous != null) {
lastState = previous;
lastState.setCurrentNamespace(namespace);
lastName = stateDescriptor.getName();
return (S) previous;
}
# 之前没创建过该状态,则创建个新的
# 此处 getOrCreateKeyedState() 一定是 Create 行为,因为上面的if判断是假
final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;
lastName = stateDescriptor.getName();
lastState = kvState;
kvState.setCurrentNamespace(namespace);
return state;
}
WindowState 的数据存取是怎么实现的?
WindowState 在窗口中主要使用方法是 add()、get()、clear()。
下面主要考虑 Heap-backed。
- add()
- HeapListState.add()
@Override
public void add(V value) {
Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
final N namespace = currentNamespace;
final StateTable<K, N, List<V>> map = stateTable;
List<V> list = map.get(namespace);
if (list == null) {
list = new ArrayList<>();
map.put(namespace, list);
}
list.add(value);
}
这个比较简单。ListState 存储数据的数据结构为 ArrayList,这里只需要使用 ArrayList.add()方法添加数据即可。
- HeapReducingState.add()
@Override
public void add(V value) throws IOException {
if (value == null) {
clear();
return;
}
try {
stateTable.transform(currentNamespace, value, reduceTransformation);
} catch (Exception e) {
throw new IOException("Exception while applying ReduceFunction in reducing state", e);
}
}
调用了 stateTable.transform() 方法修改状态值,reduceTransformation 是用户定义的聚合逻辑。按照下面访问具体逻辑:
stateTable.transform(currentNamespace, value, reduceTransformation)
-> getMapForKeyGroup(keyGroup).transform(key, namespace, value, transformation)
-> CopyOnWriteStateMap.transform(key, namespace, value, transformation)
则看到以下代码:
@Override
public <T> void transform(
K key, N namespace, T value, StateTransformationFunction<S, T> transformation)
throws Exception {
final StateMapEntry<K, N, S> entry = putEntry(key, namespace);
// copy-on-write check for state
entry.state =
transformation.apply(
(entry.stateVersion < highestRequiredSnapshotVersion)
? getStateSerializer().copy(entry.state)
: entry.state,
value);
entry.stateVersion = stateMapVersion;
}
这里 transformation.apply() 将 entry.state 与新到达的数据 value 进行聚合,并写回 entry.state。
- HeapAggregatingState.add()
与 HeapReducingState.add() 几乎完全一样,不做赘述。
- get()
三种状态的 get() 方法是相同的,如下面代码所示,调用 getInternal() 实现。ListState 通过 get() 函数获得一个迭代器,ReducingState 和 AggregatingState 则获得聚合值。
HeapListState、HeapReducingState、HeapAggregatingState的get():
@Override
public Iterable<V> get() {
return getInternal();
}
这里是封装的 getInternal() 方法,按照下面访问具体的逻辑:
getInternal()
-> stateTable.get(namespace)
-> get(key, keyGroupIndex, namespace)
-> getMapForKeyGroup(keyGroupIndex).get(key, namespace)
-> CopyOnWriteStateMap.get(key, namespace)
则看到以下代码:
@Override
public S get(K key, N namespace) {
final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
final int requiredVersion = highestRequiredSnapshotVersion;
final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);
int index = hash & (tab.length - 1);
for (StateMapEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
final K eKey = e.key;
final N eNamespace = e.namespace;
if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {
// copy-on-write check for state
if (e.stateVersion < requiredVersion) {
// copy-on-write check for entry
if (e.entryVersion < requiredVersion) {
e = handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e);
}
e.stateVersion = stateMapVersion;
e.state = getStateSerializer().copy(e.state);
}
return e.state;
}
}
return null;
}
这里就是通过 StateMap 直接获取状态的值。
3. clear()
三种状态的 clear() 方法也是一样的。
AbstractHeapState.clear():
@Override
public final void clear() {
stateTable.remove(currentNamespace);
}
可按照下面访问具体逻辑:文章来源:https://www.toymoban.com/news/detail-823710.html
stateTable.remove(namespace)
-> remove(key, keyGroupIndex(), namespace)
-> getMapForKeyGroup(keyGroupIndex).remove(key, namespace)
-> CopyOnWriteStateMap.remove(key, namespace)
-> removeEntry(key, namespace)
则可看到如下代码:文章来源地址https://www.toymoban.com/news/detail-823710.html
private StateMapEntry<K, N, S> removeEntry(K key, N namespace) {
final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);
int index = hash & (tab.length - 1);
for (StateMapEntry<K, N, S> e = tab[index], prev = null; e != null; prev = e, e = e.next) {
if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
if (prev == null) {
tab[index] = e.next;
} else {
// copy-on-write check for entry
if (prev.entryVersion < highestRequiredSnapshotVersion) {
prev = handleChainedEntryCopyOnWrite(tab, index, prev);
}
prev.next = e.next;
}
++modCount;
if (tab == primaryTable) {
--primaryTableSize;
} else {
--incrementalRehashTableSize;
}
return e;
}
}
return null;
}
到了这里,关于Flink window 源码分析4:WindowState的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!