Flink JobGraph构建过程

这篇具有很好参考价值的文章主要介绍了Flink JobGraph构建过程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言

StreamGraph构建过程中分析了StreamGraph的构建过程,在StreamGraph构建完毕之后会对StreamGraph进行优化构建JobGraph,然后再提交JobGraph。优化过程中,Flink会尝试将尽可能多的StreamNode聚合在一个JobGraph节点中,通过合并创建JobVertex,并生成JobEdge,以减少数据在不同节点之间流动所产生的序列化、反序列化、网络传输的开销。它包含的主要抽象概念有:

1、JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个JobVertex,即一个JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是IntermediateDataSet。

2、IntermediateDataSet:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。producer 是JobVertex,consumer 是 JobEdge。

3、JobEdge:代表了job graph中的一条数据传输通道。source是IntermediateDataSet,target是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。
Flink JobGraph构建过程,# Flink,flink,JobGraph


JobGraph创建的过程

AbstractJobClusterExecutor.execute -> PipelineExecutorUtils.getJobGraph  -> 
PipelineTranslator.translateToJobGraph -> StreamGraphTranslator.translateToJobGraph
 -> StreamGraph.getJobGraph ->  StreamingJobGraphGenerator.createJobGraph

createJobGraph()函数

private JobGraph createJobGraph() {
        preValidate();
        jobGraph.setJobType(streamGraph.getJobType());

        jobGraph.enableApproximateLocalRecovery(
                streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());

        // 为节点生成确定性哈希,以便在提交时识别它们(如果它们没有更改)。.
        Map<Integer, byte[]> hashes =
                defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

        // Generate legacy version hashes for backwards compatibility
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }

        setChaining(hashes, legacyHashes);

        setPhysicalEdges();

        markContainsSourcesOrSinks();

        setSlotSharingAndCoLocation();

        setManagedMemoryFraction(
                Collections.unmodifiableMap(jobVertices),
                Collections.unmodifiableMap(vertexConfigs),
                Collections.unmodifiableMap(chainedConfigs),
                id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
                id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());

        configureCheckpointing();

        jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());

        final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
                JobGraphUtils.prepareUserArtifactEntries(
                        streamGraph.getUserArtifacts().stream()
                                .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
                        jobGraph.getJobID());

        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                distributedCacheEntries.entrySet()) {
            jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
        }

        // 在最后完成ExecutionConfig时设置它
        try {
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        } catch (IOException e) {
        }

        jobGraph.setChangelogStateBackendEnabled(streamGraph.isChangelogStateBackendEnabled());

        addVertexIndexPrefixInVertexName();

        setVertexDescription();

        // Wait for the serialization of operator coordinators and stream config.
        try {
            FutureUtils.combineAll(
                            vertexConfigs.values().stream()
                                    .map(
                                            config ->
                                                    config.triggerSerializationAndReturnFuture(
                                                            serializationExecutor))
                                    .collect(Collectors.toList()))
                    .get();

            waitForSerializationFuturesAndUpdateJobVertices();
        } catch (Exception e) {
            throw new FlinkRuntimeException("Error in serialization.", e);
        }

        if (!streamGraph.getJobStatusHooks().isEmpty()) {
            jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());
        }

        return jobGraph;
    }

在 StreamGraph 构建 JobGragh 的过程中,最重要的事情就是 operator 的 chain 优化,那么到底什
么样的情况的下 Operator 能chain 在一起呢?

// 1、下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
downStreamVertex.getInEdges().size() == 1;
// 2、上下游节点都在同一个 slot group 中
upStreamVertex.isSameSlotSharingGroup(downStreamVertex);
// 3、前后算子不为空
!(downStreamOperator == null || upStreamOperator == null);
// 4、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认HEAD!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;
// 5、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter 等默认是
ALWAYS!downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS;
// 6、两个节点间物理分区逻辑是 ForwardPartitioner
(edge.getPartitioner() instanceof ForwardPartitioner);
// 7、两个算子间的shuffle方式不等于批处理模式
edge.getShuffleMode() != ShuffleMode.BATCH;
// 8、上下游的并行度一致
upStreamVertex.getParallelism() == downStreamVertex.getParallelism();
// 9、用户没有禁用 chain
streamGraph.isChainingEnabled();

构造边

private void connect(Integer headOfChain, StreamEdge edge, NonChainedOutput output) {

        physicalEdgesInOrder.add(edge);

        Integer downStreamVertexID = edge.getTargetId();

        JobVertex headVertex = jobVertices.get(headOfChain);
        JobVertex downStreamVertex = jobVertices.get(downStreamVertexID);

        StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());

        downStreamConfig.setNumberOfNetworkInputs(downStreamConfig.getNumberOfNetworkInputs() + 1);

        StreamPartitioner<?> partitioner = output.getPartitioner();
        ResultPartitionType resultPartitionType = output.getPartitionType();

        if (resultPartitionType == ResultPartitionType.HYBRID_FULL
                || resultPartitionType == ResultPartitionType.HYBRID_SELECTIVE) {
            hasHybridResultPartition = true;
        }

        checkBufferTimeout(resultPartitionType, edge);

        JobEdge jobEdge;
        if (partitioner.isPointwise()) {
            jobEdge =
                    downStreamVertex.connectNewDataSetAsInput(
                            headVertex,
                            DistributionPattern.POINTWISE,
                            resultPartitionType,
                            opIntermediateOutputs.get(edge.getSourceId()).get(edge).getDataSetId(),
                            partitioner.isBroadcast());
        } else {
            jobEdge =
                    downStreamVertex.connectNewDataSetAsInput(
                            headVertex,
                            DistributionPattern.ALL_TO_ALL,
                            resultPartitionType,
                            opIntermediateOutputs.get(edge.getSourceId()).get(edge).getDataSetId(),
                            partitioner.isBroadcast());
        }

        // set strategy name so that web interface can show it.
        jobEdge.setShipStrategyName(partitioner.toString());
        jobEdge.setForward(partitioner instanceof ForwardPartitioner);
        jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());
        jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());

        if (LOG.isDebugEnabled()) {
            LOG.debug(
                    "CONNECTED: {} - {} -> {}",
                    partitioner.getClass().getSimpleName(),
                    headOfChain,
                    downStreamVertexID);
        }
    }

总结

1、在StreamGraph构建完毕之后会开始构建JobGraph,然后再提交JobGraph。

2、StreamingJobGraphGenerator.createJobGraph()是构建JobGraph的核心实现,实现中首先会广度优先遍历StreamGraph,为其中的每个StreamNode生成一个Hash值,如果用户设置了operator的uid,那么就根据uid来生成Hash值,否则系统会自己为每个StreamNode生成一个Hash值。如果用户自己为operator提供了Hash值,也会拿来用。生成Hash值的作用主要应用在从checkpoint中的数据恢复

3、在生成Hash值之后,会调用setChaining()方法,创建operator chain、构建JobGraph顶点JobVertex、边JobEdge、中间结果集IntermediateDataSet的核心方法。

1)、创建StreamNode chain(operator chain)

从source开始,处理出边StreamEdge和target节点(edge的下游节点),递归的向下处理StreamEdge上和target StreamNode,直到找到那条过渡边,即不能再进行chain的那条边为止。那么这中间的StreamNode可以作为一个chain。这种递归向下的方式使得程序先chain完StreamGraph后面的节点,再处理头结点,类似于后序递归遍历。

2)、创建顶点JobVertex

顶点的创建在创建StreamNode chain的过程中,当已经完成了一个StreamNode chain的创建,在处理这个chain的头结点时会创建顶点JobVertex,顶点的JobVertexID根据头结点的Hash值而决定。同时JobVertex持有了chain上的所有operatorID。因为是后续遍历,所有JobVertex的创建过程是从后往前进行创建,即从sink端到source端

3)、创建边JobEdge和IntermediateDataSet

JobEdge的创建是在完成一个StreamNode chain,在处理头结点并创建完顶点JobVertex之后、根据头结点和过渡边进行connect操作时进行的,连接的是当前的JobVertex和下游的JobVertex,因为JobVertex的创建是由下至上的。

根据头结点和边从jobVertices中找到对应的JobGraph的上下游顶点JobVertex,获取过渡边的分区器,创建对应的中间结果集IntermediateDataSet和JobEdge。IntermediateDataSet由上游的顶点JobVertex创建,上游顶点JobVertex作为它的生产者producer,IntermediateDataSet作为上游顶点的输出。JobEdge中持有了中间结果集IntermediateDataSet和下游的顶点JobVertex的引用, JobEdge作为中间结果集IntermediateDataSet的消费者,JobEdge作为下游顶点JobVertex的input。整个过程就是
上游JobVertex——>IntermediateDataSet——>JobEdge——>下游JobVertex

4、接下来就是为顶点设置共享solt组、设置checkpoint配置等操作了,最后返回JobGraph,JobGraph的构建就完毕了文章来源地址https://www.toymoban.com/news/detail-841007.html

到了这里,关于Flink JobGraph构建过程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 从零开始快速构建自己的Flink应用

    本文介绍如何在 mac 下快速构建属于自己的 Flink 应用。 在 mac 上使用homebrew安装 flink: 查看安装的位置: 进入安装目录,启动 flink 集群: 进入 web 页面:http://localhost:8081/ 基于模板直接构建一个项目: 在项目的 DataStreamJob 类实现如下计数的功能: 在上面的例子中,我们使用

    2024年02月20日
    浏览(51)
  • 记一次Flink通过Kafka写入MySQL的过程

    一、前言 总体思路:source --transform --sink ,即从source获取相应的数据来源,然后进行数据转换,将数据从比较乱的格式,转换成我们需要的格式,转换处理后,然后进行sink功能,也就是将数据写入的相应的数据库DB中或者写入Hive的HDFS文件存储。 思路: pom部分放到最后面。 二

    2024年01月24日
    浏览(52)
  • flink 实时数仓构建与开发[记录一些坑]

    1、业务库使用pg数据库, 业务数据可以改动任意时间段数据 2、监听采集业务库数据,实时捕捉业务库数据变更,同时实时变更目标表和报表数据 实时数据流图与分层设计说明 1、debezium采集pg库表数据同步到kafka 【kafka模式】 2、flink 消费kafka写入pg或kafka 【upset-kafka,新版k

    2024年02月16日
    浏览(40)
  • GaussDB(DWS)基于Flink的实时数仓构建

    本文分享自华为云社区《GaussDB(DWS)基于Flink的实时数仓构建》,作者:胡辣汤。 大数据时代,厂商对实时数据分析的诉求越来越强烈,数据分析时效从T+1时效趋向于T+0时效,为了给客户提供极速分析查询能力,华为云数仓GaussDB(DWS)基于流处理框架Flink实现了实时数仓构建。在

    2024年04月22日
    浏览(43)
  • 【源码解析】flink sql执行源码概述:flink sql执行过程中有哪些阶段,这些阶段的源码大概位置在哪里

    本文大致分析了flink sql执行过程中的各个阶段的源码逻辑,这样可以在flink sql执行过程中, 能够定位到任务执行的某个阶段的代码大概分布在哪里,为更针对性的分析此阶段的细节逻辑打下基础,比如create 的逻辑是怎么执行的,select的逻辑是怎么生成的,优化逻辑都做了哪

    2024年02月04日
    浏览(42)
  • Flink 深入理解任务执行计划,即Graph生成过程(源码解读)

    我们先看一下,Flink 是如何描述作业的执行计划的。以这个 DataStream 作业为例,Flink 会基于它先生成一个 StreamGraph。这是一个有向无环图,图中的节点对应着计算逻辑,图中的边则对应着数据的分发方式。 Flink 会根据节点的并行度以及他们之间的连边方式,把一些计算节点进

    2024年02月22日
    浏览(45)
  • CDH6.3.2 集成 Flink 1.17.0 失败过程

    目录 一:下载Flink,并制作parcel包 1.相关资源下载 2. 修改配置 准备工作一: 准备工作二: 3. 开始build 二:开始在CDH页面分发激活  三:CDH添加Flink-yarn 服务  四:启动不起来的问题解决 五:CDH6.3.2集群集成zookeeper3.6.3 六:重新适配Flink服务 环境说明: cdh版本:cdh6.3.2 组件版本信

    2024年01月17日
    浏览(36)
  • [大数据 Flink,Java实现不同数据库实时数据同步过程]

    目录 🌮前言: 🌮实现Mysql同步Es的过程包括以下步骤: 🌮配置Mysql数据库连接 🌮在Flink的配置文件中,添加Mysql数据库的连接信息。可以在flink-conf.yaml文件中添加如下配置: 🌮在Flink程序中,使用JDBCInputFormat来连接Mysql数据库,并定义查询语句,获取需要同步的数据。具体代

    2024年02月10日
    浏览(45)
  • 基于 Flink CDC 构建 MySQL 到 Databend 的 实时数据同步

    这篇教程将展示如何基于 Flink CDC 快速构建 MySQL 到 Databend 的实时数据同步。本教程的演示都将在 Flink SQL CLI 中进行,只涉及 SQL,无需一行 Java/Scala 代码,也无需安装 IDE。 假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。 接下来的内容

    2024年02月10日
    浏览(53)
  • 实时数仓构建:Flink+OLAP查询的一些实践与思考

    今天是一篇架构分享内容。 以Flink为主的计算引擎配合OLAP查询分析引擎组合进而构建实时数仓 ,其技术方案的选择是我们在技术选型过程中最常见的问题之一。也是很多公司和业务支持过程中会实实在在遇到的问题。 很多人一提起实时数仓,就直接大谈特谈Hudi,Flink的流批

    2024年04月15日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包