大数据 - Spark系列《八》- 闭包引用

这篇具有很好参考价值的文章主要介绍了大数据 - Spark系列《八》- 闭包引用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 Spark系列文章:

大数据 - Spark系列《一》- 从Hadoop到Spark:大数据计算引擎的演进-CSDN博客

大数据 - Spark系列《二》- 关于Spark在Idea中的一些常用配置-CSDN博客

大数据 - Spark系列《三》- 加载各种数据源创建RDD-CSDN博客

大数据 - Spark系列《四》- Spark分布式运行原理-CSDN博客

大数据 - Spark系列《五》- Spark常用算子-CSDN博客

大数据 - Spark系列《六》- RDD详解-CSDN博客

大数据 - Spark系列《七》- 分区器详解-CSDN博客


目录

8.1.🐶闭包引用的原理

1. 闭包引用的概念

2. 闭包引用的副本

3. 🧀实例代码1

4. 🧀实例代码2

8.2 闭包引用的应用场景

🍠Source.fromFile和sc.textFile的辨析

1. 使用 Source.fromFile 读取数据:

2. 使用 sc.textFile 读取数据:

8.3 🐶闭包引用的注意事项

1.🥙序列化检查

2. 🥙“副本”数量

8.4 🐶闭包变量的问题

🥙BT传输协议:基本原理


8.1.🐶闭包引用的原理

1. 闭包引用的概念

  • 算子函数中引用了一个算子外部的变量 , 这个变量就是闭包变量 ;

  • 这些引用会随着任务的序列化而被发送到各个 Executor 上,并在 Executor 上被反序列化。

  • 闭包变量定义在Driver端 ,使用在任务实例端 , 变量需要序列化

2. 闭包引用的副本

  • 在 Executor 上执行的任务中的闭包对象是完全独立的。

  • 修改任务中的闭包对象不会影响到 Driver 端的原对象,因为它们是独立的副本。

3. 🧀实例代码1

对于在 RDD 算子函数中引用的外部对象,其修改仅影响到任务执行所在的 Executor 上的局部副本,而不会影响到 Driver 端的原对象。

package com.doit.day0217

import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, ResultSet}
import org.apache.log4j.{Level, Logger}

/**
 * @日期: 2024/2/19
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */


object Test06 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 1)
    var cnt=0  // cnt,在Driver端的jvm中
    rdd.foreach(x=>{
      cnt+=1  // 此处的cnt是在worker端反序列化出的Task中,与driver端已无联系
      println(x)
    })
    println(cnt) // 此处打印的是driver端的cnt,依然是0

    // 关闭SparkContext对象
    sc.stop()
  }
}

 大数据 - Spark系列《八》- 闭包引用,spark,大数据,spark,分布式 

4. 🧀实例代码2

Spark 的转换操作是惰性求值的,只有在调用结果算子(如 foreachcountcollect 等)时才会触发实际的作业执行。

因此cnt的取值为调用结果算子之前的值

package com.doit.day0217
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, ResultSet}
import org.apache.log4j.{Level, Logger}
/**
 * @日期: 2024/2/19
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */


object Test07 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 1)
    var cnt=0  // cnt,在Driver端的jvm中

    //算子中函数在远端执行
    //函数中引用的遍历 闭包引用,将闭包变量复制一个副本发送到远端,你对闭包变量的所有操作操作的是副本
    val rdd1 = rdd.map(x => {
      cnt += 1 // 只有在调用结果算子时,才会runjob,所以这里取进来的cnt为10
      println("-----" + cnt)
      x*10
    })

    cnt=10
    println(cnt) // 此处打印的是driver端的cnt,依然是0

    rdd1.foreach(println)
    // 关闭SparkContext对象
    sc.stop()
  }
}

 大数据 - Spark系列《八》- 闭包引用,spark,大数据,spark,分布式 

8.2 闭包引用的应用场景

可以使用闭包引用避免shuffle

在我们之前讲解join算子的案例中,需要将两个数据集进行连接,通常会触发 shuffle 操作,这会带来一定的性能开销。

 大数据 - Spark系列《八》- 闭包引用,spark,大数据,spark,分布式 

 我们可以将user数据集转换成hashmap,通过闭包引用传入进去,此时可以避免shuffle

 大数据 - Spark系列《八》- 闭包引用,spark,大数据,spark,分布式 

package com.doit.day0217

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.io.Source

/**
 * 示例代码:演示了如何使用闭包变量避免shuffle
 */
object Test08 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    // 加载用户数据,将其转换为Map
    val user = Source.fromFile("data/join/user.txt")
    val userMap: Map[String, String] = user.getLines().map(line => {
      val arr = line.split(",")
      (arr(0), arr(1))
    }).toMap // 将用户数据转换为Map,存储在内存中 一般数据集控制大小为1G以内

    // 加载订单数据
    val orders = sc.textFile("data/join/orders.txt")

    // 使用闭包变量userMap避免shuffle,将用户名直接关联到订单数据中
    val rdd2 = orders.map(iter => {
      val arr2 = iter.split(",")
      val name = userMap.getOrElse(arr2(4), "unknown") // 使用闭包变量userMap关联订单数据中的用户ID
      (arr2(0), arr2(1), arr2(2), arr2(3), arr2(4), name)
    })

    // 打印处理后的订单数据
    rdd2.foreach(println)

    // 关闭SparkContext对象
    sc.stop()
  }
}

  大数据 - Spark系列《八》- 闭包引用,spark,大数据,spark,分布式 

如果需要让每个task都用有一份“只读”的“小量”数据,比如一个字典,则可以利用闭包引用;

如果共享的这分只读数据比较大,则应该使用“广播变量”效率更高!

🍠Source.fromFilesc.textFile的辨析

1. 使用 Source.fromFile 读取数据:
  • 何时使用:

    • 当数据量较小,可以完全装载到内存中并进行处理时,适合使用 Source.fromFile 方法读取数据。例如,对于文件大小在几十兆到几百兆之间的数据集,可以考虑使用该方法。

    • 特点:

      • 将文件内容一次性读取到内存中,适合对数据进行全量处理。

      • 读取的数据直接转换为内存中的集合类型(如Map),方便进行后续处理。

    • 注意事项:

      • 对于较大的数据集,可能会导致内存溢出,因此在处理大规模数据时需要谨慎使用。

2. 使用 sc.textFile 读取数据:
  • 何时使用:

    • 当数据量较大,无法一次性装载到内存中进行处理时,应该使用 Spark 的 sc.textFile 方法读取数据。例如,对于几百兆到几十亿的大型数据集,应该使用该方法。

  • 特点:

    • Spark 的 sc.textFile 方法将文件分布式地加载到集群中的各个节点上,并返回一个分布式数据集(RDD)。

    • 可以有效地处理大规模数据,具有良好的扩展性和性能。

  • 使用方式:

    • 使用 sc.textFile 方法加载文件后,可以使用各种 Spark 的转换操作对数据进行处理,如 map、filter、reduceByKey 等。

  • 注意事项:

    • 使用 sc.textFile 方法加载的数据集是分布式的,无法直接转换为集合类型,因此需要结合 Spark 的各种操作进行处理。

8.3 🐶闭包引用的注意事项

1.🥙序列化检查

闭包引用的对象,必须实现序列化接口,否则会导致task序列化失败,从而快速报错

   class Per extends Serializable {
      val id:Int = 0
    }
    val p = new Per()
    val rd = sc.makeRDD(1 to 10, 2)
    // 闭包引用的对象,必须实现序列化接口
    rd.map((_,p)).foreach(println)

2. 🥙“副本”数量

如果闭包引用的是普通对象,则每个task中都有一份“copy”

如果闭包引用的是一个object对象(单例对象),则其实在整个executor中只有一份,如下

Yarn :resourcemanager nodemanager

Nodemanager 提供计算资源 : container对资源的隔离

    object Per extends Serializable {
      val id:Int = 0
    }
    val p = Per
    val rd = sc.makeRDD(1 to 10, 2)
    rd.map((_,p)).foreach(println)

如果使用闭包引用object对象,有可能产生线程安全问题(因为有多个task线程共享这个对象);

看似闭包,实非闭包

下面的代码,其实并不是“闭包引用”

当然,里面用到了自定义类型,也还是要注意序列化问题

    class Phone(var brand: String, var price: Double)
    val resRdd = rdd.map(tp => {
      new Phone(tp._1, tp._2) // 这里并没有引用外部的对象,所以不存在f序列化检查失败的问题    })
      //.map(phone => (phone.brand, phone.price))
      //.reduceByKey(_ + _) // 这里有shuffle,但shuffle写出的是 2元组,它能序列化,所以不会报错
      .groupBy(p=>p.brand)  // 这里有shuffle,而且shuffle写出phone对象,它不能序列化,所以报错
      .mapValues(_.size)

8.4 🐶闭包变量的问题

  • 每个任务实例中都会保存一份完整的闭包变量

  • 如果一条节点同时运行当前任务的多个任务实例 , 存储多份闭包变量数据

package com.doit.day0217
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, ResultSet}
import org.apache.log4j.{Level, Logger}
/**
 * @日期: 2024/2/19
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */


object Test07 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
    var cnt=0  // cnt,在Driver端的jvm中

    //算子中函数在远端执行
    //函数中引用的遍历 闭包引用,将闭包变量复制一个副本发送到远端,你对闭包变量的所有操作操作的是副本
    val rdd1 = rdd.map(x => {
      cnt += 1 // 只有在调用结果算子时,才会runjob,所以这里取进来的cnt为10
      println("-----" + cnt)
      x*10
    })

    cnt=10
    println(cnt) // 此处打印的是driver端的cnt,依然是0

    rdd1.foreach(println)
    // 关闭SparkContext对象
    sc.stop()
  }
}

 大数据 - Spark系列《八》- 闭包引用,spark,大数据,spark,分布式

解决方案: 运行当前任务的节点只存一份数据 

大数据 - Spark系列《八》- 闭包引用,spark,大数据,spark,分布式

🥙BT传输协议:基本原理

BitTorrent(简称BT)是一种用于大规模文件分享的通信协议。它被广泛用于分发大型文件和数据集,例如软件、电影、音乐等。BitTorrent协议的主要原理是将文件分成小块,并且允许用户同时上传和下载这些文件块,从而实现高效的分发。

以下是BitTorrent协议的主要特点和工作原理:

  1. 分布式架构:

    1. BitTorrent是一种分布式协议,没有单一的中心服务器,所有参与者都可以直接交换文件块。

    2. 参与者之间通过Tracker服务器或者DHT网络(分布式哈希表)进行通信,用于发现其他参与者并交换文件块的信息。

  2. 分块下载:

    1. 将文件分成固定大小的块(一般为256KB或512KB)。

    2. 下载者可以选择下载文件的哪些块,而不是整个文件,从而实现灵活的下载策略。

  3. 种子文件:

    1. 种子文件(Torrent文件)包含了文件的元数据信息,包括文件名、大小、哈希值等。

    2. 种子文件可以通过文件共享网站或者其他方式进行传播,从而让其他用户获取文件的元数据信息并加入下载。

  4. 优化的上传下载策略:

    1. BitTorrent协议实现了一种基于位掩码的上传下载策略,优先下载缺失的文件块,并且优先上传稀缺的文件块,从而提高下载速度和整体的网络效率。

  5. 健壮性和自我修复:

    1. BitTorrent协议具有较强的健壮性,即使某些参与者离线或者退出下载,其他参与者仍然可以通过其他方式获取丢失的文件块。

    2. 通过校验和哈希校验值,BitTorrent协议可以检测到下载的文件块是否损坏或者被篡改,并且自动请求重新下载。

  6. Tracker服务器和DHT网络:

    1. Tracker服务器用于管理下载者和上传者的信息,帮助下载者找到可用的上传者。

    2. DHT网络是一种分布式哈希表,允许下载者通过哈希值查询其他下载者的IP地址和端口信息,从而实现去中心化的Peer发现。

大数据 - Spark系列《八》- 闭包引用,spark,大数据,spark,分布式文章来源地址https://www.toymoban.com/news/detail-832563.html

到了这里,关于大数据 - Spark系列《八》- 闭包引用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark大数据分析与实战笔记(第三章 Spark RDD 弹性分布式数据集-02)

    人生很长,不必慌张。你未长大,我要担当。 传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。Spark中的RDD可以很好的解决这一缺点。 RDD是Spark提供的最重要的抽象概念

    2024年02月22日
    浏览(83)
  • 大数据课程K2——Spark的RDD弹性分布式数据集

    文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州 ⚪ 了解Spark的RDD结构; ⚪ 掌握Spark的RDD操作方法; ⚪ 掌握Spark的RDD常用变换方法、常用执行方法; 初学Spark时,把RDD看做是一个集合类型(类似于Array或List),用于存储数据和操作数据,但RDD和普通集合的区别

    2024年02月12日
    浏览(50)
  • 大数据开源框架环境搭建(七)——Spark完全分布式集群的安装部署

    前言:七八九用于Spark的编程实验 大数据开源框架之基于Spark的气象数据处理与分析_木子一个Lee的博客-CSDN博客_spark舆情分析 目录 实验环境: 实验步骤: 一、解压 二、配置环境变量:  三、修改配置文件  1.修改spark-env.sh配置文件: 2.修改配置文件slaves: 3.分发配置文件:

    2024年02月11日
    浏览(49)
  • 云计算与大数据第16章 分布式内存计算平台Spark习题

    1、Spark是Hadoop生态(  B  )组件的替代方案。 A. Hadoop     B. MapReduce        C. Yarn             D.HDFS 2、以下(  D  )不是Spark的主要组件。 A. Driver      B. SparkContext       C. ClusterManager D. ResourceManager 3、Spark中的Executor是(  A  )。 A.执行器      B.主节

    2024年02月14日
    浏览(104)
  • 分布式计算中的大数据处理:Hadoop与Spark的性能优化

    大数据处理是现代计算机科学的一个重要领域,它涉及到处理海量数据的技术和方法。随着互联网的发展,数据的规模不断增长,传统的计算方法已经无法满足需求。因此,分布式计算技术逐渐成为了主流。 Hadoop和Spark是目前最为流行的分布式计算框架之一,它们都提供了高

    2024年01月23日
    浏览(54)
  • 分布式计算框架:Spark、Dask、Ray 分布式计算哪家强:Spark、Dask、Ray

    目录 什么是分布式计算 分布式计算哪家强:Spark、Dask、Ray 2 选择正确的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式计算是一种计算方法,和集中式计算是相对的。 随着计算技术的发展, 有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成

    2024年02月11日
    浏览(64)
  • 数据存储和分布式计算的实际应用:如何使用Spark和Flink进行数据处理和分析

    作为一名人工智能专家,程序员和软件架构师,我经常涉及到数据处理和分析。在当前大数据和云计算的时代,分布式计算已经成为了一个重要的技术方向。Spark和Flink是当前比较流行的分布式计算框架,它们提供了强大的分布式计算和数据分析功能,为数据处理和分析提供了

    2024年02月16日
    浏览(58)
  • Spark单机伪分布式环境搭建、完全分布式环境搭建、Spark-on-yarn模式搭建

    搭建Spark需要先配置好scala环境。三种Spark环境搭建互不关联,都是从零开始搭建。 如果将文章中的配置文件修改内容复制粘贴的话,所有配置文件添加的内容后面的注释记得删除,可能会报错。保险一点删除最好。 上传安装包解压并重命名 rz上传 如果没有安装rz可以使用命

    2024年02月06日
    浏览(76)
  • 【Spark分布式内存计算框架——Spark 基础环境】1. Spark框架概述

    第一章 说明 整个Spark 框架分为如下7个部分,总的来说分为Spark 基础环境、Spark 离线分析和Spark实时分析三个大的方面,如下图所示: 第一方面、Spark 基础环境 主要讲述Spark框架安装部署及开发运行,如何在本地模式和集群模式运行,使用spark-shell及IDEA开发应用程序,测试及

    2024年02月11日
    浏览(65)
  • spark分布式解压工具

    ​ spark解压缩工具,目前支持tar、gz、zip、bz2、7z压缩格式,默认解压到当前路下,也支持自定义的解压输出路径。另外支持多种提交模式,进行解压任务,可通过自定义配置文件,作为spark任务的资源设定 2.1 使用hadoop的FileSystem类,对tos文件的进行读取、查找、写入等操作

    2024年02月02日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包