源码解析FlinkKafkaConsumer支持周期性水位线发送

这篇具有很好参考价值的文章主要介绍了源码解析FlinkKafkaConsumer支持周期性水位线发送。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

背景

当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程

FlinkKafkaConsumer水位线发送

1.首先从Fetcher类开始,创建Fetcher类的时候会构建一个周期性的水位线发送线程并启动

        // if we have periodic watermarks, kick off the interval scheduler
        if (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {
            PeriodicWatermarkEmitter<T, KPH> periodicEmitter =
                    new PeriodicWatermarkEmitter<>(
                            checkpointLock,
                            subscribedPartitionStates,
                            watermarkOutputMultiplexer,
                            processingTimeProvider,
                            autoWatermarkInterval);

            periodicEmitter.start();
        }

2.随后,PeriodicWatermarkEmitter中注册处理时间定时器,周期性执行

        public void start() {
            timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
        }

        @Override
        public void onProcessingTime(long timestamp) {

            synchronized (checkpointLock) {
                for (KafkaTopicPartitionState<?, ?> state : allPartitions) {
                    // 这里当前算子任务消费的kafka 分区分别记录每个分区的水位值
                    state.onPeriodicEmit();
                }
				//这里当前算子会把自己消费的kafka分区的所有水位线取最小值后当成当前算子任务自身的水位线发送出去,注意这里是当前算子任务级别的
                watermarkOutputMultiplexer.onPeriodicEmit();
            }

            // schedule the next watermark
            timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
        }
    }

3.对应state.onPeriodicEmit();记录每个kafka分区的水位线方法

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();
        if (next != null) {
            output.emitWatermark(new Watermark(next.getTimestamp()));
        }
    }
其中 WatermarkOutput output.emitWatermark(new Watermark(next.getTimestamp()))代码如下:
        public DeferredOutput(OutputState state) {
            this.state = state;
        }

        @Override
        public void emitWatermark(Watermark watermark) {
            state.setWatermark(watermark.getTimestamp());
        }
所以这里最终效果只是对应state(kafka分区[注意,一个算子任务有可能消费好几个kafka分区])上设置了水位线
        /**
         * Returns true if the watermark was advanced, that is if the new watermark is larger than
         * the previous one.
         *
         * <p>Setting a watermark will clear the idleness flag.
         */
        public boolean setWatermark(long watermark) {
            this.idle = false;
            final boolean updated = watermark > this.watermark;
            // 这里也可以看出来,即使代码里面发送了更小值的水位线,水位线也不会回退
            this.watermark = Math.max(watermark, this.watermark);
            return updated;
        }        

4.对应算子任务组合当前任务消费的所有分区水位线的方法文章来源地址https://www.toymoban.com/news/detail-718112.html

private void updateCombinedWatermark() {
        long minimumOverAllOutputs = Long.MAX_VALUE;

        boolean hasOutputs = false;
        boolean allIdle = true;
        for (OutputState outputState : watermarkOutputs) {
            if (!outputState.isIdle()) {
                minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());
                allIdle = false;
            }
            hasOutputs = true;
        }

        // if we don't have any outputs minimumOverAllOutputs is not valid, it's still
        // at its initial Long.MAX_VALUE state and we must not emit that
        // 如果算子任务不消费任何分区,它不会发出任何水位线,这里是不是就是kafka消费者要小于kafka主题的原因所在???
        if (!hasOutputs) {
            return;
        }

        if (allIdle) {// 如果当前算子任务处于空闲时间,标识空闲,以便后续算子可以继续推进
            underlyingOutput.markIdle();
        } else if (minimumOverAllOutputs > combinedWatermark) {
            combinedWatermark = minimumOverAllOutputs;
            underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));
        }
    }```

    

到了这里,关于源码解析FlinkKafkaConsumer支持周期性水位线发送的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink生成水位线记录方式--周期性水位线生成器

    在flink基于事件的时间处理中,水位线记录的生成是一个很重要的环节,本文就来记录下几种水位线记录的生成方式的其中一种:周期性水位线生成器 1.1 BoundedOutOfOrdernessTimeStampExtractor 他会接收一个表示最大延迟的参数,比如1分钟,意味着如果到达的元素的事件时间和之前到

    2024年02月07日
    浏览(42)
  • Abaqus CAE 2018插件使用详解:基于周期性边界条件定义3D几何模型的实践指南**

    注:这篇文章是为了帮助读者更好地理解和使用Abaqus CAE 2018的插件来定义周期性边界条件。所提供的信息是基于我个人的理解和实践,如有不准确或者有更好的建议,欢迎读者们指正和交流。 一、 引言 在进行无限或半无限域建模时,周期性边界条件可为我们提供了一种模拟

    2024年02月11日
    浏览(35)
  • 使用Dream3D和MATLAB从综合构建微结构到创建具有周期性边界条件的Abaqus输入文件的一站式解决方案

    声明 :本文中的所有内容仅供学术研究和讨论,不保证完全无误。对于使用本文内容可能产生的任何后果,作者不承担任何责任。希望大家在使用时,结合自己的实际情况进行酌情调整。 当我们面临材料力学问题,包括材料的疲劳、断裂和塑性等行为的仿真时,一个常见的

    2024年02月10日
    浏览(51)
  • 为什么RIP使用UDP,OSPF使用IP,而BGP使用TCP?为什么RIP周期性地和邻站交换路由信息而BGP却不这样做?

    RIP只和邻站交换信息,使用UDP无可靠保障,但开销小,可以满足RIP要求; OSPF使用可靠的洪泛法,直接使用IP,灵活、开销小; BGP需要交换整个路由表和更新信息,TCP提供可靠交付以减少带宽消耗; RIP使用不保证可靠交付的UDP,因此必须不断地(周期性地)和邻站交换信息才

    2024年02月02日
    浏览(43)
  • 【正点原子STM32】RTC实时时钟(RTC方案、BCD码、时间戳、RTC相关寄存器和HAL库驱动、RTC基本配置步骤、RTC基本驱动步骤、时间设置和读取、RTC闹钟配置和RTC周期性自动唤醒配置)

    一、RTC简介 二、STM32 RTC框图介绍 2.1、STM32 F1 RTC结构框图 2.2、STM32 F4 / F7 / H7 RTC结构框图 三、RTC相关寄存器介绍 3.1、RTC基本配置步骤 3.2、RTC相关寄存器(F1) 3.3、RTC相关寄存器(F4 / F7 / H7) 四、RTC相关HAL库驱动介绍 4.1、RTC相关HAL库驱动(F1) 4.2、RTC相关HAL库驱动(F4 / F7 /

    2024年03月27日
    浏览(56)
  • 【框架源码】Spring源码解析之Bean生命周期流程

    观看本文前,我们先思考一个问题,什么是Spring的bean的生命周期?这也是我们在面试的时候,面试官常问的一个问题。 在没有Spring之前,我们创建对象的时候,采用new的方式,当对象不在被使用的时候,由Java的垃圾回收机制回收。 而 Spring 中的对象是 bean,bean 和普通的 J

    2024年02月09日
    浏览(31)
  • Spring之Bean生命周期源码解析

    ClassPathBeanDefinitionScanner.java ClassPathScanningCandidateComponentProvider.java 通过组件索引寻找 这里的 componentsIndex 在初始化的时候会尝试解析 META-INF/spring.components 文件中的配置信息 把断点打在 ClassPathScanningCandidateComponentProvider 的 setResourceLoader 方法上调试可以看到堆栈 可以看到,的确

    2024年02月11日
    浏览(36)
  • 【Spring】Spring之Bean生命周期源码解析

    什么是bean的生命周期 是指bean在spring中是如何生成,如何销毁的; spring创建对象的过程,就是IOC(控制反转)的过程; JFR Java Flight Record,java飞行记录,类似于飞机的黑匣子,是JVM内置的基于事件的JDK监控记录框架,主要用于问题定位和持续监控; 入口代码: Spring启动的时

    2024年02月15日
    浏览(36)
  • 【深入Spring源码解析:解密Bean的生命周期】

    Spring是Java企业级应用开发领域的一颗明星,它提供了很多方便开发人员的工具和思想。在分布式系统中,Spring的分布式远程协作方案,比如REST、Web服务以及消息传递等,也是不可或缺的。 你知道吗?在我们使用Spring时,容器中存放的所有对象,在Spring启动的时候就完成了实

    2024年02月05日
    浏览(33)
  • 【Spring专题】Spring之Bean的生命周期源码解析——上(扫描生成BeanDefinition)

    由于Spring源码分析是一个前后联系比较强的过程,而且这边分析,也是按照代码顺序讲解的,所以不了解前置知识的情况下,大概率没办法看懂当前的内容。所以,特别推荐看看我前面的文章(自上而下次序): Spring底层核心原理解析——引导篇【学习难度: ★★☆☆☆ 】

    2024年02月13日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包