Flink源码之State创建流程

这篇具有很好参考价值的文章主要介绍了Flink源码之State创建流程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink源码之State创建流程,BigData,flink,大数据

StreamOperatorStateHandler

在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量,调用AbstractStreamOperator::initializeState方法中会初始化StreamOperatorStateHandler类型的成员变量, StreamOperatorStateHandler对象变量封装了keyedStatedBackend和operatorStateBackend,用于统一管理SteamOperator的状态。

 OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法
    	AbstractStreamOperator::initializeState(StreamTaskStateInitializer) 
			StreamTaskStateInitializerImpl::streamOperatorStateContext //此时会创建keyedStatedBackend和operatorStateBackend
			StreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成员变量,用于状态管理
			StreamOperatorStateHandler::initializeOperatorState
		    StateInitializationContextImpl::new //封装DefaultKeyedStateStore和OperatorStateStore
			CheckpointedStreamOperator::initializeState(StateInitializationContext)//调用用户定义函数中的initializeState方法,可获取Operator State
		StreamingRuntimeContext::setKeyedStateStore

Flink中主要有两种StateBackend:

  • HashMapStateBackend //内存
  • EmbeddedRocksDBStateBackend //内存+磁盘

每个StreamTask一个StateBackend成员变量,在构造函数中进行初始化,通过用户代码中设置或StateBackendLoader::loadStateBackendFromConfig从配置中加载,默认为HashMapStateBackend。简单起见,以HashMapStateBackend为例剖析创建KeyedStatedBackend和OperatorStateBackend以及处理数据流时是如何使用KeyedState和OperatorState的。

OperatorState

OperatorState创建流程:

OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法
    AbstractStreamOperator::initializeState
        StreamTaskStateInitializerImpl::streamOperatorStateContext
            StreamTaskStateInitializerImpl::operatorStateBackend
            HashMapStateBackend::createOperatorStateBackend //创建DefaultOperatorStateBackend
        StreamOperatorStateHandler::new //创建StreamOperatorStateHandler
        StreamOperatorStateHandler::initializeOperatorState //调用CheckpointedFunction::initializeState
        	StateInitializationContextImpl::new //该实例可getOperatorStateStore

使用Operator State的用户业务代码需要实现CheckpointedFunction接口,该接口中有以两个下方法:

void initializeState(FunctionInitializationContext context) throws Exception;

void snapshotState(FunctionSnapshotContext context) throws Exception;

其中initializeState方法则会被StreamOperatorStateHandler.initializeOperatorState 调用,在initializeState方法中可使用

FunctionInitializationContext.getOperatorStateStore().getListState(ListStateDescriptor)
DefaultOperatorStateBackend::getListState::new
	PartitionableListState::new  //内部是ArrayList

因此通过OperatorStateStore获取的ListState内部本质上是一个ArrayList, 业务代码中可以调用add方法向这个内部List添加元素,由StateBackend管理每个Operator State,这样就实现了一个分布式状态管理,借助Checkpoint可以实现状态持久化及容灾恢复。

OperatorStateStore有三个获取状态方法:

<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
            throws Exception

KeyedState

KeyedState创建流程如下:

OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法
    AbstractStreamOperator::initializeState
        StreamTaskStateInitializerImpl::streamOperatorStateContext
            StreamTaskStateInitializerImpl::keyedStatedBackend
            HashMapStateBackend::createKeyedStateBackend //创建HeapKeyedStateBackend
            	HeapKeyedStateBackendBuilder::build
            		InternalKeyContextImpl::new //用于保存当前正在处理的key
            		
        StreamOperatorStateHandler::new //创建StreamOperatorStateHandler
            DefaultKeyedStateStore::new //创建DefaultKeyedStateStore
        StreamingRuntimeContext::setKeyedStateStore //设置keyedStateStore成员变量
    AbstractStreamUdfOperator::open
    	FunctionUtils::openFunction
    		RichFunction::open

KeyedStateStore保存在StreamingRuntimeContext中,使用KeyedState时,用户自定义函数实现RichFunction接口,在open方法中调用getRuntimeContext().getState方法获取状态:

getRuntimeContext().getState() //获取ValueState
DefaultKeyedStateStore::getState
DefaultKeyedStateStore::getPartitionedState
HeapKeyedStateBackend::getPartitionedState
AbstractKeyedStateBackend::getOrCreateKeyedState
    LatencyTrackingStateFactory::createStateAndWrapWithLatencyTrackingIfEnabled
    TtlStateFactory::createStateAndWrapWithTtlIfEnabled //包装TTL
    HeapKeyedStateBackend::createInternalState
    HeapKeyedStateBackend::tryRegisterStateTable //这里很关键,对每个State创建一个StateTable
    	CopyOnWriteStateTable::new//异步快照,这里传递了当前KeyedStateBackend的InternalKeyContext
    	StateTable::new //根据当前Task管理的KeyGroups数量创建StateMap数组
    	CopyOnWriteStateTable::createStateMap //一个KeyGroup一个StateMap
    	CopyOnWriteStateMap::new //存储key及其对应的状态
   HeapValueState::create
   		HeapValueState::new //有个成员变量指向存储当前state的CopyOnWriteStateMap
   	HeapValueState::setCurrentNamespace  //默认为VoidNamespace

KeyedState有以下几种类型

ValueState<T> getState(ValueStateDescriptor<T> stateProperties) 获取HeapValueState

ListState<T> getListState(ListStateDescriptor<T> stateProperties)获取HeapListState

MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties)获取HeapMapState

getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties)获取HeapAggregatingState

getReducingState(ReducingStateDescriptor<T> stateProperties)获取HeapReducingState

RocksDBStateBackend

EmbeddedRocksDBStateBackend 管理OperatorState与HashMapStateBackend 一样,也是通过DefaultOperatorStateBackend进行管理的。

EmbeddedRocksDBStateBackend 管理KeyedState则是使用RocksDBKeyedStateBackend实现,这样可以借助磁盘加内存进行大状态管理:

RocksDBValueState
RocksDBListState
RocksDBMapState
RocksDBAggregatingState
RocksDBReducingState

总结

Flink内置状态管理是相比其他分布式流式处理系统最大的优势之一,不用借助外部存储组件,就可实现高效可靠的分布式状态管理,极大降低了学习和使用成本。文章来源地址https://www.toymoban.com/news/detail-657241.html

到了这里,关于Flink源码之State创建流程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink源码之JobMaster启动流程

    Flink中Graph转换流程如下: Flink Job提交时各种类型Graph转换流程中,JobGraph是Client端形成StreamGraph后经过Operator Chain优化后形成的,然后提交给JobManager的Restserver,最终转发给JobManager的Dispatcher处理。 本文主要解析从JobGraph转换为ExecutionGraph过程,执行栈如下: 在整个提交过程中

    2024年02月13日
    浏览(44)
  • Flink源码之TaskManager启动流程

    从启动命令flink-daemon.sh可以看出TaskManger入口类为org.apache.flink.runtime.taskexecutor.TaskManagerRunner 在TaskManagerRunner构造函数中,可以看出与JobManger类似,也是先构造出一些公共服务: 这些服务在构造TaskExecutor时作为构造函数参数传入 构造TaskExecutor前会先构造TaskManagerServices辅助Task

    2024年02月13日
    浏览(33)
  • Flink 学习七 Flink 状态(flink state)

    流式计算逻辑中,比如sum,max; 需要记录和后面计算使用到一些历史的累计数据, 状态就是 :用户在程序逻辑中用于记录信息的变量 在Flink 中 ,状态state 不仅仅是要记录状态;在程序运行中如果失败,是需要重新恢复,所以这个状态也是需要持久化;一遍后续程序继续运行 1.1 row state 我

    2024年02月09日
    浏览(41)
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(1) - Keyed State

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月17日
    浏览(48)
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator state

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月22日
    浏览(101)
  • Flink window 源码分析1:窗口整体执行流程

    注:本文源码为flink 1.18.0版本。 其他相关文章: Flink window 源码分析1:窗口整体执行流程 Flink window 源码分析2:Window 的主要组件 Flink window 源码分析3:WindowOperator Flink window 源码分析4:WindowState Window 本质上就是借助状态后端缓存着一定时间段内的数据,然后在达到某些条件

    2024年01月16日
    浏览(47)
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例 - 完整版

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月03日
    浏览(45)
  • Flink State 状态管理

    状态在Flink中叫做State,用来保存中间计算结果或者缓存数据。要做到比较好的状态管理,需要考虑以下几点内容: 状态数据的存储和访问 在Task内部,如何高效地保存状态数据和使用状态数据。 状态数据的备份和恢复 作业失败是无法避免的,那么就要考虑如何高效地将状态

    2024年01月17日
    浏览(47)
  • flink学习之state

    state作用 保留当前key的历史状态。 state用法 ListStateInteger vipList = getRuntimeContext().getListState(new ListStateDescriptorInteger(\\\"vipList\\\", TypeInformation.of(Integer.class))); 有valueState listState mapstate 。冒失没有setstate state案例 比如起点的小说不能被下载。别人只能通过截屏,提取文字的方式盗版小

    2024年02月09日
    浏览(37)
  • flink 的 State

    目录 一、前言 二、什么是State 2.1:什么时候需要历史数据 2.2:为什么要容错,以及checkpoint如何进行容错 2.3:state basckend 又是什么 三、有哪些常见的是 State 四、 State的使用 五、State backend 5.1  MemoryStateBackend: 5.2  FsStatebackend: 5.3  RocksDBStateBackend: 六、Checkpoint 七、 Deep

    2023年04月18日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包