spark12-13-14

这篇具有很好参考价值的文章主要介绍了spark12-13-14。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

12. Task线程安全问题

12.1 现象和原理

在一个Executor可以同时运行多个Task,如果多个Task使用同一个共享的单例对象,如果对共享的数据同时进行读写操作,会导致线程不安全的问题,为了避免这个问题,可以加锁,但效率变低了,因为在一个Executor中同一个时间点只能有一个Task使用共享的数据,这样就变成了串行了,效率低!

12.2 案例

定义一个工具类object,格式化日期,因为SimpleDateFormat线程不安全,会出现异常

Scala
val conf = new SparkConf()
  .setAppName("WordCount")
  .setMaster("local[*]") //本地模式,开多个线程
//1.创建SparkContext
val sc = new SparkContext(conf)

val lines = sc.textFile("data/date.txt")

val timeRDD: RDD[Long] = lines.map(e => {
  //将字符串转成long类型时间戳
  //使用自定义的object工具类
  val time: Long = DateUtilObj.parse(e)
  time
})

val res = timeRDD.collect()
println(res.toBuffer)

Scala
object DateUtilObj {

  //多个Task使用了一个共享的SimpleDateFormat,SimpleDateFormat是线程不安全

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  //线程安全的
  //val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

  def parse(str: String): Long = {
    //2022-05-23 11:39:30
    sdf.parse(str).getTime
  }

}

上面的程序会出现错误,因为多个Task同时使用一个单例对象格式化日期,报错,如果加锁,程序会变慢,改进后的代码:

Scala
val conf = new SparkConf()
  .setAppName("WordCount")
  .setMaster("local[*]") //本地模式,开多个线程
//1.创建SparkContext
val sc = new SparkContext(conf)

val lines = sc.textFile("data/date.txt")

val timeRDD = lines.mapPartitions(it => {
  //一个Task使用自己单独的DateUtilClass实例,缺点是浪费内存资源
  val dataUtil = new DateUtilClass
  it.map(e => {
    dataUtil.parse(e)
  })
})

val res = timeRDD.collect()
println(res.toBuffer)

Scala
class DateUtilClass {

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  def parse(str: String): Long = {
    //2022-05-23 11:39:30
    sdf.parse(str).getTime
  }
}

改进后,一个Task使用一个DateUtilClass实例,不会出现线程安全的问题。

13. 累加器

累加器是Spark中用来做计数功能的,在程序运行过程当中,可以做一些额外的数据指标统计

需求:在处理数据的同时,统计一下指标数据,具体的需求为:将RDD中对应的每个元素乘以10,同时在统计每个分区中偶数的数据

13.1 不使用累加器的方案

需要多次触发Action,效率低,数据会被重复计算

Scala
/**
 * 不使用累加器,而是触发两次Action
 */
object C12_AccumulatorDemo1 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local[*]") //本地模式,开多个线程
    //1.创建SparkContext
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
    //对数据进行转换操作(将每个元素乘以10),同时还要统计每个分区的偶数的数量
    val rdd2 = rdd1.map(_ * 10)
    //第一次触发Action
    rdd2.saveAsTextFile("out/111")

    //附加的指标统计
    val rdd3 = rdd1.filter(_ % 2 == 0)
    //第二个触发Action
    val c = rdd3.count()
    println(c)
  }
}

13.2 使用累加器的方法

触发一次Action,并且将附带的统计指标计算出来,可以使用Accumulator进行处理,Accumulator的本质数一个实现序列化接口class,每个Task都有自己的累加器,避免累加的数据发送冲突

Scala
object C14_AccumulatorDemo3 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local[*]") //本地模式,开多个线程
    //1.创建SparkContext
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
    //在Driver定义一个特殊的变量,即累加器
    //Accumulator可以将每个分区的计数结果,通过网络传输到Driver,然后进行全局求和
    val accumulator: LongAccumulator = sc.longAccumulator("even-acc")
    val rdd2 = rdd1.map(e => {
      if (e % 2 == 0) {
        accumulator.add(1)  //闭包,在Executor中累计的
      }
      e * 10
    })

    //就触发一次Action
    rdd2.saveAsTextFile("out/113")

    //每个Task中累计的数据会返回到Driver吗?
    println(accumulator.count)
  }
}

14. StandAlone的两种执行模式

spark自动的StandAlone集群有两种运行方式,分别是client模式和cluster模式,默认使用的是client模式。两种运行模式的本质区别是,Driver运行在哪里了

14.1 什么是Driver

Driver本意是驱动的意思(类似叫法的有MySQL的连接驱动),在就是与集群中的服务建立连接,执行一些命令和请求的。但是在Spark的Driver指定就是SparkContext和里面创建的一些对象,所有可以总结为,SparkContext在哪里创建,Driver就在哪里。Driver中包含很多的对象实例,有SparkContext,DAGScheduler、TaskScheduler、ShuffleManager、BroadCastManager等,Driver是对这些对象的统称。

14.2 client模式

Driver运行在用来提交任务的SparkSubmit进程中,在Spark的stand alone集群中,提交spark任务时,可以使用cluster模式即--deploy-mode client (默认的)

spark12-13-14

 

spark12-13-14

 

注意:spark-shell只能以client模式运行,不能以cluster模式运行,因为提交任务的命令行客户端和SparkContext必须在同一个进程中。

spark12-13-14

 

14.3 cluster模式

Driver运行在Worker启动的一个进程中,这个进程叫DriverWapper,在Spark的stand alone集群中,提交spark任务时,可以使用cluster模式即--deploy-mode cluster

特点:Driver运行在集群中,不在SparkSubmit进程中,需要将jar包上传到hdfs中

Shell
spark-submit --master spark://node-1.51doit.cn:7077 --class cn._51doit.spark.day01.WordCount --deploy-mode cluster hdfs://node-1.51doit.cn:9000/jars/spark10-1.0-SNAPSHOT.jar hdfs://node-1.51doit.cn:9000/wc hdfs://node-1.51doit.cn:9000/out002

spark12-13-14

 

cluster模式的特点:可以给Driver更灵活的指定一些参数,可以给Driver指定内存大小,cores的数量

如果一些运算要在Driver进行计算,或者将数据收集到Driver端,这样就必须指定Driver的内存和cores更大一些

Shell
# 指定Driver的内存,默认是1g
--driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
# 指定Driver的cores,默认是1
--driver-cores NUM          Number of cores used by the driver, only in cluster mode (Default: 1).文章来源地址https://www.toymoban.com/news/detail-501788.html

到了这里,关于spark12-13-14的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 如何在 iPhone 15/14/13/12/11/XS/XR 上恢复误删除的短信?

    无论你的iPhone功能多么强大,数据丢失的情况仍然时有发生,所以当你发现一些重要的消息有一天丢失了。别担心,让自己冷静下来,然后按照本页的方法轻松从 iPhone 中检索已删除的短信。 在这里,您需要奇客数据恢复iPhone版 的帮助。该工具是一款针对 iPhone、iPad 或 iPod

    2024年04月09日
    浏览(95)
  • Upgrading the Qlik Sense Repository Database from 9.6 to 12/13/14

     Upgrading Qlik Sense Repository Database using the Qlik PostgreSQL Installer In this article, we walk you through the requirements and process of how to upgrade an existing Qlik Sense Repository Database (see supported scenarios) as well as how to install a brand new Repository based on PostgreSQL. We will use the  Qlik PostgreSQL Installer  (QPI). For

    2024年02月09日
    浏览(73)
  • Unc0ver 8.0.0 更新:支持 iOS 14.6-14.8、A12-A13 iPhone 越狱

    今日,越狱工具 unc0ver 发布了全新的 8.0.0 版本。现在支持 iOS 14.6-14.8 版本越狱,需要搭载 A12-A13 芯片的 iPhone 手机。 本次越狱支持的系统版本 : iOS 14.6 、 iOS 14.7 、 iOS 14.7.1 、 iOS 14.8 这4种版本, iOS 15.0 以上系统请等待官方更新,出的时候我也会立马发文章。 支持的设备型

    2024年02月05日
    浏览(52)
  • XCode14线程警告问题

    最近收到了一些iOS16系统下的报错,DSYM之后发现是阿里云日志上传的地方出现了警告,调试的时候给出了警告,且这个问题为非必现,占比大概10%~20%,问题如下: 这个问题意思大概就是,我们运行在 QOS_CLASS_USER_INITIATED的线程,前方正在运行着一个QOS_CLASS_DEFAULT的低QoS线程,

    2024年02月11日
    浏览(28)
  • ESXI安装MacOS系统,VMware安装MacOs系统,MacOs10.14.iso下载,MacOS10.13下载,MacOS10.12下载,unlocker下载

    ESXI 6.5.0 VMware Workstation 15 Pro 15.5.0 自己本机装的一个,可以\\\"连接远程服务器\\\"方便远程操控ESXI MacOs镜像 要ISO格式的镜像,下面提供三个ISO镜像,随意使用。 macOS.Mojave10.14 版本(本人使用的就是当前版本) 链接:https://pan.baidu.com/s/1PC98hV-urGjZnwT3v8pr-A 提取码:vepy macOS High Sierra 10.13

    2024年02月13日
    浏览(35)
  • Pytorch学习第二周--Day 12-13: 构建你的第一个神经网络

    Day 12-13: 构建你的第一个神经网络 在这两天里,我动手实践构建了我的第一个神经网络,目的是解决一个基本的分类问题。使用了两个主流的深度学习框架:PyTorch和TensorFlow,以对比和理解它们在神经网络构建方面的不同。 目标:构建一个全连接的神经网络来处理分类问题。

    2024年01月20日
    浏览(30)
  • 解决高通 Android 12/13 ota升级失败问题

    1、 Android adb push ota全量包 如下图所示 2、当前设备是a分区 如下图所示  3、adb root -adb enable-verity-adb reboot  如下图所示  4、adb ota包升级成功 升级完成之后记得 reboot 重启一下 如下图所示 5、当前设备成功切换b分区 如下图所示  6、到这里基本就结束了, ota 升级相关 Andr

    2024年02月06日
    浏览(52)
  • JAVA数据结构篇--13线程安全的Set 集合

    前言:java 中用于存放不重复元素的set 集合,其中无序的HashSet,以及有序的LinkedHashSet和TreeSet 都是非线程安全的,那么多线程环境下,我们要存放不重复的元素,需要使用哪种集合进行数据存取; 1 使用: 2 过程: 2.1 放入获取元素: Collections.synchronizedSet:通过使用synchron

    2024年02月16日
    浏览(31)
  • 潇洒郎: 小白一次性成功——红米 Note 12 5G Android12 系统13.0.16/14.0.9 小米红米手机解BL锁+ROOT-刷面具—官方ROM下载-线刷降级—解锁system系统分区

    下载工具 申请解锁小米手机 (miui.com) 驱动安装进入Fastboot模式后,会自动识别已连接, 否则显示未连接

    2024年01月25日
    浏览(87)
  • 关于Android 11、12和13服务保活问题

    物联网环境,为了解决不同厂商、不同设备、不同网络情况下使用顺畅,同时也考虑到节约成本,缩小应用体积的好处,我们需要一个服务应用一直存在系统中,保活它以提供服务给其他客户端调用。 开机自启动,通过广播通信, 必要权限 开机自启动Service相关代码 注意

    2023年04月08日
    浏览(73)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包