Flink ExecuteGraph构建源码解析

这篇具有很好参考价值的文章主要介绍了Flink ExecuteGraph构建源码解析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言

Flink ExecuteGraph构建源码解析,# Flink,flink,大数据,流计算

JobGraph构建过程中分析了JobGraph的构建过程,本文分析ExecutionGraph的构建过程。JobManager(JobMaster) 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph是JobGraph 的并行化版本,是调度层最核心的数据结构。


ExecutionGraph中的主要抽象概念

1、ExecutionJobVertex:和JobGraph中的JobVertex一一对应。每一个ExecutionJobVertex都有
和并发度一样多的 ExecutionVertex。
2、ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输
出是IntermediateResultPartition。
3、IntermediateResult:和JobGraph中的IntermediateDataSet一一对应。一个
IntermediateResult包含多个IntermediateResultPartition,其个数等于该operator的并发度。
4、IntermediateResultPartition:表示ExecutionVertex的一个输出分区,producer是
ExecutionVertex,consumer是若干个ExecutionEdge。
5、ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,
target是ExecutionVertex。source和target都只能是一个。
6、Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下
ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过
ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过
ExecutionAttemptID 来确定消息接受者。

源码核心代码入口

ExecutionGraph executioinGraph = SchedulerBase.createAndRestoreExecutionGraph(
                        completedCheckpointStore,
                        checkpointsCleaner,
                        checkpointIdCounter,
                        initializationTimestamp,
                        mainThreadExecutor,
                        jobStatusListener,
                        vertexParallelismStore);

在 SchedulerBase 这个类的内部,有两个成员变量:一个是 JobGraph,一个是 ExecutioinGraph
在创建 SchedulerBase 的子类:DefaultScheduler 的实例对象的时候,会在 SchedulerBase 的构造
方法中去生成 ExecutionGraph。

源码核心流程:

DefaultExecutionGraphFactory.createAndRestoreExecutionGraph()
ExecutionGraph newExecutionGraph = createExecutionGraph(...)
DefaultExecutionGraphBuilder.buildGraph(jobGraph, ....)
// 创建 ExecutionGraph 对象
executionGraph = (prior != null) ? prior : new ExecutionGraph(...)
// 生成 JobGraph 的 JSON 表达形式
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
// 重点,从 JobGraph 构建 ExecutionGraph
executionGraph.attachJobGraph(sortedTopology);
// 遍历 JobVertex 执行并行化生成 ExecutioinVertex
for(JobVertex jobVertex : topologiallySorted) {
	// 每一个 JobVertex 对应到一个 ExecutionJobVertex
	ExecutionJobVertex ejv = new ExecutionJobVertex(jobGraph,
	jobVertex);
	ejv.connectToPredecessors(this.intermediateResults);
	List<JobEdge> inputs = jobVertex.getInputs();
	for(int num = 0; num < inputs.size(); num++) {
		JobEdge edge = inputs.get(num);
		IntermediateResult ires =intermediateDataSets.get(edgeID);
		this.inputs.add(ires);
		// 根据并行度来设置 ExecutionVertex
		for(int i = 0; i < parallelism; i++) {
			ExecutionVertex ev = taskVertices[i];
			ev.connectSource(num, ires, edge,consumerIndex);
		}
	}
}

DefaultExecutionGraphBuilder 详细代码如下:文章来源地址https://www.toymoban.com/news/detail-838564.html

public class DefaultExecutionGraphBuilder {

    public static DefaultExecutionGraph buildGraph(
            JobGraph jobGraph,
            Configuration jobManagerConfig,
            ScheduledExecutorService futureExecutor,
            Executor ioExecutor)  {
        final String jobName = jobGraph.getName();
        final JobID jobId = jobGraph.getJobID();
        final JobInformation jobInformation = new JobInformation(... );
        // create a new execution graph, if none exists so far
        final DefaultExecutionGraph executionGraph;
         executionGraph = new DefaultExecutionGraph( ....);

        // set the basic properties
        executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));

        // initialize the vertices that have a master initialization hook
        // file output formats create directories here, input formats create splits
        for (JobVertex vertex : jobGraph.getVertices()) {
            String executableClass = vertex.getInvokableClassName();
                vertex.initializeOnMaster(
                        new SimpleInitializeOnMasterContext(
                                classLoader,
                                vertexParallelismStore
                                        .getParallelismInfo(vertex.getID())
                                        .getParallelism()));
        }

        // topologically sort the job vertices and attach the graph to the existing one
        List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();

        executionGraph.attachJobGraph(sortedTopology);

        // configure the state checkpointing
        if (isDynamicGraph) {
            // dynamic graph does not support checkpointing so we skip it
            log.warn("Skip setting up checkpointing for a job with dynamic graph.");
        } else if (isCheckpointingEnabled(jobGraph)) {
            JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();

            // load the state backend from the application settings
            final StateBackend applicationConfiguredBackend;
            final SerializedValue<StateBackend> serializedAppConfigured =
                    snapshotSettings.getDefaultStateBackend();

            if (serializedAppConfigured == null) {
                applicationConfiguredBackend = null;
            } else {
                try {
                    applicationConfiguredBackend =
                            serializedAppConfigured.deserializeValue(classLoader);
                } catch (IOException | ClassNotFoundException e) {
                    throw new JobExecutionException(
                            jobId, "Could not deserialize application-defined state backend.", e);
                }
            }

            final StateBackend rootBackend =
                     StateBackendLoader.fromApplicationOrConfigOrDefault(
                             applicationConfiguredBackend,
                             snapshotSettings.isChangelogStateBackendEnabled(),
                             jobManagerConfig,
                             classLoader,
                             log);


            // load the checkpoint storage from the application settings
            final CheckpointStorage applicationConfiguredStorage;
            final SerializedValue<CheckpointStorage> serializedAppConfiguredStorage =
                    snapshotSettings.getDefaultCheckpointStorage();

            if (serializedAppConfiguredStorage == null) {
                applicationConfiguredStorage = null;
            } else {
                    applicationConfiguredStorage =                           serializedAppConfiguredStorage.deserializeValue(classLoader);


            final CheckpointStorage rootStorage;
            try {
                rootStorage =
                        CheckpointStorageLoader.load(
                                applicationConfiguredStorage,
                                null,
                                rootBackend,
                                jobManagerConfig,
                                classLoader,
                                log);
            } catch (IllegalConfigurationException | DynamicCodeLoadingException e) {
                throw new JobExecutionException(
                        jobId, "Could not instantiate configured checkpoint storage", e);
            }

            // instantiate the user-defined checkpoint hooks

            final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =
                    snapshotSettings.getMasterHooks();
            final List<MasterTriggerRestoreHook<?>> hooks;

            if (serializedHooks == null) {
                hooks = Collections.emptyList();
            } else {
                final MasterTriggerRestoreHook.Factory[] hookFactories;
                try {
                    hookFactories = serializedHooks.deserializeValue(classLoader);
                } catch (IOException | ClassNotFoundException e) {
                    throw new JobExecutionException(
                            jobId, "Could not instantiate user-defined checkpoint hooks", e);
                }

                final Thread thread = Thread.currentThread();
                final ClassLoader originalClassLoader = thread.getContextClassLoader();
                thread.setContextClassLoader(classLoader);

                try {
                    hooks = new ArrayList<>(hookFactories.length);
                    for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
                        hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
                    }
                } finally {
                    thread.setContextClassLoader(originalClassLoader);
                }
            }

            final CheckpointCoordinatorConfiguration chkConfig =
                    snapshotSettings.getCheckpointCoordinatorConfiguration();
            String changelogStorage = jobManagerConfig.getString(STATE_CHANGE_LOG_STORAGE);

            executionGraph.enableCheckpointing(
                    chkConfig,
                    hooks,
                    checkpointIdCounter,
                    completedCheckpointStore,
                    rootBackend,
                    rootStorage,
                    checkpointStatsTrackerFactory.get(),
                    checkpointsCleaner,
                    jobManagerConfig.getString(STATE_CHANGE_LOG_STORAGE));
        }

        return executionGraph;
    }
}

到了这里,关于Flink ExecuteGraph构建源码解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【源码解析】flink sql执行源码概述:flink sql执行过程中有哪些阶段,这些阶段的源码大概位置在哪里

    本文大致分析了flink sql执行过程中的各个阶段的源码逻辑,这样可以在flink sql执行过程中, 能够定位到任务执行的某个阶段的代码大概分布在哪里,为更针对性的分析此阶段的细节逻辑打下基础,比如create 的逻辑是怎么执行的,select的逻辑是怎么生成的,优化逻辑都做了哪

    2024年02月04日
    浏览(41)
  • Flink源码解析四之任务调度和负载均衡

    jobmanager scheduler :这部分与 Flink 的任务调度有关。 CoLocationConstraint :这是一个约束类,用于确保某些算子的不同子任务在同一个 TaskManager 上运行。这通常用于状态共享或算子链的情况。 CoLocationGroup CoLocationGroupImpl :这些与 CoLocationConstraint 相关,定义了一组需要在同一个

    2024年02月06日
    浏览(47)
  • Flink源码解析八之任务调度和负载均衡

    jobmanager scheduler :这部分与 Flink 的任务调度有关。 CoLocationConstraint :这是一个约束类,用于确保某些算子的不同子任务在同一个 TaskManager 上运行。这通常用于状态共享或算子链的情况。 CoLocationGroup CoLocationGroupImpl :这些与 CoLocationConstraint 相关,定义了一组需要在同一个

    2024年02月05日
    浏览(43)
  • Flink流批一体计算(12):PyFlink Tabel API之构建作业

    目录 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表 2. 创建一个作业 3. 提交作业Submitting PyFlink Jobs 1.创建源表和结果表。 创建及注册表名分别为 source 和 sink 的表 其中,源表 source 有一列

    2024年02月13日
    浏览(43)
  • FlinkSQL-- sql-client及源码解析 -- flink-1.13.6

    本文基于flink-1.13.6 SQL Client: Init scripts and Statement Sets 这个版本极大地改进了 SQL 客户端的功能。现在 SQL Client 和 SQL 脚本都支持 通过Java 应用程序执行的几乎所有操作(从 TableEnvironment 以编程方式启动查询)。这意味着 SQL 用户在 SQL 部署中需要的代码少了很多。其中最核心的功能

    2023年04月27日
    浏览(41)
  • 基于 Flink 构建实时数据湖的实践

    本文整理自火山引擎云原生计算研发工程师王正和闵中元在本次 CommunityOverCode Asia 2023 数据湖专场中的《基于 Flink 构建实时数据湖的实践》主题演讲。 实时数据湖是现代数据架构的核心组成部分,随着数据湖技术的发展,用户对其也有了更高的需求:需要从多种数据源中导入

    2024年02月04日
    浏览(39)
  • 基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Databend 的实时数据同步。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。 假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。 接下来的内容

    2024年02月10日
    浏览(53)
  • Flink SQL 解析嵌套的 JSON 数据

    下面将会演示如何在 DDL 里面定义 Map、Array、Row 类型的数据,以及在 SQL 里面如何获里面的值。 数据格式如下: 上面的数据包含了 Map、Array、Row 等类型, 对于这样的数据格式,在建表 DDL 里面应该如何定义呢? 定义 DDL 解析 SQL SQL 运行的结果 以如下数据作为样例: 定义 获取 构造

    2024年02月10日
    浏览(46)
  • 构建高效实时数据流水线:Flink、Kafka 和 CnosDB 的完美组合

    当今的数据技术生态系统中,实时数据处理已经成为许多企业不可或缺的一部分。为了满足这种需求,Apache Flink、Apache Kafka和CnosDB等开源工具的结合应运而生,使得实时数据流的收集、处理和存储变得更加高效和可靠。本篇文章将介绍如何使用 Flink、Kafka 和 CnosDB 来构建一个

    2024年02月10日
    浏览(43)
  • 大数据Flink实时计算技术

    1、架构 2、应用场景 Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。在启用高可用选项的情况下,它不存在单点失效问题。事实证明,Flink 已经可以扩展到数千核

    2024年02月10日
    浏览(54)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包