【Flink状态管理(六)】Checkpoint的触发方式(1)通过CheckpointCoordinator触发算子的Checkpoint操作

这篇具有很好参考价值的文章主要介绍了【Flink状态管理(六)】Checkpoint的触发方式(1)通过CheckpointCoordinator触发算子的Checkpoint操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Checkpoint的触发方式有两种

  • 一种是数据源节点中的Checkpoint操作触发,通过CheckpointCoordinator组件进行协调和控制。 CheckpointCoordinator通过注册定时器的方式按照配置的时间间隔触发数据源节点的Checkpoint操作。数据源节点会向下游算子发出Checkpoint Barrier事件,供下游节点使用。
  • 另一种是下游算子节点根据上游发送的Checkpoint Barrier事件控制算子中Checkpoint操作的触发时机,即只有接收到所有上游Barrier事件后,才会触发本节点的Checkpoint操作。

本文先介绍通过CheckpointCoordinator触发算子的Checkpoint操作

CheckpointCoordinator在整个作业中扮演了Checkpoint协调者的角色,负责在数据源节点触发Checkpoint以及整个作业的Checkpoint管理,并且CheckpointCoordinator组件会接收TaskMananger在Checkpoint执行完成后返回的Ack消息。

 

一. 启动CheckpointCoordinator

当作业的JobStatus转换为Running时,通知CheckpointCoordinatorDeActivator监听器启动CheckpointCoordinator服务。

如代码CheckpointCoordinatorDeActivator.jobStatusChanges()方法主要包含如下逻辑。

> 1. 当`newJobStatus == JobStatus.RUNNING`时,立即调用
> coordinator.startCheckpointScheduler()方法启动整个Job的调度器
> CheckpointCoordinator,此时Checkpoint的触发依靠CheckpointCoordinator进行协调。
> 
> 2.JobStatus为其他类型状态时,调用coordinator.stopCheckpointScheduler()方法,
> 停止当前Job中的Checkpoint操作。



public class CheckpointCoordinatorDeActivator implements JobStatusListener {
   private final CheckpointCoordinator coordinator;
   public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
      this.coordinator = checkNotNull(coordinator);
   }
   @Override
   public void jobStatusChanges(JobID jobId,JobStatus newJobStatus, long timestamp,
                              Throwable error) {
      if (newJobStatus == JobStatus.RUNNING) {
         // 启动Checkpoint调度程序
         coordinator.startCheckpointScheduler();
      } else {
         // 直接停止CheckpointScheduler
         coordinator.stopCheckpointScheduler();
      }
   }
}

 

二. 开启CheckpointScheduler线程

接下来在CheckpointCoordinator.startCheckpointScheduler()方法中调用scheduleTriggerWithDelay()方法进行后续操作,向创建好的checkpointCoordinatorTimer线程池添加定时调度执行的Runnable线程。

如代码所示:

在CheckpointCoordinator.scheduleTriggerWithDelay()方法中指定baseInterval参数,设定执行Checkpoint操作的时间间隔,通过定时器周期性地触发ScheduledTrigger线程,Checkpoint的具体操作在ScheduledTrigger线程中实现。

private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
   return timer.scheduleAtFixedRate(
      new ScheduledTrigger(),
      initDelay, baseInterval, TimeUnit.MILLISECONDS);
}

 

三. 触发Checkpoint

如代码,ScheduledTrigger也是CheckpointCoordinator的内部类,实现了Runnable接口。在ScheduledTrigger.run()方法中调用了CheckpointCoordinator.triggerCheckpoint()方法触发和执行Checkpoint操作。

private final class ScheduledTrigger implements Runnable {
   @Override
   public void run() {
      try {
         // 调用triggerCheckpoint()方法触发Checkpoint操作
         triggerCheckpoint(System.currentTimeMillis(), true);
      }
      catch (Exception e) {
         LOG.error("Exception while triggering checkpoint for job {}.", job, e);
      }
   }
}

CheckpointCoordinator.triggerCheckpoint()方法包含的执行逻辑非常多,这里重点介绍其中的主要逻辑。根据CheckpointCoordinator触发Checkpoint操作的过程分为以下几个部分。

1. Checkpoint执行前的工作

  1. 首先检查Checkpoint的执行环境和参数,满足条件后触发执行Checkpoint操作。Checkpoint执行过程分为异步和同步两种:

调用preCheckBeforeTriggeringCheckpoint()方法进行一些前置检查,主要包括检查CheckpointCoordinator当前的状态是否为shutdown、Checkpoint尝试次数是否超过配置的最大值。

  1. 构建执行和触发Checkpoint操作对应的Task节点实例的Execution集合,其中tasksToTrigger数组中存储了触发Checkpoint操作的ExecutionVertex元素,实际上就是所有的数据源节点。

CheckpointCoordinator仅会触发数据源节点的Checkpoint操作,其他节点则是通过Barrier对齐的方式触发的。

  1. 构建需要发送Ack消息的ExecutionVertex集合,主要是从tasksToWaitFor集合中转换而来。

tasksToWaitFor中存储了ExecutonGraph中所有的ExecutionVertex,也就是说每个ExecutionVertex节点对应的Task实例都需要向CheckpointCoordinator中汇报Ack消息

// 主要做前置检查
   synchronized (lock) {
      preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
   }
   // 创建需要执行的Task对应的Execution集合
  Execution[] executions = new Execution[tasksToTrigger.length];
   // 遍历tasksToTrigger集合,构建Execution集合
   for (int i = 0; i < tasksToTrigger.length; i++) {
   //获取Task对应的Execution集合
      Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
      if (ee == null) {
      // 如果Task对应的Execution集合为空,代表Task没有被执行,则抛出异常
         LOG.info("Checkpoint triggering task {} of job {} is not being 
            executed at the moment. Aborting checkpoint.", tasksToTrigger[i].
            getTaskNameWithSubtaskIndex(), job);
         throw new CheckpointException(
            CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
      } else if (ee.getState() == ExecutionState.RUNNING) {
         // 如果ExecutionState为RUNNING,则添加到executions集合中
      executions[i] = ee;
      } else {
      // 如果其他ExecutionState不为RUNNING,则抛出异常
         LOG.info("Checkpoint triggering task {} of job {} is not in state {} 
           but {} instead. Aborting checkpoint.",
             tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
             job,
             ExecutionState.RUNNING,
             ee.getState());
         throw new CheckpointException(
            CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
      }
   }
   // 组装用于需要发送Ack消息的Task集合
   Map<ExecutionAttemptID, ExecutionVertex> ackTasks = 
      new HashMap<>(tasksToWaitFor.length);
   for (ExecutionVertex ev : tasksToWaitFor) {
      Execution ee = ev.getCurrentExecutionAttempt();
      if (ee != null) {
         ackTasks.put(ee.getAttemptId(), ev);
      } else {
         LOG.info("Checkpoint acknowledging task {} of job {} is not being 
            executed at the moment. Aborting checkpoint.", ev.getTaskNameWith
               SubtaskIndex(), job);
         throw new CheckpointException(
            CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
      }
}

 

2. 创建PendingCheckpoint

在执行Checkpoint操作之前,需要构建PendingCheckpoint对象,从字面意思上讲就是挂起Checkpoint操作。

从开始执行Checkpoint操作直到Task实例返回Ack确认成功消息,Checkpoint会一直处于Pending状态,确保Checkpoint能被成功执行。

如代码逻辑:

  1. Checkpoint有唯一的checkpointID标记,根据高可用模式选择不同的计数器。

如果基于ZooKeeper实现了高可用集群,会调用ZooKeeperCheckpointIDCounter实现checkpointID计数;如果是非高可用集群,则会通过StandaloneCheckpointIDCounter完成checkpointID计数。

  1. 创建checkpointStorageLocation,用于定义Checkpoint过程中状态快照数据存放的位置。

checkpointStorageLocation通过checkpointStorage创建和初始化,不同的checkpointStorage实现创建的checkpointStorageLocation会有所不同。

  1. 创建PendingCheckpoint对象。

包括checkpointID、ackTasks以及checkpointStorageLocation等参数信息。将创建好的PendingCheckpoint存储在pendingCheckpoints集合中,并异步执行PendingCheckpoint操作。

final CheckpointStorageLocation checkpointStorageLocation;
final long checkpointID;
try {
   //通过checkpointIdCounter获取checkpointID
   checkpointID = checkpointIdCounter.getAndIncrement();
      // 获取checkpointStorageLocation
   checkpointStorageLocation = props.isSavepoint() ?
         checkpointStorage
      .initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
         checkpointStorage.initializeLocationForCheckpoint(checkpointID);
}
// 省略部分代码
// 创建PendingCheckpoint对象
final PendingCheckpoint checkpoint = new PendingCheckpoint(
   job,
   checkpointID,
   timestamp,
   ackTasks,
   masterHooks.keySet(),
   props,
   checkpointStorageLocation,
   executor);

 

3. Checkpoint的触发与执行

在CheckpointCoordinator.triggerCheckpoint()方法中,会在synchronized(lock)模块内定义和执行Checkpoint操作的具体逻辑,主要包含如下步骤。

  1. 获取coordinator对象锁,对TriggeringCheckpoint对象进行预检查,主要包括检查CheckpointCoordinator状态和PendingCheckpoint尝试次数等。

  2. 将PendingCheckpoint存储在pendingCheckpoints键值对中,使用定时器创建cancellerHandle对象,cancellerHandle用于清理过期的Checkpoint操作。

通过checkpoint.setCancellerHandle()方法设置Checkpoint的CancellerHandle,设置成功则返回True,如果失败则返回false,说明当前Checkpoint已经被释放。

  1. 调用并执行MasterHook。可以通过实现MasterHook函数,准备外部系统环境或触发相应的系统操作。

  2. 遍历执行executions集合中的Execution节点,判断props.isSynchronous()方法是否为True,如果为True则调用triggerSynchronousSavepoint()方法同步执行Checkpoint操作。
    其他情况则调用triggerCheckpoint()方法异步执行Checkpoint操作。

// 获取coordinator-wide lock
synchronized (lock) {
   // TriggeringCheckpoint检查
   preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
   LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, 
      job);
      // 将checkpoint存储在pendingCheckpoints KV集合中
   pendingCheckpoints.put(checkpointID, checkpoint);
      // 调度canceller线程,清理过期的Checkpoint对象
   ScheduledFuture<?> cancellerHandle = timer.schedule(
         canceller,
         checkpointTimeout, TimeUnit.MILLISECONDS);
      // 确定Checkpoint是否已经被释放
   if (!checkpoint.setCancellerHandle(cancellerHandle)) {
      cancellerHandle.cancel(false);
   }
   // 调用MasterHook方法
   for (MasterTriggerRestoreHook<?> masterHook : masterHooks.values()) {
      final MasterState masterState =
         MasterHooks.triggerHook(masterHook, checkpointID, timestamp, executor)
            .get(checkpointTimeout, TimeUnit.MILLISECONDS);
      checkpoint.acknowledgeMasterState(masterHook.getIdentifier(), masterState);
   }
   Preconditions.checkState(checkpoint.areMasterStatesFullyAcknowledged());
}
// 创建CheckpointOptions
final CheckpointOptions checkpointOptions = new CheckpointOptions(
      props.getCheckpointType(),
      checkpointStorageLocation.getLocationReference());
// 分别执行executions中的Execution节点
for (Execution execution: executions) {
   if (props.isSynchronous()) {
      // 如果是同步执行,则调用triggerSynchronousSavepoint()方法
      execution.triggerSynchronousSavepoint(checkpointID, timestamp, 
                                            checkpointOptions,
                                            advanceToEndOfTime);
   } else {
      // 其他情况则调用triggerCheckpoint()异步方法执行
      execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
   }
}
// 返回Checkpoint中的CompletionFuture对象
numUnsuccessfulCheckpointsTriggers.set(0);
return checkpoint.getCompletionFuture();

以上就完成了在CheckpointCoordinator中触发Checkpoint的全部操作,具体的执行过程调用Execution完成。

 

四. Task节点的Checkpoint操作

在Execution.triggerCheckpoint()方法中实际上调用triggerCheckpointHelper()方法完成Execution对应的Task节点的Checkpoint操作,并通过Task实例触发数据源节点的Checkpoint操作,如代码所示。

1. 触发准备

  1. 获取当前Execution分配的LogicalSlot,如果LogicalSlot不为空,说明Execution成功分配到Slot计算资源,否则说明Execution中没有资源,Execution对应的Task实例不会被执行和启动。

  2. 调用TaskManagerGateway.triggerCheckpoint()的RPC方法,触发和执行指定Task的Checkpoint操作。

  3. TaskExecutor收到来自CheckpointCoordinator的Checkpoint触发请求后,会在TaskExecutor实例中完成对应Task实例的Checkpoint操作。

private void triggerCheckpointHelper(long checkpointId, 
                                     long timestamp, 
                                     CheckpointOptions checkpointOptions, 
                                     boolean advanceToEndOfEventTime) {
   final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
   if (advanceToEndOfEventTime 
       && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
       throw new IllegalArgumentException("Only synchronous savepoints are 
         allowed to advance the watermark to MAX.");
   }
      // 获取当前Execution分配的LogicalSlot资源
   final LogicalSlot slot = assignedResource;
   // 如果LogicalSlot不为空,说明Execution运行正常
   if (slot != null) {
      // 通过slot获取TaskManagerGateway对象
      final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            // 调用triggerCheckpoint()方法
      taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), 
                                           checkpointId, timestamp, 
                                           checkpointOptions,
                                           advanceToEndOfEventTime);
   } else {
      // 否则说明Execution中没有资源,不再执行Execution对应的Task实例
      LOG.debug("The execution has no slot assigned. This indicates that the 
      execution is no longer running.");
   }
}

 

2. 调用TaskExecutor执行Checkpoint操作

TaskExecutor接收到来自CheckpointCoordinator的Checkpoint触发请求后,立即根据Execution信息确认Task实例线程,并且调用Task实例触发和执行数据源节点的Checkpoint操作。如代码,TaskExecutor.triggerCheckpoint()方法逻辑如下。

  1. 检查CheckpointType的类型,CheckpointType共有三种类型,分别为CHECKPOINT、SAVEPOINT和SYNC_SAVEPOINT,且只有在同步Savepoints操作时才能调整Watermark为MAX。

  2. 从taskSlotTable中获取Execution对应的Task实例,如果Task实例不为空,则调用task.triggerCheckpointBarrier()方法执行Task实例中的Checkpoint操作。

  3. 如果Task实例为空,说明Task目前处于异常,无法执行Checkpoint操作。此时调用FutureUtils.completedExceptionally()方法,并封装CheckpointException异常信息,返回给管理节点的CheckpointCoordinator进行处理。

public CompletableFuture<Acknowledge> triggerCheckpoint(
      ExecutionAttemptID executionAttemptID,
      long checkpointId,
      long checkpointTimestamp,
      CheckpointOptions checkpointOptions,
      boolean advanceToEndOfEventTime) {
   log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, 
      checkpointTimestamp, executionAttemptID);
      //检查CheckpointType,确保只有同步的savepoint操作才能将Watermark调整为MAX
   final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
   if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && 
       checkpointType.isSavepoint())) {
      throw new IllegalArgumentException("Only synchronous savepoints are 
         allowed to advance the watermark to MAX.");
   }
      // 从taskSlotTable中获取当前Execution对应的Task
   final Task task = taskSlotTable.getTask(executionAttemptID);
   // 如果task不为空,则调用triggerCheckpointBarrier()方法
   if (task != null) {
      task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, 
         checkpointOptions, advanceToEndOfEventTime);
   // 返回CompletableFuture对象
      return CompletableFuture.completedFuture(Acknowledge.get());
   } else {
      final String message = "TaskManager received a checkpoint request for 
         unknown task " + executionAttemptID + '.';
      // 如果task为空,则返回CheckpointException异常
      log.debug(message);
      return FutureUtils.completedExceptionally(
          new CheckpointException(message,
CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
   }
}

 

五. 在StreamTask中执行Checkpoint操作

在执行Task.triggerCheckpointBarrier()方法时,会借助AbstractInvokable中提供的triggerCheckpointAsync()方法触发并执行StreamTask中的Checkpoint操作。

public Future<Boolean> triggerCheckpointAsync(
      CheckpointMetaData checkpointMetaData,
      CheckpointOptions checkpointOptions,
      boolean advanceToEndOfEventTime) {
   // 异步提交Checkpoint操作
   return mailboxProcessor.getMainMailboxExecutor().submit(    
      () -> triggerCheckpoint(checkpointMetaData, 
                              checkpointOptions, advanceToEndOfEventTime),
      "checkpoint %s with %s",
      checkpointMetaData,
      checkpointOptions);
}

StreamTask.triggerCheckpoint()方法主要逻辑如下。

  1. 调用StreamTask.performCheckpoint()方法执行Checkpoint并返回success信息,用于判断Checkpoint操作是否成功执行。
  2. 如果success信息为False,表明Checkpoint操作没有成功执行,此时调用declineCheckpoint()方法回退。
boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, 
                                    checkpointMetrics, advanceToEndOfEventTime);
if (!success) {
   declineCheckpoint(checkpointMetaData.getCheckpointId());
}
return success;

在StreamTask.performCheckpoint()方法中,主要执行了Task实例的Checkpoint操作,该方法除了会通过CheckpointCoordinator触发之外,在下游算子通过CheckpointBarrier对齐触发Checkpoint操作时,也会调用该方法执行具体Task的Checkpoint操作。

 
下篇我们继续看CheckpointBarrier对齐触发Checkpoint的流程,了解StreamTask中performCheckpoint()方法如何执行Checkpoint操作,实现状态数据快照与持久化操作。

 

参考:《Flink设计与实现:核心原理与源码解析》–张利兵文章来源地址https://www.toymoban.com/news/detail-834237.html

到了这里,关于【Flink状态管理(六)】Checkpoint的触发方式(1)通过CheckpointCoordinator触发算子的Checkpoint操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink分流,合流,状态,checkpoint和精准一次笔记

    第8章 分流 1.使用侧输出流 2.合流 2.1 union :使用 ProcessFunction 处理合流后的数据 2.2 Connect : 两条流的格式可以不一样, map操作使用CoMapFunction,process 传入:CoProcessFunction 2.2 BroadcastConnectedStream keyBy 进行了按键分区,那么要传入的就是 KeyedBroadcastProcessFunction; 如果没有按键分

    2024年02月12日
    浏览(35)
  • el-date-picker实现通过其他方式触发日期选择器

    el-date-picker 目前只能通过点击input输入框触发日期选择器,项目中需要通过其他方式触发日期选择器同时把input输入框去掉,如点击另一个按钮事件 来触发日期选择器框展开。 该模块由于后端接口数据传输限制 在前面文章里做了些许改动。 需求左右切换 可以快速找到年份,

    2023年04月08日
    浏览(39)
  • Flink中的状态管理

    在Flink中,算子任务可以分为 有状态 和 无状态 两种状态。 无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果。 例如 Map 、 Filter 、 FlatMap 都是属于 无状态算子 。  而 有状态的算子任务,就是除了当前数据外,还需要一些其他的数据来得到

    2024年01月23日
    浏览(42)
  • Flink State 状态管理

    状态在Flink中叫做State,用来保存中间计算结果或者缓存数据。要做到比较好的状态管理,需要考虑以下几点内容: 状态数据的存储和访问 在Task内部,如何高效地保存状态数据和使用状态数据。 状态数据的备份和恢复 作业失败是无法避免的,那么就要考虑如何高效地将状态

    2024年01月17日
    浏览(44)
  • Flink非对齐checkpoint原理(Flink Unaligned Checkpoint)

    为什么提出Unaligned Checkpoint(UC)? 因为反压严重时会导致Checkpoint失败,可能导致如下问题 恢复时间长-服务效率低 非幂等和非事务会导致数据重复 持续反压导致任务加入死循环(可能导致数据丢失,例如超过kafka的过期时间无法重置offset) UC的原理 UC有两个阶段(UC主要是

    2024年02月14日
    浏览(45)
  • 《Flink学习笔记》——第八章 状态管理

    8.1 Flink中的状态 8.1.1 概述 在Flink中,算子任务可以分为无状态和有状态两种情况。 **无状态的算子:**每个事件不依赖其它数据,自己处理完就输出,也不需要依赖中间结果。例如:打印操作,每个数据只需要它本身就可以完成。 **有状态的算子:**事件需要依赖中间或者外

    2024年02月11日
    浏览(40)
  • 【大数据】Flink 架构(四):状态管理

    《 Flink 架构 》系列(已完结),共包含以下 6 篇文章: Flink 架构(一):系统架构 Flink 架构(二):数据传输 Flink 架构(三):事件时间处理 Flink 架构(四):状态管理 Flink 架构(五):检查点 Checkpoint(看完即懂) Flink 架构(六):保存点 Savepoint 😊 如果您觉得这篇

    2024年02月19日
    浏览(42)
  • Flink状态管理与检查点机制

    本专栏案例代码和数据集链接:  https://download.csdn.net/download/shangjg03/88477960 相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算。即你可以将中间的计算结果进行保存,并提供给后续的计算使用: 具体而言,Flink 又将状态 (State) 分为 Keyed State 与 O

    2024年02月07日
    浏览(43)
  • Flink 2.0 状态管理存算分离架构演进

    2024年01月16日
    浏览(43)
  • 【状态管理|概述】Flink的状态管理:为什么需要state、怎么保存state、对于state过大怎么处理

    按照数据的划分和扩张方式,Flink中大致分为2类: Keyed States:记录每个Key对应的状态值 因为一个任务的并行度有多少,就会有多少个子任务,当key的范围大于并行度时,就会出现一个subTask上可能包含多个Key(),但不同Task上不会出现相同的Key(解决了shuffle的问题?)   常

    2024年02月01日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包