Spark基础学习笔记----RDD检查点与共享变量

这篇具有很好参考价值的文章主要介绍了Spark基础学习笔记----RDD检查点与共享变量。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

零、本讲学习目标

  1. 了解RDD容错机制
  2. 理解RDD检查点机制的特点与用处
  3. 理解共享变量的类别、特点与使用

一、RDD容错机制

  • 当Spark集群中的某一个节点由于宕机导致数据丢失,则可以通过Spark中的RDD进行容错恢复已经丢失的数据。RDD提供了两种故障恢复的方式,分别是血统(Lineage)方式设置检查点(checkpoint)方式

(一)血统方式

  • 根据RDD之间依赖关系对丢失数据的RDD进行数据恢复。若丢失数据的子RDD进行窄依赖运算,则只需要把丢失数据的父RDD的对应分区进行重新计算,不依赖其他节点,并且在计算过程中不存在冗余计算;若丢失数据的RDD进行宽依赖运算,则需要父RDD所有分区都要进行从头到尾计算,计算过程中存在冗余计算。

(二)设置检查点方式

  • 本质是将RDD写入磁盘存储。当RDD进行宽依赖运算时,只要在中间阶段设置一个检查点进行容错,即Spark中的sparkContext调用setCheckpoint()方法,设置容错文件系统目录作为检查点checkpoint,将checkpoint的数据写入之前设置的容错文件系统中进行持久化存储,若后面有节点宕机导致分区数据丢失,则以从做检查点的RDD开始重新计算,不需要从头到尾的计算,从而减少开销。

二、RDD检查点

(一)RDD检查点机制

  • RDD的检查点机制(Checkpoint)相当于对RDD数据进行快照,可以将经常使用的RDD快照到指定的文件系统中,最好是共享文件系统,例如HDFS。当机器发生故障导致内存或磁盘中的RDD数据丢失时,可以快速从快照中对指定的RDD进行恢复,而不需要根据RDD的依赖关系从头进行计算,大大提高了计算效率。

(二)与RDD持久化的区别

  • cache()或者persist()是将数据存储于机器本地的内存或磁盘,当机器发生故障时无法进行数据恢复,而检查点是将RDD数据存储于外部的共享文件系统(例如HDFS),共享文件系统的副本机制保证了数据的可靠性。
  • 在Spark应用程序执行结束后,cache()或者persist()存储的数据将被清空,而检查点存储的数据不会受影响,将永久存在,除非手动将其移除。因此,检查点数据可以被下一个Spark应用程序使用,而cache()或者persist()数据只能被当前Spark应用程序使用。

(三)RDD检查点案例演示

  • net.cl.rdd包里创建CheckpointDemo对象
package net.cl.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CheckpointDemo {
  def main(args: Array[String]): Unit = {
    // 设置系统属性(本地运行必须设置,否则无权访问HDFS)
    System.setProperty("HADOOP_USER_NAME", "root")
    // 创建SparkConf对象
    val conf = new SparkConf()
    // 设置应用程序名称,可在Spark WebUI里显示
    conf.setAppName("Spark-CheckpointDemo")
    // 设置集群Master节点访问地址
    conf.setMaster("local[2]")
    // 设置测试内存
    conf.set("spark.testing.memory", "2147480000")
    // 基于SparkConf对象创建SparkContext对象,该对象是提交Spark应用程序的入口
    val sc = new SparkContext(conf)

    // 设置检查点数据存储路径
    sc.setCheckpointDir("hdfs://master:9000/spark-ck")
    // 创建模拟数据RDD
    val rdd: RDD[Int] = sc.parallelize(List(1, 1, 2, 3, 5, 8, 13))
    // 过滤结果
    val resultRDD = rdd.filter(_ >= 5)
    // 持久化RDD到内存中
    resultRDD.cache()
    // 将resultRDD标记为检查点
    resultRDD.checkpoint()

    // 第一次行动算子计算时,将把标记为检查点的RDD数据存储到文件系统指定路径中
    val result: String = resultRDD.collect().mkString(", ")
    println(result)
    // 第二次行动算子计算时,将直接从文件系统读取resultRDD数据,而不需要从头计算
    val count = resultRDD.count()
    println(count)

    // 停止Spark容器
    sc.stop()
  }
}
  • 上述代码使用checkpoint()方法将RDD标记为检查点(只是标记,遇到行动算子才会执行)。在第一次行动计算时,被标记为检查点的RDD的数据将以文件的形式保存在setCheckpointDir()方法指定的文件系统目录中,并且该RDD的所有父RDD依赖关系将被移除,因为下一次对该RDD计算时将直接从文件系统中读取数据,而不需要根据依赖关系重新计算。
  • Spark建议,在将RDD标记为检查点之前,最好将RDD持久化到内存,因为Spark会单独启动一个任务将标记为检查点的RDD的数据写入文件系统,如果RDD的数据已经持久化到了内存,将直接从内存中读取数据,然后进行写入,提高数据写入效率,否则需要重复计算一遍RDD的数据。
  • 创建检查点保存数据的目录

Spark基础学习笔记----RDD检查点与共享变量

 文章来源地址https://www.toymoban.com/news/detail-458249.html

  • 运行程序,在控制台查看结果

Spark基础学习笔记----RDD检查点与共享变量

 

  • 查看HDFS检查点目录,执行命令:hdfs dfs -ls -R /spark-ck

Spark基础学习笔记----RDD检查点与共享变量

 

三、共享变量

  • 通常情况下,Spark应用程序运行的时候,Spark算子(例如map(func)或filter(func))中的函数func会被发送到远程的多个Worker节点上执行,如果一个算子中使用了某个外部变量,该变量就会复制到Worker节点的每一个Task任务中,各个Task任务对变量的操作相互独立。当变量所存储的数据量非常大时(例如一个大型集合)将增加网络传输及内存的开销。因此,Spark提供了两种共享变量:广播变量和累加器。

(一)广播变量

  • 广播变量是将一个变量通过广播的形式发送到每个Worker节点的缓存中,而不是发送到每个Task任务中,各个Task任务可以共享该变量的数据。因此,广播变量是只读的。

1、默认情况下变量的传递

  • map()算子传入的函数中使用外部变量arr

Spark基础学习笔记----RDD检查点与共享变量

scala> val arr = Array(1, 2, 3, 4, 5)
scala> val lines = sc.textFile("data.txt")
scala> val result = lines.map((_, arr))
scala> result.collect()

 

  • 上述代码中,传递给map()算子的函数(_, arr)会被发送到Executor端执行,而变量arr将发送到Worker节点所有Task任务中。变量arr传递的流程如下图所示

Spark基础学习笔记----RDD检查点与共享变量

  • 假设变量arr存储的数据量大小有100MB,则每一个Task任务都需要维护100MB的副本,若某一个Executor中启动了3个Task任务,则该Executor将消耗300MB内存。

2、使用广播变量时变量的传递

  • 广播变量其实是对普通变量的封装,在分布式函数中可以通过Broadcast对象的value方法访问广播变量的值

Spark基础学习笔记----RDD检查点与共享变量

 

  • 使用广播变量将数组arr传递给map()算子

Spark基础学习笔记----RDD检查点与共享变量

scala> val arr = Array(1, 2, 3, 4, 5)
scala> val broadcastVar = sc.broadcast(arr)
scala> val lines = sc.textFile("data.txt")
scala> val result = lines.map((_, broadcastVar))
scala> result.collect()

 

  • 上述代码使用broadcast()方法向集群发送(广播)了一个只读变量,该方法只发送一次,并返回一个广播变量broadcastVar,该变量是一个org.apache.spark.broadcast.Broadcast对象。Broadcast对象是只读的,缓存在集群的每个Worker节点中。使用广播变量进行变量传递的流程如下图所示。

Spark基础学习笔记----RDD检查点与共享变量

 

  • Worker节点的每个Task任务共享唯一的一份广播变量,大大减少了网络传输和内存开销。
  • 输出result的数据

Spark基础学习笔记----RDD检查点与共享变量

 

(二)累加器

1、累加器功能

  • 累加器提供了将Worker节点的值聚合到Driver的功能,可以用于实现计数和求和。

2、不使用累加器

  • 对一个整型数组求和

Spark基础学习笔记----RDD检查点与共享变量

 

  • 上述代码由于sum变量在Driver中定义,而累加操作sum = sum + x会发送到Executor中执行,因此输出结果不正确。

3、使用累加器

  • 对一个整型数组求和

Spark基础学习笔记----RDD检查点与共享变量

 

scala> val myacc = sc.longAccumulator("My Accumulator") // 声明累加器
scala> val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5))
scala> rdd.foreach(x => myacc.add(x)) // 向累加器添加值
scala> println("sum = " + myacc.value) // 在Driver端输出结果
  • 上述代码通过调用SparkContext对象的longAccumulator ()方法创建了一个Long类型的累加器,默认初始值为0。也可以使用doubleAccumulator()方法创建Double类型的累加器。
  • 累加器只能在Driver端定义,在Executor端更新。Executor端不能读取累加器的值,需要在Driver端使用value属性读取。

到了这里,关于Spark基础学习笔记----RDD检查点与共享变量的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • loadrunner入门教程(14)--检查点

    loadrunner入门教程(14)--检查点

    检查点函数原理:回放脚本时搜索特定的文本或者字符串,从而验证服务器相应的正确性;验证请求是否成功,可以添加检查点。以检查从服务器返回的内容是否正确。本任务针对脚本开发–检查点进行介绍 掌握基于loadrunner性能测试脚本开发——检查点 1.单击Design→Insert

    2024年02月05日
    浏览(12)
  • Flink状态管理与检查点机制

    Flink状态管理与检查点机制

    本专栏案例代码和数据集链接:  https://download.csdn.net/download/shangjg03/88477960 相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算。即你可以将中间的计算结果进行保存,并提供给后续的计算使用: 具体而言,Flink 又将状态 (State) 分为 Keyed State 与 O

    2024年02月07日
    浏览(36)
  • 深入了解 Flink 的检查点机制

    Flink 是一个流处理框架,用于实时数据处理。检查点(checkpoint)机制是 Flink 的一个核心组件,用于保证流处理作业的可靠性和容错性。在这篇文章中,我们将深入了解 Flink 的检查点机制,涵盖其核心概念、算法原理、实例代码以及未来发展趋势。 Flink 的检查点机制是一种保存

    2024年02月20日
    浏览(5)
  • Flink流式计算状态检查点与恢复

    Flink流式计算状态检查点与恢复 Apache Flink是一个流处理框架,用于实时数据处理和分析。Flink可以处理大规模数据流,并提供一种高效、可靠的方法来处理和分析这些数据。Flink流式计算状态检查点与恢复是流处理的关键组件,它们确保Flink应用程序在故障时能够恢复并继续处

    2024年02月19日
    浏览(8)
  • 怎么理解flink的异步检查点机制

    flink的checkpoint监控页面那里有两个指标Sync Duration 和Async Duration,一个是开始进行同步checkpoint所需的时间,一个是异步checkpoint过程所需的时间,你是否也有过疑惑,是否只是同步过程中的时间才会阻塞正常的数据处理,而异步checkpoint的时间不会影响正常的数据处理流程? 这

    2024年02月09日
    浏览(5)
  • Flink系列之:背压下的检查点

    Flink系列之:背压下的检查点

    通常情况下,对齐 Checkpoint 的时长主要受 Checkpointing 过程中的同步和异步两个部分的影响。 然而,当 Flink 作业正运行在严重的背压下时,Checkpoint 端到端延迟的主要影响因子将会是传递 Checkpoint Barrier 到 所有的算子/子任务的时间。这在 checkpointing process) 的概述中有说明原因

    2024年02月04日
    浏览(7)
  • 论文阅读-多级检查点重新启动MPI应用的共同设计

    论文阅读-多级检查点重新启动MPI应用的共同设计

    论文名称: Co-Designing Multi-Level Checkpoint Restart for MPI Applications 摘要—高性能计算(HPC)系统继续通过包含更多硬件组件来支持更大的应用部署来扩展。关键是,这种扩展往往会减少故障之间的平均时间,从而使容错成为一个越来越重要的挑战。在HPC中容错的标准做法是检查点

    2024年04月09日
    浏览(8)
  • 【大数据】Flink 架构(五):检查点 Checkpoint(看完即懂)

    【大数据】Flink 架构(五):检查点 Checkpoint(看完即懂)

    《 Flink 架构 》系列(已完结),共包含以下 6 篇文章: Flink 架构(一):系统架构 Flink 架构(二):数据传输 Flink 架构(三):事件时间处理 Flink 架构(四):状态管理 Flink 架构(五):检查点 Checkpoint(看完即懂) Flink 架构(六):保存点 Savepoint 😊 如果您觉得这篇

    2024年02月19日
    浏览(7)
  • 【涨薪技术】0到1学会性能测试 —— LR录制回放&事务&检查点

    上一次推文我们分享了性能测试分类和应用领域,今天带大家学习性能测试工作原理、事务、检查点!后续文章都会系统分享干货,带大家从0到1学会性能测试,另外还有教程等同步资料,文末免费获取~ ​通常我们认为LoadRunner是由三部分组成:VuGen、Controller、Analysis VuGen:录

    2024年02月05日
    浏览(4)
  • 【Flink】Flink 记录一个 checkpoint 检查点 越来越大的问题

    【Flink】Flink 记录一个 checkpoint 检查点 越来越大的问题

    Flink SQL checkpoint越来越大咋么办,从2个G,现在4个G了,增量同步的,窗口是1小时,watermark是6小时,按道理来说,数据量不应该越来越大啊? 在窗口内执行了count(distinct )这些操作。设置了状态的ttl。后端状态存储用的rocksdb。 状态如下 设置了增量的检查点 代码设置不一定有

    2024年02月10日
    浏览(8)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包