我们先看一下,Flink 是如何描述作业的执行计划的。以这个 DataStream 作业为例,Flink 会基于它先生成一个 StreamGraph。这是一个有向无环图,图中的节点对应着计算逻辑,图中的边则对应着数据的分发方式。
Flink 会根据节点的并行度以及他们之间的连边方式,把一些计算节点进行链接合并,最终形成 JobGraph,从而降低计算节点间的数据传输开销。这个操作的目的是,是为了降低计算节点之间的数据传输开销。StreamGraph 和 JobGraph 都是在编译阶段生成的。JobGraph 会提交给 Flink Job Manager,从而启动和执行作业。
在执行作业前,Flink 会生成 ExecutionGraph。这个 ExecutionGraph 是根据 JobGraph 中的节点并行度,展开生成的。我们知道,Flink 是一个分布式计算框架。而 ExecutionGraph 的每一个节点,都对应着一个需要部署到 TaskManager 上进行执行的任务,每一条边都对应着任务的输入和输出。所以说,它是作业的物理执行计划。
这个物理执行计划,描述了任务的计算逻辑、所需资源和并行度,同时也描述任务产出数据的划分方式,此外还描述了任务对数据的依赖关系以及数据传输方式。
通过它,Flink 就能知道如何创建和调度作业的所有任务,从而完成作业的执行。
1生成StreamGraph
- StreamGraph在client中生成,是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
- 起始入口是CliFronted也就是客户端类,他会遍历transformations
根据遍历出的算子的类型 ,oneInput类型生成StreamNode,partition类型生成边StreamEdge,顶点和边,分别添加到相对应的list中。 - 之后根据指定的算子分区策略partitioner,和顺序决定每个节点的入和出的StreamNode和partitioner, 创建出edge
- 将该streamEdge添加到上游的输出 下游的输入。
- 最终 中 env.setPipeline(streamGraph) 把streamGraph->pipline 将streamGraph添加到pipline中,供下游使用生成JobGraph
- 另外根据 StreamGraphGenerator函数,也就是 streamGraph的构造函数 ,他把算法由是否做shuffle来进行了 两种不同的构造方法 transformOneInputTransform 和 transformPartition
/**
* Transforms one {@code StreamTransformation}.
*
* <p>This checks whether we already transformed it and exits early in that case. If not it
* delegates to one of the transformation specific methods.
*/
private Collection<Integer> transform(StreamTransformation<?> transform) {
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from the ExecutionConfig.
int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();
Collection<Integer> transformedIds;
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
if (transform.getBufferTimeout() >= 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
}
if (transform.getUid() != null) {
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}
return transformedIds;
}
算子类型分为两种,通过算子是否有做shuffle来讲算子分为 oneInput类型 与 partition类型 两类;
oneInput类型对应operator; partition对应keyby
[(transformations是一个list,依次存放了 用户代码里的算法), 所有的算子(如map)都会调用名为transform的一个方法,会把算子存到transformations中,从而构建成一棵树]
/**
* Method for passing user defined operators created by the given factory along with the type
* information that will transform the DataStream.
*
* <p>This method uses the rather new operator factories and should only be used when custom
* factories are needed.
*
* @param operatorName name of the operator, for logging purposes
* @param outTypeInfo the output type of the operator
* @param operatorFactory the factory for the operator.
* @param <R> type of the return stream
* @return the data stream constructed.
*/
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(
String operatorName,
TypeInformation<R> outTypeInfo,
OneInputStreamOperatorFactory<T, R> operatorFactory) {
return doTransform(operatorName, outTypeInfo, operatorFactory);
}
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform =
new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream =
new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
/**
* Adds an operator to the list of operators that should be executed when calling {@link
* #execute}.
*
* <p>When calling {@link #execute()} only the operators that where previously added to the list
* are executed.
*
* <p>This is not meant to be used by users. The API methods that create operators must call
* this method.
*/
@Internal
public void addOperator(Transformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}
可以看到上述所有算子都被按执行顺序添加进了transformations 这个 List 中,接下来就是根据算子是否有座shuffle操作来把算子分成两类,分别是StreamNode的非shuffle算子, 以及上下游做shuffle的streamEdge操作,并且将streamEdge操作 与 StreamNode 绑定,来形成一个有前后顺序的DAG图
protected StreamNode addNode(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
Class<? extends AbstractInvokable> vertexClass,
StreamOperatorFactory<?> operatorFactory,
String operatorName) {
if (streamNodes.containsKey(vertexID)) {
throw new RuntimeException("Duplicate vertexID " + vertexID);
}
StreamNode vertex =
new StreamNode(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
vertexClass);
streamNodes.put(vertexID, vertex);
return vertex;
}
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
null,
new ArrayList<String>(),
null,
null);
}
private void addEdgeInternal(
Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames,
OutputTag outputTag,
ShuffleMode shuffleMode) {
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
partitioner,
null,
outputTag,
shuffleMode);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
shuffleMode = virtualPartitionNodes.get(virtualId).f2;
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
partitioner,
outputNames,
outputTag,
shuffleMode);
} else {
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// If no partitioner was specified and the parallelism of upstream and downstream
// operator matches use forward partitioning, use rebalance otherwise.
if (partitioner == null
&& upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
throw new UnsupportedOperationException(
"Forward partitioning does not allow "
+ "change of parallelism. Upstream operation: "
+ upstreamNode
+ " parallelism: "
+ upstreamNode.getParallelism()
+ ", downstream operation: "
+ downstreamNode
+ " parallelism: "
+ downstreamNode.getParallelism()
+ " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}
if (shuffleMode == null) {
shuffleMode = ShuffleMode.UNDEFINED;
}
StreamEdge edge =
new StreamEdge(
upstreamNode,
downstreamNode,
typeNumber,
partitioner,
outputTag,
shuffleMode);
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
构造器通过调用上述相关的操作,来构造出StreamGraph,完成第一步操作,StreamGraph 继承 pipline
public StreamGraph generate() {
streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
configureStreamGraph(streamGraph);
alreadyTransformed = new HashMap<>();
for (Transformation<?> transformation : transformations) {
transform(transformation);
}
final StreamGraph builtStreamGraph = streamGraph;
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
return builtStreamGraph;
}
2生成JobGraph
- 同样在client中生成,StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点Jobvertex,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
- 在这里引入一个概念,chain 连接。可以理解为对数据处理的优化,将能够一次完成的算子操作合并到一起,什么样的算子可以chain到一起 请参考本人其他文章
简单的说 jobGaph就做了三个动作
1 StreamNode 转成 JobVertex
2 StreamEdge 转成 JobEdge
3 JobVertex 与 JobEdge 之间创建IntermediateDataset来连接
setChain 遍历edge 将能chain到一起的边 连到一起 ,后递归调用createchain
4 创建StreamConfig起始作业顶点
5 设置Jobvertex的StreamConfig,基本上是序列化StreamNode中的配置到StreamConfig中,把chain的起始节点以及不是chain中的节点 标记成 chain start
6 遍历StreamEdge 将当前节点(headOfChain)与所有出边相连
7 通过StreamEdge构建出JobEdge,创建IntermediateDataset,用来将JobVertex和JobEdge相连
8 将chain中所有子节点的streamConfig写入到headOfChain节点的 CHAINED_TASK_CONFIG配置中 每个JobVertex都会对应一个可序列化的StreamConfig,用来发送给JobManager和TaskManager。最后在TaskManager中起Task时,需要从这里反序列化出锁需要的配置信息,其中就包括了含有用户代码的StreamOperator
**总结一下 **
- 每个JobVertex都会对应一个可序列化的StreamConfig,用来发送给JobManager和TaskManager。最后在TaskManager中起Task时,需要从这里反序列化出锁需要的配置信息,其中就包括了含有用户代码的StreamOperator
。
2. setChaining会对source调用createChain方法,该方法会递归调用下游节点,从而构建出node chains。createChain会分析当前节点的出边,根据operator
Chains中的chainable条件,将出边分成chainable和noChainable两类,并分别递归调用自身方法。之后会将StreamNode中的配置信息序列化到StreamConfig中。如果当前不是chain中的子节点,则会构建JobVertex和JobEdge相连。如果是chain中的子节点,则会将StreaConfig添加到该chain和config集合中。一个node
chains,除了headOfChain
node会生成对应的JobVertex,其余的nodes都是以序列化的形式写入到StreamConfig中,并保存到headOfChain的CHAINED_TASK_CONFIG配置项中。直到部署时,才会取出并生成对应的ChainOperators。
3生成ExecutionGraph:
JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
JM创建中,创建Scheduler中,createAndRestoreExecutionGaph ,创建或恢复ExecutionGraph,创建 ExecutionGraph:
1 对JobGraph进行拓扑排序,获取虽有的JobVertex列表
2核心逻辑:将拓扑排序过的JobGraph添加到executionGaph数据结构中
2.1 实例化执行图节点,根据每一个jobvertex,创建对应的ExecutionJobVertex
2.2 将创建的ExecutionJobVertex,与前置的IntermediateResult连接起来
2.2.1 获取输入的JobEdge列表 JobVertex.getInputs
2.2.2 通过ID获取当前JobEdge的输入所对应的IntermediateResult
2.2.3 将IntermediateResult 加入到当前ExecutionJobVertex的输入中
2.2.4 为IntermediateResult 注册 consumer ,就是当前节点
2.2.5 将ExecutionJobVertex 与 IntermediateResult 关联起来(现在要连接ExecutionJobVertex与IntermediateResult 但是中间的,ExecutionEdge还未确认 ,需要确认出ExecutionEdge 才能画出完整的ExecutionGraph图
对照图片来理解)
2.2.5.1 分区策略只有forward的方式的情况下,pattern才是POINTWISE,否则均为all_to_all
2.2.5.2 构造ExecutionEdge ,IntermediateResultPartition.length为长度
先确定ExecutionJobVertex 在确定IntermediateResultPartition, 然后才构建ExecutionEdge:
数据流转就成为了 IntermediateResultPartition -> ExecutionEdge -> ExecutionJobVertex
4生成物理执行图:
调度器生成物理执行图,JobManager 根据 Executi onGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
题外话:
单元测试中可以通过 PackagedProgramUtils.createJobGraph 获取jobgraph 这种方法 通过构造PackagedProgram.newBuilder把用户代码jar包里的main反射调用,获取pipline,将translateToJobGraph来得到jobGraph
public static JobGraph buildJobGraph(JobParamsInfo jobParamsInfo) throws Exception {
Properties confProperties = jobParamsInfo.getConfProperties();
int parallelism = MathUtil.getIntegerVal(confProperties.getProperty(ConfigConstrant.SQL_ENV_PARALLELISM, "1"));
String flinkConfDir = jobParamsInfo.getFlinkConfDir();
String[] execArgs = jobParamsInfo.getExecArgs();
File coreJarFile = new File(findLocalCoreJarPath(jobParamsInfo.getLocalPluginRoot(), jobParamsInfo.getPluginLoadMode()));
SavepointRestoreSettings savepointRestoreSettings = dealSavepointRestoreSettings(jobParamsInfo.getConfProperties());
//通过coreJarFile中main的内容,转换为StreamGraph,并封装进pipline中,给到program返回
PackagedProgram program = PackagedProgram.newBuilder()
.setJarFile(coreJarFile)
.setArguments(execArgs)
.setSavepointRestoreSettings(savepointRestoreSettings)
.build();
Configuration flinkConfig = getFlinkConfiguration(flinkConfDir, confProperties);
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, flinkConfig, parallelism, false);
return jobGraph;
}
/**
* Creates a {@link JobGraph} with a specified {@link JobID} from the given {@link
* PackagedProgram}.
*
* @param packagedProgram to extract the JobGraph from
* @param configuration to use for the optimizer and job graph generator
* @param defaultParallelism for the JobGraph
* @param jobID the pre-generated job id
* @return JobGraph extracted from the PackagedProgram
* @throws ProgramInvocationException if the JobGraph generation failed
*/
public static JobGraph createJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
int defaultParallelism,
@Nullable JobID jobID,
boolean suppressOutput)
throws ProgramInvocationException {
final Pipeline pipeline =
getPipelineFromProgram(
packagedProgram, configuration, defaultParallelism, suppressOutput);
final JobGraph jobGraph =
FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader(
packagedProgram.getUserCodeClassLoader(),
pipeline,
configuration,
defaultParallelism);
if (jobID != null) {
jobGraph.setJobID(jobID);
}
jobGraph.addJars(packagedProgram.getJobJarAndDependencies());
jobGraph.setClasspaths(packagedProgram.getClasspaths());
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
return jobGraph;
}
run提交jar包 通过PipelineExecutorUtils.getJobGraph ,用传进来的pipline,将translateToJobGraph来得到jobGraph
StreamGraph 中 会生成pipline -> JobGraph使用pipline 生成
StreamGraphTranslator
@Override
public JobGraph translateToJobGraph(
Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {
checkArgument(
pipeline instanceof StreamGraph, "Given pipeline is not a DataStream StreamGraph.");
StreamGraph streamGraph = (StreamGraph) pipeline;
return streamGraph.getJobGraph(null);
}
5 批处理的物理执行计划
如前面所说,它是在作业运行前就已经确定的,是静态的。而 Flink 难以在作业执行前,预判什么样的计划参数更合理。所以,这些执行计划参数,只能依赖用户提前指定,也就是需要手动调优。
对于批作业,由于其分阶段执行的特性,在执行一个阶段前,理论上 Flink 是可以获得很多有用的信息的,比如其消费的数据量大小、这些数据的分布模式、当前的可用资源等等。
基于这些信息,我们可以让 Flink 对执行计划动态的进行调优,从而获得更好的执行效率。并且,由于 Flink 可以自动的进行这些调优,也可以让用户从手动调优中解放出来。
这就是 Flink 批处理作业的自适应执行计划。
为了支持自适应执行计划,最核心的一点,是需要一个可以动态调整的执行拓扑。所以,我们改造了 ExecutionGraph,使其支持渐进式构建。
具体的来说,就是让 ExecutionGraph 一开始只包含 Source 节点,随着执行的推进,再逐渐的加入后续的节点和连边。
这样,Flink 就有机会对尚未加入的执行节点和连边进行调整。
但在这个地方,有遭遇了一个阻碍。因为在原来的作业执行中,上游节点执行是依赖于下游节点的并行度的。具体来说,是因为上游在产出数据时,会根据下游并行度,对数据进行划分(sub-partition);这样,每个下游任务就可以直接消费其对应的那一个数据分区。然而,在动态执行计划的场景下,下游节点的并行度是不确定的。
为了解决这个问题,flink改造了节点数据的划分逻辑,使其不再根据下游节点的并行度,而是根据其最大并行度进行划分。同时,我们也改造了节点消费数据的逻辑,使其不再只消费单一分区,而是可以消费一组连续的数据分区(sub-partition range)。
通过这样的方式,上游节点执行得以和下游节点的并行度解耦,动态执行拓扑也得以实现。
在支持了动态执行拓扑后,我们引入了 Adaptive Batch Scheduler 来支持自适应执行计划。
与原有调度器不同的地方在于,Adaptive Batch Scheduler 会基于动态执行拓扑进行作业管控,持续收集运行时的信息,定制后续的执行计划。Flink 会基于执行计划,动态生成执行节点和连边,以此来更新执行拓扑。
在上述框架下,我们为 Flink 增加了自动决定并行度的能力。用户只需要配置希望单个执行节点处理的数据量, Flink 就可以根据该阶段需要处理的数据量,自动推导该阶段的节点并行度。
相比起传统的为每个作业单独配置并行度,自动决定并行度有这些优点:一是配置简单,无需为每个作业单独配置,一项配置可以适用于很多作业;二是可以自动的适配每天变化的数据量,当数据量较大时,作业并行度可以大一些,从而保障作业的产出时间;三是可以细粒度的调整作业的并行度,提高资源利用率。
但是自动决定并行度,数据可能分布不均。为了解决这个问题,我们在自动决定并行度的基础上,进行了自动均衡下发数据的改进。
这个改进会采集 sub-partition 粒度的数据量,并以此来决定执行节点的并行度,以及每个执行节点应该消费哪些分区数据。从而尽可能让下游各执行节点消费的数据,接近用户配置的预期值。
相比起自动决定并行度,这样的方式不但让下游数据量更均衡,而且能够缓解数据倾斜的影响。这个功能正在开发中,会随着 Flink 1.17 发布。
同源实例的并行执行
接下来,讲一讲同源实例的并行执行。
同源实例是指,属于同一个执行节点的执行实例。执行拓扑是由执行节点组成,各节点会创建执行实例,将其部署到 TaskManager 上进行执行。
当前,每个执行节点在某一时刻只能有一个执行实例,只有当该实例失败(或被取消)后,节点才会创建一个新的执行实例。这也意味着,同源执行实例只能串行执行。
在生产中,热点机器是无法避免的,混部集群、密集回刷,都可能导致一台机器的负载高、IO 繁忙。其上执行的数据处理任务可能异常缓慢,导致批作业产出时间难以得到保障。
预测执行,是一种已经得到普遍的认可、用来解决这类问题的方法。
其基本思路是,为热点机器上的慢任务创建新的执行实例,并部署在正常的机器节点上。这些预测执行实例和对应的原始实例,具有相同的输入和产出。其中,最先完成的实例会被承认,其他相应实例会被取消。
因此,为了支持预测执行,Flink 必须支持多个同源实例并行执行。为了支持同源实例并行执行,进行了下列改进。
首先,我们重新梳理了执行节点的状态。
当前,执行节点的状态和其当前唯一执行实例是一一对应的。然而,如果一个节点可以同时存在多个执行实例,这样的映射方式就会出现问题。
为此,我们重新定义了执行节点与执行实例的状态映射,取执行实例中最接近 FINISH 状态的状态作为执行节点的状态。这样既可以兼容单执行实例场景,也能支持多个同源实例并行执行的场景。
其次,我们保证了 Source 的同源执行实例,总是会读取到相同的数据。
大体上来说,就是我们在框架层为每个 Source 执行节点增加一个列表,来维护分配给它的数据分片。该节点的所有执行实例都会共享这一个列表,只是会各自维护一个不同的下标,来记录其处理到的数据分片进度。
这样的改动的好处是,大部分现有 Source 不需要额外的修改,就可以进行预测执行。只有当 Source 使用了自定义事件的情况下,它们才需要实现一个额外的接口,用以保证各个事件可以被分发给正确的执行实例。
在接下来的 Flink 1.17 中,我们也会支持 Sink 的同源执行实例并行执行。
其关键点在于避免不同 Sink 之间的执行冲突,特别是要避免因此产生的数据不一致,因为 Sink 是需要向外部系统进行写入的。
由于 Sink 的写入逻辑隐藏在各个 Sink 的实现中,我们无法像 Source 一样在框架层统一避免写入冲突。所以我们向 Sink 层暴露了执行实例标识(attemptNumber),使 Sink 可以自行避免冲突。
同时为了安全起见,我们默认不会允许 Sink 的同源执行实例并行执行,除非 Sink 显式声明支持同源执行实例并行执行。
在此基础上,我们为 Flink引入了预测执行机制。主要包括三个核心组件。
首先是慢任务检测器。它会定期进行检测,综合任务处理的数据量,以及其执行时长,评判任务是否是慢任务。当发现慢任务时,它会通知给批处理调度器。
其次是批处理调度器。在收到慢任务通知时,它会通知黑名单处理器,对慢任务所在的机器进行加黑。并且,只要慢任务同源执行的实例数量,没有超过用户配置上限,它会为其拉起并部署新的执行实例。当任意执行实例成功完成时,调度器会取消掉其他的同源执行实例。文章来源:https://www.toymoban.com/news/detail-835265.html
最后是黑名单处理器。Flink 可以利用其加黑机器。当机器节点被加黑后,后续的任务不会被部署到该机器。为了支持预测执行,我们支持了软加黑的方式,即加黑机器上已经存在的任务,可以继续执行而不会因为加黑被取消掉。文章来源地址https://www.toymoban.com/news/detail-835265.html
到了这里,关于Flink 深入理解任务执行计划,即Graph生成过程(源码解读)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!