spark作为大数据组件中不可或缺的一大部分 是我们学习和了解大数据的过程中必须要经历和学习的部分 本人将自己当初学习大数据的一点点心得和体会作为笔记 希望可以给同样在学习大数据同学提供一点点的帮助 同时也希望可以得到大家的指正
spark的特点
-- 基于内存 -- 集群 --快:与 Hadoop 的 MapReduce 相比,Spark 基于内存的运算要快 100 倍以上,基于硬盘的运算也要快 10 倍以上。Spark 实现了高效的 DAG 执行引擎,可以通过基于内存来高效处理数据流。 --易用:Spark 支持 Java、Python、R 和 Scala 的 API,还支持超过 80 种高级算法,使用户可以快速构建不同的应用。而且 Spark 支持交互式的 Python 和 Scala 的 shell,可以非常方便地在这些 shell 中使用 Spark 集群来验证解决问题的方法。 --通用:Spark 提供了统一的解决方案。Spark 可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX),这些不同类型的处理都可以在同一个应用中无缝使用。 --兼容性:Spark 可以非常方便地与其他的开源产品进行融合。比如,Spark 可以使用 Hadoop 的 YARN 和 Apache Mesos 作为它的资源管理和调度器,并且可以处理所有 Hadoop 支持的数据,包括 HDFS、HBase 和 Cassandra 等。这对于已经部署 Hadoop 集群的用户特别重要,因为不需要做任何数据迁移就可以使用 Spark 的强大处理能力。
spark和mr的比较
1.MR为什么这么慢? --额外的复制 --序列化 -- 需要将数据转换成 0 1 0 1 1 1 --磁盘IO开销 --细粒度资源调度: 任务执行之前,不会申请所有的资源,task执行时,自己申请资源,自己释放资源,任务执行就慢了,但是集群资源可以充分利用 shuffle过程会产生网络IO -- 网络IO相对于磁盘IO对于时间消耗大 2.Spark为什么比MR快? --基于内存 --DAG有向无环图 --粗粒度资源调度 任务执行之前,先将所有的资源申请到,task执行的时候不需要自己申请资源,加快了执行速度。如果多数task执行完成,只有一个task没有执行完,那么这批申请到的资源不会被释放,只有所有的task执行完成之后才会释放所有资源。会有集群资源不能充分利用的情况。 什么是进程? -- 是一个具有一定独立功能的程序在一个数据集合上依次动态执行的过程,可以表示为一个正在执行的程序实例 同一个进程下面的的所有线程可以共享一个内存空间
spark的组件
1.Spark Core 核心底层部分 -- 基于RDD (数据类型,数据结构) -- 支持多种语言(r,sql,python) 2.Spark SQL -- 基于DataFrame -- 结构化数据查询 3.Spark Streming 流处理 -- Spark 提供的对实时数据进行流式计算的组件。提供了用来操作数据流的 API。 4.Spark MLLib 机器学习 5.Spark GraphX 图计算
Spark VS Hadoop
尽管 Spark 相对于 Hadoop 而言具有较大优势,但 Spark 并不能完全替代 Hadoop,Spark 主要用于替代Hadoop中的 MapReduce 计算模型。存储依然可以使用 HDFS,但是中间结果可以存放在内存中;调度可以使用 Spark 内置的,也可以使用更成熟的调度系统 YARN 等。 Hadoop Spark 类型 分布式基础平台, 包含计算, 存储, 调度 分布式计算工具 场景 大规模数据集上的批处理 迭代计算, 交互式计算, 流计算 价格 对机器要求低, 便宜 对内存有要求, 相对较贵 编程范式 Map+Reduce, API 较为底层, 算法适应性差 RDD 组成 DAG 有向无环图, API 较为顶层, 方便使用 数据存储结构 MapReduce 中间计算结果存在 HDFS 磁盘上, 延迟大 RDD 中间运算结果存在内存中 , 延迟小 运行方式 Task 以进程方式维护, 任务启动慢 Task 以线程方式维护, 任务启动快
端口号:
Spark查看当前Spark-shell运行任务情况端口号:4040(计算) Spark Master内部通信服务端口号:7077 Standalone模式下,Spark Master Web端口号:8080(资源) Spark历史服务器端口号:18080 Hadoop YARN任务运行情况查看端口号:8088
spark运行架构
1.Driver -- 是一个 JVM 进程, 负责执行 Spark 任务的 main 方法 执行用户提交的代码,创建 SparkContext 或者 SparkSession 将用户代码转化为 Spark 任务(Jobs)创建血缘(Lineage),逻辑计划(Logical Plan)和物理计划(Physical Plan) 在 Cluster Manager 的辅助下,把 task 任务分发调度出去 跟踪任务的执行情况,收集日志 2.Spark Context/Session 它是由 Spark driver 创建,每个 Spark 应用对应一个 程序和集群交互的入口 可以连接到 Cluster Manager 3.Cluster Manager 负责部署整个 Spark 集群 包括上面提到的 driver 和 executors 具有以下几种常见的部署模式: Standalone YARN Mesos Kubernetes 4.Executor 一个创建在 worker 节点的进程 一个 Executor 有多个 slots(线程) 一个 slot 就是一个线程,对应了一个 task 可以并发执行多个 tasks 负责执行 Spark 任务,把结果返回给 Driver 可以将数据缓存到 worker 节点的内存
spark运行模式
1. Standalone模式
还是要将应用提交到对应的集群中去执行,这里来看看只使用Spark自身节点运行的集群模式,也就是我们所谓的独立部署(Standalone)模式。Spark的Standalone模式体现了经典的master-slave模式。 独立运行模式 自带完整的资源管理服务 可单独部署到一个集群中 无需依赖任何其他资源管理系统 主从架构:Master/Worker 支持两种任务提交方式 Cluster:适用于生产环境 Client:适用于交互、调试 集群规划: hadoop101 hadoop102 hadoop103 Spark Worker Master Worker Worker ================hadoop101================ 3330 Jps 3238 Worker 3163 Master ================hadoop102================ 2966 Jps 2908 Worker ================hadoop103================ 2978 Worker 3036 Jps
RDD
RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark 中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。在Spark 中,对数据的所有操作不外乎创建RDD、转化已有RDD 以及调用RDD 操作进行求值。每个RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 可以包含Python、Java、Scala 中任意类型的对象, 甚至可以包含用户自定义的对象。RDD 具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD 允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。RDD 支持两种操作:transformation操作和action操作。RDD 的转化操作是返回一个新的RDD 的操作,比如map()和filter(),而action操作则是向驱动器程序返回结果或把结果写入外部系统的操作。比如count() 和first()。 Spark 采用惰性计算模式,RDD 只有第一次在一个行动操作中用到时,才会真正计算。Spark 可以优化整个计算过程。默认情况下,Spark 的RDD 会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个RDD , 可以使用RDD.persist() 让Spark 把这个RDD 缓存下来。
RDD的描述 * ① 通过读取文件可以返回一个 RDD 该RDD是一个抽象类 * ② RDD中的泛型对应是该RDD中操作数据的数据类型 * ③ RDD中并不存储具体的数据,存储是数据处理的逻辑 (DAG有向无环图) * ④ RDD中封装的处理逻辑不可变,如果需要变更,那么需要产生新的RDD RDD=> RDD又称为弹性分布式数据集 * 弹性: * 1.可以存储数据到内存,也可以存储到磁盘 * 2.具有一定的容错性,数据发生错误,会自动重试 * 3.数据可以按需求进行做分片(Split) * 分布式: 数据存储在不同节点上,计算也可以在不同节点上
RDD的五大特性:(面试)
RDD的五大特性: 1. A list of partitions 由一系列分区组成 分区就是一个并发 分区数或并发数,其默认值受哪些因素影响 1.修改读取文件的数量 可以修改分区数(并发数量) -- 对应mr中的切片 FileInputFormat => 可以获取数据的切片 => 1个切片对应一个分区 => 切片数是由文件数和文件大小决定的 2.设定最小分区数 (当前任务运行时最小的分区数) 3.在Spark提交时设置executor的数量 2. A function for computing each split 函数是作用在每一个分区上的 当数据被切分成多个切片时,每个切片默认对应一个分区,每个分区中的数据都会执行当前stage中的每一个函数 3. A list of dependencies on other RDDs RDD之间是有一系列的依赖关系 3.1一系列的依赖关系,形成了数据的处理逻辑(DAG有向无环图) 3.2 并不是所有的算子都会返回一个RDD 能返回RDD的算子 称为 转换算子 返回值为RDD 不能返回RDD的称为行动算子 行动算子的源码中对当前的Job进行了提交,所以可以形成一个Job -- 一个行动算子形成一个job 4. Optionally, a Partitioner for key-value RDDs 可选项,分区器是作用在KV形式的RDD上 KV形式RDD表示为RDD中存储数据的类型为 tuple2(key,value) 5.Optionally, a list of preferred locations to compute each split on Spark会给每个执行任务提供最佳的计算位置 移动计算不移动数据 如果当前数据存储在HDFS中的node1节点,那么Spark在计算时,会将executor分配到node1节点上运行
Yarn-client任务执行流程
1.SparkSubmi提交任务
2.当前客户端启动Drive
3.通过集群管理器注册应用程序,启动Excutor
4.Excutor反向注册Driver端,告知任务执行,
5.执行main函数
6.获取对应的算子,创建执行逻辑(DAG图)
7.执行行动算子 -- 触发job
8.对stage进行划分 (stage划分的依据 -- 有没有shuffle过程)
7.创建Task集合
8.将Task分发給Excutor进行任务的执行
-- Client模式和Cluster模式执行的区别就是Cluster模式的Drive会在nodemanger执行
在Driver端内部会对RDD进行划分形成DAG,RDD的内部时由以系列分区组成,每一个分区对应一个并发,每一个并发对应一个Task任务,Task在执行时,会分发到对应的Excutor中执行,Excutor是进程,独享一个内存空间Task是进程下面的线程,共享进程的内存空间
shuffle
shuffle,直面翻译就是混洗,它是连接map和reduce的桥梁。
在DAG阶段以shuffle为界,划分stage,上游stage做map task,每个map task将计算结果数据分成多份,每一份对应到下游stage的每个partition中,并将其临时写到磁盘,该过程叫做shuffle write;下游stage做reduce task,每个reduce task通过网络拉取上游stage中所有map task的指定分区结果数据,该过程叫做shuffle read,最后完成reduce的业务逻辑。举个例子,假如上游stage有100个map task,下游stage有1000个reduce task,那么这100个map task中每个map task都会得到1000份数据,而1000个reduce task中的每个reduce task都会拉取上游100个map task对应的那份数据,即第一个reduce task会拉取所有map task结果数据的第一份,以此类推。 在map阶段,除了map的业务逻辑外,还有shuffle write的过程,这个过程涉及到序列化、磁盘IO等耗时操作;在reduce阶段,除了reduce的业务逻辑外,还有前面shuffle read过程,这个过程涉及到网络IO、反序列化等耗时操作。所以整个shuffle过程是极其昂贵的,spark在shuffle的实现上也做了很多优化改进,随着版本的迭代发布,spark shuffle的实现也逐步得到改进。
RDD的五大特性? Spark中的stage时如果划分的? -- 看一个Rdd之间是否有shuffle过程,有shuffle过会被切分成一个stage RDDsaunzi
reduceBykey,aggregateByKey,reduce,groupBy,groupByKey
1.//使用reduceBykey求平均年龄 平均年龄 = 总年龄 / 总人数 1500100009,沈德昌,21,男,理科一班 val sparkConf: SparkConf = new SparkConf().setAppName("createRDD").setMaster("local") val s, = new SparkContext(sparkConf) val stuRDD: RDD[String] = sc.textFile("data/students.txt") val value = stuRDD.map { case oneLine: String => val splitRes: Array[String] = oneLine.split(",") (splitRes(4), (splitRes(2).toInt, 1)) -- reduceByKey与key无关,对value中的数据做累加据聚合,以班级分组,对年龄和人数进行累加计算 (sumTuple: (Int,Int) ,valueTuple : (Int,Int)) }.reduceByKey{ case (sumTuple: (Int,Int) ,valueTuple : (Int,Int) )=> { ((22,1),(24,1)) ((46,2),(23,1)) ((22,1),(22,1)) ((44,2),(23,1)) ((24,1),(21,1)) -- (sumTuple,valueTuple) val agesum: Int = sumTuple._1 + valueTuple._1 val peoplesum :Int= sumTuple._2 + valueTuple._2 (agesum,peoplesum) } -- za这里要把求总年龄和总人数分开来看,是互相独立的部分,定义了两个Tuple -- 在这一步中,使用reduceByKey操作根据键对值进行聚合。对具有相同键的元素进行分组,并对每组的值进行自定义的聚合操作。在这里,对每组的值进行匹配,使用模式匹配来提取出元组的四个整数值。然后,将第一个整数值和第三个整数值相加,将第二个整数值和第四个整数值相加,最后返回一个新的元组。这个操作的结果是得到一个新的RDD,其中键是原始键,而值是聚合后的元组(Int, Int) }.map{ case (clazz: String, (ageSum: Int, peoplesum: Int)) => (clazz,ageSum/peoplesum) }.foreach(println) 2.使用reduceByKey求平均年龄 val Stureduce: RDD[(String, (Int, Int))] = stuRDD.map { oneline => { val splitRes: Array[String] = oneline.split(",") (splitRes(4),(splitRes(2).toInt,1) ) } } Stureduce.reduceByKey{ case ((age1: Int, age2: Int), (age3: Int, age4: Int)) => { (age1 + age3, age2 + age4) } -- (age1: Int, age2: Int)表示第一个值元组,其中age1表示学生年龄的累加和,age2表示计数的累加和。 -- 同样地,(age3: Int, age4: Int)表示第二个值元组,其中age3表示当前要合并的学生年龄,age4表示当前要合并的计数。 -- 最终,通过(age1 + age3, age2 + age4)将新的学生年龄的累加和和计数的累加和构成一个新的值元组,用于下一步的聚合操作。 2.使用aggregateByKey求平均年龄 -- 1.默认值 2.分区内进行操作(累加。。) 3.对不同分区间的分区内的数据进行操作 stuRDD.map { case oneLine: String => val splitRes: Array[String] = oneLine.split(",") (splitRes(4), (splitRes(2).toInt, 1)) }.aggregateByKey( (0,0) )( // 第一个匿名函数:在分区内进行做数据计算 (sumTup: (Int, Int), valueTup: (Int, Int)) => { val ageSum = sumTup._1 + valueTup._1 val stuSum = sumTup._2 + valueTup._2 // 将结果返回作为下一次计算的第一个参数列表值 (ageSum, stuSum) } , // 第二个匿名函数: 在分区间做数据计算 (sumTup: (Int, Int), valueTup: (Int, Int)) => { val ageSum = sumTup._1 + valueTup._1 val stuSum = sumTup._2 + valueTup._2 // 将结果返回作为下一次计算的第一个参数列表值 (ageSum, stuSum) } ).map { case (clazz: String, (ageSum: Int, stuSum: Int)) => (clazz, ageSum / stuSum.toDouble) }.foreach(println) 例: val groupByKeyRes3 = stuRDD .map { case oneLine: String => val splitRes: Array[String] = oneLine.split(",") // 以班级作为Key 其他信息做为Value (splitRes(4), (1)) }.aggregateByKey(0)( (acc: Int, value: Int) => acc + value, // 分区内的累加操作 (acc1: Int, acc2: Int) => acc1 + acc2 // 分区间的操作 /** * 在分区内部,使用(acc: Int, value: Int) => acc + value * 的累加操作将学生人数累加到累加器acc上。在分区间, * 使用(acc1: Int, acc2: Int) => acc1 + acc2 * 的累加操作将不同分区内的累加结果进行合并。 */ ).foreach(println) def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] zeroValue 是初始值,用于每个键的初始聚合结果。 seqOp 是在每个分区内进行聚合的函数,它将初始值 zeroValue 和每个键的值进行聚合操作,返回一个新的聚合结果。 combOp 是在不同分区之间进行聚合的函数,它将两个聚合结果进行合并操作,返回一个新的聚合结果。 现在我们来详细解释一下 aggregateByKey 的工作流程: 1.对每个分区内的数据进行聚合: 2.对于每个键值对 (key, value),将 value 和初始值 zeroValue 作为参数传递给 seqOp 函数,计算该分区内每个键的局部聚合结果。 3.seqOp 函数可以自定义,可以根据需要进行求和、计数、最大值等操作。它返回的是该键的局部聚合结果。 在不同分区之间进行聚合: 4.将每个分区的聚合结果进行合并,将两个聚合结果作为参数传递给 combOp 函数,计算不同分区之间每个键的全局聚合结果。 5.combOp 函数也可以自定义,它可以根据需要将两个聚合结果进行合并操作,返回的是该键的全局聚合结果。 返回一个新的RDD,其中每个键都关联着最终的聚合结果。 2.2aggregateByKey例2 -- 有一批数据 需要再分区内做累计 分区间做最大值 val sparkConf: SparkConf = new SparkConf().setAppName("createRDD").setMaster("local") val sc = new SparkContext(sparkConf) val rdd = sc.parallelize( List( ("a", 1), ("a", 2), ("b", 3) // 第一个分区 , ("b", 4), ("c", 5), ("c", 6) // 第二个分区 ), 2 //表示有两个分区 ) // 有一批数据 需要再分区内做累计 分区间做最大值 rdd.foreach(println) rdd.aggregateByKey(0)( (sum,value) => { sum+value }, (sumPartition1:Int,sumPartition2:Int ) => { math.max(sumPartition1,sumPartition2) } ).foreach(println) 3.reduce reduce只会对单个元素进行累加计算 - reduce(_ + _):对上一步生成的RDD中的元素进行累加操作。_ + _表示对两个元素进行相加操作。reduce函数会将RDD中的元素逐个进行相加,得到最终的累加结果。 - 例如,假设studentPairs RDD中的元素为(班级1, 10)、(班级2, 20)、(班级3, 30),则经过上述操作后,studentPairs.map(_._2).reduce(_ + _)的计算过程如下: - studentPairs.map(_._2)将每个键值对的值提取出来,生成一个包含学生数量的RDD:[10, 20, 30]。 reduce(_ + _)对上述RDD中的元素进行累加操作:10 + 20 + 30 = 60。 val groupByKeyRes4 = stuRDD .map { case oneLine: String => val splitRes: Array[String] = oneLine.split(",") // 以班级作为Key 其他信息做为Value (splitRes(4), (1)) }.groupBy(_._1) .map{ case(clazz,ite)=>{ (clazz,ite.map(_._2).reduce(_+_)) } }.foreach(println)
RDD分区数
Task是作用在每个分区上的,每个分区至少需要一个Task去处理
改变分区数可间接改变任务的并行度,类似手动指定Reduce数量
第一个RDD的分区数由切片的数量决定默认情况下子RDD的分区数等于父RDD的分区数
Shuflle类算子可手动指定RDD分区数
设置spark.default.parallelism参数可改变Shuffle类算子默认分区数通过repartition/coalesce操作改变RDD分区数repartition:
通过shuffle的方式增加或减少分区数coalesce:默认不通过shuffle的方式改变分区数,只能减少分区
sparkConf.set("spark.default.parallelism", "2") /** * 在Apache Spark中,"spark.default.parallelism"属性确定在执行操作 * (如parallelize、parallelizePairs)和默认并行度操作 * (如reduceByKey、groupByKey和join)时使用的默认分区数。 * * 这个属性可以用来控制Spark作业中的并行度。 * 设置较高的并行度可以提高作业的执行速度,但也会增加资源的消耗。 * 根据你的需求和集群的资源情况,你可以根据需要调整这个值。 * * 需要注意的是,设置默认并行度只会影响那些没有明确指定分区数的操作。 * 对于那些显式指定分区数的操作,将会使用指定的分区数而不受"spark.default.parallelism"的影响。 */ 1.val StuRDD = sc.textFile("data",minPartitions = 2) 使用minPartitions = 2设定分区数 默认分区数 是根据读取文件的数量以及文件的大小决定的 受minPartitions影响 2. /** *手动设置分区数量 * 方式1:通过repartition对当前分区进行重新设置,其过程会产生Shuffle * 可以动态增加对应的分区数量 也可以减少对应的分区数 * 对应源码中实际上调用的也是coalesce coalesce(numPartitions, shuffle = true) * * 方式2:通过 coalesce 对当前的分区数进行调整 默认不会产生Shuffle过程 * coalesce 默认情况下 不能增加分区数 可以实现减小分区数 该过程中没有产生Shuffle * coalesce 可以通过调整shuffle参数来实现增加分区数 shuffle = true or false * * * 注意:对于减小分区来说,尽量不去使用repartition 而使用 coalesce * */ -- val coalesceRdd = stuInfo.coalesce( 1, shuffle = true ) println(coalesceRdd.getNumPartitions) coalesceRdd.take(10) while (true){ -- val repartitionRDD: RDD[(String, (String, String, String, String))] = stuInfo.repartition(1) repartitionRDD.take(10)
Cache缓存·
为什么要使用Cache缓存? -- Spark中对每个RDD执行一个算子操作时,都会重新从源头处计算一遍 如果该RDD被多次使用,则会导致该RDD被重复计算 重复计算,浪费资源,消耗时间,影响整体性能 如何使用Cache缓存,怎么使用,在什么情况下去使用? -- 缓存策略考虑因素 放内存、磁盘、堆外内存? 是否需要序列化? 需要几个副本? -- 如何选择合适的缓存策略? 基于尽可能放入内存原则 一般不会使用多副本 内存充足:MEMORY_ONLY -直接放内存,不进行序列化,性能最高 内存不够:MEMORY_AND_DISK_SER -序列化后可减少空间占用 -反序列化所消耗的时间远小于从磁盘读取的时间 -性能会优于MEMORY_AND_DISK 其他策略基本不考虑 * 对于每个RDD中 如果存在有对一个父RDD存在有依赖关系,那么默认情况下 不会对其RDD的数据进行缓存,而是会重新计算 * 如下过程中:textFile => map 会被执行3次,会消耗过多的资源,无用功,可以对map设置一个Cache缓存,后,textFile => map 只会被执行1次 -- stuInfo.persist(StorageLevel.MEMORY_AND_DISK) * * 方式1: * cache 默认是将数据进行缓存到内存当中 persist(StorageLevel.MEMORY_ONLY) * * 方式2: * persist函数 该函数中可以对缓存的级别进行设置 StorageLevel中共有12中缓存方式 分别包括:内存 磁盘 序列化 副本 四个方面 * 注意: * 对缓存的数据可以通过 unpersist 对其进行清除缓存 手动清除缓存有助于节约资源 * 对于SPark中的缓存方式,如果当前应用程序结束,那么默认会对其缓存进行清空 */ -- 对多次使用的RDD可以通过cache/persist操作进行缓存 1. repeatRDD.cache() 默认以仅内存策略对RDD进行缓存 2. 相当于repeatRDD.persist(StorageLevel.MEMORY_ONLY)
val stuInfo: RDD[(String, (String, String, String, String))] = stuRDD.map { case oneLine: String => // println("当前map执行了1次") //不做任何操作的情况下 map执行了3000次 println("当前map执行了1次") val splitRes: Array[String] = oneLine.split(",") (splitRes(0), (splitRes(1), splitRes(2), splitRes(3), splitRes(4))) } stuInfo.persist(StorageLevel.MEMORY_AND_DISK) -- 设置缓存模式 --用完记得释放缓存:repeatRDD.unpersist()
Cache和Checkpoint的不同点
例: 当将车辆信息存入SparkStreaming中时,如果SparkStreaming发生宕机,使用Cache缓存的数据执行完job完之后就自动删除释放掉,宕机无法恢复,而使用Checkpoint 缓存的数据,会自动保存在啊磁盘中,宕机可恢复. -- checkpoint:会对RDD构建检查点,会单独形成一个job,在该JOB中会将之前checkpoint的父RDD流程重新执行一次,保存到磁盘中 -- 一般情况下,如果要对数据进行checkpoint会在之前对其进行做cache缓存,将父RDD中的数据缓存后,在checkpoint中不需要再重复计算
广播变量
-
从Driver端把一个RDD以为的数据发送到一个RDD内部使用
广播变量 -- 从Driver端把一个RDD以为的数据发送到一个RDD内部使用 算子内部的代码最终会被封装到Task并发送到Executor中执行 例如Driver端的map集合,会把数据发送給每一个task中,如果分区数很多,就会占用较高的资源 把Executor看成进程,其中的一个一个的task看作线程,由于进程和线程共享一个内存空间,可以之间将map集合中的数据直接发送到公共的内存空间中,不在需要一个一个的发送到task中。 未使用广播变量: -- 每一个task都要发送一份到Executor中执行 使用了广播变量: -- 算子内部的代码最终会被封装发送到一个Executor的共享内存中执行,节省内存空间,只在Executor中存储一份执行文件 例: def main(args: Array[String]): Unit = { /** * 以MapJoin为例使用 BroadCast * */ val sparkConf = new SparkConf().setAppName("createRDD").setMaster("local") sparkConf.set("spark.default.parallelism", "2") val sc = new SparkContext(sparkConf) val scoRDD = sc.textFile("data/score.txt") val groupByRes: RDD[(String, Iterable[(String, String)])] = scoRDD.map { case oneLine: String => val splitRes: Array[String] = oneLine.split(",") (splitRes(0), (splitRes(1), splitRes(2))) }.groupByKey() val scoreInfo = groupByRes.map { case (id, ite) => { (id, ite.map(_._2.toInt).sum) } } // 将所有的RDD数据收集到Driver端,之后再将其进行广播变量 -- RDD不能直接进行广播,需要将数据拉去到driver端,变成一个map集合才能进行广播!! val broadCast = sc.broadcast(scoreInfo.collect().toMap) // 不能通过一个RDD在另外一个RDD内部进行调用,因为RDD不能进行做序列操作 // val totalScore: Int = scoreInfo.collect().toMap.getOrElse(splitRes(0), 0) val stuRDD: RDD[String] = sc.textFile("data/students.txt") println("分区数:", stuRDD.getNumPartitions) stuRDD.map { case oneLine => val splitRes = oneLine.split(",") val totalScore = broadCast.value.getOrElse(splitRes(0), 0) (splitRes(0), splitRes(1), splitRes(2), splitRes(3), splitRes(4), totalScore) }.foreach(println)
广播变量获取流程如下:
算子内部使用了广播变量 Task会向Executor申请获取广播变量,若Executor暂无数据,则 Executor首先会向同机架的其他Executor获取 若获取不到再向跨机架的Executor获取 如果还是获取不到,则向Driver端获取数据 广播变量获取后会优先放入内存中,由BlockManager管理维护 后续Task可直接从MemoryStore中获取使用 -- 减少了Driver端的网络IO
累加器
1.在Spark应用程序中,经常会有如下需求 例如如异常监控,调试,记录符合某特性的数据的数目 这类需求都需要用到计数器 2.如果变量不被声明为累加器,那么它被改变时不会在Driver端进行全局汇总 即在分布式运行时每个task运行的只是原始变量的一个副本 并不能改变原始变量的值 但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能 -- 累加器定义: 在Driver端定义一个变量num = 0 ,然后在RDD中进行累加计算,由于在rdd中使用了RDD以外的变量法,把num分发到task中,但是num无法返回到Driver端。使用累加器,变量num在rdd中计算执行完之后,会把执行结果拉取到Driver端继续执行,得到最终的结果。
执行流程: 1.在Driver端定义 sc.longAccumulator 2.在算子内部累加 accCount.add 3.在Driver端汇总 accCount.value 1.val stuNum: LongAccumulator = sc.longAccumulator("stuNum") 2. stuRDD.map{ case oneStudent => num += 1 stuNum.add(1) println("学习当前人数: " ,num) val splitRes: Array[String] = oneStudent.split(",") (splitRes(0), (splitRes(1), splitRes(2), splitRes(3), splitRes(4))) }.take(10) 3. println("等于" + stuNum.value)
BlockManager(是Spark的分布式存储系统)
主从结构:BlockManagerMaster/BlockManager(Slave) -- BlockManagerMaster 在Driver端启动 负责接受Executor上的BlockManager的注册 管理BlockManager的元数据信息 -- BlockManager 在每个Executor中启动 负责管理所在节点上的数据 组成: 主要由四部分构成 --MemoryStore:负责对内存上的数据进行存储和读写 --DiskStore:负责对磁盘上的数据进行存储和读写 --BlockTransferService:负责建立网络连接 --BlockManagerWorker:负责对其他的BlockManager的数据进行读写 -- 当Executor 的BlockManager 执行了增删改操作,那就必须将 block 的 blockStatus 上报给Driver端的BlockManagerMaster -- BlockManagerMaster 内部的BlockManagerMasterEndPoint 内维护了元数据信息的映射,其内部通过Map、Set等数据结构实现,易于维护增加、更新、删除元数---- ------- BlockManager主要维护以下三类数据: Cache缓存的数据 广播变量和累加器 Shuffle产生的数据
Shuffle文件获取过程
数据本地化级别
Spark任务主要有5个数据本地化级别: 1.PROCESS_LOCAL进程本地化 -- Task要计算的数据在本进程(Executor)的内存中 2.NODE_LOCAL节点本地化 -- Task所计算的数据在本节点所在的磁盘上 -- Task所计算的数据在本节点其他Executor进程的内存中 3.NO_PREF无最佳位置 -- Task所计算的数据在外部系统,例如MySQL,故无最佳计算位置 4.RACK_LOCAL机架本地化 -- Task所计算的数据在同机架的不同节点的磁盘或者Executor进程的内存中 5.ANY跨机架调用 -- Task所计算的数据在不同机架的不同节点的磁盘或者Executor进程的内存中 . 为什么Driver端会知道Executor端的数据所在的哪个节点中(具体信息)? -- 因为BlockManagerMaster 内部的BlockManagerMasterEndPoint 内维护了元数据信息的映射,其内部通过Map、Set等数据结构实现,易于维护增加、更新、删除元数据 节点->Executor->task/Data Driver端发送Task到Executor执行 TaskScheduler会根据所在位置发送对应的Task本地化级别进程本地化
资源申请和任务调度(面试经常问道)
1.stage是由是否有shuffle产生而划分的
2.分配资源就是分配Executor,
3.任务调度的过程在Task中完成
4.资源时申请可以看作是在Driver端执行的吗?
资源申请
基于不同的资源管理框架,Spark支持多种部署方式 On Yarn是业内常用的一种方式 On Yarn又可以分为两种运行模式: Client客户端模式 常用于上线前的测试 Cluster集群模式 任务真正上线时运行的模式 当然Standalone模式也支持两种运行模式,暂不做讨论
Yarn Client模式(执行流程) -- 面试必问
-- 资源申请 1.Driver端向ResourceManger申请启动ApplicationMaster(AM) 2.ResourceManger随机向NodeManger中启动一个Container 3.之后NodeManger的ApplicationMaster(AM) 向ResourceManger申请Executor 4.在NodeManeger中启动对应的Executor 5.Executor向Driver端反向注册 Driver端中,客户端中 DAGSchedule中 -- 任务调度 -- 都在Driver端执行 6.遇到一个行动算子开始任务调度,执行sc.runjob 7.构建DAG图 8.根据宽窄依赖切分stage (是否有shuffle产生) 9.将stage以taskset形式发送給TaskSchedlue TaskSchedule中 -- 任务调度 10.TaskSchedlue根据TaskSet中的任务,根据本地化策略将Task发送給对应的Executor执行
任务调度——术语解释
DAGSchedule: 基于DAG及宽窄依赖切分Stage,决定每个任务的最佳位置 记录哪个RDD或者Stage输出被物化 将Taskset传给底层调度器TaskScheduler 重新提交shuffle输出丢失的stage TaskSchedule: 提交Taskset(一组并行task)到集群运行并汇报结果 出现Shuffle输出lost要报告fetchfailed错误 碰到straggle任务需要放到别的节点上重试 为每一一个TaskSet维护一一个TaskSetManager(追踪本地性及错误信息) -- 一个Spark Application包含多个Job -- 每个Action算子对应一个Job -- 一个Job又可以根据有宽窄依赖划分成多个Stage -- Stage是一组可以并行计算的Task,即一个Stage中包含了很多个Task (TASK指分区,一个分区对应一个Task) -- Task是Spark任务执行调度的最小单元 -- Task最终会被Driver端发送到Executor中执行 -- 当资源申请过程完成后,Executor启动成功,即可进行任务调度 RDD之间存在着依赖关系,基于这些依赖关系可以形成DAG有向无环图 DAGScheduler会对形成DAG图根据宽窄依赖进行Stage划分 划分的规则很简单,从后往前回溯 遇到窄依赖加入本stage 遇见宽依赖进行Stage切分 DAGScheduler会将每个Stage以TaskSet的形式提交给TaskScheduler TaskScheduler 负责具体的Task调度,将Task发送到Worker节点执行 当Task失败时会由TaskScheduler进行重试,默认重试3次,三次之后如果还是失败则认定该Task所在的job失败 若遇到Shuffle File Not Found问题,则DAGSchedule会重新提交Shuffle输出丢失的Stage,默认重试4次,四次之后还是失败则认定Application失败 推测执行:碰到计算缓慢Task,会在其他节点的Executor上启动一个相同的Task,哪个Task先完成,就以最先完成的Task其计算结果作为最终结果
client端和cluster端的区别 -- 面试问到
Client模式 Driver端在任务提交所在节点本地创建 主要用于Job的调试,上线前的测试 便于在本地查看日志 当多用户同时提交多个任务时,Driver 会与 Executor 进行大量的通信,会占用大量IO,导致网卡流量激增而被SA警告 ApplicationMaster的作用-- 为当前的Application申请资源 给NodeManager发送消息启动Executor。 Cluster模式 -- 生产环境必须使用!!!! AM兼顾Driver端的作用,在某个NM中创建 适用于任务真正上线 由于AM(Driver)端是在任意某个NM中创建,故不会造成单节点流量激增,也不会导致网卡风暴 无法直接查看日志,需要通过命令或者在WEB界面查看 -- master:8088 -> running ->history ->标准输出和标准错误输出 ApplicationMaster的作用: 当前的Application申请资源: -- 给nodemanager发送消息 启动Excutor。 任务调度。(这里和client模式的区别是AM具有调度能力,因为其就是Driver端,包含Driver进程)
PageRank
初始值——PR值 每个页面设置相同的PR值 默认初始PR值为1 迭代递归计算(直到收敛) 将PR值平均分给每个出链 将每个页面所有入链值加起来作为该页面新的PR值 不断的重复计算每个页面的PR值,最终会趋向于稳定,也就是收敛的状态 如何确定收敛标准? 每个页面的PR值和上一次计算的PR相等 设定一个阈值,比如0.0001,当所有页面和上次计算的PR平均差值小于该标准时则收敛 设定一个百分比(99%),当99%的页面和上一次计算的PR相等,则收敛
PageRank执行步骤
PageRank执行步骤 -- 1.设置文件格式PageRank.txt,C->A,B表示页面的关系 c入链给A,B,也就是c給A,B都投了一票,自身的初始权重1.0 -> A:0.5 B:0.5 C->A,B D->C,B B->C A->B,D -- 2.切分文件为当前页面,下一页面(currentPage, nextPage) -- 3.设置页面初始的权重为1.0 -- (currentPage, nextPage, 1.0) (C,List(A, B),1.0) (D,List(C, B),1.0) (currentPage, nextPage, pr) -- 4.计算每个页面的平均权重值PR = 1.0,(PageRank的目的就是一直计算权重值直到收敛即这一次的权重值和上一次的权重值接近) (D,List(C, B),1.0) -> (D, C 0.5,B 0.5) -- 5.输出的权重值做.reduceByKey(_+_) ,得到一次PageRank的结果,输出当前迭代次数和对应结果 (B,1.5) (A,0.5) (C,1.5) (D,0.5) -- 6.设计循环,将计算结果与原始RDD关联,得到新的prPageRank val joinRes: RDD[(String, (Double, List[String]))] =pageRank.join(SourceRDD) prPageRank = joinRes.map { case (currentPage, (pr, nextPages)) => (currentPage, nextPages, pr) } package com.shujia import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object PageRank02 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("pagerank").setMaster("local") val sc = new SparkContext(conf) // 读取文本文件,每行表示页面关系,形如:A->B,C val pageRank = sc.textFile("data/pageRank.txt") /** * C->A,B * D->C,B * B->C * A->B,D */ //切分页面,把文件划分为当前页面和下一页 val SourceRDD = pageRank.map { case line => val splitRes = line.split("->") val currentPage = splitRes(0) val nextPage = splitRes(1).split(",").toList (currentPage, nextPage) } // 初始化PageRank RDD,其中每个元素包含当前页面、投票页面列表和初始权重值1.0 var prPageRank = SourceRDD.map { case (currentPage, nextPage) => (currentPage, nextPage, 1.0) } /** * (currentPage, nextPage, pr) * (C,List(A, B),1.0) * (D,List(C, B),1.0) * (B,List(C),1.0) * (A,List(B, D),1.0) */ val stop = 10 // 迭代停止的次数 var cnt = 0 // 当前迭代次数 while (cnt < stop) { // 计算每个页面的平均权重值 val pageRank = prPageRank.map { case (currentPage, nextPage, pr) => val pageNum = nextPage.size val avgPr = pr / pageNum // 平均权重值 // 将平均权重值赋予给投票的每一个页面 val avgPrNextPage: List[(String, Double)] = nextPage.map { case page => (page, avgPr) } avgPrNextPage }.flatMap{ case avgPrNextPage => avgPrNextPage } .reduceByKey(_+_) // 输出当前迭代次数和对应结果 pageRank.foreach(println) /** * (B,1.5) * (A,0.5) * (C,1.5) * (D,0.5) */ // 将计算结果与原始RDD关联,得到新的prPageRank val joinRes: RDD[(String, (Double, List[String]))] =pageRank.join(SourceRDD) prPageRank = joinRes.map { case (currentPage, (pr, nextPages)) => (currentPage, nextPages, pr) } cnt += 1 println(s"这是第${cnt}次循环得到的结果") } } }
PageRank高阶
object PageRank06 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("chunge").setMaster("local") val sc = new SparkContext(conf) // 读取包含页面链接关系的文件,并将数据转换为 RDD val pageRankRDD = sc.textFile("data/pageRank.txt") // 将页面链接关系的数据转换为 (currentPage, nextPage) 的键值对形式,并保存为 SourceRDD val SourceRDD = pageRankRDD.map { oneline => { val splitRes = oneline.split("->") val currentPage = splitRes(0) val nextPage = splitRes(1).split(",").toList (currentPage, nextPage) } } // 将 SourceRDD 映射为带初始 PageRank 值的 RDD,保存为 prPageRank var prPageRank = SourceRDD.map { case (currentPage, nextPage) => (currentPage, nextPage, 1.0) } val q = 0.85 // 阻尼系数 val N: Long = prPageRank.count() // 页面总数 val p = 0.0001 // 收敛系数 var flag = true // 控制循环退出的标志 while (flag) { // 计算下一轮的 PageRank 值,并保存为 pageRankRes val pageRankRes: RDD[(String, Double)] = prPageRank .map { case (currentPage, nextPage, pr) => val avgNum = pr / nextPage.size val avgNextPages = nextPage.map { case nextPage => (nextPage, avgNum) } avgNextPages } .flatMap { case avgNextPages => avgNextPages } .reduceByKey(_ + _) .map { case (page: String, pr: Double) => // (1-q) / N + q * 求和(pr/L) (page, (1 - q) / N + q * pr) } // 在进行下一轮之前需要比较前后两次得分统计的结果 val sumRes: Double = prPageRank .map { case (currentPage, nextPages, pr) => (currentPage, pr) // 将 (currentPage, nextPages, pr) 转换为 (currentPage, pr) } .join(pageRankRes) // 将 prPageRank 和 pageRankRes 进行关联,共享相同的键 .map { // 对每个网页前后两次得到的 PR 值进行比较差额 case (currentPage, (pr1, pr2)) => (currentPage, math.abs(pr1 - pr2)) // 计算 PR 值的差异,并取绝对值 } // 对当前所有页面的 PR 差额进行统计求和 .map(_._2).sum() // 提取差额值,并求和 // 当平均差额小于阈值,认为前后两次计算得到的 PageRank 结果变化不大,退出循环得到最终结果 if (sumRes / N < p) { flag = false // 设置循环标志为 false,退出循环 println("得到最终的结果为:") pageRankRes.foreach(println) // 打印最终的 PageRank 结果 } // 将 RDD 进行关联,准备进行下一次循环 val joinRes: RDD[(String, (List[String], Double))] = SourceRDD.join(pageRankRes) // 将 SourceRDD 和 pageRankRes 进行关联 // 得到下一轮的 RDD prPageRank = joinRes.map { case (currentPage, (nextPages, pr)) => (currentPage, nextPages, pr) // 更新 prPageRank 为下一轮的 RDD } } } }
spark任务执行流程
,以reducebykey为例:
将处理流程分为map端和reduce端,在这里可以和Flink做比较
spark的好处是可以在map端对数据进行预聚合(以mapReduce为基本底层原理),先执行map再执行reduce端文章来源:https://www.toymoban.com/news/detail-753668.html
预聚合 -- 减少shuffle中产生的数据量(shuffle是大数据中最慢的处理过程,同时需要网络IO和磁盘IO)文章来源地址https://www.toymoban.com/news/detail-753668.html
到了这里,关于spark -- 数据计算框架的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!