大数据 - Spark系列《六》- RDD详解

这篇具有很好参考价值的文章主要介绍了大数据 - Spark系列《六》- RDD详解。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spark系列文章:

大数据 - Spark系列《一》- 从Hadoop到Spark:大数据计算引擎的演进-CSDN博客

大数据 - Spark系列《二》- 关于Spark在Idea中的一些常用配置-CSDN博客

大数据 - Spark系列《三》- 加载各种数据源创建RDD-CSDN博客

大数据 - Spark系列《四》- Spark分布式运行原理-CSDN博客

大数据 - Spark系列《五》- Spark常用算子-CSDN博客

目录

🐶6.1 RDD 属性

1.🥙分区列表

2.🥙分区计算逻辑

3.🥙RDD 之间的依赖关系

4.🥙可选的分区器

5.🥙计算任务的首选地址

🐶6.2 RDD 分区

6.2.1 原始RDD

1.🥙集合

2.🥙文件

🍠textFile的切片原理和设置切片大小的规则

3.🥙数据库

6.2.2 由转换算子转换的RDD

🧀实例1:(distinct算子可以指定分区个数)

🧀实例2:(union操作后的分区个数)

🧀实例3:(join操作之后的分区个数)

🧀实例4:(交集差集笛卡尔积操作之后的分区个数)

🍠repartition和coalesce

🍠groupBy不一定会Shuffle


RDD 分布式弹性数据集 , 是spark编程的核心抽象对象 ,在rdd上调用算子编程

  • 分布式RDD被设计为分布在集群的多个节点上。它们内部包含有关数据分区的信息,允许Spark在多个节点上并行计算。这使得Spark能够有效地处理大规模数据集。

  • 弹性:可恢复的 ,可复原, 安全可靠 , rdd调用转换算子返回新的rdd

可以通过RDD上下游的关系进行推导出

  • 数据集:本身是不存储数据的 底层是迭代器 ,迭代自的数据大数据 - Spark系列《六》- RDD详解,spark,大数据,spark,分布式

🐶6.1 RDD 属性

abstract class RDD 是一个抽象类 , 一切皆对象, 类中包含的信息:

*  - A list of partitions
*  - A function for computing each split
*  - A list of dependencies on other RDDs
*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
*    an HDFS file)
1.🥙分区列表

RDD 包含一个分区列表,每个分区都是数据的一个子集。这些分区是并行计算的基本单位,允许 Spark 在集群中的多个节点上同时处理数据。

2.🥙分区计算逻辑

对于每个分区,RDD 包含一个用于计算该分区的函数。这意味着在 RDD 上应用转换算子时,该函数将在每个分区上执行相同的计算逻辑。

3.🥙RDD 之间的依赖关系

RDD 可以依赖于其他 RDD,形成一个有向无环图(DAG)。这些依赖关系允许 Spark 追踪数据的来源,从而实现容错和数据恢复,即使在节点故障时也能重新计算丢失的数据

4.🥙可选的分区器

某些类型的 RDD,特别是键值对 RDD(例如通过 groupBy 或 reduceByKey 生成的 RDD),可能具有可选的分区器。这些分区器指定了如何重新分区数据,以便在后续计算中更有效地进行并行处理。

大数据 - Spark系列《六》- RDD详解,spark,大数据,spark,分布式

5.🥙计算任务的首选地址

对于每个分区,RDD 可以指定首选的计算位置列表。这些位置可能是数据存储的位置,如 HDFS 文件的块位置,或者其他 Spark 节点的位置。指定首选位置可以提高计算效率,尤其是在节点之间的数据传输成本较高的情况下。

大数据 - Spark系列《六》- RDD详解,spark,大数据,spark,分布式

🐶6.2 RDD 分区

RDD的分区个数?

6.2.1 原始RDD

1.🥙集合

如果RDD是由集合创建的,默认情况下,其分区数由 Spark 集群的总核心数(totalCores)确定。这是通过spark.default.parallelism参数设置的。如果未设置该参数,则默认使用 Spark 集群的总核心数。 除此之外,也可以通过在创建RDD时手动指定分区数来覆盖默认值。

package com.doit.day0217
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, ResultSet}
import org.apache.log4j.{Level, Logger}
/**
 * @日期: 2024/2/17
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */


object Test01 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    val rdd1 = sc.makeRDD(List(1, 2, 3))
    val rdd2 = sc.parallelize(Array(1, 2, 3),4)

    println(rdd1.getNumPartitions) //16 默认集群总核数
    println(rdd2.getNumPartitions) //4  指定分区数

    sc.stop()
  }
}
2.🥙文件

当从文件中创建RDD时,Spark会根据计算任务的切片大小和输入路径下的文件大小来确定分区数。通常情况下,至少会有两个分区。这样做是为了确保数据可以被并行处理。

🍠textFile的切片原理和设置切片大小的规则
  • 切片规则:

  1. 对于作业(job)的输入路径中的每个文件,单独切片。

  2. 判断每个文件是否支持切片:

    • 如果支持切片,根据指定的切片大小对文件进行切片。

    • 如果不支持切片,则将整个文件作为一个切片。

  • 切片大小计算

// 切片大小计算规则
splitSize = Math.max(minSize, Math.min(goalSize, blockSize))

参数说明

1)minSize:指定最小的切片大小。可以通过设置参数 mapreduce.input.fileinputformat.split.minsizemapred.min.split.size 来指定。默认值为 1L

2)goalSize:根据需求设定的期望切片大小。

goalSize=所有文件大小总和/指定的切片个数(可以通过参数传递)textFile(path,2)

3)blockSize:本地目录的切片大小为 32M,HDFS目录的切片大小为 128M256M,具体取决于 HDFS 文件块的配置。

大数据 - Spark系列《六》- RDD详解,spark,大数据,spark,分布式

3.🥙数据库

当从数据库等外部数据源创建RDD时,通常可以在创建RDD时通过参数指定分区数。这可以根据数据规模和处理需求进行调整,以实现更好的并行处理效果。

6.2.2 由转换算子转换的RDD

当对原始RDD进行转换操作时,可以通过设置参数来指定新RDD的分区数,从而控制转换后RDD的分区个数。例如,在进行 mapflatMapfilter 等转换操作时,可以通过传递参数来指定分区数。通常情况下,Spark 会尽可能地保留原始RDD的分区数,但也可以手动指定分区数来满足特定的计算需求。

转换算子产生的RDD的分区数
1) 原则上分区个数是不变的
2) 有些算子可以调用的时候指定分区个数   distinct   join  groupBy  groupByKey

3) 特殊的算子 有特殊规定   
join 1)spark.default.parallelism  2)分区数多的哪个rdd的分区数
val rdd3 = rdd1.intersection(rdd2)  // 取大的
val rdd3 = rdd1.union(rdd2)  //两个rdd分区之和
val rdd4 = rdd1.subtract(rdd2) // 前面的RDD分区数
println(rdd1.cartesian(rdd2).getNumPartitions) // 两个分区个数乘积
注意: 可能产生Shuffle的算子可以指定分区个数的
distinct(p)     减少
groupBy(_._1 , p)    Shuffle 
groupByKey( p)       Shuffle 
groupByKey(_+_, p)   Shuffle 
join( , p)
🧀实例1:(distinct算子可以指定分区个数)
package com.doit.day0217
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, ResultSet}
import org.apache.log4j.{Level, Logger}
/**
 * @日期: 2024/2/17
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */


object Test01 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式
      .set("spark.default.parallelism","8")

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(Array(1, 2, 3, 1,4,5,6,7,8,9,1,1),4)

    println(rdd1.getNumPartitions) //4  指定分区数

    val rdd2 = rdd1.map(_ * 10)
    println(rdd2.getNumPartitions) //4

    val rdd3 = rdd2.distinct(2)
    println(rdd3.getNumPartitions) //2
    sc.stop()
  }
}
🧀实例2:(union操作后的分区个数)
package com.doit.day0217
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, ResultSet}
import org.apache.log4j.{Level, Logger}
/**
 * @日期: 2024/2/17
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */


object Test01 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式
      .set("spark.default.parallelism","8")

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(Array(1, 2, 3,3,4,5),2)
    val rdd2 = sc.parallelize(Array(1,2,3,4,4,5),3)
    val rdd3 = rdd1.union(rdd2)
    rdd3.foreach(println)
    println(rdd3.getNumPartitions)   //5
    sc.stop()
  }
}

大数据 - Spark系列《六》- RDD详解,spark,大数据,spark,分布式

🧀实例3:(join操作之后的分区个数)
package com.doit.day0217

import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, ResultSet}
import org.apache.log4j.{Level, Logger}

/**
 * @日期: 2024/2/17
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */


object Test02 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式
     // .set("spark.default.parallelism","8")  //如果这里设置了,那么join后的rdd分区数为8
    
    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    val rdd1 = sc.makeRDD(List(("aaa", 1), ("bbb", 2), ("ccc", 3)),2)
    val rdd2 = sc.makeRDD(List(("aaa", "F"), ("bbb", "M"), ("ccc", "M")),3)

    val rdd3 = rdd1.join(rdd2)
    rdd3.foreach(println)
    println(rdd3.getNumPartitions)   //3

    sc.stop()
  }
}
🧀实例4:(交集差集笛卡尔积操作之后的分区个数)
package com.doit.day0217
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, ResultSet}
import org.apache.log4j.{Level, Logger}
/**
 * @日期: 2024/2/17
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */


object Test01 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式
     // .set("spark.default.parallelism","8")

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(Array(1, 2, 3,3,4,5),2)
    val rdd2 = sc.parallelize(Array(1,2,3,4,4,5),3)
    val rdd3 = rdd1.union(rdd2)
    val rdd4 = rdd1.intersection(rdd2)
    val rdd5 = rdd1.subtract(rdd2)
    val rdd6 = rdd1.cartesian(rdd2)

    println(rdd3.getNumPartitions)   //5 之和
    println(rdd4.getNumPartitions)   //3 取大的
    println(rdd5.getNumPartitions)   //2 取前面的
    println(rdd6.getNumPartitions)   //6 乘积
    sc.stop()
  }
}
🍠repartition和coalesce

repartition 是一个广泛使用的方法,它用于增加 RDD 的分区数量。它的工作原理是通过随机洗牌(shuffle)将数据重新分配到新的分区中。

val rdd = sc.parallelize(1 to 100, 3) // 创建一个初始分区数为 3 的 RDD
val repartitionedRDD = rdd.repartition(5) // 增加分区数为 5

coalesce 方法用于减少 RDD 的分区数量,它的工作原理是在不触发 shuffle 的情况下,将多个分区合并为较少的分区。因为不涉及 shuffle 操作,所以相比于 repartition 方法,coalesce 方法的开销更小。

println(rdd1.coalesce(1, true).getNumPartitions)  //减少
println(rdd1.coalesce(3, true).getNumPartitions)  //增加
// 不允许Shuffle就不能增加分区
println(rdd1.coalesce(3, false).getNumPartitions)  //增加失败
println(rdd1.coalesce(1, false).getNumPartitions)  //减少  不会Shuffle

总结

  • repartition 用于增加 RDD 的分区数量,会触发 shuffle 操作,开销较大。

  • coalesce 用于减少 RDD 的分区数量,可以选择是否触发 shuffle 操作,开销较小。

  • 如果需要增加分区数或者进行重分区时,通常首选 repartition 方法;如果需要减少分区数且不触发 shuffle 操作,通常首选 coalesce 方法。

🍠groupBy不一定会Shuffle

Shuffle:上游一个分区的数据可能被下游所有分区引用

大数据 - Spark系列《六》- RDD详解,spark,大数据,spark,分布式

package com.doit.com.doit.day0128

import org.apache.spark.SparkContext.jarOfObject
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
 * @日期: 2024/1/29
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 我是技术大牛
 * @Description:
 */


object Test03 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("doe").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1 = sc.makeRDD(List("a b c d e f g"), 2)

    val rdd2: RDD[String] = rdd1.flatMap(_.split("\\s+"))

    val wordOne = rdd2.map(line=>{
      println("aaaaaa")
      (line,1)
    })   //2

    //对数据使用HashPartitioner在分区 2
    val rdd3 = wordOne.partitionBy(new HashPartitioner(3))

    rdd3.mapPartitionsWithIndex((p,iter)=>{
      iter.map(e=>(p,e))
    }).foreach(println)

    //底层默认是HashPartition分区 2
    val rdd4: RDD[(String, Iterable[(String, Int)])] = rdd3.groupBy(_._1, 3)

    val rdd5: RDD[(Int, (String, Iterable[(String, Int)]))] = rdd4.mapPartitionsWithIndex((p, iter) => {
      iter.map(e => (p, e))
    })

    rdd5.foreach(println)
    
    sc.stop()
  }
}

结果

大数据 - Spark系列《六》- RDD详解,spark,大数据,spark,分布式

在 Spark 中,groupBy 操作并不一定会触发 Shuffle,具体是否触发 Shuffle 取决于操作前后的分区情况。在上面的示例中,通过使用 HashPartitioner 对数据进行重新分区,避免了不必要的 Shuffle 操作,从而提高了计算效率。文章来源地址https://www.toymoban.com/news/detail-829324.html

到了这里,关于大数据 - Spark系列《六》- RDD详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 工具系列:PyCaret介绍_Fugue 集成_Spark、Dask分布式训练

    Fugue 是一个低代码的统一接口,用于不同的计算框架,如 Spark、Dask。PyCaret 使用 Fugue 来支持分布式计算场景。 让我们从最标准的例子开始,代码与本地版本完全相同,没有任何魔法。 compare_model 如果您不想使用分布式系统,也完全相同。 现在让我们将其分布式,作为一个玩

    2024年02月04日
    浏览(51)
  • 大数据学习06-Spark分布式集群部署

    配置好IP vim /etc/sysconfig/network-scripts/ifcfg-ens33 修改主机名 vi /etc/hostname 做好IP映射 vim /etc/hosts 关闭防火墙 systemctl status firewalld systemctl stop firewalld systemctl disable firewalld 配置SSH免密登录 ssh-keygen -t rsa 下载Scala安装包 配置环境变量 添加如下配置 使环境生效 验证 Spark官网 解压 上

    2024年02月10日
    浏览(70)
  • 大数据开源框架环境搭建(七)——Spark完全分布式集群的安装部署

    前言:七八九用于Spark的编程实验 大数据开源框架之基于Spark的气象数据处理与分析_木子一个Lee的博客-CSDN博客_spark舆情分析 目录 实验环境: 实验步骤: 一、解压 二、配置环境变量:  三、修改配置文件  1.修改spark-env.sh配置文件: 2.修改配置文件slaves: 3.分发配置文件:

    2024年02月11日
    浏览(54)
  • 云计算与大数据第16章 分布式内存计算平台Spark习题

    1、Spark是Hadoop生态(  B  )组件的替代方案。 A. Hadoop     B. MapReduce        C. Yarn             D.HDFS 2、以下(  D  )不是Spark的主要组件。 A. Driver      B. SparkContext       C. ClusterManager D. ResourceManager 3、Spark中的Executor是(  A  )。 A.执行器      B.主节

    2024年02月14日
    浏览(119)
  • 分布式计算中的大数据处理:Hadoop与Spark的性能优化

    大数据处理是现代计算机科学的一个重要领域,它涉及到处理海量数据的技术和方法。随着互联网的发展,数据的规模不断增长,传统的计算方法已经无法满足需求。因此,分布式计算技术逐渐成为了主流。 Hadoop和Spark是目前最为流行的分布式计算框架之一,它们都提供了高

    2024年01月23日
    浏览(57)
  • 分布式计算框架:Spark、Dask、Ray 分布式计算哪家强:Spark、Dask、Ray

    目录 什么是分布式计算 分布式计算哪家强:Spark、Dask、Ray 2 选择正确的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式计算是一种计算方法,和集中式计算是相对的。 随着计算技术的发展, 有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成

    2024年02月11日
    浏览(70)
  • 数据存储和分布式计算的实际应用:如何使用Spark和Flink进行数据处理和分析

    作为一名人工智能专家,程序员和软件架构师,我经常涉及到数据处理和分析。在当前大数据和云计算的时代,分布式计算已经成为了一个重要的技术方向。Spark和Flink是当前比较流行的分布式计算框架,它们提供了强大的分布式计算和数据分析功能,为数据处理和分析提供了

    2024年02月16日
    浏览(62)
  • Spark单机伪分布式环境搭建、完全分布式环境搭建、Spark-on-yarn模式搭建

    搭建Spark需要先配置好scala环境。三种Spark环境搭建互不关联,都是从零开始搭建。 如果将文章中的配置文件修改内容复制粘贴的话,所有配置文件添加的内容后面的注释记得删除,可能会报错。保险一点删除最好。 上传安装包解压并重命名 rz上传 如果没有安装rz可以使用命

    2024年02月06日
    浏览(83)
  • 【Spark分布式内存计算框架——Spark 基础环境】1. Spark框架概述

    第一章 说明 整个Spark 框架分为如下7个部分,总的来说分为Spark 基础环境、Spark 离线分析和Spark实时分析三个大的方面,如下图所示: 第一方面、Spark 基础环境 主要讲述Spark框架安装部署及开发运行,如何在本地模式和集群模式运行,使用spark-shell及IDEA开发应用程序,测试及

    2024年02月11日
    浏览(67)
  • spark分布式解压工具

    ​ spark解压缩工具,目前支持tar、gz、zip、bz2、7z压缩格式,默认解压到当前路下,也支持自定义的解压输出路径。另外支持多种提交模式,进行解压任务,可通过自定义配置文件,作为spark任务的资源设定 2.1 使用hadoop的FileSystem类,对tos文件的进行读取、查找、写入等操作

    2024年02月02日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包