2023_Spark_实验十二:Spark高级算子使用

这篇具有很好参考价值的文章主要介绍了2023_Spark_实验十二:Spark高级算子使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

掌握Spark高级算子在代码中的使用
2023_Spark_实验十二:Spark高级算子使用,Scala,Spark实验,spark,ajax,大数据
相同点分析
三个函数的共同点,都是Transformation算子。惰性的算子。
不同点分析
map函数是一条数据一条数据的处理,也就是,map的输入参数中要包含一条数据以及其他你需要传的参数。
mapPartitions函数是一个partition数据一起处理,也即是说,mapPartitions函数的输入是一个partition的所有数据构成的“迭代器”,然后函数里面可以一条一条的处理,在把所有结果,按迭代器输出。也可以结合yield使用效果更优。
rdd的mapPartitions是map的一个变种,它们都可进行分区的并行处理。
两者的主要区别是调用的粒度不一样:map的输入变换函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区。
mapPartitionsWithIndex函数,其实和mapPartitions函数区别不大,因为mapPartitions背后调的就是mapPartitionsWithIndex函数,只是一个参数被close了。mapPartitionsWithIndex的函数可以或得partition索引号;
假设一个rdd有10个元素,分成3个分区。如果使用map方法,map中的输入函数会被调用10次;而使用mapPartitions方法的话,其输入函数会只会被调用3次,每个分区调用1次。
mapPartitionsWithIndex则是带上分区下标进行操作。
1、mapPartitionWithIndex
 

import org.apache.spark.{SparkConf,SparkContext}
object mapPartitionWithIndex {

def getPartInfo:(Int,Iterator[Int]) => Iterator[String] = (index:Int,iter:Iterator[Int]) =>{
iter.map(x =>"[ PartId " + index +", elems: " + x + " ]")
}

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RddMapPartitionsWithIndexDemo")
val sc = new SparkContext(conf)

val rdd1 = sc.parallelize(List(1,2,3,4,5,9,6,7,8),numSlices =3 )
val rdd2 = rdd1.mapPartitionsWithIndex(getPartInfo)
rdd2.collect().foreach(println)

}

}

2023_Spark_实验十二:Spark高级算子使用,Scala,Spark实验,spark,ajax,大数据

2、aggregate
首先我们来创建一个 RDD

scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24


scala> rdd1.collect
warning: there was one feature warning; re-run with -feature for details
res24: Array[Int] = Array(1, 2, 3, 4, 5)
这个 RDD 仅有 1 个分片,包含 5 个数据: 1, 2, 3, 4, 5 。
 
然后我们来应用一下 aggregate 方法。
 
在使用 aggregate 之前,我们还是先定义两个要给 aggregate 当作输入参数的函数吧。
scala> 
// Entering paste mode (ctrl-D to finish)
def pfun1(p1: Int, p2: Int): Int = {
p1 * p2
}

// Exiting paste mode, now interpreting.
pfun1: (p1: Int, p2: Int)Int

scala>

 
scala> 
// Entering paste mode (ctrl-D to finish)
def pfun2(p3: Int, p4: Int): Int = {
p3 + p4
}

// Exiting paste mode, now interpreting.
pfun2: (p3: Int, p4: Int)Int

scala>
接着是第 2 个函数。就不再解释什么了。
 
然后终于可以开始应用我们的 aggregate 方法了。
scala> rdd1.aggregate(3)(pfun1, pfun2)
res25: Int = 363

scala>
输出结果是 363 !这个结果是怎么算出来的呢?
 
首先我们的 zeroValue 即初值是 3 。然后通过上面小节的介绍,我们知道首先会应用 pfun1 函数,因为我们这个 RDD 只有 1 个分片,所以整个运算过程只会有一次 pfun1 函数调用。它的计算过程如下:
 
首先用初值 3 作为 pfun1 的参数 p1 ,然后再用 RDD 中的第 1 个值,即 1 作为 pfun1 的参数 p2 。由此我们可以得到第一个计算值为 3 * 1 = 3 。接着这个结果 3 被当成 p1 参数传入,RDD 中的第 2 个值即 2 被当成 p2 传入,由此得到第二个计算结果为 3 * 2 = 6 。以此类推,整个 pfun1 函数执行完成以后,得到的结果是  3 * 1 * 2 * 3 * 4 * 5 = 360 。这个 pfun1 的应用过程有点像是 “在 RDD 中滑动计算” 。
在 aggregate 方法的第 1 个参数函数 pfun1 执行完毕以后,我们得到了结果值 360 。于是,这个时候就要开始执行第 2 个参数函数 pfun2 了。
 
pfun2 的执行过程与 pfun1 是差不多的,同样会将 zeroValue 作为第一次运算的参数传入,在这里即是将 zeroValue 即 3 当成 p3 参数传入,然后是将 pfun1 的结果 360 当成 p4 参数传入,由此得到计算结果为 363 。因为 pfun1 仅有一个结果值,所以整个 aggregate 过程就计算完毕了,最终的结果值就是 363 

import org.apache.spark.{SparkConf, SparkContext}

object RddAggregateDemo {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RddDemos")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List("12","23","345",""),numSlices = 2)
//下面代码等价与rdd1.aggregate("")(x+y) => math.min(x.length,y.length)),(x,y)=>x+y))
val result = rdd1.aggregate("")(func1,func2)
println(result)
}
//(x,y)=>math.min(x.Length,y.length)
def func1:(String,String) => String =(x:String,y:String) =>{
println("<x: " + x + ",x.len: " + x.length + ">,<y: " + y +",y.len: " + y.length + ">")
val ret =math.min(x.length,y.length).toString
println("func1 ret:" +ret)
ret
}
//(x+y) => x+y
def func2:(String,String)=>String=(x:String,y:String) =>{
println("========" +(x+y))
x+y
}
}

2023_Spark_实验十二:Spark高级算子使用,Scala,Spark实验,spark,ajax,大数据文章来源地址https://www.toymoban.com/news/detail-720677.html

3、aggregateByKey
通过scala集合以并行化方式创建一个RDD
scala> val pairRdd = sc.parallelize(List((“cat”,2),(“cat”,5),(“mouse”,4),(“cat”,12),(“dog”,12),(“mouse”,2)),2)
pairRdd 这个RDD有两个区,一个区中存放的是:
(“cat”,2),(“cat”,5),(“mouse”,4)
另一个分区中存放的是:
(“cat”,12),(“dog”,12),(“mouse”,2)
然后,执行下面的语句
scala > pairRdd.aggregateByKey(100)(math.max(_ , _), _ + _ ).collect
结果:
res0: Array[(String,Int)] = Array((dog,100),(cat,200),(mouse,200)
下面是以上语句执行的原理详解:
aggregateByKey的意思是:按照key进行聚合
第一步:将每个分区内key相同数据放到一起
分区一
(“cat”,(2,5)),(“mouse”,4)
分区二
(“cat”,12),(“dog”,12),(“mouse”,2)
第二步:局部求最大值
对每个分区应用传入的第一个函数,math.max(_ , _),这个函数的功能是求每个分区中每个key的最大值
这个时候要特别注意,aggregateByKe(100)(math.max(_ , _),_+_)里面的那个100,其实是个初始值
在分区一中求最大值的时候,100会被加到每个key的值中,这个时候每个分区就会变成下面的样子
分区一
(“cat”,(2,5,100)),(“mouse”,(4,100))
然后求最大值后变成:
(“cat”,100), (“mouse”,100)
分区二
(“cat”,(12,100)),(“dog”,(12.100)),(“mouse”,(2,100))
求最大值后变成:
(“cat”,100),(“dog”,100),(“mouse”,100)
第三步:整体聚合
将上一步的结果进一步的合成,这个时候100不会再参与进来
最后结果就是:
(dog,100),(cat,200),(mouse,200)
对PairRDD中相同的Key值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey'函数最终返回的类型还是PairRDD,对应的结果是Key和聚合后的值,而aggregate函数直接返回的是非RDD的结果。


import org.apache.spark.{SparkConf, SparkContext}

object RddAggregateByKeyDemo {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setMaster("local").setAppName("RddAggregateByKeyDemo")
val sc = new SparkContext(conf)
val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),numSlices = 2)
val rdd = pairRDD.aggregateByKey(100)(func2,func2)
val resultArray = rdd.collect
resultArray.foreach(println)
sc.stop()

}
def func2(index:Int,iter:Iterator[(String,Int)]):Iterator[String] = {
iter.map(x=>"[partID:" + index + ",val: " +x +"]")
}

//(x,y)=>math.max(x+y)
def func1:(Int,Int) => Int =(x:Int,y:Int) =>{
println("<x: " + x + "," +",y:"+ y + ">")
val ret =math.max(x,y)
println("func1 max:" +ret)
ret
}
//(x+y) => x+y
def func2:(Int,Int)=>Int=(x:Int,y:Int) =>{
println("========func2 x :" + x + ",y:"+y)
println("========func2 ret =====" +(x+y))
x+y
}

}
2023_Spark_实验十二:Spark高级算子使用,Scala,Spark实验,spark,ajax,大数据

到了这里,关于2023_Spark_实验十二:Spark高级算子使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 2023_Spark_实验三:基于IDEA开发Scala例子

    一、创建一个空项目,作为整个项目的基本框架 二、创建SparkStudy模块,用于学习基本的Spark基础 三、创建项目结构 1、在SparkStudy模块下的pom.xml文件中加入对应的依赖,并等待依赖包下载完毕。 在pom.xml文件中加入对应的依赖 等待依赖包下载完毕 2、若不能自动下载依赖包,

    2024年02月10日
    浏览(27)
  • 2023_Spark_实验七:Scala函数式编程部分演示

    1、Scala中的函数 在Scala中,函数是“头等公民”,就和数字一样。可以在变量中存放函数,即:将函数作为变量的值(值函数)。 2、匿名函数 3、带函数参数的函数,即:高阶函数 示例1: (*)首先,定义一个最普通的函数 (*)再定义一个高阶函数 (*)分析这个高阶函数

    2024年02月08日
    浏览(40)
  • 2023_Spark_实验九:Scala函数式编程部分演示

    需求: 1、做某个文件的词频统计//某个单词在这个文件出现次数 步骤: 1、文件单词规律(空格分开) 2、单词切分 3、单词的统计 (k,v)-(k:单词,V:数量) 4、打印 框架: 1、单例对象,main() 2、创建CONF 3、创建SC--读取文件的方式--》RDD 4、RDD进行处理 5、关闭资源 一、新

    2024年02月08日
    浏览(28)
  • 2023_Spark_实验六:Scala面向对象部分演示(二)(IDEA开发)

    7、Scala中的apply方法() 遇到如下形式的表达式时,apply方法就会被调用: Object(参数1,参数2,......,参数N) 通常,这样一个apply方法返回的是伴生类的对象;其作用是为了省略new Object的apply方法举例: 8、Scala中的继承 Scala和Java一样,使用extends扩展类。 案例一:

    2024年02月10日
    浏览(38)
  • 2023_Spark_实验五:Scala面向对象部分演示(一)(IDEA开发)

    1、面向对象的基本概念 把数据及对数据的操作方法放在一起,作为一个相互依存的整体——对象,面向 对象的三大特征:  封装  继承  多态 2、类的定义 简单类和无参方法 如果要开发main方法,需要将main方法定义在该类的伴生对象中,即:object对 象中,(后续做详细的讨

    2024年02月10日
    浏览(30)
  • Spark算子-Scala版本 头歌答案

    第1关 Spark算子--Scala版本 编程要求 根据提示,在右侧编辑器 begin-end 处补充代码,输出每个元素及其长度并去重。 测试说明 平台会对你编写的代码进行测试: 预期输出: 开始你的任务吧,祝你成功! 第2关:转换算子之flatMap和filter算子 编程要求 根据提示,在右侧编辑器

    2023年04月15日
    浏览(35)
  • spark之action算子学习笔记(scala,pyspark双语言)

    函数签名:def collect(): Array[T] 功能说明:收集每个分区数据,以数组Array的形式封装后发给driver。设置driver内存:bin/spark-submit --driver-memory 10G(内存大小) 注意:collect会把所有分区的数据全部拉取到driver端,如果数据量过大,可能内存溢出。 图1 结果 图2 结果 返回RDD中元素的

    2024年02月04日
    浏览(33)
  • 大数据平台安装实验: ZooKeeper、Kafka、Hadoop、Hbase、Hive、Scala、Spark、Storm

    ​ 在大数据时代,存在很多开源的分布式数据采集、计算、存储技术,本实验将在熟练掌握几种常见Linux命令的基础上搭建几种常用的大数据采集、处理分析技术环境。 相关安装包下载: 链接:https://pan.baidu.com/s/1Wa2U3qstc54IAUCypcApSQ 提取码:lcd8 Hadoop大数据平台所需工具、软件

    2023年04月09日
    浏览(80)
  • spark底层为什么选择使用scala语言开发

    基于Scala的语言特性 集成性:Scala 是一种运行在 Java 虚拟机(JVM)上的静态类型编程语言,可以与 Java 代码无缝集成。由于 Spark 涉及到与大量 Java 生态系统的交互,例如 Hadoop、Hive 等,使用 Scala 可以方便地与这些组件进行集成和交互。 函数式编程支持:Scala 是一种面向函数

    2024年02月10日
    浏览(40)
  • 简单使用Spark、Scala完成对天气数据的指标统计

    目录 一、前言   什么是Spark?   什么是Scala 二、数据准备(数据类型的转换) 三、Spark部分 1、使用Spark完成数据中的“风级”,“风向”、“天气情况”相关指标统计及筛选 四、Scala部分 1、使用Scala统计某月、全年的温差、平均气温以及最值等相关的指标 五、遇到的问题

    2024年02月03日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包