Spark中RDD的Transformation算子

这篇具有很好参考价值的文章主要介绍了Spark中RDD的Transformation算子。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RDD的Transformation算子

map

map算子的功能为做映射,即将原来的RDD中对应的每一个元素,应用外部传入的函数进行运算,返回一个新的RDD

val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
val rdd2: RDD[Int] = rdd1.map(_ * 2)

Spark中RDD的Transformation算子

flatMap

flatMap算子的功能为扁平化映射,即将原来RDD中对应的每一个元素应用外部的运算逻辑进行运算,然后再将返回的数据进行压平,类似先map,然后再flatten的操作,最后返回一个新的RDD

val arr = Array(
  "spark hive flink",
  "hive hive flink",
  "hive spark flink",
  "hive spark flink"
)
val rdd1: RDD[String] = sc.makeRDD(arr, 2)
val rdd2: RDD[String] = rdd1.flatMap(_.split(" "))

Spark中RDD的Transformation算子

filter

filter的功能为过滤,即将原来RDD中对应的每一个元素,应用外部传入的过滤逻辑,然后返回一个新的的RDD

val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
val rdd2: RDD[Int] = rdd1.filter(_ % 2 == 0)

Spark中RDD的Transformation算子

mapPartitions

将数据以分区为的形式返回进行map操作,一个分区对应一个迭代器,该方法和map方法类似,只不过该方法的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器,如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的过。

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5), 2)
var r1: RDD[Int] = rdd1.mapPartitions(it => it.map(x => x * 10))

map和mapPartitions的区别,mapPartitions一定会比map效率更高吗?
不一定:如果对RDD中的数据进行简单的映射操作,例如变大写,对数据进行简单的运算,map和mapPartitions的效果是一样的,但是如果是使用到了外部共享的对象或数据库连接,mapPartitions效率会更高一些。
原因:map出入的函数是一条一条的进行处理,如果使用数据库连接,会每来一条数据创建一个连接,导致性能过低,而mapPartitions传入的函数参数是迭代器,是以分区为单位进行操作,可以事先创建好一个连接,反复使用,操作一个分区中的多条数据。
特别提醒:如果使用mapPartitions方法不当,即将迭代器中的数据toList,就是将数据都放到内存中,可能会出现内存溢出的情况。

mapPartitionsWithIndex

类似于mapPartitions, 不过函数要输入两个参数,第一个参数为分区的索引,第二个是对应分区的迭代器。函数的返回的是一个经过该函数转换的迭代器。

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
val rdd2 = rdd1.mapPartitionsWithIndex((index, it) => {
  it.map(e => s"partition: $index, val: $e")
})

keys

RDD中的数据为对偶元组类型,调用keys方法后返回一个新的的RDD,该RDD的对应的数据为原来对偶元组的全部key,该方法有隐式转换

val lst = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通过并行化的方式创建RDD,分区数量为4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val keyRDD: RDD[String] = wordAndOne.keys

values

RDD中的数据为对偶元组类型,调用values方法后返回一个新的的RDD,该RDD的对应的数据为原来对偶元组的全部values

val lst = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通过并行化的方式创建RDD,分区数量为4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val valueRDD: RDD[Int] = wordAndOne.values

mapValues

RDD中的数据为对偶元组类型,将value应用传入的函数进行运算后再与key组合成元组返回一个新的RDD

val lst = List(("spark", 5), ("hive", 3), ("hbase", 4), ("flink", 8))
val rdd1: RDD[(String, Int)] = sc.parallelize(lst, 2)
//将每一个元素的次数乘以10再可跟key组合在一起
//val rdd2 = rdd1.map(t => (t._1, t._2 * 10))
val rdd2 = rdd1.mapValues(_ * 10)

flatMapValues

RDD中的数据为对偶元组类型,将value应用传入的函数进行flatMap打平后再与key组合成元组返回一个新的RDD

val lst = List(("spark", "1,2,3"), ("hive", "4,5"), ("hbase", "6"), ("flink", "7,8"))
val rdd1: RDD[(String, String)] = sc.parallelize(lst, 2)
//将value打平,再将打平后的每一个元素与key组合("spark", "1,2,3") =>("spark",1),("spark",2),("spark",3)
val rdd2: RDD[(String, Int)] = rdd1.flatMapValues(_.split(",").map(_.toInt))
//    val rdd2 = rdd1.flatMap(t => {
//      t._2.split(",").map(e => (t._1, e.toInt))
//    })

uion

将两个类型一样的RDD合并到一起,返回一个新的RDD,新的RDD的分区数量是原来两个RDD的分区数量之和

//两个RDD进行union,对应的数据类型必须一样
//Union不会去重
val rdd1 = sc.parallelize(List(1,2,3,4), 2)
val rdd2 = sc.parallelize(List(5, 6, 7, 8, 9,10), 3)
val rdd3 = rdd1.union(rdd2)
println(rdd3.partitions.length)

Spark中RDD的Transformation算子

reduceByKey

将数据按照相同的key进行聚合,特点是先在每个分区中进行局部分组聚合,然后将每个分区聚合的结果从上游拉取到下游再进行全局分组聚合

val lst = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通过并行化的方式创建RDD,分区数量为4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

Spark中RDD的Transformation算子

combineByKey

val lst = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通过并行化的方式创建RDD,分区数量为4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
//调用combineByKey传入三个函数
//val reduced = wordAndOne.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
val f1 = (x: Int) => {
  val stage = TaskContext.get().stageId()
  val partition = TaskContext.getPartitionId()
  println(s"f1 function invoked in state: $stage, partition: $partition")
  x
}
//在每个分区内,将key相同的value进行局部聚合操作
val f2 = (a: Int, b: Int) => {
  val stage = TaskContext.get().stageId()
  val partition = TaskContext.getPartitionId()
  println(s"f2 function invoked in state: $stage, partition: $partition")
  a + b
}
//第三个函数是在下游完成的
val f3 = (m: Int, n: Int) => {
  val stage = TaskContext.get().stageId()
  val partition = TaskContext.getPartitionId()
  println(s"f3 function invoked in state: $stage, partition: $partition")
  m + n
}
val reduced = wordAndOne.combineByKey(f1, f2, f3)

combineByKey要传入三个函数:
第一个函数:在上游执行,该key在当前分区第一次出现时,对value处理的运算逻辑
第二个函数:在上游执行,当该key在当前分区再次出现时,将以前相同key的value进行运算的逻辑
第三个函数:在下游执行,将来自不同分区,相同key的数据通过网络拉取过来,然后进行全局聚合的逻辑

groupByKey

按照key进行分组,底层使用的是ShuffledRDD,mapSideCombine = false,传入的三个函数只有前两个被调用了,并且是在下游执行的

 val lst = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通过并行化的方式创建RDD,分区数量为4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
//按照key进行分组
val grouped: RDD[(String, Iterable[Int])] = wordAndOne.groupByKey()

Spark中RDD的Transformation算子

foldByKey

与reduceByKey类似,只不过是可以指定初始值,每个分区应用一次初始值,先在每个进行局部聚合,然后再全局聚合,局部聚合的逻辑与全局聚合的逻辑相同。

 val lst: Seq[(String, Int)] = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通过并行化的方式创建RDD,分区数量为4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)

//与reduceByKey类似,只不过是可以指定初始值,每个分区应用一次初始值
val reduced: RDD[(String, Int)] = wordAndOne.foldByKey(0)(_ + _)

aggregateByKey

与reduceByKey类似,并且可以指定初始值,每个分区应用一次初始值,传入两个函数,分别是局部聚合的计算逻辑、全局聚合的逻辑。

val lst: Seq[(String, Int)] = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通过并行化的方式创建RDD,分区数量为4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
//在第一个括号中传入初始化,第二个括号中传入两个函数,分别是局部聚合的逻辑和全局聚合的逻辑
val reduced: RDD[(String, Int)] = wordAndOne.aggregateByKey(0)(_ + _, _ + _)

ShuffledRDD

reduceByKey、combineByKey、aggregateByKey、foldByKey底层都是使用的ShuffledRDD,并且mapSideCombine = true

val f1 = (x: Int) => {
  val stage = TaskContext.get().stageId()
  val partition = TaskContext.getPartitionId()
  println(s"f1 function invoked in state: $stage, partition: $partition")
  x
}
//在每个分区内,将key相同的value进行局部聚合操作
val f2 = (a: Int, b: Int) => {
  val stage = TaskContext.get().stageId()
  val partition = TaskContext.getPartitionId()
  println(s"f2 function invoked in state: $stage, partition: $partition")
  a + b
}
//第三个函数是在下游完成的
val f3 = (m: Int, n: Int) => {
  val stage = TaskContext.get().stageId()
  val partition = TaskContext.getPartitionId()
  println(s"f3 function invoked in state: $stage, partition: $partition")
  m + n
}
//指定分区器为HashPartitioner
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
val shuffledRDD = new ShuffledRDD[String, Int, Int](wordAndOne, partitioner)
//设置聚合亲器并关联三个函数
val aggregator = new Aggregator[String, Int, Int](f1, f2, f3)
shuffledRDD.setAggregator(aggregator) //设置聚合器
shuffledRDD.setMapSideCombine(true) //设置map端聚合

如果设置了setMapSideCombine(true),那么聚合器中的三个函数都会执行,前两个在上游执行,第三个在下游执行
如果设置了setMapSideCombine(false),那么聚合器中的三个函数只会执行前两个,并且这两个函数都是在下游执行

distinct

distinct是对RDD中的元素进行取重,底层使用的是reduceByKey实现的,先局部去重,然后再全局去重

val arr = Array(
  "spark", "hive", "spark", "flink",
  "spark", "hive", "hive", "flink",
  "flink", "flink", "flink", "spark"
)
val rdd1: RDD[String] = sc.parallelize(arr, 3)
//去重
val rdd2: RDD[String] = rdd1.distinct()
distinct的底层实现如下:
Scala
val rdd11: RDD[(String, Null)] = rdd1.map((_, null))
val rdd12: RDD[String] = rdd11.reduceByKey((a, _) => a).keys

partitionBy

按照指的的分区器进行分区,底层使用的是ShuffledRDD

val lst: Seq[(String, Int)] = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通过并行化的方式创建RDD,分区数量为4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
//按照指定的分区进行分区
val partitioned: RDD[(String, Int)] = wordAndOne.partitionBy(partitioner)

repartitionAndSortWithinPartitions

按照值的分区器进行分区,并且将数据按照指的的排序规则在分区内排序,底层使用的是ShuffledRDD,设置了指定的分区器和排序规则

val lst: Seq[(String, Int)] = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通过并行化的方式创建RDD,分区数量为4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
//按照指定的分区进行分区,并且将数据按照指定的排序规则在分区内排序
val partitioned = wordAndOne.repartitionAndSortWithinPartitions(partitioner)
repartitionAndSortWithinPartitions的底层实现:
Scala
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)

sortBy

val lines: RDD[String] = sc.textFile("hdfs://node-1.51doit.cn:9000/words")
//切分压平
val words: RDD[String] = lines.flatMap(_.split(" "))
//将单词和1组合
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
//分组聚合
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
//按照单词出现的次数,从高到低进行排序
val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)

sortByKey

按照指的的key排序规则进行全局排序

val lines: RDD[String] = sc.textFile("hdfs://node-1.51doit.cn:9000/words")
//切分压平
val words: RDD[String] = lines.flatMap(_.split(" "))
//将单词和1组合
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
//分组聚合
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
//按照单词出现的次数,从高到低进行排序
//val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)
//val keyed: RDD[(Int, (String, Int))] = reduced.keyBy(_._2).sortByKey()
val sorted = reduced.map(t => (t._2, t)).sortByKey(false)

sortBy、sortByKey是Transformation,但是为什么会生成job?
因为sortBy、sortByKey需要实现全局排序,使用的是RangePartitioner,在构建RangePartitioner时,会对数据进行采样,所有会触发Action,根据采样的结果来构建RangePartitioner。
RangePartitioner可以保证数据按照一定的范围全局有序,同时在shuffle的同时,有设置了setKeyOrdering,这样就又可以保证数据在每个分区内有序了!

reparation

reparation的功能是重新分区,一定会shuffle,即将数据打散。reparation的功能是改变分区数量(可以增大、减少、不变)可以将数据相对均匀的重新分区,可以改善数据倾斜的问题

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
//repartition方法一定shuffle
//不论将分区数量变多、变少、或不变,都shuffle
val rdd2 = rdd1.repartition(3)

Spark中RDD的Transformation算子
reparation的底层调用的是coalesce,shuffle = true

coalesce(numPartitions, shuffle = true)

coalesce

coalesce可以shuffle,也可以不shuffle,如果将分区数量减少,并且shuffle = false,就是将分区进行合并

  • shuffle = true
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
//shuffle = true
val rdd2 = rdd1.coalesce(3, true)
//与repartition(3)功能一样
  • shuffle = false
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 4)
//shuffle = false
val rdd2 = rdd1.coalesce(2, false)

Spark中RDD的Transformation算子

cogroup

协同分组,即将多个RDD中对应的数据,使用相同的分区器(HashPartitioner),将来自多个RDD中的key相同的数据通过网络传入到同一台机器的同一个分区中(与groupByKey、groupBy区别是,groupByKey、groupBy只能对一个RDD进行分组)
注意:调用cogroup方法,两个RDD中对应的数据都必须是对偶元组类型,并且key类型一定相同

//通过并行化的方式创建一个RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2), ("jerry", 4)), 3)
//通过并行化的方式再创建一个RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
//将两个RDD都进行分组
val grouped: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)

Spark中RDD的Transformation算子

join

两个RDD进行join,相当于SQL中的内关联join
两个RDD为什么要进行jion?想要的数据来自于两个数据集,并且两个数据集的数据存在相同的条件,必须关联起来才能得到想要的全部数据

//通过并行化的方式创建一个RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通过并行化的方式再创建一个RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Int, Double))] = rdd1.join(rdd2)

Spark中RDD的Transformation算子

leftOuterJoin

左外连接,相当于SQL中的左外关联

 //通过并行化的方式创建一个RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通过并行化的方式再创建一个RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)

Spark中RDD的Transformation算子

rightOuterJoin

右外连接,相当于SQL中的右外关联

//通过并行化的方式创建一个RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通过并行化的方式再创建一个RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd2)

Spark中RDD的Transformation算子

fullOuterJoin

全连接,相当于SQL中的全关联

 //通过并行化的方式创建一个RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通过并行化的方式再创建一个RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Option[Int], Option[Int]))] = rdd1.fullOuterJoin(rdd2)

Spark中RDD的Transformation算子

intersection

求交集,底层使用的是cogroup实现的

val rdd1 = sc.parallelize(List(1,2,3,4,4,6), 2)
val rdd2 = sc.parallelize(List(3,4,5,6,7,8), 2)
//求交集
val rdd3: RDD[Int] = rdd1.intersection(rdd2)

//使用cogroup实现intersection的功能
val rdd11 = rdd1.map((_, null))
val rdd22 = rdd2.map((_, null))
val rdd33: RDD[(Int, (Iterable[Null], Iterable[Null]))] = rdd11.cogroup(rdd22)
val rdd44: RDD[Int] = rdd33.filter { case (_, (it1, it2)) => it1.nonEmpty && it2.nonEmpty }.keys

subtract

求两个RDD的差集,将第一个RDD中的数据,如果在第二个RDD中出现了,就从第一个RDD中移除

val rdd1 = sc.parallelize(List("A", "B", "C", "D", "E"))
val rdd2 = sc.parallelize(List("A", "B"))

val rdd3: RDD[String] = rdd1.subtract(rdd2)
//返回 C D E

cartesian

笛卡尔积文章来源地址https://www.toymoban.com/news/detail-513956.html

val rdd1 = sc.parallelize(List("tom", "jerry"), 2)
val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"), 3)
val rdd3 = rdd1.cartesian(rdd2)

到了这里,关于Spark中RDD的Transformation算子的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 2023_Spark_实验十:RDD基础算子操作

    Ø练习 1: Ø 练习 2: Ø 练习 3: Ø 练习 4: Ø 练习 5: groupByKey groupByKey会将RDD[key,value]按照相同的key进行分组,形成RDD[key,iterable[value]]的形式,有点类似于sql中的groupby,例如类似于mysql中的group_contact cogroup groupByKey是对单个RDD的数据进行分组,还可以使用一个叫作cogroup()的函

    2024年02月08日
    浏览(44)
  • 2023_Spark_实验十一:RDD高级算子操作

    coalesce : 总所周知,spark的rdd编程中有两个算子repartition和coalesce。公开的资料上定义为,两者都是对spark分区数进行调整的算子。         repartition会经过shuffle,其实际上就是调用的coalesce(shuffle=true)。         coalesce,默认shuffle=false,不会经过shuffle。         当

    2024年02月08日
    浏览(36)
  • Spark大数据处理学习笔记(3.2.2)掌握RDD算子

    衔接上文:http://t.csdn.cn/Z0Cfj 功能: reduce()算子按照传入的函数进行归约计算 案例: 计算1 + 2 + 3 + …+100的值 计算1 × 2 × 3 × 4 × 5 × 6 的值(阶乘 - 累乘) 计算1 2 + 2 2 + 3 2 + 4 2 + 5**2的值(先映射,后归约) 功能: collect()算子向Driver以数组形式返回数据集的所有元素。通常对

    2024年02月08日
    浏览(48)
  • Flink之转换算子Transformation

    转换算子(Transformation)是ApacheFlink中用于对数据流进行处理和转换的操作。在Flink中,数据流被抽象为一个有向无环图(DAG),转换算子可以将数据流的每个元素进行操作,并生成新的数据流。 因此,Flink中的转换算子是指对输入数据流进行转换操作的一类算子,它是将一个

    2024年02月07日
    浏览(45)
  • 大数据学习之Flink算子、了解(Transformation)转换算子(基础篇三)

    目录 Transformation转换算子(基础篇三) 三、转换算子(Transformation) 1.基本转换算子 1.1 映射(Map) 1.2 过滤(filter) 1.3 扁平映射(flatmap) 1.4基本转换算子的例子 2.聚合算子(Aggregation) 2.1 按键分区(keyBy) 2.2 简单聚合 2.3 归约聚合(reduce) 3.用户自定义函数(UDF) 3.1 函

    2024年02月20日
    浏览(41)
  • Halcon 3D-Transformation 相关算子(一)

    (1) hom_mat3d_identity( : : : HomMat3DIdentity) 功能:生成三维齐次变换矩阵。 控制输出参数:HomMat3DIdentity:变换矩阵。 (2) create_pose( : : TransX, TransY, TransZ, RotX, RotY, RotZ, OrderOfTransform, OrderOfRotation, ViewOfTransform : Pose) 功能:创建一个3D位姿。 控制输入参数1:(TransX, TransY, TransZ):分别表示

    2024年01月20日
    浏览(42)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(2)转换算子(Transformation)【基本转换算子、聚合算子】

    数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream。 map 是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个 “一 一映射”,消费一个元素就产出一个元素 。 我们只

    2024年01月23日
    浏览(49)
  • RDD算子操作(基本算子和常见算子)

    目录 一、基本算子         1.map算子         2.flatMap算子         3.filter算子          4.foreach算子         5.saveAsTextFile算子         6.redueceByKey算子 二、常用Transformation算子                 1.mapValues算子         2.groupBy算子         3.distinct算子        

    2024年02月08日
    浏览(39)
  • flink重温笔记(三):Flink 流批一体 API 开发——Transformation 重要算子操作

    前言:今天是学习 flink 第三天啦,学习了高级 api 开发中11 中重要算子,查找了好多资料理解其中的原理,以及敲了好几个小时代码抓紧理解原理。 Tips:虽然学习进度有点慢,希望自己继续努力,不断猜想 api 原理,通过敲代码不断印证自己的想法,转码大数据之路一定会越

    2024年02月19日
    浏览(44)
  • 【Flink-1.17-教程】-【四】Flink DataStream API(5)转换算子(Transformation)【分流】

    所谓 “分流” ,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个 DataStream ,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。 其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用 .filter() 方法进行筛选

    2024年01月24日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包