大数据编程实验:RDD编程

这篇具有很好参考价值的文章主要介绍了大数据编程实验:RDD编程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、目的与要求

1、熟悉Spark的RDD基本操作及键值对操作;

2、熟悉使用RDD编程解决实际具体问题的方法。

二、实验内容

1给定数据集 data1.txt,包含了某大学计算机系的成绩,数据格式如下所示:

Tom,DataBase,80

Tom,Algorithm,50

Tom,DataStructure,60

Jim,DataBase,90

Jim,Algorithm,60

Jim,DataStructure,80

……

请根据给定的实验数据,在pyspark中通过编程来计算以下内容:

(1)该系总共有多少学生;

先获取每行的姓名字段,再将其用字典统计汇总,最后统计出几个键值对即为学生数量

 >>> lines=sc.textFile("file:///home/deeszechyi/data1.txt")

>>> lines.foreach(print)

rdd编程实践,大数据,spark

>>> namecount=lines.map(lambda line:(line.split(",")[0],1))

>>> namecount.foreach(print)

rdd编程实践,大数据,spark

>>> namecount=namecount.reduceByKey(lambda x,y:(x+y))

>>> namecount.foreach(print)

rdd编程实践,大数据,spark

>>> namecount.count()

rdd编程实践,大数据,spark

(2)该系共开设了多少门课程;

可以考虑先使用map函数映射获取课程字段,再用字典统计,方法与第一小问类似

>>> coursecount=lines.map(lambda x:x.split(",")[1])

>>> coursecount.foreach(print)

rdd编程实践,大数据,spark

>>> coursecount=coursecount.map(lambda x:(x,1))

>>> coursecount.foreach(print)

>>> coursecount=coursecount.reduceByKey(lambda x,y:x+y)

>>> coursecount.count()

rdd编程实践,大数据,spark

(3)Tom同学的总成绩平均分是多少;

本题可以考虑使用filter方法过滤姓名字段为Tom的记录,再映射其课程分数

>>> filtered_rdd = score.filter(lambda x: x[0] == "Tom").map(lambda x: int(x[1]))

>>> tom_sum=filtered_rdd.reduce(lambda a,b:a+b)

>>> print(tom_sum)

rdd编程实践,大数据,spark

>>> tom_ave=tom_sum/filtered_rdd.count()

>>> print(tom_ave)

rdd编程实践,大数据,spark

(4)求每名同学的选修的课程门数;

该题可直接映射获取姓名字段,并用字典统计每个姓名出现次数,该次数即代表该同学所选修的课程数。

>>>stu=lines.map(lambda x:x.split(“,”)[0])

>>>stu.foreach(print)

rdd编程实践,大数据,spark

>>>stu=stu.map(lambda x:(x,1)).reduceByKey(lambda a,b:(a+b))

>>>stu.foreach(print)

rdd编程实践,大数据,spark

(5)该系DataBase课程共有多少人选修;

本题对数据集直接映射过滤课程为DataBase的课程

>>> course=lines.map(lambda x:x.split(",")[1])

>>> course=course.filter(lambda x:x=="DataBase")

>>> course.count()

rdd编程实践,大数据,spark

(6)各门课程的平均分是多少;

针对问题(6),考虑使用嵌套形式的数据结构来存储,从该数据集中映射出课程名称和分数,对课程出现次数用字典进行统计:(课程名称, (分数, 1))使用reduceByKey方法将分数和方法加,得到新的数据:(课程名称,(总分数,总人数))

 >>> cave=lines.map(lambda x:(x.split(",")[1],(x.split(",")[2],1)))

>>> cave.foreach(print)

rdd编程实践,大数据,spark

>>> cave=cave.reduceByKey(lambda x,y:(int(x[0])+int(y[0]),x[1]+y[1]))

>>> cave.foreach(print)

rdd编程实践,大数据,spark

>>> cave=cave.map(lambda x:(x[0],x[1][0]/x[1][1]))

>>> cave.foreach(print)

rdd编程实践,大数据,spark

7)使用累加器计算共有多少人选了DataBase这门课。

本题使用map方法映射课程字段并用字典对其进行统计,统计结果使用filter过滤即可

 >>>course=lines.map(lambda x:x.split(“,”)[1])

>>>course=course.map(lambda x:(x,1))

>>>course=course.reduceByKey(lambda x,y:(x+y))

>>>course.foreach(print)

>>>DB=course.filter(lambda x:x[0]==’DataBase’).map(lambda x:x[1])

>>>DB.foreach(print)

rdd编程实践,大数据,spark

2.编写独立应用程序实现数据去重

于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。

输入文件A的样例如下:

20170101    x

20170102    y

20170103    x

20170104    y

20170105    z

20170106    z

输入文件B的样例如下:

20170101    y

20170102    y

20170103    x

20170104    z

20170105    y

根据输入的文件A和B合并得到的输出文件C的样例如下:

20170101    x

20170101    y

20170102    y

20170103    x

20170104    y

20170104    z

20170105    y

20170105    z

20170106    z

将数据写入文件A和B

rdd编程实践,大数据,spark

创建unique.py文件,代码如下:

from pyspark import SparkContext, SparkConf

conf = SparkConf().setMaster("local").setAppName("Sparkunique")
sc = SparkContext(conf=conf)

linesA = sc.textFile("file:///home/deeszechyi/A.txt")
linesB = sc.textFile("file:///home/deeszechyi/B.txt")

lines = linesA.union(linesB)
lines = lines.distinct()
lines = lines.sortBy(lambda x: x)

lines.repartition(1).saveAsTextFile("file:///home/deeszechyi/C.txt")

该段代码读取A和B文件,将两个文件内容合并去重并按照第一个字段排序,保存到C.txt中

rdd编程实践,大数据,spark

rdd编程实践,大数据,spark

3.编写独立应用程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。

Algorithm成绩:

明 92

小红 87

小新 82

小丽 90

Database成绩:

小明 95

小红 81

小新 89

小丽 85

Python成绩:

小明 82

小红 83

小新 94

小丽 91

平均成绩如下:

    (小红,83.67)

    (小新,88.33)

    (小明,89.67)

(小丽,88.67)

创建三个文本文件和一个.py文件

rdd编程实践,大数据,spark

代码如下:

from pyspark import SparkContext,SparkConf
conf=SparkConf().setMaster("local").setAppName("avescore")
sc=SparkContext(conf=conf)
linesA=sc.textFile("file:///home/deeszechyi/Algorithm.txt")
linesB=sc.textFile("file:///home/deeszechyi/Database.txt")
linesC=sc.textFile("file:///home/deeszechyi/Python.txt")
lines=linesA.union(linesB).union(linesC)
uniquelines=lines.distinct()
ave=uniquelines.sortBy(lambda x:x).filter(bool)
ave=ave.map(lambda x:x.split())
ave=ave.map(lambda x:(x[0],(int(x[1]),1)))
ave=ave.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
ave=ave.map(lambda x:(x[0],x[1][0]/x[1][1]))
ave.repartition(1).saveAsTextFile("file:///home/deeszechyi/ave.txt")

该段代码构造了一个复合型数据结构:(姓名,(成绩,1)),与第一题第(6)问相同。
rdd编程实践,大数据,spark

rdd编程实践,大数据,spark

4、运行教材P86第四节中的三个综合实例,对每个Python程序要给出适当的注释。

rdd编程实践,大数据,spark

rdd编程实践,大数据,spark

rdd编程实践,大数据,spark文章来源地址https://www.toymoban.com/news/detail-857980.html

到了这里,关于大数据编程实验:RDD编程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark 【RDD编程(一)RDD编程基础】

            在Spark中,RDD是弹性分布式数据集(Resilient Distributed Dataset)的缩写。通俗来讲,RDD是一种抽象的数据结构,用于表示分布式计算中的数据集合。它是Spark中最基本的数据模型,可以看作是一个不可变的、可分区、可并行处理的数据集合。这个数据集的全部或部分可

    2024年02月09日
    浏览(54)
  • Spark【RDD编程(三)键值对RDD】

            键值对 RDD 就是每个RDD的元素都是 (key,value)类型的键值对,是一种常见的 RDD,可以应用于很多场景。                 因为毕竟通过我们之前Hadoop的学习中,我们就可以看到对数据的处理,基本都是以键值对的形式进行统一批处理的,因为MapReduce模型中

    2024年02月09日
    浏览(49)
  • Spark RDD编程基本操作

    RDD是Spark的核心概念,它是一个只读的、可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,可在多次计算间重用。Spark用Scala语言实现了RDD的API,程序员可以通过调用API实现对RDD的各种操作,从而实现各种复杂的应用。 Spark采用textFile()方法来从文件系统中加

    2024年02月06日
    浏览(83)
  • Spark【RDD编程(四)综合案例】

    输入数据:   处理代码: 代码解析:  运行结果: 要求:输入三个文件(每行一个数字),要求输出一个文件,文件内文本格式为(序号 数值)。         我们会发现,如果我们不调用 foreach 这个行动操作而是直接在转换操作中进行输出的话,这样是输出不来结果的,

    2024年02月09日
    浏览(38)
  • spark DStream从不同数据源采集数据(RDD 队列、文件、diy 采集器、kafka)(scala 编程)

    目录 1. RDD队列 2 textFileStream 3 DIY采集器 4 kafka数据源【重点】        a、使用场景:测试        b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理     1. 自定义采集器     2. 什么情况下需要自定采集器呢?          比

    2024年02月07日
    浏览(49)
  • 2023_Spark_实验十:RDD基础算子操作

    Ø练习 1: Ø 练习 2: Ø 练习 3: Ø 练习 4: Ø 练习 5: groupByKey groupByKey会将RDD[key,value]按照相同的key进行分组,形成RDD[key,iterable[value]]的形式,有点类似于sql中的groupby,例如类似于mysql中的group_contact cogroup groupByKey是对单个RDD的数据进行分组,还可以使用一个叫作cogroup()的函

    2024年02月08日
    浏览(43)
  • 2023_Spark_实验十一:RDD高级算子操作

    coalesce : 总所周知,spark的rdd编程中有两个算子repartition和coalesce。公开的资料上定义为,两者都是对spark分区数进行调整的算子。         repartition会经过shuffle,其实际上就是调用的coalesce(shuffle=true)。         coalesce,默认shuffle=false,不会经过shuffle。         当

    2024年02月08日
    浏览(36)
  • Spark避坑系列二(Spark Core-RDD编程)

    大家想了解更多大数据相关内容请移驾我的课堂: 大数据相关课程 剖析及实践企业级大数据 数据架构规划设计 大厂架构师知识梳理:剖析及实践数据建模 PySpark避坑系列第二篇,该篇章主要介绍spark的编程核心RDD,RDD的概念,基础操作 RDD(Resilient Distributed Dataset)叫做弹性

    2024年02月02日
    浏览(37)
  • RDD编程初级实践

    spark入门实战系列--8MLlib spark 实战_mob6454cc68310b的技术博客_51CTO博客 https://blog.51cto.com/u_16099212/7454034 Spark和Hadoop的安装-CSDN博客 https://blog.csdn.net/weixin_64066303/article/details/138021948?spm=1001.2014.3001.5501 启动spark-shell 注:我将下载的chapter5-data1.txt文件放在“/home/hadoop/下载”目录下。 读

    2024年04月27日
    浏览(36)
  • 大数据 - Spark系列《六》- RDD详解

    Spark系列文章: 大数据 - Spark系列《一》- 从Hadoop到Spark:大数据计算引擎的演进-CSDN博客 大数据 - Spark系列《二》- 关于Spark在Idea中的一些常用配置-CSDN博客 大数据 - Spark系列《三》- 加载各种数据源创建RDD-CSDN博客 大数据 - Spark系列《四》- Spark分布式运行原理-CSDN博客 大数据

    2024年02月20日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包