源码解析Flink源节点数据读取是如何与checkpoint串行执行

这篇具有很好参考价值的文章主要介绍了源码解析Flink源节点数据读取是如何与checkpoint串行执行。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

源码解析Flink源节点数据读取是如何与checkpoint串行执行

Flink版本:1.13.6

前置知识:源节点的Checkpoint是由Checkpointcoordinate触发,具体是通过RPC调用TaskManager中对应的Task的StreamTask类的performChecpoint方法执行Checkpoint。

本文思路:本文先分析checkpoint阶段,然后再分析数据读取阶段,最后得出结论:源节点Checkpoint时和源节点读取数据时,都需要抢SourceStreamTask类中lock变量的锁,最终实现串行执行checkpoint与写数据

Checkpoint阶段

Checkpoint在StreamTask的performCheckpoint方法中执行,该方法调用过程如下

// 在StreamTask类中 执行checkpoint操作
private boolean performCheckpoint(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetricsBuilder checkpointMetrics )
            throws Exception {
        if (isRunning) {
            //使用actionExecutor 同步触发checkpoint
            actionExecutor.runThrowing(
                    () -> {
    					....//经过一系列检查
                        subtaskCheckpointCoordinator.checkpointState(
                                checkpointMetaData,
                                checkpointOptions,
                                checkpointMetrics,
                                operatorChain,
                                this::isRunning);
                    });
            return true;
        } else {
    		....
        }
    }

从上述代码可以看出,Checkpoint执行是由actionExecutor执行器执行

StreamTask类变量actionExecutor的实现和初始化

StreamTask类变量actionExecution的实现

通过代码注释可以知道该执行器的实现是StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor;从SynchronizedStreamTaskActionExecutor源代码可知,该执行器每次执行都需要获得mutex对象锁

  /**
     * All actions outside of the task {@link #mailboxProcessor mailbox} (i.e. performed by another
     * thread) must be executed through this executor to ensure that we don't have concurrent method
     * calls that void consistent checkpoints.
     *
     * <p>CheckpointLock is superseded by {@link MailboxExecutor}, with {@link
     * StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor
     * SynchronizedStreamTaskActionExecutor} to provide lock to {@link SourceStreamTask}.
     */
private final StreamTaskActionExecutor actionExecutor;


class SynchronizedStreamTaskActionExecutor implements StreamTaskActionExecutor {
    private final Object mutex;

    public SynchronizedStreamTaskActionExecutor(Object mutex) {
        this.mutex = mutex;
    }

    @Override
    public void run(RunnableWithException runnable) throws Exception {
        synchronized (mutex) {
            runnable.run();
        }
    }
}

StreamTask变量actionExecution初始化

actionExecutor变量在StreamTask中定义,在构造方法中初始化;该构造方法由SourceStreamTask调用,并传入SynchronizedStreamTaskActionExecutor对象,代码如下所示

//   SourceStreamTask的方法
private SourceStreamTask(Environment env, Object lock) throws Exception {
    //调用的StreamTask构造函数,传入SynchronizedStreamTaskActionExecutor对象
    super(
            env,
            null,
            FatalExitExceptionHandler.INSTANCE,
            //初始化actionExecutor
            StreamTaskActionExecutor.synchronizedExecutor(lock));
    //将lock对象赋值给类变量lock
    this.lock = Preconditions.checkNotNull(lock);
    this.sourceThread = new LegacySourceFunctionThread();

    getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);
}

//  StreamTask的方法
protected StreamTask(
        Environment environment,
        @Nullable TimerService timerService,
        Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
    	//初始化actionExecutor
        StreamTaskActionExecutor actionExecutor)
        throws Exception {
    this(
            environment,
            timerService,
            uncaughtExceptionHandler,
            actionExecutor,
            new TaskMailboxImpl(Thread.currentThread()));
}

protected StreamTask(
        Environment environment,
        @Nullable TimerService timerService,
        Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
        StreamTaskActionExecutor actionExecutor,
        TaskMailbox mailbox)
        throws Exception {
    super(environment);
    this.configuration = new StreamConfig(getTaskConfiguration());
    this.recordWriter = createRecordWriterDelegate(configuration, environment);
    //初始化actionExecutor
    this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
    this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
    .......}
小结

actionExecutor执行器每次执行都需要获得mutex对象,mutex对象就是SourceStreamTask类中的lock对象;即算子每次执行Checkpoint时都需要获得SourceStreamTask类中lock对象锁才能进行

数据读取阶段

在执行Checkpoint时控制读取源端,则控制点必定是在调用SourceContext的collect方法时

@Override
public void run(SourceContext<String> ctx) throws Exception {
    int i = 0;
    while (true) {
		//在这个方法里处理
        ctx.collect(String.valueOf(i));
    }
}

点击collection查看实现,选择NonTimestampContext查看代码,collect()实现如下

@Override
public void collect(T element) {
    synchronized (lock) {
        output.collect(reuse.replace(element));
    }
}

所以这里控制数据读取发送是通过lock来控制,lock是如何初始化的?

通过NonTimestampContext构造方法可以定位到StreamSourceContexts->getSourceContext方法;

public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
        TimeCharacteristic timeCharacteristic,
        ProcessingTimeService processingTimeService,
        Object checkpointLock,
        StreamStatusMaintainer streamStatusMaintainer,
        Output<StreamRecord<OUT>> output,
        long watermarkInterval,
        long idleTimeout) {

    final SourceFunction.SourceContext<OUT> ctx;
    switch (timeCharacteristic) {
		....
        case ProcessingTime:
            //初始化NonTimestampContext
            ctx = new NonTimestampContext<>(checkpointLock, output);
            break;
        default:
            throw new IllegalArgumentException(String.valueOf(timeCharacteristic));
    }
    return ctx;
}

向上追踪,在StreamSource类中调用getSourceContext:

public void run(
        final Object lockingObject,
        final StreamStatusMaintainer streamStatusMaintainer,
        final Output<StreamRecord<OUT>> collector,
        final OperatorChain<?, ?> operatorChain)
        throws Exception {
        ....
        this.ctx =
        
        StreamSourceContexts.getSourceContext(
                timeCharacteristic,
                getProcessingTimeService(),
                lockingObject,
                streamStatusMaintainer,
                collector,
                watermarkInterval,
                -1);
        ....
        }
// 再向上最终run方法的调用点->是由内部方法run调用
public void run(
        final Object lockingObject,
        final StreamStatusMaintainer streamStatusMaintainer,
        final OperatorChain<?, ?> operatorChain)
        throws Exception {

    run(lockingObject, streamStatusMaintainer, output, operatorChain);
}

//再向上最终run方法的调用点->SourceStreamTask 调用run 然后再代用mainOpterator run方法
@Override
public void run() {
    try {
        // 使用的是类变量lock
        mainOperator.run(lock, getStreamStatusMaintainer(), operatorChain);
        if (!wasStoppedExternally && !isCanceled()) {
            synchronized (lock) {
                operatorChain.setIgnoreEndOfInput(false);
            }
        }
        completionFuture.complete(null);
    } catch (Throwable t) {
        // Note, t can be also an InterruptedException
        completionFuture.completeExceptionally(t);
    }
}
小结

所以在源端写数据时,必须获得SourceStreamTask中的类变量lock的锁才能进行写数据;类变量lock刚好和执行器时同一个对象

总结

flink的source算子在Checkpoint时,是通过锁对象SourceStreamTask.lock,来控制源端数据产生和Checkpoint的有序进行文章来源地址https://www.toymoban.com/news/detail-631151.html

到了这里,关于源码解析Flink源节点数据读取是如何与checkpoint串行执行的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink 从savepoint、checkpoint中恢复数据

    提示:flink checkpoint重启:   flink作业因为故障导致restart strategy失败或升级flink版本重新发布任务,这时就需要从最近的checkpoint恢复。一般而言有两种方案,第一种方案是 开启checkpoint且任务取消时不删除checkpoint (调整参数execution.checkpointing.externalized-checkpoint-retention),第

    2024年02月10日
    浏览(31)
  • flink正常消费kafka数据,flink没有做checkpoint,kafka位点没有提交

    1、背景 flink消费kafka数据,多并发,实现双流join 2、现象 (1)flink任务消费kafka数据,其中数据正常消费,kafka显示消息堆积,位点没有提交,并且flink任务没有做checkpoint (2)其中一个流的subtask显示finished (3)无背压 3、问题原因 (1)其中一个topic分区为1 (2)配置的并行

    2024年02月13日
    浏览(38)
  • 【大数据】Flink 架构(五):检查点 Checkpoint(看完即懂)

    《 Flink 架构 》系列(已完结),共包含以下 6 篇文章: Flink 架构(一):系统架构 Flink 架构(二):数据传输 Flink 架构(三):事件时间处理 Flink 架构(四):状态管理 Flink 架构(五):检查点 Checkpoint(看完即懂) Flink 架构(六):保存点 Savepoint 😊 如果您觉得这篇

    2024年02月19日
    浏览(40)
  • 【Unity 3D】C#从JSON文件中读取、解析、保存数据(附源码)

    JSON是一种轻量级的数据交换格式,采用完全独立于编程语言的文本格式存储和表示数据,简洁和清晰的层次结构使JSON成为理想的数据交换语言,易于读者阅读和编写,同时也易于机器解析和生成,并有效的提高网络传输效率 生成JSON数据实例代码如下 下面的代码将JSON中数据

    2024年02月11日
    浏览(58)
  • Flink系列之:使用Flink CDC从数据库采集数据,设置checkpoint支持数据采集中断恢复,保证数据不丢失

    博主相关技术博客: Flink系列之:Debezium采集Mysql数据库表数据到Kafka Topic,同步kafka topic数据到StarRocks数据库 Flink系列之:使用Flink Mysql CDC基于Flink SQL同步mysql数据到StarRocks数据库

    2024年02月11日
    浏览(56)
  • 深入解析 Flink CDC 增量快照读取机制

    Flink CDC 1.x 使用 Debezium 引擎集成来实现数据采集,支持全量加增量模式,确保数据的一致性。然而,这种集成存在一些痛点需要注意: 一致性通过加锁保证 :在保证数据一致性时,Debezium 需要对读取的库或表加锁。全局锁可能导致数据库出现挂起情况,而表级锁会影响表的

    2024年02月03日
    浏览(31)
  • Java如何快速读取&解析JSON数据(文件),获取想要的内容?

    手打不易,如果转摘,请注明出处! 注明原文: https://zhangxiaofan.blog.csdn.net/article/details/132764186 目录 前言 准备工作 Json数据(示例) 解析Json文件 第一步:创建一个空类 第二步:使用 Gsonformat 插件  第三步:复制Json内容,创建对应类 第四步:读取Json文件,提取目标数据

    2024年02月05日
    浏览(70)
  • Flink非对齐checkpoint原理(Flink Unaligned Checkpoint)

    为什么提出Unaligned Checkpoint(UC)? 因为反压严重时会导致Checkpoint失败,可能导致如下问题 恢复时间长-服务效率低 非幂等和非事务会导致数据重复 持续反压导致任务加入死循环(可能导致数据丢失,例如超过kafka的过期时间无法重置offset) UC的原理 UC有两个阶段(UC主要是

    2024年02月14日
    浏览(43)
  • Flink 如何定位反压节点?

    Flink Web UI 自带的反压监控 —— 直接方式 Flink Web UI 的反压监控提供了 Subtask 级别 的反压监控。监控的原理是 通过Thread.getStackTrace() 采集在 TaskManager 上正在运行的所有线程,收集在缓冲区请求中阻塞的线程数(意味着下游阻塞),并计算缓冲区阻塞线程数与总线程数的比值

    2024年02月10日
    浏览(35)
  • 【Flink】 FlinkCDC读取Mysql( DataStream 方式)(带完整源码,直接可使用)

    简介:     FlinkCDC读取Mysql数据源,程序中使用了自定义反序列化器,完整的Flink结构,开箱即用。 本工程提供 1、项目源码及详细注释,简单修改即可用在实际生产代码 2、成功编译截图 3、自己编译过程中可能出现的问题 4、mysql建表语句及测试数据 5、修复FlinkCDC读取Mysql数

    2024年02月07日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包