尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】

这篇具有很好参考价值的文章主要介绍了尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili

  1. 尚硅谷大数据技术Spark教程-笔记01【SparkCore(概述、快速上手、运行环境、运行架构)】
  2. 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,RDD-核心属性-执行原理-基础编程-并行度与分区-转换算子)】
  3. 尚硅谷大数据技术Spark教程-笔记03【SparkCore(核心编程,RDD-转换算子-案例实操)】
  4. 尚硅谷大数据技术Spark教程-笔记04【SparkCore(核心编程,RDD-行动算子-序列化-依赖关系-持久化-分区器-文件读取与保存)】
  5. 尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】
  6. 尚硅谷大数据技术Spark教程-笔记06【SparkCore(案例实操,电商网站)】

目录

01_尚硅谷大数据技术之SparkCore

第05章-Spark核心编程

P105【105.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 原理及简单演示】15:49

P106【106.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 问题】03:39

P107【107.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 自定义实现】10:55

P108【108.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 自定义实现 - 1】07:14

P109【109.尚硅谷_SparkCore - 核心编程 - 数据结构 - 广播变量】17:16


01_尚硅谷大数据技术之SparkCore

第05章-Spark核心编程

P105【105.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 原理及简单演示】15:49

尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】

5.2 累加器

5.2.1 实现原理

累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。

尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】 ​​​​​​​尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】

package com.atguigu.bigdata.spark.core.acc

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_Acc {

  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc = new SparkContext(sparConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    //reduce:分区内计算,分区间计算
    //val i: Int = rdd.reduce(_+_)
    //println(i)
    var sum = 0
    rdd.foreach(
      num => {
        sum += num
      }
    )
    println("sum = " + sum) // sum = 0

    sc.stop()
  }
}
package com.atguigu.bigdata.spark.core.acc

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_Acc {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc = new SparkContext(sparConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    // 获取系统累加器
    // Spark默认就提供了简单数据聚合的累加器
    val sumAcc = sc.longAccumulator("sum")

    //sc.doubleAccumulator
    //sc.collectionAccumulator

    rdd.foreach(
      num => {
        // 使用累加器
        sumAcc.add(num)
      }
    )

    // 获取累加器的值
    println(sumAcc.value) // 10

    sc.stop()
  }
}

P106【106.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 问题】03:39

package com.atguigu.bigdata.spark.core.acc

import org.apache.spark.{SparkConf, SparkContext}

object Spark03_Acc {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc = new SparkContext(sparConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    // 获取系统累加器
    // Spark默认就提供了简单数据聚合的累加器
    val sumAcc = sc.longAccumulator("sum")

    //sc.doubleAccumulator
    //sc.collectionAccumulator

    val mapRDD = rdd.map(
      num => {
        // 使用累加器
        sumAcc.add(num)
        num
      }
    )

    // 获取累加器的值
    // 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
    // 多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
    // 一般情况下,累加器会放置在行动算子进行操作
    mapRDD.collect()
    mapRDD.collect()
    println(sumAcc.value) // 20

    sc.stop()
  }
}

P107【107.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 自定义实现】10:55

尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】

package com.atguigu.bigdata.spark.core.acc

import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Spark04_Acc_WordCount {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc = new SparkContext(sparConf)

    val rdd = sc.makeRDD(List("hello", "spark", "hello"))

    // 累加器 : WordCount
    // 创建累加器对象
    val wcAcc = new MyAccumulator()
    // 向Spark进行注册
    sc.register(wcAcc, "wordCountAcc")

    rdd.foreach(
      word => {
        // 数据的累加(使用累加器)
        wcAcc.add(word)
      }
    )

    // 获取累加器累加的结果
    println(wcAcc.value)

    sc.stop()
  }

  /*
    自定义数据累加器:WordCount

    1. 继承AccumulatorV2, 定义泛型
       IN : 累加器输入的数据类型 String
       OUT : 累加器返回的数据类型 mutable.Map[String, Long]

    2. 重写方法(6)
   */
  class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
  }
}

P108【108.尚硅谷_SparkCore - 核心编程 - 数据结构 -累加器 - 自定义实现 - 1】07:14

尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】

package com.atguigu.bigdata.spark.core.acc

import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Spark04_Acc_WordCount {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc = new SparkContext(sparConf)

    val rdd = sc.makeRDD(List("hello", "spark", "hello"))

    // 累加器 : WordCount
    // 创建累加器对象
    val wcAcc = new MyAccumulator()
    // 向Spark进行注册
    sc.register(wcAcc, "wordCountAcc")

    rdd.foreach(
      word => {
        // 数据的累加(使用累加器)
        wcAcc.add(word)
      }
    )

    // 获取累加器累加的结果
    println(wcAcc.value)

    sc.stop()
  }

  /*
    自定义数据累加器:WordCount

    1. 继承AccumulatorV2, 定义泛型
       IN : 累加器输入的数据类型 String
       OUT : 累加器返回的数据类型 mutable.Map[String, Long]

    2. 重写方法(6)
   */
  class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {
    private var wcMap = mutable.Map[String, Long]()

    // 判断是否初始状态
    override def isZero: Boolean = {
      wcMap.isEmpty
    }

    override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
      new MyAccumulator()
    }

    override def reset(): Unit = {
      wcMap.clear()
    }

    // 获取累加器需要计算的值
    override def add(word: String): Unit = {
      val newCnt = wcMap.getOrElse(word, 0L) + 1
      wcMap.update(word, newCnt)
    }

    // Driver合并多个累加器
    override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {

      val map1 = this.wcMap
      val map2 = other.value

      map2.foreach {
        case (word, count) => {
          val newCount = map1.getOrElse(word, 0L) + count
          map1.update(word, newCount)
        }
      }
    }

    // 累加器结果
    override def value: mutable.Map[String, Long] = {
      wcMap
    }
  }
}

P109【109.尚硅谷_SparkCore - 核心编程 - 数据结构 - 广播变量】17:16

5.3 广播变量

5.3.1 实现原理

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送。

尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】

尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】文章来源地址https://www.toymoban.com/news/detail-429874.html

package com.atguigu.bigdata.spark.core.acc

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Spark05_Bc {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc = new SparkContext(sparConf)

    val rdd1 = sc.makeRDD(List(
      ("a", 1), ("b", 2), ("c", 3)
    ))
    //        val rdd2 = sc.makeRDD(List(
    //            ("a", 4),("b", 5),("c", 6)
    //        ))
    val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))

    // join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用
    //val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
    //joinRDD.collect().foreach(println)
    // (a, 1),    (b, 2),    (c, 3)
    // (a, (1,4)),(b, (2,5)),(c, (3,6))
    rdd1.map {
      case (w, c) => {
        val l: Int = map.getOrElse(w, 0)
        (w, (c, l))
      }
    }.collect().foreach(println)
    //(a,(1,4))
    //(b,(2,5))
    //(c,(3,6))

    sc.stop()
  }
}
package com.atguigu.bigdata.spark.core.acc

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Spark06_Bc {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local").setAppName("Acc")
    val sc = new SparkContext(sparConf)

    val rdd1 = sc.makeRDD(List(
      ("a", 1), ("b", 2), ("c", 3)
    ))
    val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))

    // 封装广播变量
    val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)

    rdd1.map {
      case (w, c) => {
        // 访问广播变量
        val l: Int = bc.value.getOrElse(w, 0)
        (w, (c, l))
      }
    }.collect().foreach(println)
    //(a,(1,4))
    //(b,(2,5))
    //(c,(3,6))

    sc.stop()
  }
}

到了这里,关于尚硅谷大数据技术Spark教程-笔记05【SparkCore(核心编程,累加器、广播变量)】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 尚硅谷Docker实战教程-笔记05【本地镜像发布到阿里云与私有库】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】 视频地址:尚硅谷Docker实战教程(docker教程天花板)_哔哩哔哩_bilibili 尚硅谷Docker实战教程-笔记01【理念简介、官网介绍、平台入门图解、平台架构图解】 尚硅谷Docker实战教程-笔记02【安装docker、镜像加速器配置

    2024年02月12日
    浏览(39)
  • Spark重温笔记(二):快如闪电的大数据计算框架——你真的了解SparkCore的 RDD 吗?(包含企业级搜狗案例和网站点击案例)

    前言:今天是温习 Spark 的第 2 天啦!主要梳理了 Spark 核心数据结构:RDD(弹性分布式数据集),其中包括基于内存计算的 SparkCore 各类技术知识点希望对大家有帮助! Tips:\\\"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起

    2024年03月25日
    浏览(27)
  • Spark大数据分析与实战笔记(第二章 Spark基础-05)

    成长是一条必走的路路上我们伤痛在所难免。 在大数据处理和分析领域,Spark被广泛应用于解决海量数据处理和实时计算的挑战。作为一个快速、可扩展且易于使用的分布式计算框架,Spark为开发人员提供了丰富的API和工具来处理和分析大规模数据集。 其中,Spark-Shell是Spar

    2024年02月03日
    浏览(92)
  • 大数据技术之SparkCore

            RDD( Resilient Distributed Dataset )叫做弹性分布式数据集 ,是Spark中最基本的数据抽象。         代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。 RDD代表的是 弹性、可分区、不可变、元素可并行计算 的计算。 1. 弹性

    2024年02月01日
    浏览(33)
  • 尚硅谷Docker实战教程-笔记06【Docker容器数据卷】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】 视频地址:尚硅谷Docker实战教程(docker教程天花板)_哔哩哔哩_bilibili 尚硅谷Docker实战教程-笔记01【理念简介、官网介绍、平台入门图解、平台架构图解】 尚硅谷Docker实战教程-笔记02【安装docker、镜像加速器配置

    2024年02月16日
    浏览(33)
  • 尚硅谷大数据Flink1.17实战教程-笔记02【部署】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】 视频地址:尚硅谷大数据Flink1.17实战教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据Flink1.17实战教程-笔记01【Flink概述、Flink快速上手】 尚硅谷大数据Flink1.17实战教程-笔记02【Flink部署】 尚硅谷大数据Flink1.17实

    2024年02月09日
    浏览(33)
  • 尚硅谷大数据Flink1.17实战教程-笔记02【Flink部署】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】 视频地址:尚硅谷大数据Flink1.17实战教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据Flink1.17实战教程-笔记01【Flink概述、Flink快速上手】 尚硅谷大数据Flink1.17实战教程-笔记02【Flink部署】 尚硅谷大数据Flink1.17实

    2024年02月11日
    浏览(28)
  • 尚硅谷大数据Flink1.17实战教程-笔记03【Flink运行时架构】

    尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】 视频地址:尚硅谷大数据Flink1.17实战教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据Flink1.17实战教程-笔记01【Flink概述、Flink快速上手】 尚硅谷大数据Flink1.17实战教程-笔记02【Flink部署】 尚硅谷大数据Flink1.17实

    2024年02月16日
    浏览(37)
  • Spark---SparkCore(一)

    1、Master(standalone):资源管理的主节点(进程) 2、Cluster Manager:在集群上获取资源的外部服务(例如:standalone,Mesos,Yarn) 3、Worker Node(standalone):资源管理的从节点(进程)或者说管理本机资源的进程 4、Driver Program:用于连接工作进程(Worker)的程序 5、Executor:是一个worker进程所管理的节点

    2024年02月03日
    浏览(25)
  • 【Spark】SparkCore

    三大数据结构分别是: ➢ RDD : 弹性分布式数据集 ➢ 累加器:分布式共享只写变量 ➢ 广播变量:分布式共享只读变量 RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的 数据处理模型 。 代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面

    2024年02月12日
    浏览(71)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包