Spark的一些重要概念

这篇具有很好参考价值的文章主要介绍了Spark的一些重要概念。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Shuffle的深入理解

什么是Shuffle,本意为洗牌,在数据处理领域里面,意为将数打散。
问题:shuffle一定有网络传输吗?有网络传输的一定是Shuffle吗?

Shuffle的概念

通过网络将数据传输到多台机器,数据被打散,但是有网络传输,不一定就有shuffle,Shuffle的功能是将具有相同规律的数据按照指定的分区器的分区规则,通过网络,传输到指定的机器的一个分区中,需要注意的是,不是上游的Task发送给下游的Task,而是下游的Task到上游拉取数据。

Spark的一些重要概念

reduceByKey一定会Shuffle吗

不一定,如果一个RDD事先使用了HashPartitioner分区先进行分区,然后再调用reduceByKey方法,使用的也是HashPartitioner,并且没有改变分区数量,调用redcueByKey就不shuffle
如果自定义分区器,多次使用自定义的分区器,并且没有改变分区的数量,为了减少shuffle的次数,提高计算效率,需要重新自定义分区器的equals方法
例如:

//创建RDD,并没有立即读取数据,而是触发Action才会读取数据
val lines = sc.textFile("hdfs://node-1.51doit.cn:9000/words")

val wordAndOne = lines.flatMap(_.split(" ")).map((_, 1))
//先使用HashPartitioner进行partitionBy
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
val partitioned = wordAndOne.partitionBy(partitioner)
//然后再调用reduceByKey
val reduced: RDD[(String, Int)] = partitioned.reduceByKey(_ + _)

reduced.saveAsTextFile("hdfs://node-1.51doit.cn:9000/out-36-82")

Spark的一些重要概念

join一定会Shuffle吗

不一定,join一般情况会shuffle,但是如果两个要join的rdd实现都使用相同的分区去进行分区了,并且join时,依然使用相同类型的分区器,并且没有改变分区数据,那么不shuffle

//通过并行化的方式创建一个RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通过并行化的方式再创建一个RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
//该join一定有shuffle,并且是3个Stage
val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2)


val rdd11 = rdd1.groupByKey()
val rdd22 = rdd2.groupByKey()
//下面的join,没有shuffle
val rdd33 = rdd11.join(rdd22)

rdd33.saveAsTextFile("hdfs://node-1.51doit.cn:9000/out-36-86")

Spark的一些重要概念

shuffle数据的复用

spark在shuffle时,会应用分区器,当读取达到一定大小或整个分区的数据被处理完,会将数据溢写磁盘磁盘(数据文件和索引文件),溢写持磁盘的数据,会保存在Executor所在机器的本地磁盘(默认是保存在/temp目录,也可以配置到其他目录),只要application一直运行,shuffle的中间结果数据就会被保存。如果以后再次触发Action,使用到了以前shuffle的中间结果,那么就不会从源头重新计算而是,而是复用shuffle中间结果,所有说,shuffle是一种特殊的persist,以后再次触发Action,就会跳过前面的Stage,直接读取shuffle的数据,这样可以提高程序的执行效率。

广播变量

广播变量的使用场景

在很多计算场景,经常会遇到两个RDD进行JOIN,如果一个RDD对应的数据比较大,一个RDD对应的数据比较小,如果使用JOIN,那么会shuffle,导致效率变低。广播变量就是将相对较小的数据,先收集到Driver,然后再通过网络广播到属于该Application对应的每个Executor中,以后处理大量数据对应的RDD关联数据,就不用shuffle了,而是直接在内存中关联已经广播好的数据,即通实现mapside join,可以将Driver端的数据广播到属于该application的Executor,然后通过Driver广播变量返回的引用,获取实现广播到Executor的数据

广播变量的特点:广播出去的数据就无法在改变了,在没有Executor中是只读的操作,在每个Executor中,多个Task使用一份广播变量
Spark的一些重要概念

广播变量的实现原理

广播变量是通过BT的方式广播的(TorrentBroadcast),多个Executor可以相互传递数据,可以提高效率
sc.broadcast这个方法是阻塞的(同步的)
广播变量一但广播出去就不能改变,为了以后可以定期的改变要关联的数据,可以定义一个object[单例对象],在函数内使用,并且加一个定时器,然后定期更新数据
广播到Executor的数据,可以在Driver获取到引用,然后这个引用会伴随着每一个Task发送到Executor,然后通过这个引用,获取到事先广播好的数据

序列化问题

序列化问题的场景

spark任务在执行过程中,由于编写的程序不当,任务在执行时,会出序列化问题,通常有以下两种情况,
• 封装数据的Bean没有实现序列化接口(Task已经生成了),在ShuffleWirte之前要将数据溢写磁盘,会抛出异常
• 函数闭包问题,即函数的内部,使用到了外部没有实现序列化的引用(Task没有生成)

数据Bean未实现序列化接口

spark在运算过程中,由于很多场景必须要shuffle,即向数据溢写磁盘并且在网络间进行传输,但是由于封装数据的Bean没有实现序列化接口,就会导致出现序列化的错误!


object C02_CustomSort {

  def main(args: Array[String]): Unit = {

    val sc = SparkUtil.getContext(this.getClass.getSimpleName, true)
    //使用并行化的方式创建RDD
    val lines = sc.parallelize(
      List(
        "laoduan,38,99.99",
        "nianhang,33,99.99",
        "laozhao,18,9999.99"
      )
    )
    val tfBoy: RDD[Boy] = lines.map(line => {
      val fields = line.split(",")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toDouble
      new Boy(name, age, fv) //将数据封装到一个普通的class中
    })

    implicit val ord = new Ordering[Boy] {
      override def compare(x: Boy, y: Boy): Int = {
        if (x.fv == y.fv) {
          x.age - y.age
        } else {
          java.lang.Double.compare(y.fv, x.fv)
        }
      }
    }
    //sortBy会产生shuffle,如果Boy没有实现序列化接口,Shuffle时会报错
    val sorted: RDD[Boy] = tfBoy.sortBy(bean => bean)

    val res = sorted.collect()

    println(res.toBuffer)
  }
}

//如果以后定义bean,建议使用case class
class Boy(val name: String, var age: Int, var fv: Double)  //extends Serializable 
{
  
  override def toString = s"Boy($name, $age, $fv)"
}

函数闭包问题

闭包的现象

在调用RDD的Transformation和Action时,可能会传入自定义的函数,如果函数内部使用到了外部未被序列化的引用,就会报Task无法序列化的错误。原因是spark的Task是在Driver端生成的,并且需要通过网络传输到Executor中,Task本身实现了序列化接口,函数也实现了序列化接口,但是函数内部使用到的外部引用不支持序列化,就会函数导致无法序列化,从而导致Task没法序列化,就无法发送到Executor中了
Spark的一些重要概念

在调用RDD的Transformation或Action是传入函数,第一步就进行检测,即调用sc的clean方法
为了避免错误,在Driver初始化的object或class必须实现序列化接口,不然会报错误

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f) //检测函数是否可以序列化,如果可以直接将函数返回,如果不可以,抛出异常
  new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}
private def ensureSerializable(func: AnyRef): Unit = {
  try {
    if (SparkEnv.get != null) {
      //获取spark执行换的的序列化器,如果函数无法序列化,直接抛出异常,程序退出,根本就没有生成Task
      SparkEnv.get.closureSerializer.newInstance().serialize(func)
    }
  } catch {
    case ex: Exception => throw new SparkException("Task not serializable", ex)
  }
}

在Driver端初始化实现序列化的object

在一个Executor中,多个Task使用同一个object对象,因为在scala中,object就是单例对象,一个Executor中只有一个实例,Task会反序列化多次,但是引用的单例对象只反序列化一次

//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//函数外部定义的一个引用类型(变量)
//RuleObjectSer是一个静态对象,实在第一次使用的时候被初始化了(实在Driver被初始化的)
val rulesObj = RuleObjectSer

//函数实在Driver定义的
val func = (line: String) => {
  val fields = line.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  val name = rulesObj.rulesMap.getOrElse(code, "未知") //闭包
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesObj.toString)
}

//处理数据,关联维度
val res = lines.map(func)
res.saveAsTextFile(args(2))

Spark的一些重要概念

在Driver端初始化实现序列化的class
在一个Executor中,每个Task都会使用自己独享的class实例,因为在scala中,class就是多例,Task会反序列化多次,每个Task引用的class实例也会被序列化

//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//函数外部定义的一个引用类型(变量)
//RuleClassNotSer是一个类,需要new才能实现(实在Driver被初始化的)
val rulesClass = new RuleClassSer

//处理数据,关联维度
val res = lines.map(e => {
  val fields = e.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  val name = rulesClass.rulesMap.getOrElse(code, "未知") //闭包
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesClass.toString)
})

res.saveAsTextFile(args(2))

Spark的一些重要概念

在函数内部初始化未序列化的object
object没有实现序列化接口,不会出现问题,因为该object实现函数内部被初始化的,而不是在Driver初始化的

//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//不再Driver端初始化RuleObjectSer或RuleClassSer
//函数实在Driver定义的
val func = (line: String) => {
  val fields = line.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  //在函数内部初始化没有实现序列化接口的RuleObjectNotSer
  val name = RuleObjectNotSer.rulesMap.getOrElse(code, "未知") 
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, RuleObjectNotSer.toString)
}
//处理数据,关联维度
val res = lines.map(func)
res.saveAsTextFile(args(2))
sc.stop()

Spark的一些重要概念

在函数内部初始化未序列化的class

这种方式非常不好,因为每来一条数据,new一个class的实例,会导致消耗更多资源,jvm会频繁GC

//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))

//处理数据,关联维度
val res = lines.map(e => {
  val fields = e.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  //RuleClassNotSer是在Executor中被初始化的
  val rulesClass = new RuleClassNotSer
  //但是如果每来一条数据new一个RuleClassNotSer,不好,效率低,浪费资源,频繁GC
  val name = rulesClass.rulesMap.getOrElse(code, "未知") 
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesClass.toString)
})

res.saveAsTextFile(args(2))

调用mapPartitions在函数内部初始化未序列化的class

一个分区使用一个class的实例,即每个Task都是自己的class实例

//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//处理数据,关联维度
val res = lines.mapPartitions(it => {
  //RuleClassNotSer是在Executor中被初始化的
  //一个分区的多条数据,使用同一个RuleClassNotSer实例
  val rulesClass = new RuleClassNotSer
  it.map(e => {
    val fields = e.split(",")
    val id = fields(0).toInt
    val code = fields(1)
    val name = rulesClass.rulesMap.getOrElse(code, "未知") 
    //获取当前线程ID
    val treadId = Thread.currentThread().getId
    //获取当前Task对应的分区编号
    val partitiondId = TaskContext.getPartitionId()
    //获取当前Task运行时的所在机器的主机名
    val host = InetAddress.getLocalHost.getHostName
    (id, code, name, treadId, partitiondId, host, rulesClass.toString)
  })
})
res.saveAsTextFile(args(2))
sc.stop()

Spark的一些重要概念

Task线程安全问题

在一个Executor可以同时运行多个Task,如果多个Task使用同一个共享的单例对象,如果对共享的数据同时进行读写操作,会导致线程不安全的问题,为了避免这个问题,可以加锁,但效率变低了,因为在一个Executor中同一个时间点只能有一个Task使用共享的数据,这样就变成了串行了,效率低!

定义一个工具类object,格式化日期,因为SimpleDateFormat线程不安全,会出现异常

val conf = new SparkConf()
  .setAppName("WordCount")
  .setMaster("local[*]") //本地模式,开多个线程
//1.创建SparkContext
val sc = new SparkContext(conf)

val lines = sc.textFile("data/date.txt")

val timeRDD: RDD[Long] = lines.map(e => {
  //将字符串转成long类型时间戳
  //使用自定义的object工具类
  val time: Long = DateUtilObj.parse(e)
  time
})

val res = timeRDD.collect()
println(res.toBuffer)
object DateUtilObj {

  //多个Task使用了一个共享的SimpleDateFormat,SimpleDateFormat是线程不安全

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  //线程安全的
  //val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

  def parse(str: String): Long = {
    //2022-05-23 11:39:30
    sdf.parse(str).getTime
  }

}

上面的程序会出现错误,因为多个Task同时使用一个单例对象格式化日期,报错,如果加锁,程序会变慢,改进后的代码:

val conf = new SparkConf()
  .setAppName("WordCount")
  .setMaster("local[*]") //本地模式,开多个线程
//1.创建SparkContext
val sc = new SparkContext(conf)

val lines = sc.textFile("data/date.txt")

val timeRDD = lines.mapPartitions(it => {
  //一个Task使用自己单独的DateUtilClass实例,缺点是浪费内存资源
  val dataUtil = new DateUtilClass
  it.map(e => {
    dataUtil.parse(e)
  })
})

val res = timeRDD.collect()
println(res.toBuffer)
class DateUtilClass {

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  def parse(str: String): Long = {
    //2022-05-23 11:39:30
    sdf.parse(str).getTime
  }
}

改进后,一个Task使用一个DateUtilClass实例,不会出现线程安全的问题。

累加器

累加器是Spark中用来做计数功能的,在程序运行过程当中,可以做一些额外的数据指标统计

触发一次Action,并且将附带的统计指标计算出来,可以使用Accumulator进行处理,Accumulator的本质数一个实现序列化接口class,每个Task都有自己的累加器,避免累加的数据发送冲突文章来源地址https://www.toymoban.com/news/detail-522617.html

object C14_AccumulatorDemo3 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local[*]") //本地模式,开多个线程
    //1.创建SparkContext
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
    //在Driver定义一个特殊的变量,即累加器
    //Accumulator可以将每个分区的计数结果,通过网络传输到Driver,然后进行全局求和
    val accumulator: LongAccumulator = sc.longAccumulator("even-acc")
    val rdd2 = rdd1.map(e => {
      if (e % 2 == 0) {
        accumulator.add(1)  //闭包,在Executor中累计的
      }
      e * 10
    })

    //就触发一次Action
    rdd2.saveAsTextFile("out/113")

    //每个Task中累计的数据会返回到Driver吗?
    println(accumulator.count) 
  }
}

到了这里,关于Spark的一些重要概念的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 深入理解 Spark(四)Spark 内存管理模型

    深入理解 Spark(四)Spark 内存管理模型

    Executor 进程作为一个 JVM 进程,其内存管理建立在 JVM 的内存管理之上,整个大致包含两种方式:堆内内存和堆外内存。 一个 Executor 当中的所有 Task 是共享堆内内存的。一个 Work 中的多个 Executor 中的多个 Task 是共享堆外内存的。 堆内内存和堆外内存 大数据领域两个比较常见

    2024年01月24日
    浏览(10)
  • 深入理解Linux权限管理:保护系统安全的重要措施

    Linux操作系统以其稳定性、可靠性和灵活性而受到广泛使用。其中一个关键特性是其强大的权限管理系统,它可以保护系统资源和用户数据的安全性。本文将深入探讨Linux权限管理的概念、原则和实践,帮助您理解如何正确配置和管理权限,以确保系统的安全性和完整性。 第

    2024年02月11日
    浏览(8)
  • 深入理解哈希表:数据结构中的重要角色

    目录 一. 哈希表的原理与结构 哈希函数 存储数组 哈希冲突与解决方法 总结 二. 哈希函数的作用与设计 哈希函数的作用: 哈希函数的设计: 常见的哈希函数设计方法包括: 三. 哈希冲突与解决方法 1. 开放寻址法(Open Addressing) 2. 链地址法(Chaining) 四. 哈希表的应用 五

    2024年02月11日
    浏览(11)
  • 【Spark精讲】Spark Shuffle详解

    【Spark精讲】Spark Shuffle详解

    目录 Shuffle概述 Shuffle执行流程 总体流程 中间文件 ShuffledRDD生成 Stage划分 Task划分 Map端写入(Shuffle Write) Reduce端读取(Shuffle Read) Spark Shuffle演变 SortShuffleManager运行机制 普通运行机制 bypass 运行机制 Tungsten Sort Shuffle 运行机制 基于Sort的Shuffle机制的优缺点 Shuffle调优 广播变量 shu

    2024年02月02日
    浏览(9)
  • 深入探讨进程间通信的重要性:理解不同的通信机制(下)

    深入探讨进程间通信的重要性:理解不同的通信机制(下)

    在上一篇文章中,我们探讨了进程间通信的三种常见机制:管道、消息队列和共享内存。我们了解到,这些机制各有其特点和适用场景,可以根据实际需求选择合适的机制进行进程间通信。然而,进程间通信并不仅限于这三种方式。 在本文中,我们将继续探索进程间通信的知

    2024年02月10日
    浏览(12)
  • 深入探讨进程间通信的重要性:理解不同的通信机制(上)

    深入探讨进程间通信的重要性:理解不同的通信机制(上)

    在操作系统中,进程间通信是指不同进程之间进行信息共享、数据传输和消息通知等交互的过程。每个进程在创建时都有自己独立的虚拟地址空间,但它们共享内核空间。因此,要实现进程间的通信,必须通过内核来进行中介,如下图所示: 在Linux系统中,提供了多种进程间

    2024年02月10日
    浏览(10)
  • Spark九:Spark调优之Shuffle调优

    map端和reduce端缓存大小设置,reduce端重试次数和等待时间间隔,以及bypass设置 学习资料:https://mp.weixin.qq.com/s/caCk3mM5iXy0FaXCLkDwYQ 在Spark任务运行过程中,如果shuffle的map端处理的数据量比较大,但是map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁spill溢写到磁盘文件中

    2024年01月20日
    浏览(13)
  • 深入理解 Spark(一)spark 运行模式简介与启动流程源码分析

    深入理解 Spark(一)spark 运行模式简介与启动流程源码分析

    以 standalone-client 为例,运行过程如下: SparkContext 连接到 Master,向 Master 注册并申请资源(CPU Core 和 Memory); Master 根据 SparkContext 的资源申请要求和 Worker 心跳周期内报告的信息决定在哪个 Worker 上分配资源,然后在该 Worker 上获取资源,然后启动 StandaloneExecutorBackend; Stan

    2024年02月02日
    浏览(21)
  • Spark学习笔记【shuffle】

    Spark学习笔记【shuffle】

    本文基本上是大数据处理框架Apache Spark设计与实现的Shuffle部分的学习。以及Spark基础知识@Bambrow 上游和下游,不同stage,不同的task之间是如何传递数据的。 ShuffleManager 管理ShuffleWrite和ShuffleRead 分为两个阶段: ShuffleWrite 上游stage输出的分区问题。 ShuffleRead 下游stage从上游获取

    2024年02月03日
    浏览(11)
  • Spark Shuffle 过程

    Spark Shuffle 过程

    本篇主要阐述Spark Shuffle过程,在执行 Job 任务时,无论是 MapReduce 或者 Spark Shuffle 过程都是比较消耗性能;因为该环节包含了大量的磁盘 IO、序列化、网络数据传输等操作。因此,在这一过程中进行调参优化,就有可能让 Job 执行效率上更好。 在 Spark 1.2 以前,默认的 Shuffle

    2023年04月08日
    浏览(10)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包