RDD编程初级实践

这篇具有很好参考价值的文章主要介绍了RDD编程初级实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

参考链接

spark入门实战系列--8MLlib spark 实战_mob6454cc68310b的技术博客_51CTO博客https://blog.51cto.com/u_16099212/7454034

Spark和Hadoop的安装-CSDN博客https://blog.csdn.net/weixin_64066303/article/details/138021948?spm=1001.2014.3001.5501

1. spark-shell交互式编程

启动spark-shell

cd /usr/local/spark/
./bin/spark-shell

1.1 该系总共有多少学生

注:我将下载的chapter5-data1.txt文件放在“/home/hadoop/下载”目录下。

val lines = sc.textFile("file:///home/hadoop/下载/chapter5-data1.txt")  #读取文件
lines.map(row=>row.split(",")(0)).distinct().count  #每一行作为一个字符串,用’,’分割,取第一个元素,distinct去重,count统计有多少数据项

RDD编程初级实践,服务器,运维,spark,scala,RDD

1.2 该系共开设来多少门课程

lines.map(row=>row.split(",")(1)).distinct().count   #去第二个元素,去重,统计元素数量

RDD编程初级实践,服务器,运维,spark,scala,RDD

1.3 Tom同学的总成绩平均分是多少

lines.filter(row=>row.split(",")(0)=="Tom")    #以','作为分隔符,用filter进行过滤,筛选出第一项是“Tom”的数据项
    .map(row=>(row.split(",")(0),row.split(",")(2).toInt))    #把第一项和第三项(姓名+成绩)合在一起构成一个数据项
    .mapValues(x=>(x,1))    #去除value,把x变成(x,1),第一项是原始数据,第二项是数字1
    .reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))    #针对想对的Key(也就是姓名),来进行运行,运算规则是(x.1+y._1),表示求和,也就是对(x,1)分别进行求和
    .mapValues(x=>(x._1/x._2)).collect()  #求平均值运算,x._1是原始数据的求和,x._2是1的求和,表示数据项的个数

读取的是字符串,所以需要转Int .

RDD编程初级实践,服务器,运维,spark,scala,RDD

RDD编程初级实践,服务器,运维,spark,scala,RDD

1.4 求每名同学的选修的课程门数

lines.map(row=>(row.split(",")(0),1)).reduceByKey((x,y)=>x+y).collect 

首先是将数据变成(姓名,1)的map,然后针对相同key(姓名)的数据进行求和,也就是统计数据项的个数。 

RDD编程初级实践,服务器,运维,spark,scala,RDDRDD编程初级实践,服务器,运维,spark,scala,RDD

1.5 该系DataBase课程共有多少人选修

lines.filter(row=>row.split(",")(1)=="DataBase").count #直接是筛选第二项(课程)是DataBase的数据,然后进行统计个数

RDD编程初级实践,服务器,运维,spark,scala,RDD

1.6 各门课程的平均分是多少

lines.map(row=>(row.split(",")(1),row.split(",")(2).toInt)).mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect()

 求平均分的部分和前面是保持一致的,区别就是筛选构成map的时候前面是根据“Tom”来划分,现在是根据第二项的课程来进行划分。

RDD编程初级实践,服务器,运维,spark,scala,RDD

1.7 使用累加器计算共有多少人选了DataBase这门课

val acc=sc.longAccumulator("My Accumulator")    #定义一个累加器
# #筛选第二项是DataBase的数据项,构成一个(DataBase,1)的map,用foreach,对values值来进行累加
lines.filter(row=>row.split(",")(1)=="DataBase").map(row=>(row.split(",")(1),1)).values.foreach(x=>acc.add(x))
#输出累加值
acc.value

RDD编程初级实践,服务器,运维,spark,scala,RDD

2. 编写独立应用程序实现数据去重

2.1创建相关项目

sudo mkdir -p /example/sparkapp4/src/main/scala
cd /example/sparkapp4/src/main/scala
sudo touch A.txt
sudo vim A.txt
sudo touch B.txt
sudo vim B.txt

RDD编程初级实践,服务器,运维,spark,scala,RDD

sudo vim SimpleApp.scala
import java.io.FileWriter
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf


object SimpleApp {
  def main(args: Array[String]): Unit = {
    //配置
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    //读取文件A.txt
    val A = sc.textFile("file:///example/sparkapp4/src/main/scala/A.txt")
    //读取文件B.txt
    val B = sc.textFile("file:///example/sparkapp4/src//main/scala/B.txt")
    //对两个文件进行合并
    val C = A ++ B
    //1.用distinct进行去重
    //2.以空格来进行分割
    //3.根据key排序
    val distinct_lines = C.distinct().map(row => (row.split("    ")(0), row.split("    ")(1))).sortByKey()
    //将RDD类型的数据转换为数组
    val result = distinct_lines.collect()
    //将结果输出到C.txt中
    val out = new FileWriter("/example/sparkapp4/src/main/scala/C.txt", true)
    for (item <- result) {
      out.write(item + "\n")
      println(item)
    }
    out.close()
  }
}

 2.2创建.sbt文件

cd /example/sparkapp4
sudo touch build.sbt
sudo vim build.sbt
name := "Simple Project"
version := "1.0"
scalaVersion := "2.13.13"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"

 2.3打包执行

 出现Exception in thread "main" java.io.FileNotFoundException:/example/sparkapp4/src/main/scala/C.txt (权限不够)

切换到root用户:su root

他这个空格我还是粘贴的,如果代码只有一个空格分割他的结果第二个数据是空的。

sudo /usr/local/sbt/sbt package
su root
spark-submit --class "SimpleApp" ./target/scala-2.13/simple-project_2.13-1.0.jar

RDD编程初级实践,服务器,运维,spark,scala,RDD

RDD编程初级实践,服务器,运维,spark,scala,RDD

RDD编程初级实践,服务器,运维,spark,scala,RDD

RDD编程初级实践,服务器,运维,spark,scala,RDD

3. 编写独立应用程序实现求平均值问题

3.1创建相关文件

sudo mkdir -p /example/sparkapp5/src/main/scala
cd /example/sparkapp5/src/main/scala
sudo vim Algorithm.txt
sudo vim Database.txt
sudo vim Python.txt

RDD编程初级实践,服务器,运维,spark,scala,RDD

vim ./src/main/scala/SimpleApp.scala
import java.io.FileWriter
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]): Unit = {
    //配置
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    //读取文件Algorithm.txt
    val Algorithm = sc.textFile("file:///example/sparkapp5/src//main/scala/Algorithm.txt")
    //读取文件Database.txt
    val Database = sc.textFile("file:///example/sparkapp5/src//main/scala/Database.txt")
    //读取文件Python.txt
    val Python = sc.textFile("file:///example/sparkapp5/src//main/scala/Python.txt")
    //对三个文件进行整合
    val scoreSum = Algorithm ++ Database ++ Python
    //以空格切割将名字作为key,(成绩,1)作为value
    val student_grade = scoreSum.map(row => (row.split(" ")(0), (row.split(" ")(1).toInt, 1)))
    //求平均分数
    val student_ave = student_grade.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).map(x => (x._1, 1.0 * x._2._1 / x._2._2))
    //将RDD类型的数据转化为数组
    val result = student_ave.collect()
    val out = new FileWriter("/example/sparkapp5/src/main/scala/average.txt", true)
    for (item <- result) {
      out.write(item + "\n")
      println(item)
    }
    out.close()
  }
}

2.2创建.sbt文件

如上同

2.3打包执行

如上同

题目要求要保留两位小数,我找的那个没有保留小数,我目前写的这个小数后面不止两位。

写入文件采用的是追加的方式。

RDD编程初级实践,服务器,运维,spark,scala,RDD

RDD编程初级实践,服务器,运维,spark,scala,RDD

 补:

还是解决了,先写简单的程序调试,然后直接替换。

RDD编程初级实践,服务器,运维,spark,scala,RDD

 刚开始想的不对,直接用的是Array,结果不出意外报错了。

object Test {
  def main(args: Array[String]): Unit = {
    var a = Array("feng", 12.355353)
    println(a)
    println(a(0))
    println(a(1))
    println(a(1).formatted("%.2f"))
    printf("%s %.2f\n", a(0), a(1))
  }
}

 因为需要格式化输出的是一个Map,不是Array,所以需要修改代码。

[Ljava.lang.Object;@43a25848
feng
12.355353
12.36
feng 12.36
object Test {
  def main(args: Array[String]): Unit = {
    var map = Map[String, Double]("feng" -> 12.442424, "xi" -> 13.35262, "ze" -> 23.151425)
    for (elem <- map) {
      println(elem)
    }
    for ((key, value) <- map) {
      val roundedValue = BigDecimal(value).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble
      println(s"($key,$roundedValue)")
    }
  }
}
(feng,12.442424)
(xi,13.35262)
(ze,23.151425)
(feng,12.44)
(xi,13.35)
(ze,23.15)

 之后就是直接替换原始的代码就行了。

import java.io.FileWriter
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]): Unit = {
    //配置
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    //读取文件Algorithm.txt
    val Algorithm = sc.textFile("file:///example/sparkapp5/src//main/scala/Algorithm.txt")
    //读取文件Database.txt
    val Database = sc.textFile("file:///example/sparkapp5/src//main/scala/Database.txt")
    //读取文件Python.txt
    val Python = sc.textFile("file:///example/sparkapp5/src//main/scala/Python.txt")
    //对三个文件进行整合
    val scoreSum = Algorithm ++ Database ++ Python
    //以空格切割将名字作为key,(成绩,1)作为value
    val student_grade = scoreSum.map(row => (row.split(" ")(0), (row.split(" ")(1).toInt, 1)))
    //求平均分数
    val student_ave = student_grade.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).map(x => (x._1, 1.0 * x._2._1 / x._2._2))
    //将RDD类型的数据转化为数组
    val result = student_ave.collect()
    val out = new FileWriter("/example/sparkapp5/src/main/scala/average.txt", true)
    /* for (item <- result) {
       out.write(item + "\n")
       println(item)
     }*/
    for ((key, value) <- result) {
      val roundedValue = BigDecimal(value).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble
      out.write(s"($key,$roundedValue)\n")
      println(s"($key,$roundedValue)")
    }
    out.close()
  }
}

RDD编程初级实践,服务器,运维,spark,scala,RDD

RDD编程初级实践,服务器,运维,spark,scala,RDD文章来源地址https://www.toymoban.com/news/detail-859630.html

到了这里,关于RDD编程初级实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 实验7 Spark初级编程实践

    一、实验目的 掌握使用 Spark 访问本地文件和 HDFS 文件的方法 掌握 Spark 应用程序的编写、编译和运行方法 二、实验平台 操作系统:Ubuntu18.04(或 Ubuntu16.04) Spark 版本:2.4.0 Hadoop 版本:3.1.3 三、实验内容和要求 1. 安装 Hadoop 和 Spark 进人 Linux 操作系统,完成 Hadoop 伪分布式模

    2024年02月06日
    浏览(39)
  • 实验四 Spark Streaming编程初级实践

    数据流  :数据流通常被视为一个随时间延续而无限增长的动态数据集合,是一组顺序、大量、快速、连续到达的数据序列。通过对流数据处理,可以进行卫星云图监测、股市走向分析、网络攻击判断、传感器实时信号分析。 1.下载安装包 https://www.apache.org/dyn/closer.lua/flume/

    2024年04月26日
    浏览(46)
  • 大数据实验 实验六:Spark初级编程实践

    实验环境:Windows 10 Oracle VM VirtualBox 虚拟机:cnetos 7 Hadoop 3.3 因为Hadoop版本为3.3所以在官网选择支持3.3的spark安装包 解压安装包到指定文件夹 配置spark-env.sh 启动成功 (1) 在spark-shell中读取Linux系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数; (2) 在spark-shell中读

    2024年02月04日
    浏览(77)
  • 大数据技术原理及应用课实验7 :Spark初级编程实践

    实验7  Spark初级编程实践 一、实验目的 1. 掌握使用Spark访问本地文件和HDFS文件的方法 2. 掌握Spark应用程序的编写、编译和运行方法 二、实验平台 1. 操作系统:Ubuntu18.04(或Ubuntu16.04); 2. Spark版本:2.4.0; 3. Hadoop版本:3.1.3。 三、实验步骤(每个步骤下均需有运行截图) 实

    2024年01月22日
    浏览(51)
  • Spark 【RDD编程(一)RDD编程基础】

            在Spark中,RDD是弹性分布式数据集(Resilient Distributed Dataset)的缩写。通俗来讲,RDD是一种抽象的数据结构,用于表示分布式计算中的数据集合。它是Spark中最基本的数据模型,可以看作是一个不可变的、可分区、可并行处理的数据集合。这个数据集的全部或部分可

    2024年02月09日
    浏览(54)
  • Spark【RDD编程(三)键值对RDD】

            键值对 RDD 就是每个RDD的元素都是 (key,value)类型的键值对,是一种常见的 RDD,可以应用于很多场景。                 因为毕竟通过我们之前Hadoop的学习中,我们就可以看到对数据的处理,基本都是以键值对的形式进行统一批处理的,因为MapReduce模型中

    2024年02月09日
    浏览(49)
  • Spark【RDD编程(四)综合案例】

    输入数据:   处理代码: 代码解析:  运行结果: 要求:输入三个文件(每行一个数字),要求输出一个文件,文件内文本格式为(序号 数值)。         我们会发现,如果我们不调用 foreach 这个行动操作而是直接在转换操作中进行输出的话,这样是输出不来结果的,

    2024年02月09日
    浏览(38)
  • Spark RDD编程 文件数据读写

    从本地文件系统读取数据,可以采用textFile()方法,可以为textFile()方法提供一个本地文件或目录地址,如果是一个文件地址,它会加载该文件,如果是一个目录地址,它会加载该目录下的所有文件的数据。 示例:读取一个本地文件word.txt val textFile中的textFile是变量名称,sc.t

    2024年02月05日
    浏览(41)
  • Spark RDD编程基本操作

    RDD是Spark的核心概念,它是一个只读的、可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,可在多次计算间重用。Spark用Scala语言实现了RDD的API,程序员可以通过调用API实现对RDD的各种操作,从而实现各种复杂的应用。 Spark采用textFile()方法来从文件系统中加

    2024年02月06日
    浏览(82)
  • Spark避坑系列二(Spark Core-RDD编程)

    大家想了解更多大数据相关内容请移驾我的课堂: 大数据相关课程 剖析及实践企业级大数据 数据架构规划设计 大厂架构师知识梳理:剖析及实践数据建模 PySpark避坑系列第二篇,该篇章主要介绍spark的编程核心RDD,RDD的概念,基础操作 RDD(Resilient Distributed Dataset)叫做弹性

    2024年02月02日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包