【Spark练习】RDD分区操作

这篇具有很好参考价值的文章主要介绍了【Spark练习】RDD分区操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

练习1:行动(Action)操作算子方法

任务1: reduce

// 1. 数组
val x = sc.parallelize(List(1,2,3,4))
val y = x.reduce( (a,b) => a + b)
// 2. 列表
val rdd = sc.parallelize(List(1,2,3,4))
// 求和,将各个数累加,依次合并  下面两种方式相同
val y = rdd.reduce( (x,y) => x + y)
val y = rdd.reduce(_+_)

【Spark练习】RDD分区操作

任务2: saveAsTextFile

val x = sc.parallelize(Array(2,4,1))
x.saveAsTextFile("file:///F:/04Spark/dataset/hello_new.txt")
// 再将文件中内容读出来
val y = sc.textFile("file:///F:/04Spark/dataset/hello_new.txt")
y.collect.mkString(",")

【Spark练习】RDD分区操作

练习2:RDD的分区操作、分区个数查看

任务1: textFile、parallelize

2.1 textFile
  • 对于textFile而言,如果没有在方法中指定分区数,则sc.defaultMinPartitions默认为min(defaultParallelism,2),其中,defaultParallelism对应的就是spark.default.parallelism,如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片)
  • rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions)
// 1. 查看默认分区 (为4)
sc.defaultParallelism
// 2. 查看默认最小分区 (为2)
sc.defaultMinPartitions
//3.  将rdd存为文件
val rdd1 = sc.parallelize(Array(2,4))
rdd1.saveAsTextFile("file:///F:/04Spark/dataset/hello_new2.txt")
// 4. 读取
val rdd2 = sc.textFile("file:///F:/04Spark/dataset/hello_new2.txt")
// 5.查看分区的数量 (为4)
rdd2.partitions.size

【Spark练习】RDD分区操作

  • 由于rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions),rdd的分区数为4,sc.defaultMinPartitions为2,所以本地file的分片数为4,检查发现确实分为4个part。
2.2 parallelize
  • 这种方式下,如果在parallelize操作时没有指定分区数,则rdd的分区数 = sc.defaultParallelism
// 1. 没有设置分区数量
val array = Array(1,2,3,4,5)
val rdd = sc.parallelize(array) #没有设置分区数量
// 分区数量为默认分区数量(4)
rdd.partitions.size
// 2. 设置分区数量
val rdd = sc.parallelize(array,2) // 分区数量为2
// 结果分区数量变为2
rdd.partitions.size

【Spark练习】RDD分区操作

任务2: repartition

  • coalesce方法默认是不触发shuffle的,而repartition方法一定会触发shuffle,他们都可以重新进行分区
  • repartition方法不会改变原来rdd分区数量,而是使返回新的rdd分区数量改变
val array = Array(1,2,3,4,5)
val rdd = sc.parallelize(array,2) // 此时rdd分区数量为2
// repartition不会改变原来rdd,它会返回一个新的rdd
rdd.repartition(1)  // 此时rdd分区数量仍为2
rdd.partitions.size
// 返回一个新的rdd,新的rdd分区数量为1
val rdd1 = rdd.repartition(1) // 此时新的rdd1分区数量为1,rdd分区数量为2
rdd1.partitions.size

【Spark练习】RDD分区操作

补充

1. countByKey()

  • action算子;根据key的次数来做统计
val x = sc.parallelize(Array(('J',"James"),('F',"Fred"),('A',"Anna"),('J',"John")))
val y = x.countByKey()
println(y) 

【Spark练习】RDD分区操作

2. foreachPartition

foreachPartition方法是迭代器被传入了我们的方法(每个分区执行一次函数,我们获取迭代器后需要自行进行迭代处理)

//   1. 分区 2个分区
    val rdd = sc.parallelize(1 to 6,2)
    rdd.foreachPartition(x =>{
      println("data")
      println(x)
      while(x.hasNext){println(x.next())}
    })

【Spark练习】RDD分区操作

3. aggregate 方法

3.1 方法说明

首先对每个分区内的数据基于初始值进行一个首次聚合,然后将每个分区聚合的结果,通过使用给定的聚合函数,再次基于初始值进行分区之间的聚合,并且最终返回结果。该算子为action算子。

3.2 操作步骤
  • 定义两个要给 aggregate 当作输入参数的函数,给初值3
// 乘积
def pfun1(p1: Int, p2: Int): Int = {
    p1 * p2
}
// 和
def pfun2(p3: Int, p4: Int): Int = {
    p3 + p4
}
// 
val array = Array(1,2,3,4,5)
// 指明分区数量为1,否则默认分区数量为4
 val rdd1 = sc.parallelize(array,1)
// 给定初值3,先进行相乘,再将结果进行相加
rdd1.aggregate(3)(pfun1, pfun2)

【Spark练习】RDD分区操作

3.2 分析
  • 首先用初值 3 作为 pfun1 的参数 p1 ,用 RDD 中的第 1 个值,即 1 作为 pfun1 的参数 p2 。由此我们可以得到第一个计算值为 3 * 1 = 3 。接着这个结果 3 被当成 p1 参数传入,RDD 中的第 2 个值即 2 被当成 p2 传入,由此得到第二个计算结果为 3 * 2 = 6 。以此类推,整个 pfun1 函数执行完成以后3 * 1 * 2 * 3 * 4 * 5 = 360

  • 在 aggregate 方法的第 1 个参数函数 pfun1 执行完毕以后,我们得到了结果值 360 。于是,这个时候就要开始执行第 2 个参数函数 pfun2 了。

  • pfun2 的执行过程与 pfun1 是差不多的,同样会将 zeroValue 作为第一次运算的参数传入,在这里即是将 zeroValue 即 3 当成 p3 参数传入,然后是将 pfun1 的结果 360 当成 p4 参数传入,由此得到计算结果为 363 。因为 pfun1 仅有一个结果值,所以整个 aggregate 过程就计算完毕了,最终的结果值就是 363 。
    注意分区数量的不同导致最后运算的结果也会不同。

3.3 多个分片RDD
val array = Array(1,2,3,4,5,6,7,8,9,10)
val rdd2 = sc.parallelize(array,3)
rdd2.getNumPartitions

rdd2.aggregate(2)(pfun1, pfun2)

【Spark练习】RDD分区操作文章来源地址https://www.toymoban.com/news/detail-434077.html

  • 分析:
    2 * 1 * 2 * 3 = 12
    2 * 4 * 5 * 6 = 240
    2 * 7 * 8 * 9 * 10 = 10080
    2 + 12 + 240 + 10080 = 10334

参考链接

  1. aggregate参考
  2. 默认分区数参考

到了这里,关于【Spark练习】RDD分区操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 2023_Spark_实验十:RDD基础算子操作

    Ø练习 1: Ø 练习 2: Ø 练习 3: Ø 练习 4: Ø 练习 5: groupByKey groupByKey会将RDD[key,value]按照相同的key进行分组,形成RDD[key,iterable[value]]的形式,有点类似于sql中的groupby,例如类似于mysql中的group_contact cogroup groupByKey是对单个RDD的数据进行分组,还可以使用一个叫作cogroup()的函

    2024年02月08日
    浏览(34)
  • 2023_Spark_实验十一:RDD高级算子操作

    coalesce : 总所周知,spark的rdd编程中有两个算子repartition和coalesce。公开的资料上定义为,两者都是对spark分区数进行调整的算子。         repartition会经过shuffle,其实际上就是调用的coalesce(shuffle=true)。         coalesce,默认shuffle=false,不会经过shuffle。         当

    2024年02月08日
    浏览(25)
  • 横扫Spark之 - 9个常见的行动算子

    水善利万物而不争,处众人之所恶,故几于道💦 1. collect()   收集RDD每个分区的数据以数组封装之后发给Driver   如果RDD数据量比较大,Driver内存默认只有1G,可能出现内存溢出,工作中一般需要将Driver内存设置为5-10G。可以通过 bin/spark-submit --driver-memory 10G 这样设置 结果

    2024年02月20日
    浏览(27)
  • Spark 【RDD编程(一)RDD编程基础】

            在Spark中,RDD是弹性分布式数据集(Resilient Distributed Dataset)的缩写。通俗来讲,RDD是一种抽象的数据结构,用于表示分布式计算中的数据集合。它是Spark中最基本的数据模型,可以看作是一个不可变的、可分区、可并行处理的数据集合。这个数据集的全部或部分可

    2024年02月09日
    浏览(41)
  • Spark【RDD编程(三)键值对RDD】

            键值对 RDD 就是每个RDD的元素都是 (key,value)类型的键值对,是一种常见的 RDD,可以应用于很多场景。                 因为毕竟通过我们之前Hadoop的学习中,我们就可以看到对数据的处理,基本都是以键值对的形式进行统一批处理的,因为MapReduce模型中

    2024年02月09日
    浏览(37)
  • Spark---RDD依赖关系

    1.1 RDD依赖关系 在Spark中,一个RDD的形成依赖于另一个RDD,则称这两个RDD具有依赖关系(一般指相邻的两个RDD之间的关系) ,RDD的依赖关系对于优化Spark应用程序的性能和可靠性非常重要。通过合理地设计RDD的转换和动作操作,可以避免不必要的Shuffle操作,提高计算效率。 words的

    2024年01月19日
    浏览(40)
  • Spark核心--RDD介绍

    rdd  弹性分布式数据集  是spark框架自己封装的数据类型,用来管理内存数据 数据集: rdd数据的格式  类似Python中 []     。 hive中的 该结构[] 叫 数组 rdd提供算子(方法)  方便开发人员进行调用计算数据 在pysaprk中本质是定义一个rdd类型用来管理和计算内存数据 分布式 : r

    2024年01月16日
    浏览(29)
  • SPARK-RDD

    分区列表 RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。 分区计算函数 Spark 在计算时,是使用分区函数对每一个分区进行计算 RDD之间的依赖关系 RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建

    2024年02月04日
    浏览(30)
  • Spark RDD 缓存机制

    Spark RDD 缓存是在内存存储RDD计算结果的一种优化技术。把中间结果缓存起来以便在需要的时候重复使用,这样才能有效减轻计算压力,提升运算性能。 当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接

    2024年03月25日
    浏览(43)
  • 【Spark】RDD转换算子

    目录 map mapPartitions mapPartitionsWithIndex flatMap glom groupBy shuffle filter sample distinct coalesce repartition sortBy ByKey intersection union subtract zip partitionBy reduceByKey groupByKey reduceByKey 和 groupByKey 的区别 aggregateByKey foldByKey combineByKey reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别 join leftOuterJoin

    2024年02月12日
    浏览(60)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包