Spark 【RDD编程(一)RDD编程基础】

这篇具有很好参考价值的文章主要介绍了Spark 【RDD编程(一)RDD编程基础】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

RDD

简介

        在Spark中,RDD是弹性分布式数据集(Resilient Distributed Dataset)的缩写。通俗来讲,RDD是一种抽象的数据结构,用于表示分布式计算中的数据集合。它是Spark中最基本的数据模型,可以看作是一个不可变的、可分区、可并行处理的数据集合。这个数据集的全部或部分可以缓存在内存中,可在多次计算中重用。
        RDD是由一系列的记录(或元素)组成的,这些记录可以分散存储在集群的多个节点上,每个节点上的数据可以被并行处理。RDD提供了一系列的操作函数,例如map、reduce、filter等,可以对数据进行转换和计算。RDD的特点是具有容错性和弹性,即使在节点故障的情况下,也能自动恢复数据和计算过程。 

RDD编程基础

1、RDD 创建

Spark 通过 textFile() 从文件系统(本地系统、HDFS、集合)中加载数据来创建RDD。

1.1、从文件系统中加载数据创建 RDD

import org.apache.spark.{SparkConf, SparkContext}

object CreateRddByFileScala {

  def main(args: Array[String]): Unit = {
    //创建SparkContext对象
    val conf = new SparkConf()
    conf.setAppName("CreateRddByFileScala")
      .setMaster("local")

    val sc = new SparkContext(conf)

//windows
    val path = "D:\\test\\data"

//linux
//    val path = "file:///usr/local/test/data/"

    //读取文件数据,可以在textFile中生成的RDD分区数量
    val rdd = sc.textFile(path,2)
    //获取每一行数据的长度,计算文件内数据的总长度
    val length = rdd.map(_.length).reduce(_+_)

    println(length)

    //关闭SparkContext
    sc.stop()
  }

}

1.2、从HDFS中加载数据

只需要修改路径如下:

    val path = "hadoop101:9000/test/"
    //读取文件数据,可以在textFile中生成的RDD分区数量
    val rdd = sc.textFile(path,2)

1.3、通过并行集合(数组)创建RDD

调用 SparkContext 的 parallelize() 方法,通过一个已经存在的集合(数组)来创建RDD。

//创建SparkContext
   val conf = new SparkConf()

   conf.setAppName("CreateRddByArrayScala")
     .setMaster("local")  //local表示在本地执行
    
val sc = new SparkContext(conf)
    //创建集合
    val arr = Array(1,2,3,4,5)
    //基于集合创建RDD
    val rdd = sc.parallelize(arr)

2、RDD 操作

        RDD 的操作包括两种类型:转换操作和行动操作。其中,转换操作主要有map()、filter()、groupBy()、join()等,对RDD而言,每次转换都会产生一个新的RDD,供下一次操作使用。而行动操作(如count()、collect()等)返回的一般都是一个值。

2.1、转换操作

        RDD 的真个转换过程是采用惰性机制的,也就是说,整个转换过程只记录了转换的轨迹,并不会真正的运算,只有遇到行动操作才会触发从头到尾的真正计算。

1、filter(f: String => Boolean)

用法和Scala中的filter一致。

输入文档:

Hadoop is good
Spark is better
Spark is fast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RDDAction {
  def main(args: Array[String]): Unit = {
    // 创建 SparkContext 对象
    val conf = new SparkConf()
    conf.setAppName("filter-test").setMaster("local")
    val sc = new SparkContext(conf)

    // 通过加载数据创建RDD对象
    val rdd: RDD[String] = sc.textFile("data/word.txt")
    //filter 的参数是一个匿名函数 要求返回一个Boolean 类型的值 true-留下 false-过滤
    val lineWithSpark: RDD[String] = rdd.filter(line => {
      line.contains("Spark")
    })

    lineWithSpark.foreach(println)

    // 关闭sc对象
    sc.stop()
  }
}

 运行结果:
 

Spark is better
Spark is fast
2、map()

同样和Scala中的map()用法一致。

//省略创建AparkContext对象的代码...

// 使用并行集合创建 RDD
    val arr = Array(1,2,3,4,5)
    val rdd1: RDD[Int] = sc.parallelize(arr)

    //转换操作
    val rdd2 = rdd1.map(num => num*2)

    rdd2.foreach(println)

运行结果:

2
4
6
8
10
//使用本地文件作为数据加载创建RDD 对象
    val rdd1: RDD[String] = sc.textFile("data/word.txt")
    val rdd2: RDD[Array[String]] = rdd1.map(line => {
      line.split(" ")
    })

解析:

输入:

Hadoop is good 
Spark is better 
Spark is fast 

Spark 读取进来后,就变成了 RDD("Hadoop is good","Spark is better","Spark is fast"),我们知道,Scala中要进行扁平化操作的话,对象必须是一个多维数组,所以我们要通过 map() 对读取进来的格式进行处理,处理后的格式:RDD(Array("Hadoop is good"),Array("Spark is better"),Array("Spark is fast"))

RDD("Hadoop is good","Spark is better","Spark is fast") => RDD(Array("Hadoop is good"),Array("Spark is better"),Array("Spark is fast"))
3、flatMap()

和Scala中用法基本一样。

//使用本地文件作为数据加载创建RDD 对象
    val rdd1: RDD[String] = sc.textFile("data/word.txt")

    val rdd2: RDD[String] = rdd1.flatMap(line => line.split(" "))

flatMap 的过程:

RDD("Hadoop is good","Spark is better","Spark is fast")
先进行 map() => 
RDD(Array("Hadoop is good"),Array("Spark is better"),Array("Spark is fast"))
在进行 flatten =>
RDD("Hadoop","is",good","Spark","is","better","Spark","is","fast"))

扁平化后我们的数据又变为了一维集合的数据结构(RDD)了。

4、groupByKey()

        这个函数十分重要,上面我们得到了关于每次单词的一个RDD集合,现在我们要进行wordcount 的话肯定还需要对相同的键进行一个分类,这样会生成一个RDD集合(key:String,valut_list:Interable[Int])。

我们同样基于上面的结果进行操作:

val rdd3: RDD[(String, Int)] = rdd2.map(word => {
      (word, 1)
    })
    //RDD(("Hadoop",1),("is",1),("good",1),("Spark",1),("is",1),("better",1),("Spark",1),("is",1),("fast",1)))

    val rdd4: RDD[(String, Iterable[Int])] = rdd3.groupByKey()
    //RDD(("Hadoop",1),("is",1,1,1),("good",1),("Spark",1,1),("better",1),("fast",1)))
5、reduceByKey()

需要注意的是,reduceByKey是对(key:String,value:Int)这种相同键值对元素的合并,而不是对上面groupByKey()的结果(key:String,value_list:Interable[Int])进行操作,这个粗心让我找了半天。

//rdd5和6效果都一样
    val rdd5: RDD[(String,Int)] = rdd4.map(t => {
      (t._1, t._2.size)
    })
    //RDD(("Hadoop",1),("is",3),("good",1),("Spark",2),("better",1),("fast",1)))

//    rdd3.reduceByKey((v1,v2)=>v1+v2)  //v1 v2代表发现key相同的键值对的值 参数按照顺序在函数体中只出现了一次 那么可以用下划线代替
    val rdd6: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)
    //RDD(("Hadoop",1),("is",3),("good",1),("Spark",2),("better",1),("fast",1)))

 //打印结果
 rdd6.foreach(println)

运行结果:
 

(Spark,2)
(is,3)
(fast,1)
(good,1)
(better,1)
(Hadoop,1)

Process finished with exit code 0

总结

剩下的RDD转换操作下午再新开一篇,以及RDD的行动操作篇、持久化、分区和综合实例后续更新。文章来源地址https://www.toymoban.com/news/detail-706346.html

到了这里,关于Spark 【RDD编程(一)RDD编程基础】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark弹性分布式数据集

    1. Spark RDD是什么 RDD(Resilient Distributed Dataset,弹性分布式数据集)是一个不可变的分布式对象集合,是Spark中最基本的数据抽象。在代码中RDD是一个抽象类,代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。 每个RDD都被分为多个分区,这些分区运行在集群中

    2024年02月13日
    浏览(60)
  • 大数据学习06-Spark分布式集群部署

    配置好IP vim /etc/sysconfig/network-scripts/ifcfg-ens33 修改主机名 vi /etc/hostname 做好IP映射 vim /etc/hosts 关闭防火墙 systemctl status firewalld systemctl stop firewalld systemctl disable firewalld 配置SSH免密登录 ssh-keygen -t rsa 下载Scala安装包 配置环境变量 添加如下配置 使环境生效 验证 Spark官网 解压 上

    2024年02月10日
    浏览(70)
  • Spark 【RDD编程(一)RDD编程基础】

            在Spark中,RDD是弹性分布式数据集(Resilient Distributed Dataset)的缩写。通俗来讲,RDD是一种抽象的数据结构,用于表示分布式计算中的数据集合。它是Spark中最基本的数据模型,可以看作是一个不可变的、可分区、可并行处理的数据集合。这个数据集的全部或部分可

    2024年02月09日
    浏览(55)
  • 大数据开源框架环境搭建(七)——Spark完全分布式集群的安装部署

    前言:七八九用于Spark的编程实验 大数据开源框架之基于Spark的气象数据处理与分析_木子一个Lee的博客-CSDN博客_spark舆情分析 目录 实验环境: 实验步骤: 一、解压 二、配置环境变量:  三、修改配置文件  1.修改spark-env.sh配置文件: 2.修改配置文件slaves: 3.分发配置文件:

    2024年02月11日
    浏览(54)
  • 云计算与大数据第16章 分布式内存计算平台Spark习题

    1、Spark是Hadoop生态(  B  )组件的替代方案。 A. Hadoop     B. MapReduce        C. Yarn             D.HDFS 2、以下(  D  )不是Spark的主要组件。 A. Driver      B. SparkContext       C. ClusterManager D. ResourceManager 3、Spark中的Executor是(  A  )。 A.执行器      B.主节

    2024年02月14日
    浏览(119)
  • 分布式计算中的大数据处理:Hadoop与Spark的性能优化

    大数据处理是现代计算机科学的一个重要领域,它涉及到处理海量数据的技术和方法。随着互联网的发展,数据的规模不断增长,传统的计算方法已经无法满足需求。因此,分布式计算技术逐渐成为了主流。 Hadoop和Spark是目前最为流行的分布式计算框架之一,它们都提供了高

    2024年01月23日
    浏览(57)
  • 分布式计算框架:Spark、Dask、Ray 分布式计算哪家强:Spark、Dask、Ray

    目录 什么是分布式计算 分布式计算哪家强:Spark、Dask、Ray 2 选择正确的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式计算是一种计算方法,和集中式计算是相对的。 随着计算技术的发展, 有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成

    2024年02月11日
    浏览(70)
  • 数据存储和分布式计算的实际应用:如何使用Spark和Flink进行数据处理和分析

    作为一名人工智能专家,程序员和软件架构师,我经常涉及到数据处理和分析。在当前大数据和云计算的时代,分布式计算已经成为了一个重要的技术方向。Spark和Flink是当前比较流行的分布式计算框架,它们提供了强大的分布式计算和数据分析功能,为数据处理和分析提供了

    2024年02月16日
    浏览(62)
  • Spark单机伪分布式环境搭建、完全分布式环境搭建、Spark-on-yarn模式搭建

    搭建Spark需要先配置好scala环境。三种Spark环境搭建互不关联,都是从零开始搭建。 如果将文章中的配置文件修改内容复制粘贴的话,所有配置文件添加的内容后面的注释记得删除,可能会报错。保险一点删除最好。 上传安装包解压并重命名 rz上传 如果没有安装rz可以使用命

    2024年02月06日
    浏览(83)
  • spark分布式解压工具

    ​ spark解压缩工具,目前支持tar、gz、zip、bz2、7z压缩格式,默认解压到当前路下,也支持自定义的解压输出路径。另外支持多种提交模式,进行解压任务,可通过自定义配置文件,作为spark任务的资源设定 2.1 使用hadoop的FileSystem类,对tos文件的进行读取、查找、写入等操作

    2024年02月02日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包