Flink源码之Checkpoint执行流程

这篇具有很好参考价值的文章主要介绍了Flink源码之Checkpoint执行流程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink源码之Checkpoint执行流程,BigData,flink,大数据

Checkpoint完整流程如上图所示:

  1. JobMaster的CheckpointCoordinator向所有SourceTask发送RPC触发一次CheckPoint
  2. SourceTask向下游广播CheckpointBarrier
  3. SouceTask完成状态快照后向JobMaster发送快照结果
  4. 非SouceTask在Barrier对齐后完成状态快照向JobMaster发送快照结果
  5. JobMaster保存SubTask快照结果
  6. JobMaster收到所有SubTask快照结果后保存快照信息,想SubTask通知Checkpoint完成

以下对整个流程具体说明。

CheckpointCoordinator

JobMaster将JobGraph转换为ExecutionGraph时,如果开启Checkpoint,会为ExecutionGraph生成一个CheckpointCoordinator

DefaultExecutionGraphBuilder.buildGraph//在此会将JobGraph转换为ExecutionGraph
    DefaultExecutionGraph::new
    DefaultExecutionGraph::attachJobGraph //创建ExecutionJobVertex
    	DefaultExecutionTopology.fromExecutionGraph //创建ExecutionTopology
    DefaultExecutionGraph::enableCheckpointing //创建CheckpointCoordinator
    	DefaultExecutionGraph::createCheckpointPlanCalculator//创建DefaultCheckpointPlanCalculator
    	CheckpointCoordinator::new 

CheckpointCoordinator封装了StateBackend和CheckpointStorage

StateBackend负责管理状态:

  • HashMapStateBackend //内存
  • EmbeddedRocksDBStateBackend //内存+磁盘

CheckpointStorage则是负责存储StateBackend管理的状态:

  • JobManagerCheckpointStorage //checkpoint state放入JobManager内存
  • FileSystemCheckpointStorage //配置state.checkpoints.dir时

在为StreamTask构造SubtaskCheckpointCoordinatorImpl时会调用:

CheckpointStorage::createCheckpointStorage

创建CheckpointStorageAccess用于执行Checkpoint时解析状态存储位置

  • MemoryBackendCheckpointStorageAccess //对应JobManagerCheckpointStorage
  • FsCheckpointStorageAccess //对应FileSystemCheckpointStorage

CheckpointCoordinator在执行状态快照时会调用

CheckpointStorageAccess::resolveCheckpointStorageLocation

生成CheckpointStreamFactory用于生成读写状态数据流

  • MemCheckpointStreamFactory //对应JobManagerCheckpointStorage
  • FsCheckpointStreamFactory //对应FileSystemCheckpointStorage

Checkpoint触发流程

JobMaster状态转换为running后,通过CheckpointCoordinator向SourceTask发送TriggerCheckpoint

JobMaster端触发流程
JobMaster::start  //RPCServer启动
JobMaster::onStart
JobMaster::startJobExecution
JobMaster::startJobMasterServices //获取RM地址后与RM建立连接
JobMaster::startScheduling
SchedulerBase::startScheduling
DefaultScheduler::startSchedulingInternal
SchedulerBase::transitionToRunning
	DefaultExecutionGraph::transitionToRunning //调用ExecutionGraph监听器通知状态变化
		CheckpointCoordinatorDeActivator::jobStatusChanges//触发checkpoint
			CheckpointCoordinator::startCheckpointScheduler
				CheckpointCoordinator::scheduleTriggerWithDelay //定时不断触发Checkpoint
				CheckpointCoordinator::triggerCheckpoint
				CheckpointCoordinator::startTriggeringCheckpoint
				DefaultCheckpointPlanCalculator::calculateCheckpointPlan//Plan中会隔离出SourceTask作为作为Trigger Checkpoint的入口
				CheckpointCoordinator::createPendingCheckpoint
				CheckpointCoordinator::triggerCheckpointRequest
				CheckpointCoordinator::triggerTasks 
					Execution::triggerCheckpoint //向每个SourceTask发送TriggerCheckpoint请求
                    Execution::triggerCheckpointHelper
                    TaskManagerGateway::triggerCheckpoint//向TaskExecutor发RPC
StreamTask端执行流程
SourceTask

SourceTask由JobMaster RPC直接触发,执行时先广播CheckpointBarrier,然后对状态执行异步快照

TaskExecutor::triggerCheckpoint
Task::triggerCheckpointBarrier
AbstractInvokable::triggerCheckpointAsync
SourceStreamTask::triggerCheckpointAsync
StreamTask::triggerCheckpointAsync
StreamTask::triggerCheckpointAsyncInMailbox
StreamTask::performCheckpoint
SubtaskCheckpointCoordinatorImpl::checkpointState
	OperatorChain.broadcastEvent //广播CheckpointBarrier
CheckpointStorage::createCheckpointStorage//为JobId创建CheckpointStorageAccess
SubtaskCheckpointCoordinatorImpl::takeSnapshotSync
CheckpointStorageWorkerView::resolveCheckpointStorageLocation//CheckpointStorageAccess创建 CheckpointStreamFactory
	OperatorChain::snapshotState //对每个Operator
		RegularOperatorChain::buildOperatorSnapshotFutures
		RegularOperatorChain::checkpointStreamOperator
			AbstractStreamOperator::snapshotState
			StreamOperatorStateHandler::snapshotState//调用Operator/Keyed Backend的snapshot
				StateSnapshotContextSynchronousImpl::new
				AbstractUdfStreamOperator::snapshotState //调用UDF中snapshotState方法,一般用于更新OperatorState
				DefaultOperatorStateBackend::snapshot
					SnapshotStrategyRunner::snapshot
					  DefaultOperatorStateBackendSnapshotStrategy::syncPrepareResources//深copy operator state,便于后续进行异步快照
					  DefaultOperatorStateBackendSnapshotStrategy::asyncSnapshot//异步快照					  	  CheckpointStateOutputStream::closeAndGetHandle
						OperatorStreamStateHandle::new //包装元信息及数据StreamStateHandle
					
				HeapKeyedStateBackend::snapshot
					SnapshotStrategyRunner::snapshot
					    HeapSnapshotStrategy::syncPrepareResources
						HeapSnapshotStrategy::asyncSnapshot //采用COWSateTable异步快照
							CheckpointStateOutputStream::closeAndGetHandle
							KeyGroupsStateHandle::new //包装KeyGroup及数据StreamStateHandle
SubtaskCheckpointCoordinatorImpl::finishAndReportAsync //向JobMaster发送checkpoint的结果
	AsyncCheckpointRunnable::new 
	AsyncCheckpointRunnable::run
		AsyncCheckpointRunnable::finalizeNonFinishedSnapshots
			OperatorSnapshotFinalizer::new //等待TaskSnapshot状态信息序列化完成
		AsyncCheckpointRunnable::reportCompletedSnapshotStates
			TaskStateManagerImpl::reportTaskStateSnapshots
				RpcCheckpointResponder::acknowledgeCheckpoint//向JobMaster发送Ack,带上State信息
非SourceTask

在StreamTask启动后调用StreamTask::processInput不断读取数据进行处理, 非SourceTask在收到上游的CheckpointBarrier对齐后触发Checkpoint,

StreamTask::processInput
StreamOneInputProcessor::processInput
StreamTaskNetworkInput::emitNext(StreamTaskNetworkOutput)
AbstractStreamTaskNetworkInput::emitNext //循环不断从buffer中读取StreamElement
处理
	CheckpointedInputGate::pollNext
	CheckpointedInputGate::handleEvent
		SingleCheckpointBarrierHandler::processBarrier
		SingleCheckpointBarrierHandler::markCheckpointAlignedAndTransformState
			WaitingForFirstBarrier::barrierReceived
			AbstractAlignedBarrierHandlerState::barrierReceived
			 SingleCheckpointBarrierHandler.ControllerImpl::allBarriersReceived//判断对齐
			 AbstractAlignedBarrierHandlerState::triggerGlobalCheckpoint
			  SingleCheckpointBarrierHandler.ControllerImpl::triggerGlobalCheckpoint
			  SingleCheckpointBarrierHandler::triggerCheckpoint
			  	CheckpointBarrierHandler::notifyCheckpoint //触发StreamTask Checkpoint
			  		StreamTask::triggerCheckpointOnBarrier
			  			StreamTask::performCheckpoint //后续调用过程与SourceTask一样
			  			SubtaskCheckpointCoordinatorImpl::checkpointState   		

根据调用栈看出,非SourceStreamTask执行Checkpoint只是触发时机不同,SourceTask由JobMaster RPC定时不断触发,非SourceTask则是在上游的CheckpointBarrier对齐后触发Checkpoint,最终执行逻辑都是将当前算子的信息写入CheckpointStorage后向JobMaster发送确认信息。

StreamTask向JobMaster ACK信息中包含状态元信息及StreamStateHandle,根据状态存储位置分为:

  • ByteStreamStateHandle //对应JobManagerCheckpointStorage,将状态序列化为byte[]发送给JobMaster
  • FileStateHandle //对应FileSystemCheckpointStorage,将状态写入文件系统后将文件路径发送给JobMaster
JobMaster端完成流程

JobMaster收到StreamTask的acknowledgeCheckpoint后:

JobMaster::acknowledgeCheckpoint
SchedulerBase::acknowledgeCheckpoint
ExecutionGraphHandler::acknowledgeCheckpoint
CheckpointCoordinator::receiveAcknowledgeMessage
	PendingCheckpoint::acknowledgeTask //某一个Task的确认
		PendingCheckpoint::updateOperatorState//更新SubTask状态信息
	CheckpointCoordinator::completePendingCheckpoint//所有Task Ack后
		PendingCheckpoint::finalizeCheckpoint
			Checkpoints.storeCheckpointMetadata//保存CheckpointMetadata
				CompletedCheckpoint::new
		CheckpointCoordinator::sendAcknowledgeMessages//向Task通知Checkpoint完成消息
			ExecutionVertex::notifyCheckpointComplete
				TaskManagerGateway.notifyCheckpointComplete

JobMaster收到所有StreamTask的Checkpoint状态信息后,标志一次Checkpoint完成,这时会通知StreamTask CheckPoint完成消息,便于SubTask监听Checkpoint完成后做后续动作。文章来源地址https://www.toymoban.com/news/detail-666680.html

到了这里,关于Flink源码之Checkpoint执行流程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink 从savepoint、checkpoint中恢复数据

    提示:flink checkpoint重启:   flink作业因为故障导致restart strategy失败或升级flink版本重新发布任务,这时就需要从最近的checkpoint恢复。一般而言有两种方案,第一种方案是 开启checkpoint且任务取消时不删除checkpoint (调整参数execution.checkpointing.externalized-checkpoint-retention),第

    2024年02月10日
    浏览(26)
  • flink正常消费kafka数据,flink没有做checkpoint,kafka位点没有提交

    1、背景 flink消费kafka数据,多并发,实现双流join 2、现象 (1)flink任务消费kafka数据,其中数据正常消费,kafka显示消息堆积,位点没有提交,并且flink任务没有做checkpoint (2)其中一个流的subtask显示finished (3)无背压 3、问题原因 (1)其中一个topic分区为1 (2)配置的并行

    2024年02月13日
    浏览(31)
  • flink如何利用checkpoint保证数据状态一致性

    这本质上是一『尽力而为』的方法。保证数据或事件最多由应用程序中的所有算子处理一次。 这意味着如果数据在被流应用程序完全处理之前发生丢失,则不会进行其他重试或者重新发送。下图中的例子说明了这种情况。 应用程序中的所有算子都保证数据或事件至少被处理

    2024年02月21日
    浏览(36)
  • Flink Job 执行流程

    ​ 基于 Yarn 层面的架构类似 Spark on Yarn 模式 ,都是由 Client 提交 App 到 RM 上面去运行,然后 RM 分配第一个 container 去运行 AM ,然后由 AM 去负责资源的监督和管理 。需要说明的是, Flink 的 Yarn 模式更加类似 Spark on Yarn 的 cluster 模式,在 cluster 模式中, dirver 将作为 AM 中的一

    2024年02月04日
    浏览(31)
  • Flink系列之:使用Flink CDC从数据库采集数据,设置checkpoint支持数据采集中断恢复,保证数据不丢失

    博主相关技术博客: Flink系列之:Debezium采集Mysql数据库表数据到Kafka Topic,同步kafka topic数据到StarRocks数据库 Flink系列之:使用Flink Mysql CDC基于Flink SQL同步mysql数据到StarRocks数据库

    2024年02月11日
    浏览(41)
  • 【大数据】Flink 架构(五):检查点 Checkpoint(看完即懂)

    《 Flink 架构 》系列(已完结),共包含以下 6 篇文章: Flink 架构(一):系统架构 Flink 架构(二):数据传输 Flink 架构(三):事件时间处理 Flink 架构(四):状态管理 Flink 架构(五):检查点 Checkpoint(看完即懂) Flink 架构(六):保存点 Savepoint 😊 如果您觉得这篇

    2024年02月19日
    浏览(24)
  • 【Flink精讲】Flink内核源码分析:命令执行入口

    官方推荐per-job模式,一个job一个集群,提交时yarn才分配集群资源; 主要的进程:JobManager、TaskManager、Client 提交命令:bin/flink run -t yarn-per-job  /opt/module/flink-1.12.0/examples/streaming/SocketWindowWordCount.jar --port 9999 Per-job进程: CliFronted、YarnJobClusterEncrypoint、TaskExecutorRunner=TaskManagerRunn

    2024年02月21日
    浏览(33)
  • Flink非对齐checkpoint原理(Flink Unaligned Checkpoint)

    为什么提出Unaligned Checkpoint(UC)? 因为反压严重时会导致Checkpoint失败,可能导致如下问题 恢复时间长-服务效率低 非幂等和非事务会导致数据重复 持续反压导致任务加入死循环(可能导致数据丢失,例如超过kafka的过期时间无法重置offset) UC的原理 UC有两个阶段(UC主要是

    2024年02月14日
    浏览(34)
  • 【源码解析】flink sql执行源码概述:flink sql执行过程中有哪些阶段,这些阶段的源码大概位置在哪里

    本文大致分析了flink sql执行过程中的各个阶段的源码逻辑,这样可以在flink sql执行过程中, 能够定位到任务执行的某个阶段的代码大概分布在哪里,为更针对性的分析此阶段的细节逻辑打下基础,比如create 的逻辑是怎么执行的,select的逻辑是怎么生成的,优化逻辑都做了哪

    2024年02月04日
    浏览(28)
  • Flink源码-Task执行

    上一节我们分析到了Execution的生成,然后调用taskManagerGateway.submitTask方法提交task,提交的时候会将executionVertex封装成TaskDeploymentDescriptor,task的提交与执行涉及到了flink多个组件的配合,之前没有详细讲过,可能有的小伙伴有点不太清楚,这里我们花点时间介绍一下。 1.JobManager

    2024年02月03日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包