每日一句正能量
人生很长,不必慌张。你未长大,我要担当。
第3章 Spark RDD弹性分布式数据集
章节概要
传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。Spark中的RDD可以很好的解决这一缺点。
RDD是Spark提供的最重要的抽象概念,我们可以将RDD理解为一个分布式存储在集群中的大型数据集合,不同RDD之间可以通过转换操作形成依赖关系实现管道化,从而避免了中间结果的I/O操作,提高数据处理的速度和性能。接下来,本章将针对RDD进行详细讲解。
3.3 RDD的处理过程
Spark用Scala语言实现了RDD的API,程序开发者可以通过调用API对RDD进行操作处理。RDD经过一系列的“转换”操作,每一次转换都会产生不同的RDD,以供给下一次“转换”操作使用,直到最后一个RDD经过“行动”操作才会被真正计算处理,并输出到外部数据源中,若是中间的数据结果需要复用,则可以进行缓存处理,将数据缓存到内存中。
Spark用Scala语言实现了RDD的API,程序开发者可以通过调用API对RDD进行操作处理。下面,通过一张图来描述RDD的处理过程。
RDD经过一系列的"转换”操作,每一次转换都会产生不同的RDD,以供给下一次转换”操作使用,直到最后一个RDD经过“行动”操作才会被真正计算处理。
需要注意的是,RDD采用了惰性调用,即在RDD的处理过程中,真正的计算发生在RDD的"行动”操作,对于"行动"之前的所有"转换"操作,Spark只是记录下“转换”操作应用的一些基础数据集以及RDD相互之间的依赖关系,而不会触发真正的计算处理。
3.3.1 转换算子
RDD处理过程中的“转换”操作主要用于根据已有RDD创建新的RDD,每一次通过Transformation算子计算后都会返回一个新RDD,供给下一个转换算子使用。下面,通过一张表来列举一些常用转换算子操作的API,具体如下。
下面,我们通过结合具体的示例对这些转换算子API进行详细讲解。
- filter(func)
filter(func)操作会筛选出满足函数func的元素,并返回一个新的数据集。假设,有一个文件test.txt(内容如前面所示),下面,通过一张图来描述如何通过filter算子操作,筛选出包含单词“spark”的元素。
通过从test.txt文件中加载数据的方式创建RDD,然后通过map操作将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[6] at textFile at <console>:24
scala> val linesWithSpark=lines.filter(line=>line.contains("spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at filter at <console>:25
具体步骤如下:
1.进入到hadoop01,进入/export/data目录,命令如下cd /export/data
2.修改test.txt文件的内容与源数据保持一致(vi test.txt
)。
hadoop spark
itcast heima
scala spark
spark itcast
iscast hadoop
3.进入到spark shell(参考之前的启动)。
cd export/servers/spark/
bin /spark-shell --master local[2]
4.加载文件并产生RDD,代码如下。
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[6] at textFile at <console>:24
scala> val linesWithSpark=lines.filter(line=>line.contains("spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at filter at <console>:25
结果如下图所示
- map(func)
map(func)操作将每个元素传递到函数func中,并将结果返回为一个新的数据集。假设,有一个文件test.txt,接下来,通过一张图来描述如何通过map算子操作把文件内容拆分成一个个的单词并封装在数组对象中,具体过程如下图所示。
通过从test.txt文件中加载数据的方式创建RDD,然后通过map操作将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[9] at textFile at <console>:24
scala> var words=lines.map(line=>line.split(" "))
words: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[10] at map at <console>:25
结果如下所示:
- flatMap(func)
flatMap(func)与map(func)相似,但是每个输入的元素都可以映射到0或者多个输出的结果。有一个文件test.txt,接下来,通过一张图来描述如何通过flatMap算子操作,把文件内容拆分成一个个的单词。具体过程如下图所示。
通过从test.txt文件中加载数据的方式创建RDD,然后通过flatMap操作将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[12] at textFile at <console>:24
scala> val words=lines.flatMap(line=>line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:25
结果如下所示:
- groupByKey()
groupByKey()主要用于(Key,Value)键值对的数据集,将具有相同Key的Value进行分组,会返回一个新的(Key ,lterable)形式的数据集。同样以文件test.txt为例,接下来,通过一张图来描述如何通过groupByKey算子操作,将文件内容中的所有单词进行分组。具体过程如下图所示。
通过groupByKey操作把(Key,Value))键值对类型的RDD,按单词将单词出现的次数进行分组,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[15] at textFile at <console>:24
scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[17] at map at <console>:25
scala> val groupWords=words.groupByKey()
groupWords: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[18] at groupByKey at <console>:25
结果如下所示:
- reduceByKey(func)
reduceByKey()主要用于(Key,Value)键值对的数据集,返回的是一个新的(Key,Iterable)形式的数据集,该数据集是每个Key传递给函数func进行聚合运算后得到的结果。同样以文件test.txt,接下来,通过一张图来描述如何通过reduceByKey算子操作统计单词出现的次数。具体过程如下图所示。
通过groupByKey操作把(Key,Value)键值对类型的RDD,按单词将单词出现的次数进行分组,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:
scala> val lines=sc.textFile("file:///export/data/test.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///export/data/test.txt MapPartitionsRDD[20] at textFile at <console>:24
scala> val words=lines.flatMap(line=>line.split(" ")).map(word=>(word,1))
words: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:25
scala> var reduceWords=words.reduceByKey((a,b)=>a+b)
reduceWords: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:25
结果如下所示:
3.3.2 行动算子
行动算子主要是将在数据集上运行计算后的数值返回到驱动程序,从而触发真正的计算。下面,通过一张表来列举一些常用行动算子操作的API,具体如下。
下面,结合具体的示例对这些行动算子API进行详细讲解。
- count ()
count()主要用于返回数据集中的元素个数。假设,现有一个arrRdd,如果要统计arrRdd元素的个数,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.count()
res0: Long = 5
- first()
first()主要用于返回教组的第一个元素。现有一个arrRdd,如果要获取arrRdd中第一个元素,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.first()
res1: Int = 1
从上述结果可以看出,当执行arrRdd.first()操作后返回的结果是1,说明成功获取到了第1个元素。
- take()
take()主要用于以数组的形式返回数组集中的前n个元素。现有一个arrRdd,如果要获取arrRdd中的前三个元素,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.take(3)
res2: Array[Int] = Array(1, 2, 3)
从上述代码可以看出,执行arrRdd.take(3)操作后返回的结果是Array(1,2,3),说明成功获取到了RDD数据集的前3个元素。
- reduce(func)
reduce()主要用于通过函数func(输入两个参数并返回一个值)聚合数据集中的元素。现有一个arrRdd,如果要对arrRdd中的元素进行聚合,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.reduce((a,b)=>a+b)
res3: Int = 15
- collect()
collect()主要用于以数组的形式返回数据集中的所有元素。现有一个rdd,如果希望rdd中的元素以数组的形式输出,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.collect()
res4: Array[Int] = Array(1, 2, 3, 4, 5)
- foreach(func)
foreach()主要用于将数据集中的每个元素传递到函数func中运行。现有一个arrRdd,如果希望遍历输出arrRdd中的元素,示例代码如下:
scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> arrRdd.foreach(x=>println(x))
1
2
3
4
5
3.3.3 编写WordCount词频统计案例
在Linux本地系统的/export/data目录下,有一个test.txt文件,文件里有多行文本,每行文本都是由2个单词构成,且单词之间都是用空格分隔。现在,我们需要通过RDD统计每个单词出现的次数(即词频),具体操作过程如下。
具体参见书本内容文章来源:https://www.toymoban.com/news/detail-835565.html
转载自:https://blog.csdn.net/u014727709/article/details/136032993
欢迎 👍点赞✍评论⭐收藏,欢迎指正文章来源地址https://www.toymoban.com/news/detail-835565.html
到了这里,关于Spark大数据分析与实战笔记(第三章 Spark RDD 弹性分布式数据集-02)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!