Flink Job 执行流程

这篇具有很好参考价值的文章主要介绍了Flink Job 执行流程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink On Yarn 模式

Flink Job 执行流程,Flink,flink,大数据,java,面试,后端,性能优化,spark

基于Yarn层面的架构类似 Spark on Yarn模式,都是由Client提交AppRM上面去运行,然后 RM分配第一个container去运行AM,然后由AM去负责资源的监督和管理。需要说明的是,FlinkYarn模式更加类似Spark on Yarncluster模式,在cluster模式中,dirver将作为AM中的一个线程去运行。Flink on Yarn模式也是会将JobManager启动在container里面,去做个driver类似的任务调度和分配,Yarn AMFlink JobManager在同一个Container,这样AM可以知道Flink JobManager的地址,从而AM可以申请Container去启动Flink TaskManager。待Flink成功运行在Yarn集群上,Flink Yarn Client就可以提交Flink JobFlink JobManager,并进行后续的映射、调度和计算处理。

Fink on Yarn 的缺陷

【1】资源分配是静态的,一个作业需要在启动时获取所需的资源并且在它的生命周期里一直持有这些资源。这导致了作业不能随负载变化而动态调整,在负载下降时无法归还空闲的资源,在负载上升时也无法动态扩展。
【2】On-Yarn模式下,所有的container都是固定大小的,导致无法根据作业需求来调整container的结构。譬如CPU密集的作业或需要更多的核,但不需要太多内存,固定结构的container会导致内存被浪费。
【3】与容器管理基础设施的交互比较笨拙,需要两个步骤来启动Flink作业:1.启动Flink守护进程;2.提交作业。如果作业被容器化并且将作业部署作为容器部署的一部分,那么将不再需要步骤2。
【4】On-Yarn模式下,作业管理页面会在作业完成后消失不可访问。
【5】Flink推荐 per job clusters 的部署方式,但是又支持可以在一个集群上运行多个作业的session模式,令人疑惑。

Flink版本1.5中引入了DispatcherDispatcher是在新设计里引入的一个新概念。Dispatcher会从Client端接受作业提交请求并代表它在集群管理器上启动作业。引入Dispatcher的原因主要有两点:
【1】一些集群管理器需要一个中心化的作业生成和监控实例;
【2】能够实现Standalone模式下JobManager的角色,且等待作业提交。在一些案例中,Dispatcher是可选的Yarn或者不兼容的kubernetes

资源调度模型重构下的 Flink On Yarn 模式

Flink Job 执行流程,Flink,flink,大数据,java,面试,后端,性能优化,spark

客户端提交JobGraph以及依赖jar包到YarnResourceManager,接着Yarn ResourceManager分配第一个container以此来启动AppMasterApplication Master中会启动一个FlinkResourceManager以及JobManagerJobManager会根据JobGraph生成的ExecutionGraph以及物理执行计划向FlinkResourceManager申请slotFlinkResoourceManager会管理这些slot以及请求,如果没有可用slot就向YarnResourceManager申请containercontainer启动以后会注册到FlinkResourceManager,最后JobManager会将subTask deploy到对应containerslot中去。
Flink Job 执行流程,Flink,flink,大数据,java,面试,后端,性能优化,spark

在有Dispatcher的模式下:会增加一个过程,就是Client会直接通过HTTP Server的方式,然后用Dispatcher将这个任务提交到Yarn ResourceManager中。

新框架具有四大优势,详情如下:
【1】client直接在Yarn上启动作业,而不需要先启动一个集群然后再提交作业到集群。因此client再提交作业后可以马上返回。
【2】所有的用户依赖库和配置文件都被直接放在应用的classpath,而不是用动态的用户代码classloader去加载。
【3】container在需要时才请求,不再使用时会被释放。
【4】“需要时申请”的container分配方式允许不同算子使用不同profile (CPU和内存结构)的container

新的资源调度框架下 single cluster job on Yarn 流程介绍

Flink Job 执行流程,Flink,flink,大数据,java,面试,后端,性能优化,spark

single cluster job on Yarn模式涉及三个实例对象:
【1】clifrontend Invoke App code;生成StreamGraph,然后转化为JobGraph
【2】YarnJobClusterEntrypoint(Master) 依次启动YarnResourceManagerMinDispatcherJobManagerRunner三者都服从分布式协同一致的策略;JobManagerRunnerJobGraph转化为ExecutionGraph,然后转化为物理执行任务Execution,然后进行deploydeploy过程会向 YarnResourceManager请求slot,如果有直接deploy到对应的YarnTaskExecutiontorslot里面,没有则向YarnResourceManager申请,带container启动以后deploy
【3】YarnTaskExecutorRunner (slave) 负责接收subTask,并运行。

整个任务运行代码调用流程如下图

Flink Job 执行流程,Flink,flink,大数据,java,面试,后端,性能优化,spark

subTask在执行时是怎么运行的?

调用StreamTaskinvoke方法,执行步骤如下:
【1】initializeState()operatorinitializeState()
【2】openAllOperators()operatoropen()方法;
【3】最后调用run方法来进行真正的任务处理;

我们来看下flatMap对应的OneInputStreamTaskrun方法具体是怎么处理的。

@Override
protected void run() throws Exception {
    // 在堆栈上缓存处理器引用,使代码更易于JIT
    final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

    while (running && inputProcessor.processInput()) {
        // 所有的工作都发生在“processInput”方法中
    }
}

最终是调用StreamInputProcessorprocessInput()做数据的处理,这里面包含用户的处理逻辑。

public boolean processInput() throws Exception {
    if (isFinished) {
        return false;
    }
    if (numRecordsIn == null) {
        try {
            numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
        } catch (Exception e) {
            LOG.warn("An exception occurred during the metrics setup.", e);
           numRecordsIn = new SimpleCounter();
       }
   }
   while (true) {
       if (currentRecordDeserializer != null) {
           DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
           if (result.isBufferConsumed()) {
               currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
               currentRecordDeserializer = null;
           }
           if (result.isFullRecord()) {
               StreamElement recordOrMark = deserializationDelegate.getInstance();
               //处理watermark
               if (recordOrMark.isWatermark()) {
                   // handle watermark
                   //watermark处理逻辑,这里可能引起timer的trigger
                   statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                   continue;
               } else if (recordOrMark.isStreamStatus()) {
                   // handle stream status
                   statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                   continue;
                   //处理latency watermark
               } else if (recordOrMark.isLatencyMarker()) {
                   // handle latency marker
                   synchronized (lock) {
                       streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                   }
                   continue;
               } else {
                   //用户的真正的代码逻辑
                   // now we can do the actual processing
                   StreamRecord<IN> record = recordOrMark.asRecord();
                   synchronized (lock) {
                       numRecordsIn.inc();
                       streamOperator.setKeyContextElement1(record);
                       //处理数据
                       streamOperator.processElement(record);
                   }
                   return true;
               }
           }
       }
            
       //这里会进行checkpoint barrier的判断和对齐,以及不同partition 里面checkpoint barrier不一致时候的,数据buffer,
       final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
       if (bufferOrEvent != null) {
           if (bufferOrEvent.isBuffer()) {
               currentChannel = bufferOrEvent.getChannelIndex();
               currentRecordDeserializer = recordDeserializers[currentChannel];
               currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
           }
           else {
               // Event received
               final AbstractEvent event = bufferOrEvent.getEvent();
               if (event.getClass() != EndOfPartitionEvent.class) {
                   throw new IOException("Unexpected event: " + event);
               }
           }
       }
       else {
           isFinished = true;
           if (!barrierHandler.isEmpty()) {
               throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
           }
           return false;
       }
   }
}

streamOperator.processElement(record)最终会调用用户的代码处理逻辑,假如operatorStreamFlatMap的话。文章来源地址https://www.toymoban.com/news/detail-761846.html

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
    collector.setTimestamp(element);
    userFunction.flatMap(element.getValue(), collector);//用户代码
}

到了这里,关于Flink Job 执行流程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据-玩转数据-Flink状态后端(下)

    每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务(子任务)都会在本地维护其状态,以确保快速的状态访问。 状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(

    2024年02月09日
    浏览(44)
  • 深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年01月16日
    浏览(42)
  • Flink stop 和 cancel停止 job 的区别 Flink 停止 job 的方式(stop 和 cancel)

    后边跟的任务id 是flink的任务ID,不是yarn的 注:stop方式停止任务对 source 有要求,source必须实现了StopableFunction接口,才可以优雅的停止job 取消任务。如果在 conf/flink-conf.yaml 里面配置了 state.savepoints.dir ,会保存savepoint, 否则不会保存 savepoint。 使用 命令方式 也可以在停止的

    2024年02月12日
    浏览(44)
  • 3、flink重要概念(api分层、角色、执行流程、执行图和编程模型)及dataset、datastream详细示例入门和提交任务至on yarn运行

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月12日
    浏览(44)
  • 大数据Flink(六十一):Flink流处理程序流程和项目准备

    文章目录 Flink流处理程序流程和项目准备 一、Flink流处理程序的一般流程

    2024年02月11日
    浏览(38)
  • 大数据Flink(五十二):Flink中的批和流以及性能比较

    文章目录 Flink中的批和流以及性能比较 ​​​​​​​​​​​​​​一、Flink中的批和流

    2024年02月15日
    浏览(42)
  • flink state原理,TTL,状态后端,数据倾斜一文全

    拿五个字做比喻:“铁锅炖大鹅”,铁锅是状态后端,大鹅是状态,Checkpoint 是炖的动作。 状态 :本质来说就是数据,在 Flink 中,其实就是 Flink 提供给用户的状态编程接口。比如 flink 中的 MapState,ValueState,ListState。 状态后端 :Flink 提供的用于管理状态的组件,状态后端决

    2024年02月22日
    浏览(49)
  • Flink实时大数据处理性能测试

    Flink是一个开源的流处理框架,用于实时大数据处理。它可以处理大量数据,提供低延迟和高吞吐量。Flink的性能测试是一项重要的任务,可以帮助我们了解其在实际应用中的表现。在本文中,我们将讨论Flink实时大数据处理性能测试的背景、核心概念、算法原理、代码实例、

    2024年03月18日
    浏览(49)
  • 【大数据面试】Flink面试题附答案

    目录 1、背压问题 2、Flink是如何支持批流一体的 3、Flink任务延迟高,想解决这个问题,你会如何入手 4、Flink的监控页面,有了解吗,主要关注那些指标? 5、你们之前Flink集群规模有多大?部署方式是什么?你了解哪些部署方式? 6、Flink如何做压测和监控 7、Flink checkpoint 的

    2024年01月16日
    浏览(50)
  • Flink:处理大规模复杂数据集的最佳实践深入探究Flink的数据处理和性能优化技术

    作者:禅与计算机程序设计艺术 随着互联网、移动互联网、物联网等新型网络技术的不断发展,企业对海量数据的处理日益依赖,而大数据分析、决策支持、风险控制等领域都需要海量的数据处理能力。如何高效、快速地处理海量数据、提升处理效率、降低成本,是当下处理

    2024年02月13日
    浏览(53)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包