05 Flink 的 WordCount

这篇具有很好参考价值的文章主要介绍了05 Flink 的 WordCount。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

本文对应于 spark 系列的 Spark 的 WordCount

这里主要是 从宏观上面来看一下 flink 这边的几个角色, 以及其调度的整个流程 

一个宏观 大局上的任务的处理, 执行 

基于 一个本地的 flink 集群 

 文章来源地址https://www.toymoban.com/news/detail-835447.html

 

测试用例

/**
 * com.hx.test.Test01WordCount
 *
 * @author Jerry.X.He
 * @version 1.0
 * @date 2021/4/12 10:14
 */
public class Test01WordCount {

    // com.hx.test.Test01WordCount
    // -Xmx100M -XX:+UseSerialGC -XX:+TraceClassUnloading
    public static void main(String[] args) throws Exception {

//        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        String jarPath = "D:\\IdeaWorkStations\\HelloFlink\\target\\HelloFlink-0.0.1.jar";
        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8081, jarPath);
        env.setParallelism(1);

        String inputPath = "D:\\IdeaWorkStations\\HelloFlink\\src\\main\\resources\\Test01WordCount.txt";
        DataSource<String> inputDs = env.readTextFile(inputPath);

        DataSet<Tuple2<String, Integer>> result = inputDs
                .flatMap(new MyFlatMapMapper())
                .map(new MyMapMapper())
                .groupBy(0)
                .sum(1);
        result.print();

        System.gc();
        System.in.read();
        System.out.println(" end ");

    }

    /**
     * MyFlatMapMapper
     *
     * @author Jerry.X.He
     * @version 1.0
     * @date 2021/4/12 10:24
     */
    private static class MyFlatMapMapper implements FlatMapFunction<String, String> {
        private static List<byte[]> dummyBytes = new ArrayList<>();

        static {
            try {
                for (int i = 0; i < 10; i++) {
                    byte[] tmpBytes = FileUtils.readAllBytes(Paths.get("D:\\IdeaWorkStations\\HelloFlink\\target\\logs\\ROOT.2021-12-27-9.log"));
                    dummyBytes.add(tmpBytes);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                new Thread(new MyRunnable()).start();
            }
        }

        @Override
        public void flatMap(String line, Collector<String> out) throws Exception {
            String[] splits = line.split("\\s+");
            for (String split : splits) {
                out.collect(split);
            }
        }
    }

    /**
     * MyRunnable
     *
     * @author Jerry.X.He
     * @return
     * @date 2021/12/27 16:16
     */
    public static class MyRunnable implements Runnable {
        @Override
        public void run() {
            System.err.println(" MyRunnable.run before ");
            IoUtils.sleep(1000_000);
            System.err.println(" MyRunnable.run after ");
        }
    }

    /**
     * MyMapMapper
     *
     * @author Jerry.X.He
     * @version 1.0
     * @date 2021/4/12 10:29
     */
    private static class MyMapMapper implements MapFunction<String, Tuple2<String, Integer>> {
        @Override
        public Tuple2<String, Integer> map(String word) throws Exception {
            return new Tuple2<>(word, 1);
        }
    }

}

 

Test01WordCount.txt 内容如下 

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

 

整体交互流程 

Driver 提交 Job 到 JobManager, JobManager 分配任务到 TaskManager

05 Flink 的 WordCount,19 flink,flink,debug,topology

然后 TaskManager 和 JobManager 这边交互如下 

05 Flink 的 WordCount,19 flink,flink,debug,topology 

 

Driver 这边的处理  

这里是 driver 这边的根据 DataSet阶段 转换为 Plan阶段

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

这里是 Plan阶段 转换为 OptimizedPlan阶段 

05 Flink 的 WordCount,19 flink,flink,debug,topology 

这里是 OptimizedPlan阶段 转换为 JobGraph阶段 

05 Flink 的 WordCount,19 flink,flink,debug,topology 

然后 提交的就是 JobGraph, 然后 等待集群响应结果信息  

05 Flink 的 WordCount,19 flink,flink,debug,topology 

这里是将 JobGraph 序列化为 为一个临时文件, 然后提交给 flink 集群 

然后 另外就是 job 这边需要使用的 jar 列表, 也需要提交给集群 

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

然后更详细的提交的请求内容如下, 合计传送了 47kb

然后传递的主要内容为 三部分 

第一部分为元数据, 内容为 “{"jobGraphFileName": "flink-jobgraph126256148600228610.bin", "jobJarFileNames": ["HelloFlink-0.0.1.jar"], "jobArtifactFileNames": []}” 

第二部分为 jobGraph 序列化之后的临时文件, “flink-jobgraph126256148600228610.bin”

第三部分为 job 执行需要的 jar 包, “HelloFlink-0.0.1.jar”

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

 

JobManager 这边的处理 

JobManager 这边拿到如上 driver 这边提交的 HttpRequest 之后, 处理如下 

根据 jobGraphFileName 反序列化 JobGraph, 上传 job 所需要的 jar 到 BlobServer

然后就是向 Dispatcher 提交 jobGraph

返回当前提交的 job 的相关信息, 主要是 jobId

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

然后是 Dispatcher 这边 persistAndRunJob, 创建 JobManagerRunner, JobMaster

05 Flink 的 WordCount,19 flink,flink,debug,topology 

然后是 JobManagerRunner 启动 JobMaster

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

 然后是 JobMaster 这边基于内部的 scheudlerNG 来开始调度任务

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

然后是根据 ExecutionVertex 封装 TaskDeploymentDescriptor, 然后 向 TaskManager 执行 job 拆解之后的任务 

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

 

 TaskManager 这边的处理

TaskExecutor 这边收到了 TaskDeploymentDescriptor 之后, 反序列化 jobInformation, taskInformation, 创建 Task, 然后执行 Task 

这里可以从 taskInformation 的上下文信息, 可以看到当前 Task 属于哪一个 JobVertex, 以及改 JobVertex 总共有一个 SubTask, 以及当前 Task 是属于第几个 SubTask 

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

然后就是 Task 这边的执行 

05 Flink 的 WordCount,19 flink,flink,debug,topology 

这边执行的就是基于 jar 包 和 DataSourceTask, ChainnedFlatMapDriver, ChainnedMapDriver 等等 来执行具体的业务处理

业务这边执行流程如下

DataSource 逐行读取 Test01WordCount.txt 中的字符串信息, 这里读取的是第一行 “123 234 456”

ChainedFlatMapDriver 这边是将一行的字符串转换为多行字符串, “123 234 456” 转换为 “123”, “234”, “456”

ChainedMapDriver 这边是将一个输入进行映射, 这里的是 “123” 转换为 (“123”, 1)

SynchronousChainedCombineDriver 这边是将到目前为止的结果, hash partition 之后输出到下游的 SubTask 

SynchronousChainedCombineDriver 这边先是将记录写入到 sorter, 然后 close的时候, 在迭代记录将记录输出到下游 

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

输出当前 Task 的各个记录的地方 

05 Flink 的 WordCount,19 flink,flink,debug,topology 

repartition 的地方 

05 Flink 的 WordCount,19 flink,flink,debug,topology 

每一个 partition 的数据是写出到 RecordWriter 下面的 ReleaseOnSonsumptionResultPartition 下面的 subpartitions[partitionIndex]

然后这部分 ResultPartition 数据是维护在 ResultPartitionManager, 这个是每一个 TaskExecutor 维护一个 ResultPartitionManager 用于相同的任务之间的不同的 SubTask 的数据交互 

所以说一个 SubTask 维护了一个 ReleaseOnSonsumptionResultPartition, 然后维护了 parallelism 个 subpartitions

然后下游的 SubTask 依次来遍历上游的 SubTask, 获取当前 SubTask 需要读取的 subpartitions[index] 来作为输入 

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

输出一条记录之后信息如下, bufferBuilder 中输出了 12 字节, 前面四个字节为长度标记 0x04 长度为 4

然后接下来 8个字节为 ”0431323300000001”, 表示的是 “123” + 1, 最前面的 0x04 表示 0x03 + 1[参见 StringValue.writeString]

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

然后下游的 SubTask 这边读取的是上游的 N 个 SubTask 中的当前 subpartitionIndex 部分的输出, 这里的 partitionId 为上游输入 SubTask 的 partitionId

这里当前 SubTask 会对应 parallelism 个 InputChannel, 每一个分别关联上上游 SubTask 的输出 

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

 

各个 SubTask 的数据交互

上游任务启动的时候, 会向 PartitionManager 注册输出的 ResultPartition 的信息 

然后这里的 partition.getPartitionId 是来自于当前 SubTask 的 partition, 每一个 SubTask 单独生成一个 

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

在 TaskManager 这边传递过程如下, TaskDeploymentDescriptor.producedPartitions.shuffleDescriptor.resultPartitionID -> ReleaseOnConsumptionResultPartition -> ResultPartition

然后再到 后面的 ResultPartition.setup 的向 PartitionManager 注册 

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

然后 JobManager 这边是生成这个 shuffleDescriptor 相关, 传递流程如下 

IntermediateResultPartition -> Vertex.resultPartitions -> Partition -> PartitionDescriptor -> NettyShuffleDescriptor

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

然后 Vertex.resultPartitions 这边初始化如下, 里面的组合了一个 IntermediateResultPartition, 它的 partitionId 是传递到后面 NettyShuffleDescriptor 的 partitionID, 然后这个 partitionId 是随机生成的

根据上下文来看, 就是每一个 SubTask 一个 

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

然后我们看一下 下游的 SubTask 这边来消费上游的 SubTask 的处理, 这里获取的是 InputChannel 的 partitionId

05 Flink 的 WordCount,19 flink,flink,debug,topology 

InputChannel 的 partitionId 是来自于上游的 inputChannelDescriptor.partitionId, 这样就把整个流程串联起来了 

下游的 SubTask 可以读取到所有上游SubTask 的结果信息 

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

 

各个 Task 执行的通知 

Task 和 Task 之间是有依赖关系的, 下游的 Task 相对于 上游的Task 称之为 consumers

当上游的 Task 有数据提交到之后, 这里会通知到 JobMaster 通知 partitionId 已经在产出数据了 

然后 jobMaster 通知该 Task 的下游的 Task 开始执行 

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

然后 JobMaster 这边收到 scheduleOrUpdateConsumers 之后的处理如下 

开始 调度下游的 consumers, 即下游的 SubTask 开始申请资源, 然后 执行 等等

05 Flink 的 WordCount,19 flink,flink,debug,topology

05 Flink 的 WordCount,19 flink,flink,debug,topology 

 

计算结果的交互 

首先是 driver 这边 

Job 提交了之后, 会执行 requestJobResult, 这里面是向 JobManager 这边发送 http 请求, 获取 给定的 Job 的执行结果 

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

发送的 http 请求如下, 请求的是 “/v1/jobs/682debfc2ac22e73847c23b1953343e1/execution-result”

05 Flink 的 WordCount,19 flink,flink,debug,topology 

然后 JobMaster 这边的处理如下, 我们这里 需要关注的是 这个 accumulatorResults, 这里面暂存的我们计算的结果 

05 Flink 的 WordCount,19 flink,flink,debug,topology 

然后这个 accumulatorResults 的数据来自于各个 Task 执行完成之后通知到 JobMaster 这边的 accumulators 

如下图 这里是 Task 执行完成之后提交更新任务执行状态的请求到 JobMaster, state 中携带了 accumulators

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

接下来是封装 ArchivedExecutionGraph, 这里封装的 accumulators 是使用的各个 Task 执行完成之后响应的 accumulators

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

然后 executionGraph.getAccumulatorsSerialized 遍历的基础 accumulatos 是来自于如下, 可以看到的是遍历的当前执行计划的所有的 Vertex 的 accumulators

然后 结合上上一张图可以看到的是 这里是从 vertex.attempt 中获取的数据, 然后 vertex.attempt 的数据是来自于 ExecutionGraph.updateStateInternal

然后外层 JobMaster.jobStatusChanged 将这上面生成的 ArchivedExecutionGraph 设置到了 JobManagerRunner.resultFuture 中 

05 Flink 的 WordCount,19 flink,flink,debug,topology

 

Task 这边, 任务执行完成之后, 将 accumulators 封装到 TaskExecutionState, 然后响应给 JobMaster

05 Flink 的 WordCount,19 flink,flink,debug,topology 

 

完 

 

 

 

到了这里,关于05 Flink 的 WordCount的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink本地Debug调试的方法和注意点

    初学flink可能会疑惑flink项目如何本地调试,毕竟和后端项目不同。本文介绍flink项目本地debug方法和注意点。 以IDEA Maven项目(java开发语言)为例,步骤如下: 一、设置执行环境为Local 公司封装后如下: 注意,调试完后需要修改回集群环境: ** 二、设置参数 注意: 1、如果

    2024年03月22日
    浏览(88)
  • 轻松通关Flink第19讲:Flink 如何做维表关联

    在实际生产中,我们经常会有这样的需求,需要以原始数据流作为基础,然后关联大量的外部表来补充一些属性。例如,我们在订单数据中,希望能得到订单收货人所在省的名称,一般来说订单中会记录一个省的 ID,那么需要根据 ID 去查询外部的维度表补充省名称属性。 在

    2024年02月13日
    浏览(45)
  • Flink1.19版本生产环境应用解读!

    300万字!全网最全大数据学习面试社区等你来! Flink1.19版本更新了,我们按例对最新版本的Flink中的核心能力进行一下解读。 我们的重点还是生产环境应用和需要注意的问题,以及对未来的一些判断。 本次更新涉及到SQL/Runtime/CheckPoint这三个方面的改进,这也是目前整个引擎

    2024年04月17日
    浏览(31)
  • Hudi(19):Hudi集成Flink之索引和Catalog

    目录 0. 相关文章链接 1. Bucket索引(从 0.11 开始支持) 1.1. WITH参数 1.2. 和 state 索引的对比 2. Hudi Catalog(从 0.12.0 开始支持) 2.1. 概述 2.2. WITH 参数 2.3. 使用dfs方式  Hudi文章汇总          默认的 flink 流式写入使用 state 存储索引信息:primary key 到 fileId 的映射关系。当

    2024年02月05日
    浏览(40)
  • 【flink番外篇】19、Datastream数据类型到Table schema映射示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月21日
    浏览(53)
  • Flink流批一体计算(19):PyFlink DataStream API之State

    目录 keyed state Keyed DataStream 使用 Keyed State 实现了一个简单的计数窗口 状态有效期 (TTL) 过期数据的清理 全量快照时进行清理 增量数据清理 在 RocksDB 压缩时清理 Operator State算子状态 Broadcast State广播状态 keyed state Keyed DataStream 使用 keyed state,首先需要为DataStream指定 key(主键)

    2024年02月10日
    浏览(38)
  • 【天衍系列 05】Flink集成KafkaSink组件:实现流式数据的可靠传输 & 高效协同

    Flink版本: 本文主要是基于Flink1.14.4 版本 导言: Apache Flink 作为流式处理领域的先锋,为实时数据处理提供了强大而灵活的解决方案。其中,KafkaSink 是 Flink 生态系统中的关键组件之一,扮演着将 Flink 处理的数据可靠地发送到 Kafka 主题的角色。本文将深入探讨 KafkaSink 的工作

    2024年02月20日
    浏览(63)
  • Doris-05-集成Spark、Flink、Datax,以及数据湖分析(JDBC、ODBC、ES、Hive、多源数据目录Catalog)

    准备表和数据: Spark 读写 Doris Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。 代码库地址:https://github.com/apache/incubator-doris-spark-connector 支持从 Doris 中读取数据 支持 Spark DataFrame 批量/流式 写入 Doris 可以将 Doris 表映射为 DataFra

    2024年02月06日
    浏览(62)
  • 互联网摸鱼日报(2023-05-19)

    InfoQ 热门话题 张勇发布全员信:阿里云将分拆上市,二次创业打造世界级科技公司 面临强制退市后,图森未来又现高管人事地震,技术副总裁王磊将离职 工行与中国外汇交易中心合作打造“智能交易助手”;农业银行金融创新赋能“智慧雄安”建设;广发信用卡基于敏捷理

    2024年02月05日
    浏览(65)
  • WuThreat身份安全云-TVD每日漏洞情报-2023-05-19

    漏洞名称:Weaver OA jx2_config.ini 文件访问漏洞 漏洞级别:中危 漏洞编号:CVE-2023-2766,CNNVD-202305-1694 相关涉及:Sourcecodester Auto Dealer Management System 1.0 漏洞状态:POC 参考链接:https://tvd.wuthreat.com/#/listDetail?TVD_ID=TVD-2023-12447 漏洞名称:cnoa OA硬编码密码 漏洞级别:中危 漏洞编号:CVE-2023-2799 相关

    2024年02月08日
    浏览(63)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包