Spark的一些重要概念

这篇具有很好参考价值的文章主要介绍了Spark的一些重要概念。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Shuffle的深入理解

什么是Shuffle,本意为洗牌,在数据处理领域里面,意为将数打散。
问题:shuffle一定有网络传输吗?有网络传输的一定是Shuffle吗?

Shuffle的概念

通过网络将数据传输到多台机器,数据被打散,但是有网络传输,不一定就有shuffle,Shuffle的功能是将具有相同规律的数据按照指定的分区器的分区规则,通过网络,传输到指定的机器的一个分区中,需要注意的是,不是上游的Task发送给下游的Task,而是下游的Task到上游拉取数据。

Spark的一些重要概念

reduceByKey一定会Shuffle吗

不一定,如果一个RDD事先使用了HashPartitioner分区先进行分区,然后再调用reduceByKey方法,使用的也是HashPartitioner,并且没有改变分区数量,调用redcueByKey就不shuffle
如果自定义分区器,多次使用自定义的分区器,并且没有改变分区的数量,为了减少shuffle的次数,提高计算效率,需要重新自定义分区器的equals方法
例如:

//创建RDD,并没有立即读取数据,而是触发Action才会读取数据
val lines = sc.textFile("hdfs://node-1.51doit.cn:9000/words")

val wordAndOne = lines.flatMap(_.split(" ")).map((_, 1))
//先使用HashPartitioner进行partitionBy
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
val partitioned = wordAndOne.partitionBy(partitioner)
//然后再调用reduceByKey
val reduced: RDD[(String, Int)] = partitioned.reduceByKey(_ + _)

reduced.saveAsTextFile("hdfs://node-1.51doit.cn:9000/out-36-82")

Spark的一些重要概念

join一定会Shuffle吗

不一定,join一般情况会shuffle,但是如果两个要join的rdd实现都使用相同的分区去进行分区了,并且join时,依然使用相同类型的分区器,并且没有改变分区数据,那么不shuffle

//通过并行化的方式创建一个RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通过并行化的方式再创建一个RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
//该join一定有shuffle,并且是3个Stage
val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2)


val rdd11 = rdd1.groupByKey()
val rdd22 = rdd2.groupByKey()
//下面的join,没有shuffle
val rdd33 = rdd11.join(rdd22)

rdd33.saveAsTextFile("hdfs://node-1.51doit.cn:9000/out-36-86")

Spark的一些重要概念

shuffle数据的复用

spark在shuffle时,会应用分区器,当读取达到一定大小或整个分区的数据被处理完,会将数据溢写磁盘磁盘(数据文件和索引文件),溢写持磁盘的数据,会保存在Executor所在机器的本地磁盘(默认是保存在/temp目录,也可以配置到其他目录),只要application一直运行,shuffle的中间结果数据就会被保存。如果以后再次触发Action,使用到了以前shuffle的中间结果,那么就不会从源头重新计算而是,而是复用shuffle中间结果,所有说,shuffle是一种特殊的persist,以后再次触发Action,就会跳过前面的Stage,直接读取shuffle的数据,这样可以提高程序的执行效率。

广播变量

广播变量的使用场景

在很多计算场景,经常会遇到两个RDD进行JOIN,如果一个RDD对应的数据比较大,一个RDD对应的数据比较小,如果使用JOIN,那么会shuffle,导致效率变低。广播变量就是将相对较小的数据,先收集到Driver,然后再通过网络广播到属于该Application对应的每个Executor中,以后处理大量数据对应的RDD关联数据,就不用shuffle了,而是直接在内存中关联已经广播好的数据,即通实现mapside join,可以将Driver端的数据广播到属于该application的Executor,然后通过Driver广播变量返回的引用,获取实现广播到Executor的数据

广播变量的特点:广播出去的数据就无法在改变了,在没有Executor中是只读的操作,在每个Executor中,多个Task使用一份广播变量
Spark的一些重要概念

广播变量的实现原理

广播变量是通过BT的方式广播的(TorrentBroadcast),多个Executor可以相互传递数据,可以提高效率
sc.broadcast这个方法是阻塞的(同步的)
广播变量一但广播出去就不能改变,为了以后可以定期的改变要关联的数据,可以定义一个object[单例对象],在函数内使用,并且加一个定时器,然后定期更新数据
广播到Executor的数据,可以在Driver获取到引用,然后这个引用会伴随着每一个Task发送到Executor,然后通过这个引用,获取到事先广播好的数据

序列化问题

序列化问题的场景

spark任务在执行过程中,由于编写的程序不当,任务在执行时,会出序列化问题,通常有以下两种情况,
• 封装数据的Bean没有实现序列化接口(Task已经生成了),在ShuffleWirte之前要将数据溢写磁盘,会抛出异常
• 函数闭包问题,即函数的内部,使用到了外部没有实现序列化的引用(Task没有生成)

数据Bean未实现序列化接口

spark在运算过程中,由于很多场景必须要shuffle,即向数据溢写磁盘并且在网络间进行传输,但是由于封装数据的Bean没有实现序列化接口,就会导致出现序列化的错误!


object C02_CustomSort {

  def main(args: Array[String]): Unit = {

    val sc = SparkUtil.getContext(this.getClass.getSimpleName, true)
    //使用并行化的方式创建RDD
    val lines = sc.parallelize(
      List(
        "laoduan,38,99.99",
        "nianhang,33,99.99",
        "laozhao,18,9999.99"
      )
    )
    val tfBoy: RDD[Boy] = lines.map(line => {
      val fields = line.split(",")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toDouble
      new Boy(name, age, fv) //将数据封装到一个普通的class中
    })

    implicit val ord = new Ordering[Boy] {
      override def compare(x: Boy, y: Boy): Int = {
        if (x.fv == y.fv) {
          x.age - y.age
        } else {
          java.lang.Double.compare(y.fv, x.fv)
        }
      }
    }
    //sortBy会产生shuffle,如果Boy没有实现序列化接口,Shuffle时会报错
    val sorted: RDD[Boy] = tfBoy.sortBy(bean => bean)

    val res = sorted.collect()

    println(res.toBuffer)
  }
}

//如果以后定义bean,建议使用case class
class Boy(val name: String, var age: Int, var fv: Double)  //extends Serializable 
{
  
  override def toString = s"Boy($name, $age, $fv)"
}

函数闭包问题

闭包的现象

在调用RDD的Transformation和Action时,可能会传入自定义的函数,如果函数内部使用到了外部未被序列化的引用,就会报Task无法序列化的错误。原因是spark的Task是在Driver端生成的,并且需要通过网络传输到Executor中,Task本身实现了序列化接口,函数也实现了序列化接口,但是函数内部使用到的外部引用不支持序列化,就会函数导致无法序列化,从而导致Task没法序列化,就无法发送到Executor中了
Spark的一些重要概念

在调用RDD的Transformation或Action是传入函数,第一步就进行检测,即调用sc的clean方法
为了避免错误,在Driver初始化的object或class必须实现序列化接口,不然会报错误

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f) //检测函数是否可以序列化,如果可以直接将函数返回,如果不可以,抛出异常
  new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}
private def ensureSerializable(func: AnyRef): Unit = {
  try {
    if (SparkEnv.get != null) {
      //获取spark执行换的的序列化器,如果函数无法序列化,直接抛出异常,程序退出,根本就没有生成Task
      SparkEnv.get.closureSerializer.newInstance().serialize(func)
    }
  } catch {
    case ex: Exception => throw new SparkException("Task not serializable", ex)
  }
}

在Driver端初始化实现序列化的object

在一个Executor中,多个Task使用同一个object对象,因为在scala中,object就是单例对象,一个Executor中只有一个实例,Task会反序列化多次,但是引用的单例对象只反序列化一次

//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//函数外部定义的一个引用类型(变量)
//RuleObjectSer是一个静态对象,实在第一次使用的时候被初始化了(实在Driver被初始化的)
val rulesObj = RuleObjectSer

//函数实在Driver定义的
val func = (line: String) => {
  val fields = line.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  val name = rulesObj.rulesMap.getOrElse(code, "未知") //闭包
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesObj.toString)
}

//处理数据,关联维度
val res = lines.map(func)
res.saveAsTextFile(args(2))

Spark的一些重要概念

在Driver端初始化实现序列化的class
在一个Executor中,每个Task都会使用自己独享的class实例,因为在scala中,class就是多例,Task会反序列化多次,每个Task引用的class实例也会被序列化

//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//函数外部定义的一个引用类型(变量)
//RuleClassNotSer是一个类,需要new才能实现(实在Driver被初始化的)
val rulesClass = new RuleClassSer

//处理数据,关联维度
val res = lines.map(e => {
  val fields = e.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  val name = rulesClass.rulesMap.getOrElse(code, "未知") //闭包
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesClass.toString)
})

res.saveAsTextFile(args(2))

Spark的一些重要概念

在函数内部初始化未序列化的object
object没有实现序列化接口,不会出现问题,因为该object实现函数内部被初始化的,而不是在Driver初始化的

//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//不再Driver端初始化RuleObjectSer或RuleClassSer
//函数实在Driver定义的
val func = (line: String) => {
  val fields = line.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  //在函数内部初始化没有实现序列化接口的RuleObjectNotSer
  val name = RuleObjectNotSer.rulesMap.getOrElse(code, "未知") 
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, RuleObjectNotSer.toString)
}
//处理数据,关联维度
val res = lines.map(func)
res.saveAsTextFile(args(2))
sc.stop()

Spark的一些重要概念

在函数内部初始化未序列化的class

这种方式非常不好,因为每来一条数据,new一个class的实例,会导致消耗更多资源,jvm会频繁GC

//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))

//处理数据,关联维度
val res = lines.map(e => {
  val fields = e.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  //RuleClassNotSer是在Executor中被初始化的
  val rulesClass = new RuleClassNotSer
  //但是如果每来一条数据new一个RuleClassNotSer,不好,效率低,浪费资源,频繁GC
  val name = rulesClass.rulesMap.getOrElse(code, "未知") 
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesClass.toString)
})

res.saveAsTextFile(args(2))

调用mapPartitions在函数内部初始化未序列化的class

一个分区使用一个class的实例,即每个Task都是自己的class实例

//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//处理数据,关联维度
val res = lines.mapPartitions(it => {
  //RuleClassNotSer是在Executor中被初始化的
  //一个分区的多条数据,使用同一个RuleClassNotSer实例
  val rulesClass = new RuleClassNotSer
  it.map(e => {
    val fields = e.split(",")
    val id = fields(0).toInt
    val code = fields(1)
    val name = rulesClass.rulesMap.getOrElse(code, "未知") 
    //获取当前线程ID
    val treadId = Thread.currentThread().getId
    //获取当前Task对应的分区编号
    val partitiondId = TaskContext.getPartitionId()
    //获取当前Task运行时的所在机器的主机名
    val host = InetAddress.getLocalHost.getHostName
    (id, code, name, treadId, partitiondId, host, rulesClass.toString)
  })
})
res.saveAsTextFile(args(2))
sc.stop()

Spark的一些重要概念

Task线程安全问题

在一个Executor可以同时运行多个Task,如果多个Task使用同一个共享的单例对象,如果对共享的数据同时进行读写操作,会导致线程不安全的问题,为了避免这个问题,可以加锁,但效率变低了,因为在一个Executor中同一个时间点只能有一个Task使用共享的数据,这样就变成了串行了,效率低!

定义一个工具类object,格式化日期,因为SimpleDateFormat线程不安全,会出现异常

val conf = new SparkConf()
  .setAppName("WordCount")
  .setMaster("local[*]") //本地模式,开多个线程
//1.创建SparkContext
val sc = new SparkContext(conf)

val lines = sc.textFile("data/date.txt")

val timeRDD: RDD[Long] = lines.map(e => {
  //将字符串转成long类型时间戳
  //使用自定义的object工具类
  val time: Long = DateUtilObj.parse(e)
  time
})

val res = timeRDD.collect()
println(res.toBuffer)
object DateUtilObj {

  //多个Task使用了一个共享的SimpleDateFormat,SimpleDateFormat是线程不安全

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  //线程安全的
  //val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

  def parse(str: String): Long = {
    //2022-05-23 11:39:30
    sdf.parse(str).getTime
  }

}

上面的程序会出现错误,因为多个Task同时使用一个单例对象格式化日期,报错,如果加锁,程序会变慢,改进后的代码:

val conf = new SparkConf()
  .setAppName("WordCount")
  .setMaster("local[*]") //本地模式,开多个线程
//1.创建SparkContext
val sc = new SparkContext(conf)

val lines = sc.textFile("data/date.txt")

val timeRDD = lines.mapPartitions(it => {
  //一个Task使用自己单独的DateUtilClass实例,缺点是浪费内存资源
  val dataUtil = new DateUtilClass
  it.map(e => {
    dataUtil.parse(e)
  })
})

val res = timeRDD.collect()
println(res.toBuffer)
class DateUtilClass {

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  def parse(str: String): Long = {
    //2022-05-23 11:39:30
    sdf.parse(str).getTime
  }
}

改进后,一个Task使用一个DateUtilClass实例,不会出现线程安全的问题。

累加器

累加器是Spark中用来做计数功能的,在程序运行过程当中,可以做一些额外的数据指标统计

触发一次Action,并且将附带的统计指标计算出来,可以使用Accumulator进行处理,Accumulator的本质数一个实现序列化接口class,每个Task都有自己的累加器,避免累加的数据发送冲突文章来源地址https://www.toymoban.com/news/detail-522617.html

object C14_AccumulatorDemo3 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local[*]") //本地模式,开多个线程
    //1.创建SparkContext
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
    //在Driver定义一个特殊的变量,即累加器
    //Accumulator可以将每个分区的计数结果,通过网络传输到Driver,然后进行全局求和
    val accumulator: LongAccumulator = sc.longAccumulator("even-acc")
    val rdd2 = rdd1.map(e => {
      if (e % 2 == 0) {
        accumulator.add(1)  //闭包,在Executor中累计的
      }
      e * 10
    })

    //就触发一次Action
    rdd2.saveAsTextFile("out/113")

    //每个Task中累计的数据会返回到Driver吗?
    println(accumulator.count) 
  }
}

到了这里,关于Spark的一些重要概念的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 深入理解 Spark(四)Spark 内存管理模型

    Executor 进程作为一个 JVM 进程,其内存管理建立在 JVM 的内存管理之上,整个大致包含两种方式:堆内内存和堆外内存。 一个 Executor 当中的所有 Task 是共享堆内内存的。一个 Work 中的多个 Executor 中的多个 Task 是共享堆外内存的。 堆内内存和堆外内存 大数据领域两个比较常见

    2024年01月24日
    浏览(35)
  • 深入理解Linux权限管理:保护系统安全的重要措施

    Linux操作系统以其稳定性、可靠性和灵活性而受到广泛使用。其中一个关键特性是其强大的权限管理系统,它可以保护系统资源和用户数据的安全性。本文将深入探讨Linux权限管理的概念、原则和实践,帮助您理解如何正确配置和管理权限,以确保系统的安全性和完整性。 第

    2024年02月11日
    浏览(37)
  • 深入理解哈希表:数据结构中的重要角色

    目录 一. 哈希表的原理与结构 哈希函数 存储数组 哈希冲突与解决方法 总结 二. 哈希函数的作用与设计 哈希函数的作用: 哈希函数的设计: 常见的哈希函数设计方法包括: 三. 哈希冲突与解决方法 1. 开放寻址法(Open Addressing) 2. 链地址法(Chaining) 四. 哈希表的应用 五

    2024年02月11日
    浏览(41)
  • 【Spark精讲】Spark Shuffle详解

    目录 Shuffle概述 Shuffle执行流程 总体流程 中间文件 ShuffledRDD生成 Stage划分 Task划分 Map端写入(Shuffle Write) Reduce端读取(Shuffle Read) Spark Shuffle演变 SortShuffleManager运行机制 普通运行机制 bypass 运行机制 Tungsten Sort Shuffle 运行机制 基于Sort的Shuffle机制的优缺点 Shuffle调优 广播变量 shu

    2024年02月02日
    浏览(37)
  • 深入探讨进程间通信的重要性:理解不同的通信机制(下)

    在上一篇文章中,我们探讨了进程间通信的三种常见机制:管道、消息队列和共享内存。我们了解到,这些机制各有其特点和适用场景,可以根据实际需求选择合适的机制进行进程间通信。然而,进程间通信并不仅限于这三种方式。 在本文中,我们将继续探索进程间通信的知

    2024年02月10日
    浏览(32)
  • 深入探讨进程间通信的重要性:理解不同的通信机制(上)

    在操作系统中,进程间通信是指不同进程之间进行信息共享、数据传输和消息通知等交互的过程。每个进程在创建时都有自己独立的虚拟地址空间,但它们共享内核空间。因此,要实现进程间的通信,必须通过内核来进行中介,如下图所示: 在Linux系统中,提供了多种进程间

    2024年02月10日
    浏览(32)
  • Spark九:Spark调优之Shuffle调优

    map端和reduce端缓存大小设置,reduce端重试次数和等待时间间隔,以及bypass设置 学习资料:https://mp.weixin.qq.com/s/caCk3mM5iXy0FaXCLkDwYQ 在Spark任务运行过程中,如果shuffle的map端处理的数据量比较大,但是map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁spill溢写到磁盘文件中

    2024年01月20日
    浏览(31)
  • 深入理解 Spark(一)spark 运行模式简介与启动流程源码分析

    以 standalone-client 为例,运行过程如下: SparkContext 连接到 Master,向 Master 注册并申请资源(CPU Core 和 Memory); Master 根据 SparkContext 的资源申请要求和 Worker 心跳周期内报告的信息决定在哪个 Worker 上分配资源,然后在该 Worker 上获取资源,然后启动 StandaloneExecutorBackend; Stan

    2024年02月02日
    浏览(27)
  • Spark Shuffle 过程

    本篇主要阐述Spark Shuffle过程,在执行 Job 任务时,无论是 MapReduce 或者 Spark Shuffle 过程都是比较消耗性能;因为该环节包含了大量的磁盘 IO、序列化、网络数据传输等操作。因此,在这一过程中进行调参优化,就有可能让 Job 执行效率上更好。 在 Spark 1.2 以前,默认的 Shuffle

    2023年04月08日
    浏览(28)
  • Spark学习笔记【shuffle】

    本文基本上是大数据处理框架Apache Spark设计与实现的Shuffle部分的学习。以及Spark基础知识@Bambrow 上游和下游,不同stage,不同的task之间是如何传递数据的。 ShuffleManager 管理ShuffleWrite和ShuffleRead 分为两个阶段: ShuffleWrite 上游stage输出的分区问题。 ShuffleRead 下游stage从上游获取

    2024年02月03日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包