Spark---累加器和广播变量

这篇具有很好参考价值的文章主要介绍了Spark---累加器和广播变量。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.累加器实现原理

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。

    //建立与Spark框架的连接
    val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件
    val context = new SparkContext(wordCount) //读取配置文件

    val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)
    var sum=0
    dataRdd.foreach(num=>sum+=num)

    println(sum)
    context.stop()

运行结果:
Spark---累加器和广播变量,大数据,scala,spark,spark,大数据,scala
我们预期是想要实现数据的累加,开始数据从Driver被传输到了Executor中进行计算,但是每个分区在累加数据完成之后并没有将计算结果返回到Driver端,所以导致最后的结果与预期的不一致。
Spark---累加器和广播变量,大数据,scala,spark,spark,大数据,scala
对上述代码使用累加器

    val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4))
    val sum = context.longAccumulator("sum")
    dataRdd.foreach(num=>{
      //使用累加器
      sum.add(num)
    })

    //获取累加器的值
    println(sum.value)

运行结果:
Spark---累加器和广播变量,大数据,scala,spark,spark,大数据,scala
由此可见,在使用了累加器之后,每个Executor在开始都会获得这个累加器变量,每个Executor在执行完成后,累加器会将每个Executor中累加器变量的值聚合到Driver端。
Spark---累加器和广播变量,大数据,scala,spark,spark,大数据,scala

Spark提供了多种类型的累加器,以下是其中的一些:
Spark---累加器和广播变量,大数据,scala,spark,spark,大数据,scala

2.自定义累加器

用户可以通过继承AccumulatorV2来自定义累加器。需求:自定义累加器实现WordCount案例。

AccumulatorV2[IN,OUT]中:
IN:输入数据的类型
OUT:输出数据类型

Spark---累加器和广播变量,大数据,scala,spark,spark,大数据,scala
WordCount案例实现完整代码:

package bigdata.wordcount.leijiaqi

import bigdata.wordcount.leijiaqi
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
 * 使用累加器完成WordCount案例
 */
object Spark_addDemo {
  def main(args: Array[String]): Unit = {
    //建立与Spark框架的连接
    val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件
    val context = new SparkContext(wordCount) //读取配置文件

    val dataRDD: RDD[String] = context.textFile("D:\\learnSoftWare\\IdeaProject\\Spark_Demo\\Spark_Core\\src\\main\\com.mao\\datas\\1.txt")

    //创建累加器对象
    val wordCountAccumulator = new WordCountAccumulator
    //向Spark中进行注册
    context.register(wordCountAccumulator,"wordCountAccumulator")

    //实现累加
    dataRDD.foreach(word => {
      wordCountAccumulator.add(word)
    })
    //获取累加结果,打印在控制台上
    println(wordCountAccumulator.value)

    //关闭链接
    context.stop()
  }

}

class WordCountAccumulator extends  AccumulatorV2[String,mutable.Map[String,Long]]
{

  //定义一个map用于存储累加后的结果
  var map: mutable.Map[String, Long] =mutable.Map[String,Long]()

  //累加器是否为初始状态
  override def isZero: Boolean = {
    map.isEmpty
  }

  //复制累加器
  override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
    new WordCountAccumulator()
  }

  //重置累加器
  override def reset(): Unit = {
    map.clear()
  }

  //向累加器添加数据IN
  override def add(word: String): Unit = {
    // 查询 map 中是否存在相同的单词
    // 如果有相同的单词,那么单词的数量加 1
    // 如果没有相同的单词,那么在 map 中增加这个单词
    val newValue = map.getOrElse(word, 0L) + 1
    map.update(word,newValue)
  }

  //合并累加器
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    var map1=this.map
    var map2=other.value

    //合并两个map
    map2.foreach({
      case (word,count)=>{
        val newValue = map1.getOrElse(word,0L)+count
        map1.update(word,newValue)
      }
    })
  }

  //返回累加器的结果(OUT)
  override def value: mutable.Map[String, Long] = this.map
}
}

运行结果:
Spark---累加器和广播变量,大数据,scala,spark,spark,大数据,scala

3.广播变量

在Apache Spark中,广播变量(Broadcast Variables)是一种特殊类型的变量,用于优化数据共享。当Spark应用程序在集群中的多个节点上运行时,每个节点都需要访问相同的数据集。如果数据集很大,那么将这些数据发送到每个节点可能会非常耗时和低效。 广播变量提供了一种方法,可以在集群中的所有节点上共享数据集的一个只读副本 ,从而避免了在每个节点上重复发送数据。

Spark---累加器和广播变量,大数据,scala,spark,spark,大数据,scala
广播变量也叫分布式只读变量,它可以将闭包数据发送到每个Executor的内存中来达到共享的目的,Executor其实就相当于一个JVM,在启动的时候会自动分配内存。

    //建立与Spark框架的连接
    val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件
    val context = new SparkContext(wordCount) //读取配置文件

    val rdd1 = context.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)), 4)
    val list = List(("a", 4), ("b", 5), ("c", 6), ("d", 7))
    // 声明广播变量
    val broadcast: Broadcast[List[(String, Int)]] = context.broadcast(list)
    val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
      case (key, num) => {
        var num2 = 0
        // 使用广播变量
        for ((k, v) <- broadcast.value) {
          if (k == key) {
            num2 = v
          }
        }
        (key, (num, num2))
      }
    }

    resultRDD.collect().foreach(print)
    
    context.stop()

Spark---累加器和广播变量,大数据,scala,spark,spark,大数据,scala文章来源地址https://www.toymoban.com/news/detail-800032.html

到了这里,关于Spark---累加器和广播变量的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark---累加器

    累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。 运行结果: 我们预期是想要实现数据的累加,开始数据从Driver被传输到了Execut

    2024年02月02日
    浏览(41)
  • Spark累加器LongAccumulator

    1.Accumulator是由Driver端总体进行维护的,读取当前值也是在Driver端,各个Task在其所在的Executor上也维护了Accumulator变量,但只是局部性累加操作,运行完成后会到Driver端去合并累加结果。Accumulator有两个性质: 1、只会累加,合并即累加; 2、不改变Spark作业懒执行的特点,即没

    2024年01月25日
    浏览(51)
  • 【Spark原理系列】Accumulator累加器原理用法示例源码详解

    源自专栏《SparkML:Spark ML系列专栏目录》 Accumulator是Spark中的一种分布式变量,用于在并行计算中进行累加操作。它是由MapReduce模型中的“全局计数器”概念演化而来的。 Accumulator提供了一个可写的分布式变量,可以在并行计算中进行累加操作。在Spark中,当一个任务对Accum

    2024年03月14日
    浏览(60)
  • Flink 源码剖析|累加器

    累加器是实现了 加法运算 功能和 合并运算 (合并多个累加器的结果)功能的一种数据结构,在作业结束后,可以获取所有部分(各个 operator 的各个 subtask)合并后的最终结果并发送到客户端。 Flink 的累加器均实现了 Accumulator 接口,包括如下 2 个方法用于支持加法运算和合

    2024年02月21日
    浏览(49)
  • 计算机组成原理 累加器实验

    计算机组成原理实验环境 理解累加器的概念和作用。 连接运算器、存储器和累加器,熟悉计算机的数据通路。 掌握使用微命令执行各种操作的方法。 做好实验预习,读懂实验电路图,熟悉实验元器件的功能特性和使用方法。在实验之前设计好要使用的微命令,填入表 6-2 、

    2024年02月06日
    浏览(40)
  • Flink 源码剖析|4. 累加器与相关工具方法

    累加器是实现了 加法运算 功能和 合并运算 (合并多个累加器的结果)功能的一种数据结构,在作业结束后,可以获取所有部分(各个 operator 的各个 subtask)合并后的最终结果并发送到客户端。 Flink 的累加器均实现了 Accumulator 接口,包括如下 2 个方法用于支持加法运算和合

    2024年03月15日
    浏览(40)
  • 【数字IC/FPGA】百度昆仑芯手撕代码--累加器

    已知一个加法器IP,其功能是计算两个数的和,但这个和延迟两个周期才会输出。现在有一串连续的数据输入,每个周期都不间断,试问最少需要例化几个上述的加法器IP,才可以实现累加的功能。 由于加法器两个周期后才能得到结果(再将该结果作为加法器的输入进行累加

    2024年02月09日
    浏览(38)
  • 《JUC并发编程 - 高级篇》05 -共享模型之无锁 (CAS | 原子整数 | 原子引用 | 原子数组 | 字段更新器 | 原子累加器 | Unsafe类 )

    有如下需求,保证 account.withdraw 取款方法的线程安全 原有实现并不是线程安全的 测试代码 执行测试代码,某次执行结果 5.1.1 为么不安全 withdraw 方法是临界区,会存在线程安全问题 查看下字节码 多线程在执行过程中可能会出现指令的交错,从而结果错误! 5.1.2 解决思路1

    2023年04月12日
    浏览(44)
  • spark广播变量

    2024-1-24 广播变量特点 Broadcast Variable会将使用到的变量,只会为每个节点拷贝一份,不会为每个task进行拷贝,能够优化性能(在task数量比较大体现更明显),减少网络传输及内存消耗 通过SparkContext的broadcast()方法,针对某个变量创建广播变量,可以通过广播变量的value()方法

    2024年01月25日
    浏览(59)
  • Apache Spark中的广播变量分发机制

    Apache Spark中的广播变量提供了一种机制,允许用户在集群中共享只读变量,并且每个任务都可以访问这个变量,而不需要在每次任务之间重新发送该变量。这种机制特别适用于在所有节点上都需要访问同一份只读数据集的情况,因为它可以显著减少网络通信的开销。 以下是广

    2024年01月24日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包