Flink源码之StreamTask启动流程

这篇具有很好参考价值的文章主要介绍了Flink源码之StreamTask启动流程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

每个ExecutionVertex分配Slot后,JobMaster就会向Slot所在的TaskExecutor提交RPC请求执行Task,接口为TaskExecutorGateway::submitTask

CompletableFuture<Acknowledge> submitTask(
        TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, @RpcTimeout Time timeout); 

TaskDeploymentDescriptor 中包含当前Task的执行逻辑、Job信息、输入输出信息
Flink源码之StreamTask启动流程,BigData,flink,大数据

submitTask 方法核心就是构造org.apache.flink.runtime.taskmanager.Task实例,该实例继承自Runnable接口,有个Thread成员变量,构造完成后就启动线程执行Task逻辑。

TaskExecutor::submitTask
Task.startTaskThread
Task.run
Task.doRun
Task::setupPartitionsAndGates //初始化Task的输入输出
RuntimeEnvironment::new //封装task执行上下文信息
Task::loadAndInstantiateInvokable //TaskInvokables实例化
StreamTask::new
    StreamTask::createRecordWriterDelegate //创建Writer,为每个StreamEdge创建一个Writer
    StreamTask::createStateBackend //创建StateBackend,一个task一个StateBackend实例
    StreamTask::createCheckpointStorage
    SubtaskCheckpointCoordinatorImpl::new 
Task::restoreAndInvoke
TaskInvokable::restore 
TaskInvokable::invoke //处理输入元素
TaskInvokable::cleanUp

Task的Invokable Class是在StreamGraph中添加Operator形成StreamNode时确定的,对不同的算子有不同的InvokableClass:

  • SourceStreamTask.class //LegacySource算子
  • SourceOperatorStreamTask //Source算子
  • OneInputStreamTask.class //输入是一个算子
  • TwoInputStreamTask:class //输入是两个算子
  • MultipleInputStreamTask.class //输入有多个算子

以上这些类都继承自org.apache.flink.streaming.runtime.tasks.StreamTask

Flink源码之StreamTask启动流程,BigData,flink,大数据

在调用TaskInvokable::restore时会执行:

StreamTask::restore
StreamTask::restoreInternal //创建OperatorChain
RegularOperatorChain::new
OperatorChain::new
OperatorChain::createOutputCollector
OperatorChain::createOperatorChain
OperatorChain::createOperator
StreamOperatorFactoryUtil.createOperator  //创建Operator,在每个算子的StreamConfig中定义了每个Operator具体类型,比如StreamMap, StreamFlatMap
SimpleOperatorFactory::createStreamOperator //创建StreamOperator包装了用户函数,, StreamOperator包装了代码中用户函数,会调用用户函数中的open/close等生命周期函数
	AbstractUdfStreamOperator::setup
	AbstractStreamOperator::setup //设置用用自定义函数中的RuntimeContext成员变量
    	StreamingRuntimeContext::new  //
    
StreamTask::init //子类做初始化,创建InputGate、StreamTaskInput、DataOutput及InputProcessor
StreamTask::restoreGates
	StreamTask::createStreamTaskStateInitializer
		StreamTaskStateInitializerImpl::new //
    OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法
    	AbstractStreamOperator::initializeState 
			StreamTaskStateInitializerImpl::streamOperatorStateContext //此时会创建keyedStatedBackend和operatorStateBackend
		StreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成员变量,用于状态管理
		StreamOperatorStateHandler::initializeOperatorState
		    StateInitializationContextImpl::new
			AbstractUdfStreamOperator::initializeState//调用用户定义函数中的initializeState方法,可获取Operator State
				StreamingFunctionUtils::restoreFunctionState
		StreamingRuntimeContext::setKeyedStateStore
	StreamOperator::open //调用getRuntimeContext().getState可获取keySate
StreamTask::invoke
StreamTask::runMailboxLoop
MailboxProcessor::runMailboxLoop
StreamTask::processInput

整个过程在StreamTask.java的注释中有说明:

 * -- invoke()
 *       |
 *       +----> Create basic utils (config, etc) and load the chain of operators
 *       +----> operators.setup()
 *       +----> task specific init()
 *       +----> initialize-operator-states()
 *       +----> open-operators()
 *       +----> run()
 *       +----> finish-operators()
 *       +----> close-operators()
 *       +----> common cleanup
 *       +----> task specific cleanup()
  1. 首先创建OperatorChain,依次创建出每个StreamOperator
  2. 调用Operator的setup方法,初始化StreamingRuntimeContext
  3. 调用子类init方法初始化
  4. 调用initializeState初始化每个算子的状态,此时会为每个StreamOperator创建keyedStatedBackend和operatorStateBackend,然后会调用用户定义函数中的initializeState方法,用于创建Operator State
  5. 调用算子的open方法,便于用户在自定义函数open中进行初始化,比如初始化keyState
  6. 调用processInput处理流中数据

SourceStreamTask重载了StreamTask::processInput,该方法中直接起一个线程调用SourceFunction::run方法。

OneInputStreamTask则不同,它重载了StreamTask的init方法,在init方法中创建了StreamOneInputProcessor

OneInputStreamTask::init
OneInputStreamTask::createCheckpointedInputGate
OneInputStreamTask::createDataOutput //创建StreamTaskNetworkOutput
OneInputStreamTask::createTaskInput //创建StreamTaskNetworkInput
StreamOneInputProcessor::new

在StreamTask::processInput则是调用InputProcessor::processInput不断读取数据进行处理

StreamOneInputProcessor::processInput
StreamTaskNetworkInput::emitNext(StreamTaskNetworkOutput)
AbstractStreamTaskNetworkInput::emitNext //循环不断从buffer中读取StreamElement
处理
AbstractStreamTaskNetworkInput::processElement
    StreamTaskNetworkOutput::emitRecord //调用operator的setKeyContextElement和processElement
        OneInputStreamOperator::setKeyContextElement
        AbstractStreamOperator::setKeyContextElement1
        AbstractStreamOperator::setCurrentKey //
            StreamOperatorStateHandler::setCurrentKey //设置状态当前key
        Input::processElement  //调用StreamOperator的processElement方法

以上Task从提交到起线程执行起来的整个过程,在初始化过程中为每个StreamOperator进行状态后端的初始化相当重要,后续处理流的过程中会使用这些状态后端存储管理状态。文章来源地址https://www.toymoban.com/news/detail-648966.html

到了这里,关于Flink源码之StreamTask启动流程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 说说Flink on yarn的启动流程

    核心流程 FlinkYarnSessionCli 启动的过程中首先会检查Yarn上有没有足够的资源去启动所需要的container,如果有,则 上传一些flink的jar和配置文件到HDFS ,这里主要是启动AM进程和TaskManager进程的相关依赖jar包和配置文件。 接着yarn client会首先向RM 申请一个container来作为ApplicationMas

    2024年02月10日
    浏览(39)
  • 深入理解 Flink(五)Flink Standalone 集群启动源码剖析

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年02月02日
    浏览(52)
  • Flink源码之State创建流程

    StreamOperatorStateHandler 在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量,调用AbstractStreamOperator::initializeState方法中会初始化StreamOperatorStateH

    2024年02月12日
    浏览(90)
  • Flink源码之Checkpoint执行流程

    Checkpoint完整流程如上图所示: JobMaster的CheckpointCoordinator向所有SourceTask发送RPC触发一次CheckPoint SourceTask向下游广播CheckpointBarrier SouceTask完成状态快照后向JobMaster发送快照结果 非SouceTask在Barrier对齐后完成状态快照向JobMaster发送快照结果 JobMaster保存SubTask快照结果 JobMaster收到所

    2024年02月11日
    浏览(39)
  • Flink window 源码分析1:窗口整体执行流程

    注:本文源码为flink 1.18.0版本。 其他相关文章: Flink window 源码分析1:窗口整体执行流程 Flink window 源码分析2:Window 的主要组件 Flink window 源码分析3:WindowOperator Flink window 源码分析4:WindowState Window 本质上就是借助状态后端缓存着一定时间段内的数据,然后在达到某些条件

    2024年01月16日
    浏览(47)
  • 大数据Flink(六十一):Flink流处理程序流程和项目准备

    文章目录 Flink流处理程序流程和项目准备 一、Flink流处理程序的一般流程

    2024年02月11日
    浏览(39)
  • 【大数据】Flink 详解(六):源码篇 Ⅰ

    《 Flink 详解 》系列(已完结),共包含以下 10 10 10 篇文章: 【大数据】Flink 详解(一):基础篇(架构、并行度、算子) 【大数据】Flink 详解(二):核心篇 Ⅰ(窗口、WaterMark) 【大数据】Flink 详解(三):核心篇 Ⅱ(状态 State) 【大数据】Flink 详解(四):核心篇

    2024年02月10日
    浏览(33)
  • 【Flink】Flink提交流程

    我们通常在学习的时候需要掌握大数据组件的原理以便更好的掌握这个大数据组件,Flink实际生产开发过程中最常见的就是提交到yarn上进行调度,模式使用的 Per-Job模式,下面我们就给大家讲下Flink提交Per-Job任务到yarn上的流程,流程图如下  (1)客户端将作业提交给 YARN 的资

    2024年02月11日
    浏览(33)
  • 源码解析Flink源节点数据读取是如何与checkpoint串行执行

    源码解析Flink源节点数据读取是如何与checkpoint串行执行 Flink版本:1.13.6 前置知识:源节点的Checkpoint是由Checkpointcoordinate触发,具体是通过RPC调用TaskManager中对应的Task的StreamTask类的performChecpoint方法执行Checkpoint。 本文思路:本文先分析checkpoint阶段,然后再分析数据读取阶段,

    2024年02月14日
    浏览(54)
  • 【Flink】详解Flink任务提交流程

    通常我们会使用 bin/flink run -t yarn-per-job -c com.xxx.xxx.WordCount/WordCount.jar 方式启动任务;我们看一下 flink 文件中到底做了什么,以下是其部分源码 可以看到,第一步将相对地址转换成绝对地址;第二步获取 Flink 配置信息,这个信息放在 bin 目录下的 config. sh 中;第三步获取 JV

    2024年02月14日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包