Flink 深入理解任务执行计划,即Graph生成过程(源码解读)

这篇具有很好参考价值的文章主要介绍了Flink 深入理解任务执行计划,即Graph生成过程(源码解读)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器
我们先看一下,Flink 是如何描述作业的执行计划的。以这个 DataStream 作业为例,Flink 会基于它先生成一个 StreamGraph。这是一个有向无环图,图中的节点对应着计算逻辑,图中的边则对应着数据的分发方式。

Flink 会根据节点的并行度以及他们之间的连边方式,把一些计算节点进行链接合并,最终形成 JobGraph,从而降低计算节点间的数据传输开销。这个操作的目的是,是为了降低计算节点之间的数据传输开销。StreamGraph 和 JobGraph 都是在编译阶段生成的。JobGraph 会提交给 Flink Job Manager,从而启动和执行作业。

在执行作业前,Flink 会生成 ExecutionGraph。这个 ExecutionGraph 是根据 JobGraph 中的节点并行度,展开生成的。我们知道,Flink 是一个分布式计算框架。而 ExecutionGraph 的每一个节点,都对应着一个需要部署到 TaskManager 上进行执行的任务,每一条边都对应着任务的输入和输出。所以说,它是作业的物理执行计划。

Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器
这个物理执行计划,描述了任务的计算逻辑、所需资源和并行度,同时也描述任务产出数据的划分方式,此外还描述了任务对数据的依赖关系以及数据传输方式。

通过它,Flink 就能知道如何创建和调度作业的所有任务,从而完成作业的执行。

Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器

1生成StreamGraph

  • StreamGraph在client中生成,是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
  • 起始入口是CliFronted也就是客户端类,他会遍历transformations
    根据遍历出的算子的类型 ,oneInput类型生成StreamNode,partition类型生成边StreamEdge,顶点和边,分别添加到相对应的list中。
  • 之后根据指定的算子分区策略partitioner,和顺序决定每个节点的入和出的StreamNode和partitioner, 创建出edge
  • 将该streamEdge添加到上游的输出 下游的输入。
  • 最终 中 env.setPipeline(streamGraph) 把streamGraph->pipline 将streamGraph添加到pipline中,供下游使用生成JobGraph
  • 另外根据 StreamGraphGenerator函数,也就是 streamGraph的构造函数 ,他把算法由是否做shuffle来进行了 两种不同的构造方法 transformOneInputTransform 和 transformPartition
/**
 * Transforms one {@code StreamTransformation}.
 *
 * <p>This checks whether we already transformed it and exits early in that case. If not it
 * delegates to one of the transformation specific methods.
 */
private Collection<Integer> transform(StreamTransformation<?> transform) {
	if (alreadyTransformed.containsKey(transform)) {
		return alreadyTransformed.get(transform);
	}
	LOG.debug("Transforming " + transform);
	if (transform.getMaxParallelism() <= 0) {
		// if the max parallelism hasn't been set, then first use the job wide max parallelism
		// from the ExecutionConfig.
		int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
		if (globalMaxParallelismFromConfig > 0) {
			transform.setMaxParallelism(globalMaxParallelismFromConfig);
		}
	}
	// call at least once to trigger exceptions about MissingTypeInfo
	transform.getOutputType();
	Collection<Integer> transformedIds;
	if (transform instanceof OneInputTransformation<?, ?>) {
		transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
	} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
		transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
	} else if (transform instanceof SourceTransformation<?>) {
		transformedIds = transformSource((SourceTransformation<?>) transform);
	} else if (transform instanceof SinkTransformation<?>) {
		transformedIds = transformSink((SinkTransformation<?>) transform);
	} else if (transform instanceof UnionTransformation<?>) {
		transformedIds = transformUnion((UnionTransformation<?>) transform);
	} else if (transform instanceof SplitTransformation<?>) {
		transformedIds = transformSplit((SplitTransformation<?>) transform);
	} else if (transform instanceof SelectTransformation<?>) {
		transformedIds = transformSelect((SelectTransformation<?>) transform);
	} else if (transform instanceof FeedbackTransformation<?>) {
		transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
	} else if (transform instanceof CoFeedbackTransformation<?>) {
		transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
	} else if (transform instanceof PartitionTransformation<?>) {
		transformedIds = transformPartition((PartitionTransformation<?>) transform);
	} else if (transform instanceof SideOutputTransformation<?>) {
		transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
	} else {
		throw new IllegalStateException("Unknown transformation: " + transform);
	}
	// need this check because the iterate transformation adds itself before
	// transforming the feedback edges
	if (!alreadyTransformed.containsKey(transform)) {
		alreadyTransformed.put(transform, transformedIds);
	}
	if (transform.getBufferTimeout() >= 0) {
		streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
	}
	if (transform.getUid() != null) {
		streamGraph.setTransformationUID(transform.getId(), transform.getUid());
	}
	if (transform.getUserProvidedNodeHash() != null) {
		streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
	}
	if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
		streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
	}
	return transformedIds;
}

算子类型分为两种,通过算子是否有做shuffle来讲算子分为 oneInput类型 与 partition类型 两类;
oneInput类型对应operator; partition对应keyby

[(transformations是一个list,依次存放了 用户代码里的算法), 所有的算子(如map)都会调用名为transform的一个方法,会把算子存到transformations中,从而构建成一棵树]

    /**
     * Method for passing user defined operators created by the given factory along with the type
     * information that will transform the DataStream.
     *
     * <p>This method uses the rather new operator factories and should only be used when custom
     * factories are needed.
     *
     * @param operatorName name of the operator, for logging purposes
     * @param outTypeInfo the output type of the operator
     * @param operatorFactory the factory for the operator.
     * @param <R> type of the return stream
     * @return the data stream constructed.
     */
    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            OneInputStreamOperatorFactory<T, R> operatorFactory) {

        return doTransform(operatorName, outTypeInfo, operatorFactory);
    }

    protected <R> SingleOutputStreamOperator<R> doTransform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            StreamOperatorFactory<R> operatorFactory) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        OneInputTransformation<T, R> resultTransform =
                new OneInputTransformation<>(
                        this.transformation,
                        operatorName,
                        operatorFactory,
                        outTypeInfo,
                        environment.getParallelism());

        @SuppressWarnings({"unchecked", "rawtypes"})
        SingleOutputStreamOperator<R> returnStream =
                new SingleOutputStreamOperator(environment, resultTransform);

        getExecutionEnvironment().addOperator(resultTransform);

        return returnStream;
    }
    /**
     * Adds an operator to the list of operators that should be executed when calling {@link
     * #execute}.
     *
     * <p>When calling {@link #execute()} only the operators that where previously added to the list
     * are executed.
     *
     * <p>This is not meant to be used by users. The API methods that create operators must call
     * this method.
     */
    @Internal
    public void addOperator(Transformation<?> transformation) {
        Preconditions.checkNotNull(transformation, "transformation must not be null.");
        this.transformations.add(transformation);
    }

可以看到上述所有算子都被按执行顺序添加进了transformations 这个 List 中,接下来就是根据算子是否有座shuffle操作来把算子分成两类,分别是StreamNode的非shuffle算子, 以及上下游做shuffle的streamEdge操作,并且将streamEdge操作 与 StreamNode 绑定,来形成一个有前后顺序的DAG图

    protected StreamNode addNode(
            Integer vertexID,
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            Class<? extends AbstractInvokable> vertexClass,
            StreamOperatorFactory<?> operatorFactory,
            String operatorName) {

        if (streamNodes.containsKey(vertexID)) {
            throw new RuntimeException("Duplicate vertexID " + vertexID);
        }

        StreamNode vertex =
                new StreamNode(
                        vertexID,
                        slotSharingGroup,
                        coLocationGroup,
                        operatorFactory,
                        operatorName,
                        vertexClass);

        streamNodes.put(vertexID, vertex);

        return vertex;
    }
    
    public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
        addEdgeInternal(
                upStreamVertexID,
                downStreamVertexID,
                typeNumber,
                null,
                new ArrayList<String>(),
                null,
                null);
    }
    
    private void addEdgeInternal(
            Integer upStreamVertexID,
            Integer downStreamVertexID,
            int typeNumber,
            StreamPartitioner<?> partitioner,
            List<String> outputNames,
            OutputTag outputTag,
            ShuffleMode shuffleMode) {

        if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
            if (outputTag == null) {
                outputTag = virtualSideOutputNodes.get(virtualId).f1;
            }
            addEdgeInternal(
                    upStreamVertexID,
                    downStreamVertexID,
                    typeNumber,
                    partitioner,
                    null,
                    outputTag,
                    shuffleMode);
        } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
            if (partitioner == null) {
                partitioner = virtualPartitionNodes.get(virtualId).f1;
            }
            shuffleMode = virtualPartitionNodes.get(virtualId).f2;
            addEdgeInternal(
                    upStreamVertexID,
                    downStreamVertexID,
                    typeNumber,
                    partitioner,
                    outputNames,
                    outputTag,
                    shuffleMode);
        } else {
            StreamNode upstreamNode = getStreamNode(upStreamVertexID);
            StreamNode downstreamNode = getStreamNode(downStreamVertexID);

            // If no partitioner was specified and the parallelism of upstream and downstream
            // operator matches use forward partitioning, use rebalance otherwise.
            if (partitioner == null
                    && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
                partitioner = new ForwardPartitioner<Object>();
            } else if (partitioner == null) {
                partitioner = new RebalancePartitioner<Object>();
            }

            if (partitioner instanceof ForwardPartitioner) {
                if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                    throw new UnsupportedOperationException(
                            "Forward partitioning does not allow "
                                    + "change of parallelism. Upstream operation: "
                                    + upstreamNode
                                    + " parallelism: "
                                    + upstreamNode.getParallelism()
                                    + ", downstream operation: "
                                    + downstreamNode
                                    + " parallelism: "
                                    + downstreamNode.getParallelism()
                                    + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
                }
            }

            if (shuffleMode == null) {
                shuffleMode = ShuffleMode.UNDEFINED;
            }

            StreamEdge edge =
                    new StreamEdge(
                            upstreamNode,
                            downstreamNode,
                            typeNumber,
                            partitioner,
                            outputTag,
                            shuffleMode);

            getStreamNode(edge.getSourceId()).addOutEdge(edge);
            getStreamNode(edge.getTargetId()).addInEdge(edge);
        }
    }

构造器通过调用上述相关的操作,来构造出StreamGraph,完成第一步操作,StreamGraph 继承 pipline

 public StreamGraph generate() {
        streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
        shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
        configureStreamGraph(streamGraph);

        alreadyTransformed = new HashMap<>();

        for (Transformation<?> transformation : transformations) {
            transform(transformation);
        }

        final StreamGraph builtStreamGraph = streamGraph;

        alreadyTransformed.clear();
        alreadyTransformed = null;
        streamGraph = null;

        return builtStreamGraph;
    }

2生成JobGraph

  • 同样在client中生成,StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点Jobvertex,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
  • 在这里引入一个概念,chain 连接。可以理解为对数据处理的优化,将能够一次完成的算子操作合并到一起,什么样的算子可以chain到一起 请参考本人其他文章

简单的说 jobGaph就做了三个动作
1 StreamNode 转成 JobVertex
2 StreamEdge 转成 JobEdge
3 JobVertex 与 JobEdge 之间创建IntermediateDataset来连接

setChain 遍历edge 将能chain到一起的边 连到一起 ,后递归调用createchain

4 创建StreamConfig起始作业顶点
5 设置Jobvertex的StreamConfig,基本上是序列化StreamNode中的配置到StreamConfig中,把chain的起始节点以及不是chain中的节点 标记成 chain start
6 遍历StreamEdge 将当前节点(headOfChain)与所有出边相连
7 通过StreamEdge构建出JobEdge,创建IntermediateDataset,用来将JobVertex和JobEdge相连
8 将chain中所有子节点的streamConfig写入到headOfChain节点的 CHAINED_TASK_CONFIG配置中 每个JobVertex都会对应一个可序列化的StreamConfig,用来发送给JobManager和TaskManager。最后在TaskManager中起Task时,需要从这里反序列化出锁需要的配置信息,其中就包括了含有用户代码的StreamOperator
**总结一下 **

  1. 每个JobVertex都会对应一个可序列化的StreamConfig,用来发送给JobManager和TaskManager。最后在TaskManager中起Task时,需要从这里反序列化出锁需要的配置信息,其中就包括了含有用户代码的StreamOperator

    2. setChaining会对source调用createChain方法,该方法会递归调用下游节点,从而构建出node chains。createChain会分析当前节点的出边,根据operator
    Chains中的chainable条件,将出边分成chainable和noChainable两类,并分别递归调用自身方法。之后会将StreamNode中的配置信息序列化到StreamConfig中。如果当前不是chain中的子节点,则会构建JobVertex和JobEdge相连。如果是chain中的子节点,则会将StreaConfig添加到该chain和config集合中。一个node
    chains,除了headOfChain
    node会生成对应的JobVertex,其余的nodes都是以序列化的形式写入到StreamConfig中,并保存到headOfChain的CHAINED_TASK_CONFIG配置项中。直到部署时,才会取出并生成对应的ChainOperators。

3生成ExecutionGraph:

JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
JM创建中,创建Scheduler中,createAndRestoreExecutionGaph ,创建或恢复ExecutionGraph,创建 ExecutionGraph:

1 对JobGraph进行拓扑排序,获取虽有的JobVertex列表 
2核心逻辑:将拓扑排序过的JobGraph添加到executionGaph数据结构中
     2.1 实例化执行图节点,根据每一个jobvertex,创建对应的ExecutionJobVertex
     2.2 将创建的ExecutionJobVertex,与前置的IntermediateResult连接起来
	     2.2.1 获取输入的JobEdge列表 JobVertex.getInputs
	     2.2.2 通过ID获取当前JobEdge的输入所对应的IntermediateResult
	     2.2.3 将IntermediateResult 加入到当前ExecutionJobVertex的输入中
	     2.2.4 为IntermediateResult 注册 consumer ,就是当前节点
	     2.2.5 将ExecutionJobVertex 与 IntermediateResult 关联起来(现在要连接ExecutionJobVertex与IntermediateResult 但是中间的,ExecutionEdge还未确认 ,需要确认出ExecutionEdge 才能画出完整的ExecutionGraph图
 对照图片来理解)
 			2.2.5.1 分区策略只有forward的方式的情况下,pattern才是POINTWISE,否则均为all_to_all
 			2.2.5.2 构造ExecutionEdge ,IntermediateResultPartition.length为长度   

先确定ExecutionJobVertex 在确定IntermediateResultPartition, 然后才构建ExecutionEdge:
数据流转就成为了 IntermediateResultPartition -> ExecutionEdge -> ExecutionJobVertex

4生成物理执行图:

调度器生成物理执行图,JobManager 根据 Executi onGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。

题外话

单元测试中可以通过 PackagedProgramUtils.createJobGraph 获取jobgraph 这种方法 通过构造PackagedProgram.newBuilder把用户代码jar包里的main反射调用,获取pipline,将translateToJobGraph来得到jobGraph

   public static JobGraph buildJobGraph(JobParamsInfo jobParamsInfo) throws Exception {
        Properties confProperties = jobParamsInfo.getConfProperties();
        int parallelism = MathUtil.getIntegerVal(confProperties.getProperty(ConfigConstrant.SQL_ENV_PARALLELISM, "1"));
        String flinkConfDir = jobParamsInfo.getFlinkConfDir();

        String[] execArgs = jobParamsInfo.getExecArgs();
        File coreJarFile = new File(findLocalCoreJarPath(jobParamsInfo.getLocalPluginRoot(), jobParamsInfo.getPluginLoadMode()));
        SavepointRestoreSettings savepointRestoreSettings = dealSavepointRestoreSettings(jobParamsInfo.getConfProperties());
		//通过coreJarFile中main的内容,转换为StreamGraph,并封装进pipline中,给到program返回
        PackagedProgram program = PackagedProgram.newBuilder()
                .setJarFile(coreJarFile)
                .setArguments(execArgs)
                .setSavepointRestoreSettings(savepointRestoreSettings)
                .build();
        Configuration flinkConfig = getFlinkConfiguration(flinkConfDir, confProperties);
        JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, flinkConfig, parallelism, false);
        return jobGraph;
    }



   /**
     * Creates a {@link JobGraph} with a specified {@link JobID} from the given {@link
     * PackagedProgram}.
     *
     * @param packagedProgram to extract the JobGraph from
     * @param configuration to use for the optimizer and job graph generator
     * @param defaultParallelism for the JobGraph
     * @param jobID the pre-generated job id
     * @return JobGraph extracted from the PackagedProgram
     * @throws ProgramInvocationException if the JobGraph generation failed
     */
    public static JobGraph createJobGraph(
            PackagedProgram packagedProgram,
            Configuration configuration,
            int defaultParallelism,
            @Nullable JobID jobID,
            boolean suppressOutput)
            throws ProgramInvocationException {
        final Pipeline pipeline =
                getPipelineFromProgram(
                        packagedProgram, configuration, defaultParallelism, suppressOutput);
        final JobGraph jobGraph =
                FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader(
                        packagedProgram.getUserCodeClassLoader(),
                        pipeline,
                        configuration,
                        defaultParallelism);
        if (jobID != null) {
            jobGraph.setJobID(jobID);
        }
        jobGraph.addJars(packagedProgram.getJobJarAndDependencies());
        jobGraph.setClasspaths(packagedProgram.getClasspaths());
        jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());

        return jobGraph;
    }

run提交jar包 通过PipelineExecutorUtils.getJobGraph ,用传进来的pipline,将translateToJobGraph来得到jobGraph
StreamGraph 中 会生成pipline -> JobGraph使用pipline 生成

StreamGraphTranslator
    @Override
    public JobGraph translateToJobGraph(
            Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {
        checkArgument(
                pipeline instanceof StreamGraph, "Given pipeline is not a DataStream StreamGraph.");

        StreamGraph streamGraph = (StreamGraph) pipeline;
        return streamGraph.getJobGraph(null);
    }
    

5 批处理的物理执行计划

如前面所说,它是在作业运行前就已经确定的,是静态的。而 Flink 难以在作业执行前,预判什么样的计划参数更合理。所以,这些执行计划参数,只能依赖用户提前指定,也就是需要手动调优。

对于批作业,由于其分阶段执行的特性,在执行一个阶段前,理论上 Flink 是可以获得很多有用的信息的,比如其消费的数据量大小、这些数据的分布模式、当前的可用资源等等。

基于这些信息,我们可以让 Flink 对执行计划动态的进行调优,从而获得更好的执行效率。并且,由于 Flink 可以自动的进行这些调优,也可以让用户从手动调优中解放出来。

这就是 Flink 批处理作业的自适应执行计划。
Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器

为了支持自适应执行计划,最核心的一点,是需要一个可以动态调整的执行拓扑。所以,我们改造了 ExecutionGraph,使其支持渐进式构建。

Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器
具体的来说,就是让 ExecutionGraph 一开始只包含 Source 节点,随着执行的推进,再逐渐的加入后续的节点和连边。

这样,Flink 就有机会对尚未加入的执行节点和连边进行调整。

Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器
但在这个地方,有遭遇了一个阻碍。因为在原来的作业执行中,上游节点执行是依赖于下游节点的并行度的。具体来说,是因为上游在产出数据时,会根据下游并行度,对数据进行划分(sub-partition);这样,每个下游任务就可以直接消费其对应的那一个数据分区。然而,在动态执行计划的场景下,下游节点的并行度是不确定的。

为了解决这个问题,flink改造了节点数据的划分逻辑,使其不再根据下游节点的并行度,而是根据其最大并行度进行划分。同时,我们也改造了节点消费数据的逻辑,使其不再只消费单一分区,而是可以消费一组连续的数据分区(sub-partition range)。

通过这样的方式,上游节点执行得以和下游节点的并行度解耦,动态执行拓扑也得以实现。
Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器

在支持了动态执行拓扑后,我们引入了 Adaptive Batch Scheduler 来支持自适应执行计划。

与原有调度器不同的地方在于,Adaptive Batch Scheduler 会基于动态执行拓扑进行作业管控,持续收集运行时的信息,定制后续的执行计划。Flink 会基于执行计划,动态生成执行节点和连边,以此来更新执行拓扑。
Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器
在上述框架下,我们为 Flink 增加了自动决定并行度的能力。用户只需要配置希望单个执行节点处理的数据量, Flink 就可以根据该阶段需要处理的数据量,自动推导该阶段的节点并行度。

相比起传统的为每个作业单独配置并行度,自动决定并行度有这些优点:一是配置简单,无需为每个作业单独配置,一项配置可以适用于很多作业;二是可以自动的适配每天变化的数据量,当数据量较大时,作业并行度可以大一些,从而保障作业的产出时间;三是可以细粒度的调整作业的并行度,提高资源利用率。Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器
但是自动决定并行度,数据可能分布不均。为了解决这个问题,我们在自动决定并行度的基础上,进行了自动均衡下发数据的改进。

这个改进会采集 sub-partition 粒度的数据量,并以此来决定执行节点的并行度,以及每个执行节点应该消费哪些分区数据。从而尽可能让下游各执行节点消费的数据,接近用户配置的预期值。

相比起自动决定并行度,这样的方式不但让下游数据量更均衡,而且能够缓解数据倾斜的影响。这个功能正在开发中,会随着 Flink 1.17 发布。

同源实例的并行执行

Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器
接下来,讲一讲同源实例的并行执行。

同源实例是指,属于同一个执行节点的执行实例。执行拓扑是由执行节点组成,各节点会创建执行实例,将其部署到 TaskManager 上进行执行。

当前,每个执行节点在某一时刻只能有一个执行实例,只有当该实例失败(或被取消)后,节点才会创建一个新的执行实例。这也意味着,同源执行实例只能串行执行。
Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器
在生产中,热点机器是无法避免的,混部集群、密集回刷,都可能导致一台机器的负载高、IO 繁忙。其上执行的数据处理任务可能异常缓慢,导致批作业产出时间难以得到保障。

Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器
预测执行,是一种已经得到普遍的认可、用来解决这类问题的方法。

其基本思路是,为热点机器上的慢任务创建新的执行实例,并部署在正常的机器节点上。这些预测执行实例和对应的原始实例,具有相同的输入和产出。其中,最先完成的实例会被承认,其他相应实例会被取消。

因此,为了支持预测执行,Flink 必须支持多个同源实例并行执行。为了支持同源实例并行执行,进行了下列改进。
Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器
首先,我们重新梳理了执行节点的状态。

当前,执行节点的状态和其当前唯一执行实例是一一对应的。然而,如果一个节点可以同时存在多个执行实例,这样的映射方式就会出现问题。

为此,我们重新定义了执行节点与执行实例的状态映射,取执行实例中最接近 FINISH 状态的状态作为执行节点的状态。这样既可以兼容单执行实例场景,也能支持多个同源实例并行执行的场景。
Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器
其次,我们保证了 Source 的同源执行实例,总是会读取到相同的数据。

大体上来说,就是我们在框架层为每个 Source 执行节点增加一个列表,来维护分配给它的数据分片。该节点的所有执行实例都会共享这一个列表,只是会各自维护一个不同的下标,来记录其处理到的数据分片进度。

这样的改动的好处是,大部分现有 Source 不需要额外的修改,就可以进行预测执行。只有当 Source 使用了自定义事件的情况下,它们才需要实现一个额外的接口,用以保证各个事件可以被分发给正确的执行实例。
Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器
在接下来的 Flink 1.17 中,我们也会支持 Sink 的同源执行实例并行执行。

其关键点在于避免不同 Sink 之间的执行冲突,特别是要避免因此产生的数据不一致,因为 Sink 是需要向外部系统进行写入的。

由于 Sink 的写入逻辑隐藏在各个 Sink 的实现中,我们无法像 Source 一样在框架层统一避免写入冲突。所以我们向 Sink 层暴露了执行实例标识(attemptNumber),使 Sink 可以自行避免冲突。

同时为了安全起见,我们默认不会允许 Sink 的同源执行实例并行执行,除非 Sink 显式声明支持同源执行实例并行执行。Flink 深入理解任务执行计划,即Graph生成过程(源码解读),flink技术原理,flink,java,服务器
在此基础上,我们为 Flink引入了预测执行机制。主要包括三个核心组件。

首先是慢任务检测器。它会定期进行检测,综合任务处理的数据量,以及其执行时长,评判任务是否是慢任务。当发现慢任务时,它会通知给批处理调度器。

其次是批处理调度器。在收到慢任务通知时,它会通知黑名单处理器,对慢任务所在的机器进行加黑。并且,只要慢任务同源执行的实例数量,没有超过用户配置上限,它会为其拉起并部署新的执行实例。当任意执行实例成功完成时,调度器会取消掉其他的同源执行实例。

最后是黑名单处理器。Flink 可以利用其加黑机器。当机器节点被加黑后,后续的任务不会被部署到该机器。为了支持预测执行,我们支持了软加黑的方式,即加黑机器上已经存在的任务,可以继续执行而不会因为加黑被取消掉。文章来源地址https://www.toymoban.com/news/detail-835265.html

到了这里,关于Flink 深入理解任务执行计划,即Graph生成过程(源码解读)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • windows任务计划不执行的解决方法

    如果遇到不执行的情况,可以尝试一下操作 重新创建任务计划,创建时,先设置“只在用户登录时执行”,执行一次,时间在2分钟后后。 看下,如果执行成功,再改为“不管用户是否登录都要执行”,设置重复执行。 2023.9.21 发现:只要设置“重复任务”的“持续时间”为“

    2024年02月08日
    浏览(42)
  • Linux--crontab命令详解--循环执行的计划任务

    循环执行任务是由cron(crond)这个系统服务来控制的。用户想要建立循环的计划任务时,使用的是crontab这个命令,为了避免安全性的问题,和at一样,我们可以限制使用crontab的账号,可以使用的配置文件有: /etc/cron.allow 将可以使用crontab的账号写入,不在这个文件中的账户则不能使用cr

    2024年02月16日
    浏览(47)
  • Doris的执行计划生成、分发与执行

    目录 一、概述 三、执行计划的生成 四、执行计划的分发 五、执行计划的执行 六、关于PipeLine 七、Stream Load 的执行计划 八、举个例子 执行SQL的代码入口为StmtExecutor::execute() 在Doris的FE端,与大多数数据库系统一样,要从SQL或某种http请求,生成执行计划,从SQL生成,一开始是

    2024年02月12日
    浏览(34)
  • 【源码分析】一个flink job的sql到底是如何执行的(一):flink sql底层是如何调用connector实现物理执行计划的

    我们以一条sql为例分析下flink sql与connector是如何配合执行的,本文我们先分析 sql-sqlnode-validate-operation:是如何找到对应的connector实例的 relnode-execGraph:是如何组装node为Graph,在哪找到connector实例的 之后的文章将会继续分析: translateToPlanInternal是如何串联connector其他方法的

    2024年01月16日
    浏览(45)
  • 深入浅出【图卷积神经网络GCN】从 邻接矩阵、特征值矩阵、单位阵、度矩阵 入手,深刻理解融合邻居节点(信息) | GCN从公式到代码实现 全过程 | 在Cora数据集上实现节点分类任务

      这个世界虽然破破烂烂,可总有一些人在缝缝补补,以耀眼的光芒照耀这片大地。   🎯作者主页: 追光者♂🔥          🌸个人简介:   💖[1] 计算机专业硕士研究生💖   🌟[2] 2022年度博客之星人工智能领域TOP4🌟   🏅[3] 阿里云社区特邀专家博主🏅   🏆[4] CSDN-人

    2024年02月13日
    浏览(42)
  • 十三、Flink使用local模式执行任务 并开启Flink的webUI

    1、概述 1)webUI依赖 2)调用StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); 3)可以通过configuration配置webUI的端口号 2、代码实现 3、执行结果 1)输入测试数据 控制台输出执行结果 2)localhost:8081查看webUI

    2024年02月10日
    浏览(37)
  • 深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年01月24日
    浏览(46)
  • 深入理解 Flink(一)Flink 架构设计原理

    深入理解 Flink 系列文章已完结,总共八篇文章,直达链接: 深入理解 Flink (一)Flink 架构设计原理 深入理解 Flink (二)Flink StateBackend 和 Checkpoint 容错深入分析 深入理解 Flink (三)Flink 内核基础设施源码级原理详解 深入理解 Flink (四)Flink Time+WaterMark+Window 深入分析 深入

    2024年02月02日
    浏览(43)
  • flink执行任务运行10h以后挂掉并且报错

    问题描述 flink运行jar包任务,运行几个小时或者1天以后,任务就会挂掉!!! 第一个错误是 2023-02-01 23:43:08,083 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Window(TumblingEventTimeWindows(60000), EventTimeTrigger, getHvcDownLine) - Sink: Unnamed (1/1) (8672ad64cfc4ddce37756e60242432be) switched from RUN

    2024年02月10日
    浏览(44)
  • 深入理解 Flink(七)Flink Slot 管理详解

    JobMaster 中封装了一个 DefaultScheduler,在 DefaultScheduler.startSchedulingInternal() 方法中生成 ExecutionGraph 以执行调度。 资源调度的大体流程如下: Register:当 TaskExecutor 启动之后,会向 ResourceManager 注册自己(TaskExecutor)和自己内部的 Slot(TaskManagerSlot)。 Status Report:TaskExecutor 启动之

    2024年01月21日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包