Flink window 源码分析4:WindowState

这篇具有很好参考价值的文章主要介绍了Flink window 源码分析4:WindowState。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文源码为flink 1.18.0版本。
其他相关文章:
Flink window 源码分析1:窗口整体执行流程
Flink window 源码分析2:Window 的主要组件
Flink window 源码分析3:WindowOperator
Flink window 源码分析4:WindowState

reduce、aggregate 等函数中怎么使用 WindowState ?

主要考虑 reduce、aggregate 函数中的托管状态是在什么时候触发和使用的?使用时与WindowState有什么联系?

  1. 状态描述器
    若用户定义了 Evictor,则窗口中创建 ListState 描述器:
ListStateDescriptor<StreamRecord<T>> stateDesc =  
        new ListStateDescriptor<>(WINDOW_STATE_NAME, streamRecordSerializer);

在 process、apply 中创建 ListState 描述器:

ListStateDescriptor<T> stateDesc =  
        new ListStateDescriptor<>(  
                WINDOW_STATE_NAME, inputType.createSerializer(config));

在 reduce 中创建 ReducingState 描述器:

ReducingStateDescriptor<T> stateDesc =  
        new ReducingStateDescriptor<>(  
                WINDOW_STATE_NAME, reduceFunction, inputType.createSerializer(config));

在 aggregate 中创建 AggregatingState 描述器:

AggregatingStateDescriptor<T, ACC, V> stateDesc =  
        new AggregatingStateDescriptor<>(  
                WINDOW_STATE_NAME,  
                aggregateFunction,  
                accumulatorType.createSerializer(config));

在创建ReducingState、AggregatingState时,直接将用户定义的函数添加到状态中。
2. 创建状态
上述描述器会在 WindowOperator.open() 方法中使用。getOrCreateKeyedState() 会从状态后端中创建或检索相应的状态。最终会得到的状态类型是 InternalAppendingState 。
如果是会话窗口,会进一步将 windowState 转换为 windowMergingState, 其类型为 InternalMergingState。

// create (or restore) the state that hold the actual window contents  
// NOTE - the state may be null in the case of the overriding evicting window operator  
if (windowStateDescriptor != null) {  
    windowState =  
            (InternalAppendingState<K, W, IN, ACC, ACC>)  
                    getOrCreateKeyedState(windowSerializer, windowStateDescriptor);  
}
  1. 使用状态
    在 WindowOperator 类中的 processElement()、
windowState.setCurrentNamespace(stateWindow);  
windowState.add(element.getValue());
... # 判断窗口是否触发
if (triggerResult.isFire()) {  
    ACC contents = windowState.get();  
    if (contents == null) {  
        continue;  
    }  
    emitWindowContents(actualWindow, contents);  
}
if (triggerResult.isPurge()) {  
    windowState.clear();  
}

会调用 windowState 的 add() 和 get() 来添加元素或获取元素。这两个方法在接口 AppendingState 中做了定义,前面的那些类都继承该接口。不同类型的 State 对这两个函数的具体实现是不同的。

@PublicEvolving  
public interface AppendingState<IN, OUT> extends State {  
    OUT get() throws Exception;  
  
    void add(IN var1) throws Exception;  
}

为了可以在emitWindowContents()函数中统一调用用户自定义的代码,会将用户自定义的代码转换为 InternalIterableWindowFunction 类型,在该类型的 process() 方法中会执行用户定义的逻辑。若 windowState 是 ReducingState 或 AggregatingState 类型,则会提供“空”的 InternalIterableWindowFunction,因为逻辑已经绑定到 windowState 上了。

  • 若使用的 process()、apply() 方法,在调用add()方法时,则向ListState中添加数据。等待 emitWindowContents() 函数执行时,使用用户定义的Function处理数据。
  • 若使用的 reduce()、aggragate() 函数,在add()时进行聚合。emitWindowContents()函数执行时,直接将状态中聚合的数据进行提交。
  • 若用户定义了 Evictor,在调用add()方法时,则向ListState中添加数据。等待 emitWindowContents() 函数执行时,使用用户定义的Function处理数据。如果是使用的reduce()、aggragate()函数,那么会在这里遍历窗口的所有数据,反复执行用户自定义的函数。在执行用户窗口处理函数前后会执行用户定义的 Evictor 中的方法 evictBefore() 和 evictAfter() ,这两个函数中可能对窗口的历史数据做处理。
evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
... # 执行用户窗口处理函数
evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));

不同滑动窗口的 state 是否会有重叠?

这节的代码在WindowOperator.processElement()中。
滑动窗口与滚动窗口的底层代码相同,区别只是 WindowAssigner 不同。
滑动窗口到达新数据时,该数据可能属于多个窗口,那么 WindowAssigner 会返回窗口集合:

final Collection<W> elementWindows =  
        windowAssigner.assignWindows(  
                element.getValue(), element.getTimestamp(), windowAssignerContext);

进而会访问该集合的每个元素(窗口),在每个窗口中都处理一次该元素:

for (W window : elementWindows) {
	# 判断数据是否迟到
	# 数据添加到 windowState中
	# 判断该数据是否触发窗口操作
	# 如果触发窗口操作,则进一步处理
}

在该循环中,每个窗口访问其状态时,为了区别,需要设置 namespace:

windowState.setCurrentNamespace(window);  
windowState.add(element.getValue());

每个窗口就是一个 namespace。namespace 不同,访问到的状态会有联系吗?状态存储和检索时通过 StateTable 管理,StateTable 具体是使用 StateMap 管理每个 state 。StateMap 的 value 就是 state 。key 和 namespace 会分别序列化成 byte,两个 byte 数组拼接起来作为 StateMap 的 key。所以,namespace 不同,状态也不同。所以每个 window 单独存储其 state。如果一个数据属于多个窗口,那么它会被复制多份存储。
下面是通过key和namespace获得数据存储位置的函数(不重要):

MemorySegment serializeToSegment(K key, N namespace) {  
    outputStream.reset();  
    try {  
        // serialize namespace  
        outputStream.setPosition(Integer.BYTES);  
        namespaceSerializer.serialize(namespace, outputView);  
    } catch (IOException e) {  
        throw new RuntimeException("Failed to serialize namespace", e);  
    }  
  
    int keyStartPos = outputStream.getPosition();  
    try {  
        // serialize key  
        outputStream.setPosition(keyStartPos + Integer.BYTES);  
        keySerializer.serialize(key, outputView);  
    } catch (IOException e) {  
        throw new RuntimeException("Failed to serialize key", e);  
    }  
  
    final byte[] result = outputStream.toByteArray();  
    final MemorySegment segment = MemorySegmentFactory.wrap(result);  
  
    // set length of namespace and key  
    segment.putInt(0, keyStartPos - Integer.BYTES);  
    segment.putInt(keyStartPos, result.length - keyStartPos - Integer.BYTES);  
  
    return segment;  
}

会话窗口比较特殊,会涉及到 windowState 的合并:

// merge the merged state windows into the newly resulting  
// state window  
windowMergingState.mergeNamespaces(  
        stateWindowResult, mergedStateWindows);

WindowState 与 用户自定义 KeyedState

两者创建过程不同(下面有两者创建状态的源码),但创建出的同一类型状态是一样的。因为在 getPartitionedState() 方法中,如果状态在以前没有创建过,则使用的就是 getOrCreateKeyedState() 方法进行创建,因此两方法所得到的状态是一样的。那么状态描述符一样的话,状态使用起来也就没差别

  • WindowState 创建使用 AbstractKeyedStateBackend.getOrCreateKeyedState() 方法 :
@Override  
@SuppressWarnings("unchecked")  
public <N, S extends State, V> S getOrCreateKeyedState(  
        final TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)  
        throws Exception {  
    checkNotNull(namespaceSerializer, "Namespace serializer");  
    checkNotNull(  
            keySerializer,  
            "State key serializer has not been configured in the config. "  
                    + "This operation cannot use partitioned state.");  

	# 判断之前是否已经创建过该状态
    InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());  
    if (kvState == null) {  
        if (!stateDescriptor.isSerializerInitialized()) {  
            stateDescriptor.initializeSerializerUnlessSet(executionConfig);  
        }  
        kvState =  
                LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled(  
                        TtlStateFactory.createStateAndWrapWithTtlIfEnabled(  
                                namespaceSerializer, stateDescriptor, this, ttlTimeProvider),  
                        stateDescriptor,  
                        latencyTrackingStateConfig);  
        keyValueStatesByName.put(stateDescriptor.getName(), kvState);  
        publishQueryableStateIfEnabled(stateDescriptor, kvState);  
    }  
    return (S) kvState;  
}
  • KeyedState 创建使用 AbstractKeyedStateBackend.getPartitionedState() 方法 :
@SuppressWarnings("unchecked")  
@Override  
public <N, S extends State> S getPartitionedState(  
        final N namespace,  
        final TypeSerializer<N> namespaceSerializer,  
        final StateDescriptor<S, ?> stateDescriptor)  
        throws Exception {  
  
    checkNotNull(namespace, "Namespace");  

	# 如果上一次创建的与这一次是同一个name,则返回上次创建的
    if (lastName != null && lastName.equals(stateDescriptor.getName())) {  
        lastState.setCurrentNamespace(namespace);  
        return (S) lastState;  
    }  

	# 若之前已经创建过该状态,则返回之前创建的
	# 在 getOrCreateKeyedState 函数中也有该判断
    InternalKvState<K, ?, ?> previous = keyValueStatesByName.get(stateDescriptor.getName());  
    if (previous != null) {  
        lastState = previous;  
        lastState.setCurrentNamespace(namespace);  
        lastName = stateDescriptor.getName();  
        return (S) previous;  
    }  

	# 之前没创建过该状态,则创建个新的
	# 此处 getOrCreateKeyedState() 一定是 Create 行为,因为上面的if判断是假
    final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);  
    final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;  
  
    lastName = stateDescriptor.getName();  
    lastState = kvState;  
    kvState.setCurrentNamespace(namespace);  
  
    return state;  
}

WindowState 的数据存取是怎么实现的?

WindowState 在窗口中主要使用方法是 add()、get()、clear()。
下面主要考虑 Heap-backed。

  1. add()
  • HeapListState.add()
@Override  
public void add(V value) {  
    Preconditions.checkNotNull(value, "You cannot add null to a ListState.");  
  
    final N namespace = currentNamespace;  
  
    final StateTable<K, N, List<V>> map = stateTable;  
    List<V> list = map.get(namespace);  
  
    if (list == null) {  
        list = new ArrayList<>();  
        map.put(namespace, list);  
    }  
    list.add(value);  
}

这个比较简单。ListState 存储数据的数据结构为 ArrayList,这里只需要使用 ArrayList.add()方法添加数据即可。

  • HeapReducingState.add()
@Override  
public void add(V value) throws IOException {  
  
    if (value == null) {  
        clear();  
        return;  
    }  
  
    try {  
        stateTable.transform(currentNamespace, value, reduceTransformation);  
    } catch (Exception e) {  
        throw new IOException("Exception while applying ReduceFunction in reducing state", e);  
    }  
}

调用了 stateTable.transform() 方法修改状态值,reduceTransformation 是用户定义的聚合逻辑。按照下面访问具体逻辑:

stateTable.transform(currentNamespace, value, reduceTransformation)
-> getMapForKeyGroup(keyGroup).transform(key, namespace, value, transformation)
-> CopyOnWriteStateMap.transform(key, namespace, value, transformation)

则看到以下代码:

@Override  
public <T> void transform(  
        K key, N namespace, T value, StateTransformationFunction<S, T> transformation)  
        throws Exception {  
  
    final StateMapEntry<K, N, S> entry = putEntry(key, namespace);  
  
    // copy-on-write check for state  
    entry.state =  
            transformation.apply(  
                    (entry.stateVersion < highestRequiredSnapshotVersion)  
                            ? getStateSerializer().copy(entry.state)  
                            : entry.state,  
                    value);  
    entry.stateVersion = stateMapVersion;  
}

这里 transformation.apply() 将 entry.state 与新到达的数据 value 进行聚合,并写回 entry.state。

  • HeapAggregatingState.add()
    与 HeapReducingState.add() 几乎完全一样,不做赘述。
  1. get()
    三种状态的 get() 方法是相同的,如下面代码所示,调用 getInternal() 实现。ListState 通过 get() 函数获得一个迭代器,ReducingState 和 AggregatingState 则获得聚合值。
    HeapListState、HeapReducingState、HeapAggregatingState的get():
@Override  
public Iterable<V> get() {  
    return getInternal();  
}

这里是封装的 getInternal() 方法,按照下面访问具体的逻辑:

getInternal()
-> stateTable.get(namespace)
-> get(key, keyGroupIndex, namespace)
-> getMapForKeyGroup(keyGroupIndex).get(key, namespace)
-> CopyOnWriteStateMap.get(key, namespace)

则看到以下代码:

@Override  
public S get(K key, N namespace) {  
  
    final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);  
    final int requiredVersion = highestRequiredSnapshotVersion;  
    final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);  
    int index = hash & (tab.length - 1);  
  
    for (StateMapEntry<K, N, S> e = tab[index]; e != null; e = e.next) {  
        final K eKey = e.key;  
        final N eNamespace = e.namespace;  
        if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {  
  
            // copy-on-write check for state  
            if (e.stateVersion < requiredVersion) {  
                // copy-on-write check for entry  
                if (e.entryVersion < requiredVersion) {  
                    e = handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e);  
                }  
                e.stateVersion = stateMapVersion;  
                e.state = getStateSerializer().copy(e.state);  
            }  
  
            return e.state;  
        }  
    }  
  
    return null;  
}

这里就是通过 StateMap 直接获取状态的值。
3. clear()
三种状态的 clear() 方法也是一样的。
AbstractHeapState.clear():

@Override  
public final void clear() {  
    stateTable.remove(currentNamespace);  
}

可按照下面访问具体逻辑:

stateTable.remove(namespace)
-> remove(key, keyGroupIndex(), namespace)
-> getMapForKeyGroup(keyGroupIndex).remove(key, namespace)
-> CopyOnWriteStateMap.remove(key, namespace)
-> removeEntry(key, namespace)

则可看到如下代码:文章来源地址https://www.toymoban.com/news/detail-823710.html

private StateMapEntry<K, N, S> removeEntry(K key, N namespace) {  
  
    final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);  
    final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);  
    int index = hash & (tab.length - 1);  
  
    for (StateMapEntry<K, N, S> e = tab[index], prev = null; e != null; prev = e, e = e.next) {  
        if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {  
            if (prev == null) {  
                tab[index] = e.next;  
            } else {  
                // copy-on-write check for entry  
                if (prev.entryVersion < highestRequiredSnapshotVersion) {  
                    prev = handleChainedEntryCopyOnWrite(tab, index, prev);  
                }  
                prev.next = e.next;  
            }  
            ++modCount;  
            if (tab == primaryTable) {  
                --primaryTableSize;  
            } else {  
                --incrementalRehashTableSize;  
            }  
            return e;  
        }  
    }  
    return null;  
}

到了这里,关于Flink window 源码分析4:WindowState的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink源码分析 - yaml解析

    flink版本: flink-1.12.1      代码位置:  org.apache.flink.configuration.GlobalConfiguration 主要看下解析yaml文件的方法:  org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource

    2024年01月18日
    浏览(43)
  • Flink|《Flink 官方文档 - DataStream API - 用户自定义 Functions》学习笔记 + 源码分析

    学习文档:Flink 官方文档 - DataStream API - 用户自定义 Functions 学习笔记如下: 用户可以通过实现接口来完成自定义 Functions。 实现接口并使用的样例: 使用匿名类实现的样例: 使用 Lambda 表达式实现(Java 8)样例: 所有的 Flink 函数类都有其 Rich 版本,在 Rick function 中,可以获

    2024年01月18日
    浏览(46)
  • flink源码分析-获取JVM最大堆内存

    flink版本: flink-1.11.2 代码位置: org.apache.flink.runtime.util.EnvironmentInformation#getMaxJvmHeapMemory 如果设置了-Xmx参数,就返回这个参数,如果没设置就返回机器物理内存的1/4.  这里主要看各个机器内存的获取方法。 进入getSizeOfPhysicalMemory()方法,里面有获取各种操作系统物理内存的方法

    2024年02月15日
    浏览(37)
  • flink源码分析之功能组件(五)-高可用组件

         本系列是flink源码分析的第二个系列,上一个《flink源码分析之集群与资源》分析集群与资源,本系列分析功能组件,kubeclient,rpc,心跳,高可用,slotpool,rest,metrics,future。      本文解释高可用组件,包括两项服务, 主节点选举 和 主节点变更通知 *     高可用服

    2024年02月01日
    浏览(46)
  • flink源码分析-获取最大可以打开的文件句柄

    flink版本: flink-1.11.2 代码位置: org.apache.flink.runtime.util.EnvironmentInformation 调用位置:   taskmanager启动类:   org.apache.flink.runtime.taskexecutor.TaskManagerRunner 注意,该方法主要调用了com.sun.management.UnixOperatingSystemMXBean接口下的getMaxFileDescriptorCount方法,所以一定要在Sun/Oracle的JDK下才能使用

    2024年02月10日
    浏览(37)
  • flink集群与资源@k8s源码分析-集群

    本文是flink集群与资源@k8s源码分析系列的第二篇-集群 下面详细分析各用例 k8s集群支持session和application模式,job模式将会被废弃,本文分析session模式集群 Configuration作为配置容器,几乎所有的构建需要从配置类获取配置项,这里不显示关联关系 1. 用户命令行执行kubernates-ses

    2024年02月07日
    浏览(46)
  • flink集群与资源@k8s源码分析-运行时

    运行时提供了Flink作业运行过程依赖的基础执行环境,包含Dispatcher、ResourceManager、JobManager和TaskManager等核心组件,本节分析资源相关运行时组件构建和启动。 flink没有使用spring,缺少ioc的构建过程相当复杂,所有依赖手动关联和置入,为了共享组件,flink使用了很多中间持有

    2024年02月07日
    浏览(38)
  • 【源码分析】一个flink job的sql到底是如何执行的(一):flink sql底层是如何调用connector实现物理执行计划的

    我们以一条sql为例分析下flink sql与connector是如何配合执行的,本文我们先分析 sql-sqlnode-validate-operation:是如何找到对应的connector实例的 relnode-execGraph:是如何组装node为Graph,在哪找到connector实例的 之后的文章将会继续分析: translateToPlanInternal是如何串联connector其他方法的

    2024年01月16日
    浏览(43)
  • flink集群与资源@k8s源码分析-资源III 声明式资源管理

    资源分析分3部分,资源请求,资源提供,声明式资源管理,本文是第三部分 声明式资源管理 检查资源需求/检查资源声明是flink 声明式资源管理 的核心方法 上面的资源场景分为两类, 提出资源需求 和 提供资源 , 检查资源请求/检查资源声明是交汇点,处理资源请求,该分

    2024年02月07日
    浏览(39)
  • 【大数据】Flink 详解(六):源码篇 Ⅰ

    《 Flink 详解 》系列(已完结),共包含以下 10 10 10 篇文章: 【大数据】Flink 详解(一):基础篇(架构、并行度、算子) 【大数据】Flink 详解(二):核心篇 Ⅰ(窗口、WaterMark) 【大数据】Flink 详解(三):核心篇 Ⅱ(状态 State) 【大数据】Flink 详解(四):核心篇

    2024年02月10日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包