import org.apache.spark.{SparkConf,SparkContext}
object mapPartitionWithIndex {
def getPartInfo:(Int,Iterator[Int]) => Iterator[String] = (index:Int,iter:Iterator[Int]) =>{
iter.map(x =>"[ PartId " + index +", elems: " + x + " ]")
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RddMapPartitionsWithIndexDemo")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List(1,2,3,4,5,9,6,7,8),numSlices =3 )
val rdd2 = rdd1.mapPartitionsWithIndex(getPartInfo)
rdd2.collect().foreach(println)
}
}
文章来源:https://www.toymoban.com/news/detail-720677.html
scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> rdd1.collect
warning: there was one feature warning; re-run with -feature for details
res24: Array[Int] = Array(1, 2, 3, 4, 5)
scala>
// Entering paste mode (ctrl-D to finish)
def pfun1(p1: Int, p2: Int): Int = {
p1 * p2
}
// Exiting paste mode, now interpreting.
pfun1: (p1: Int, p2: Int)Int
scala>
scala>
// Entering paste mode (ctrl-D to finish)
def pfun2(p3: Int, p4: Int): Int = {
p3 + p4
}
// Exiting paste mode, now interpreting.
pfun2: (p3: Int, p4: Int)Int
scala>
scala> rdd1.aggregate(3)(pfun1, pfun2)
res25: Int = 363
scala>
import org.apache.spark.{SparkConf, SparkContext}
object RddAggregateDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RddDemos")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List("12","23","345",""),numSlices = 2)
//下面代码等价与rdd1.aggregate("")(x+y) => math.min(x.length,y.length)),(x,y)=>x+y))
val result = rdd1.aggregate("")(func1,func2)
println(result)
}
//(x,y)=>math.min(x.Length,y.length)
def func1:(String,String) => String =(x:String,y:String) =>{
println("<x: " + x + ",x.len: " + x.length + ">,<y: " + y +",y.len: " + y.length + ">")
val ret =math.min(x.length,y.length).toString
println("func1 ret:" +ret)
ret
}
//(x+y) => x+y
def func2:(String,String)=>String=(x:String,y:String) =>{
println("========" +(x+y))
x+y
}
}
文章来源地址https://www.toymoban.com/news/detail-720677.html
scala> val pairRdd = sc.parallelize(List((“cat”,2),(“cat”,5),(“mouse”,4),(“cat”,12),(“dog”,12),(“mouse”,2)),2)
(“cat”,2),(“cat”,5),(“mouse”,4)
(“cat”,12),(“dog”,12),(“mouse”,2)
scala > pairRdd.aggregateByKey(100)(math.max(_ , _), _ + _ ).collect
res0: Array[(String,Int)] = Array((dog,100),(cat,200),(mouse,200)
(“cat”,(2,5)),(“mouse”,4)
(“cat”,12),(“dog”,12),(“mouse”,2)
(“cat”,(2,5,100)),(“mouse”,(4,100))
(“cat”,100), (“mouse”,100)
(“cat”,(12,100)),(“dog”,(12.100)),(“mouse”,(2,100))
(“cat”,100),(“dog”,100),(“mouse”,100)
(dog,100),(cat,200),(mouse,200)
import org.apache.spark.{SparkConf, SparkContext}
object RddAggregateByKeyDemo {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setMaster("local").setAppName("RddAggregateByKeyDemo")
val sc = new SparkContext(conf)
val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),numSlices = 2)
val rdd = pairRDD.aggregateByKey(100)(func2,func2)
val resultArray = rdd.collect
resultArray.foreach(println)
sc.stop()
}
def func2(index:Int,iter:Iterator[(String,Int)]):Iterator[String] = {
iter.map(x=>"[partID:" + index + ",val: " +x +"]")
}
//(x,y)=>math.max(x+y)
def func1:(Int,Int) => Int =(x:Int,y:Int) =>{
println("<x: " + x + "," +",y:"+ y + ">")
val ret =math.max(x,y)
println("func1 max:" +ret)
ret
}
//(x+y) => x+y
def func2:(Int,Int)=>Int=(x:Int,y:Int) =>{
println("========func2 x :" + x + ",y:"+y)
println("========func2 ret =====" +(x+y))
x+y
}
}
到了这里,关于2023_Spark_实验十二:Spark高级算子使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!