基于Spark的数据清洗与转换

这篇具有很好参考价值的文章主要介绍了基于Spark的数据清洗与转换。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


未经许可,禁止以任何形式转载,若要引用,请标注链接地址
全文共计7326字,阅读大概需要3分钟

一、实验目的

掌握数据整合、数据清洗和数据转换方法。

二、实验内容

1、整合来自不同数据源的数据。
  2、对数据进行清洗。
  3、对数据进行转换。

三、实验原理

数据质量一直是业界普遍存在的问题。不正确或不一致的数据的存在可能会对分析产生误导。90%的时间,数据科学家们并非花时间在建立炫酷的模型上,而是花在数据准备上。做任何分析,最难也最花时间的部分都在数据准备。有一个行业术语叫做“数据工程”,指的是数据的来源和准备。
  数据准备阶段是一个非常重要的阶段,不仅对于算法来说是正确的,而且还可以让我们更好地理解我们的数据,这样我们就可以在实现算法的同时采取正确的方法。

四、实验环境

硬件:x86_64 ubuntu 16.04服务器
  软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3,zeppelin-0.8.1

五、实验步骤

5.1 启动HDFS集群、Spark集群和Zeppelin服务器

在终端窗口下,输入以下命令,分别启动HDFS集群、Spark集群和Zeppelin服务器:

1.	$ start-dfs.sh
2.	$ cd /opt/spark
3.	$ ./sbin/start-all.sh
4.	$ zeppelin-daemon.sh start

然后使用jps命令查看进程,确保已经正确地启动了HDFS集群、Spark集群和Zeppelin服务器。

5.2 准备实验数据

将本地数据上传至HDFS上。在终端窗口中,分别执行以下命令上传数据:

1.	$ hdfs dfs -mkdir -p /data/dataset
2.	$ hdfs dfs -put /data/dataset/batch/salary.json /data/dataset/
3.	$ hdfs dfs -put /data/dataset/batch/designation.json /data/dataset

执行以下命令,查看数据文件是否已经上传到HDFS中:

1.	$ hdfs dfs -ls /data/dataset/

5.3 数据整合

假设这样的场景:员工数据分散存储在本地的RDDs、JSON文件和SQL数据库中。我们的任务是将这些数据加载到DataFrame中。一旦数据从不同的来源获得,接下来就是将它们全部合并,以便将数据作为一个整体进行清理、格式化,并转换为分析所需的格式。
  1、创建notebook。启动浏览器,访问”http://localhost:9090“, 打开zeppelin notebook首页。点击”Create new note”链接,创建一个新的笔记本,命名为”analy_demo”,如下图所示:
基于Spark的数据清洗与转换

2、构造代表员工信息的DataFrame。在zeppelin中输入如下代码:

1.	// 创建一个RDD并转换为DataFrame
2.	val employeesDF = sc.parallelize(List((1, "陈柯宇", 25), (2, "陶心瑶", 35),(3, "楼一萱", 24), 
3.	                                      (4, "张希", 28), (5, "王心凌", 26), (6, "庄妮", 35), 
4.	                                      (7,"何洁", 38), (8, "成方圆", 32), (9, "孙玉", 29), 
5.	                                      (10, "刘珂矣", 29),(11, "林忆莲", 28), (12, "蓝琪儿", 25), 
6.	                                      (13, "白安",31))).toDF("emp_id","name","age")
7.	employeesDF.show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

3、加载存储有薪资数据的json文件创建DataFrame。在zeppelin中执行如下代码:

1.	val salaryFilePath = "hdfs://localhost:9000/data/dataset/salary.json"
2.	val salaryDF = spark.read.json(salaryFilePath)
3.	salaryDF.show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

4、加载存储有职务数据的json文件创建DataFrame。在zeppelin中执行如下代码:

1.	val designationFilePath = "hdfs://localhost:9000/data/dataset/designation.json"
2.	val designationDF = spark.read.json(designationFilePath)
3.	designationDF.show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

5、数据整合:组合从各种数据源获得的数据。在zeppelin中执行如下代码:

1.	val final_data = employeesDF.join(salaryDF,$"emp_id"===$"e_id").
2.	                             join(designationDF,$"emp_id"===$"id").
3.	                             select("emp_id","name","age","role","salary")
4.	final_data.cache
5.	final_data.show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

5.4 数据清洗

一旦把数据整合到一起,就必须花足够的时间和精力去整理它,然后再分析它。这是一个迭代的过程,因为我们必须验证对数据所采取的操作,并一直持续到我们对数据质量满意为止。
  数据中某些不洁度通常存在于任何数据集中。数据可能有各种各样的问题,这里我们将处理一些常见的情况,比如缺失值、重复值、转换或格式化(从数字中添加或删除数字,将一个列分割成两个,合并两个列到一个)。
  1、缺失值处理。在zeppelin中执行如下代码:

1.	// 可以删除带有缺失值的行
2.	var clean_data = final_data.na.drop()
3.	clean_data.show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

可以看出,带有缺失值的”emp_id”为4的数据已经被删除了。
  也可以使用平均值替换缺失值。在zeppelin中执行如下代码:

1.	// 也可以使用平均值替换缺失值
2.	val mean_salary = final_data.select(floor(avg("salary"))).first()(0).toString.toDouble
3.	clean_data = final_data.na.fill(Map("salary" -> mean_salary))
4.	clean_data.show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

2、异常值处理:删除包含离群值的行,或者均值替代方法。在这里我们识别异常值并用均值替代。在zeppelin中执行如下代码:

1.	// 计算每一行的偏差
2.	val devs = clean_data.select((($"salary" - mean_salary) * ($"salary" - mean_salary)).alias("deviation"))
3.	     
4.	// 计算标准偏差
5.	val stddev = devs.select(sqrt(avg("deviation"))).first().getDouble(0)
6.	     
7.	// 用平均工资替换超过2个标准差范围内的异常值(UDF)
8.	val outlierfunc = udf((value: Long, mean: Double) => {
9.	    if (value > mean+(2*stddev) || value < mean-(2*stddev)) 
10.	        mean 
11.	    else 
12.	        value
13.	  })  
14.	val no_outlier = clean_data.withColumn("updated_salary",outlierfunc(col("salary"),lit(mean_salary)))
15.	     
16.	// 观察修改后的值
17.	no_outlier.filter($"salary" =!= $"updated_salary").show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

可以看出,”emp_id”为13的记录中,薪资列属于异常值,我们用平均工资来代替它。
  3、重复值处理:在数据集中处理重复记录有不同的方法,既可以删除重复的行,也可以基于某列的子集删除重复的行。在zeppelin中执行如下代码:

1.	// 删除重复的行
2.	// val no_outlier_no_duplicates = no_outlier.dropDuplicates()
3.	// no_outlier_no_duplicates.show
4.	     
5.	// 也可基于某列的子集删除重复的行
6.	val test_df = no_outlier.dropDuplicates("role").show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

可以看出,当我们基于”role”列删除重复的行时,每种role值只保留一个。

5.5 数据转换

存在有各种各样的数据转换需求,这里我们将讨论一些基本类型的转换。
  1、将两列合并成一列。在zeppelin中执行如下代码:

1.	// 创建一个udf来连接两个列值
2.	val concatfunc = udf((name: String, age: Integer) => {name + "_" + age})
3.	     
4.	// 应用该udf来创建合并的列
5.	val concat_df = final_data.withColumn("name_age",concatfunc($"name", $"age"))
6.	     
7.	// 显示 
8.	concat_df.show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

可以看出,在上面的转换中,我们将”name”列和”age”列合并成一个”name_age”列。
  2、将字符/数字添加到现有的字符/数字。在zeppelin中执行如下代码:

1.	// 向数据添加常量
2.	val addconst = udf((age:Integer) => {age + 10})
3.	val data_new = concat_df.withColumn("age_incremented",addconst(col("age")))
4.	data_new.show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

当基于一个列中已经存在的值追加新的列时,如果新列与旧列同名,则会覆盖旧列。在zeppelin中执行如下代码:

1.	concat_df.withColumn("age",addconst(col("age"))).show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

3、从现有的字符/数字中删除或替换字符/数字。在zeppelin中执行如下代码:

1.	// 替换一个列中的值
2.	final_data.na.replace("role",Map("合伙人" -> "同事")).show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

注:如果在replace中列名参数是”*”,那么替换应用到所有的列。
  4、更改日期的格式。在zeppelin中执行如下代码:

1.	import org.apache.spark.sql.functions.udf
2.	// 日期转换
3.	// 构造数据集
4.	case class Book (title: String, author: String, pubtime: String)
5.	val books = Seq(Book("浮生六记","[清]沈复","2018/7/1"),
6.	                Book("云边有个小卖部","张嘉佳","2018/07/12"),
7.	                Book("菊与刀","[美]本尼迪克特",null),
8.	                Book("苏菲的世界","乔斯坦·贾德","2017,10 12"),
9.	                Book("罗生门",null,null)
10.	            )
11.	     
12.	val ds1 = sc.parallelize(books).toDS()
13.	     
14.	// 定义udf: 将传入的字符串转换成YYYY-MM-DD格式
15.	def toDateUDF = udf((s: String) => {
16.	    var (year, month, day) = ("","","")
17.	     
18.	    // 格式化日期
19.	    if(s != null) {
20.	        var x = s.split(" |/|,")    // 拆分
21.	        year = "%04d".format(x(0).toInt)
22.	        month = "%02d".format(x(1).toInt)
23.	        day = "%02d".format(x(2).toInt)
24.	     
25.	        year + "-" + month + "-" + day
26.	    } else {
27.	        null
28.	    }
29.	})
30.	     
31.	// 应用udf并将日期字符串转换为标准的形式YYYY-MM-DD
32.	ds1.withColumn("pubtime",toDateUDF(ds1("pubtime"))).show

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

在这里,我们看到的是日期列有许多不同日期格式的数据点的情况。我们需要将所有不同的日期格式标准化成一种格式。要做到这一点,我们首先必须创建一个用户定义的函数(udf),它可以处理不同的格式,并将不同的日期格式转换为一种通用格式。

六、 实验知识测试

七、实验拓展

下面是一个简单的数据清洗的示例。
  1. 构造一个带缺失值的Dataset。在zeppelin中执行如下代码:

1.	// 构造一个带缺失值的Dataset
2.	case class Author (name: String, dynasty: String, dob: String)
3.	
4.	val authors = Seq(Author("曹雪芹","清代","1724年"),
5.	                  Author("施耐庵","元末明初","1296年"),
6.	                  Author("罗贯中","元末明初",null),
7.	                  Author("吴承恩",null,null)
8.	               )
9.	     
10.	val ds1 = sc.parallelize(authors).toDS()
11.	ds1.show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

2. 删除带有缺失值的行。在zeppelin中执行如下代码:

1.	ds1.na.drop().show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

3. 删除带有至少两个缺失值的行。在zeppelin中执行如下代码:

1.	ds1.na.drop(minNonNulls = ds1.columns.length - 1).show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

4. 使用一个给定的字符串填充所有缺失的值。在zeppelin中执行如下代码:

1.	ds1.na.fill("不详").show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

5. 在不同列使用不同的字符串填充缺失值。在zeppelin中执行如下代码:

1.	ds1.na.fill(Map("dynasty"->"--", "dob"->"不详")).show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

6. 删除重复的值。在zeppelin中执行如下代码:

1.	// 删除重复的行
2.	val authors = Seq(Author("曹雪芹","清代","1724年"),
3.	                  Author("曹雪芹","清代","1724年"),
4.	                  Author("施耐庵","元末明初","1296年"),
5.	                  Author("曹雪芹","清朝","1724年"),
6.	                  Author("罗贯中","元末明初",null),
7.	                  Author("吴承恩",null,null)
8.	                )
9.	     
10.	val ds1 = sc.parallelize(authors).toDS()
11.	ds1.show()
12.	     
13.	// 删除重复的行
14.	ds1.dropDuplicates().show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换
基于Spark的数据清洗与转换

7. 基于一个列的子集删除重复。在zeppelin中执行如下代码:

1.	// 基于一个列的子集删除重复
2.	ds1.dropDuplicates("name").show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

8. 基于一个子集删除重复。在zeppelin中执行如下代码:

1.	// 基于一个子集删除重复
2.	ds1.dropDuplicates(Array("dynasty","dob")).show()

同时按下”【Shift + Enter】”键,执行以上代码。可以看到如下的输出内容:
基于Spark的数据清洗与转换

— end —文章来源地址https://www.toymoban.com/news/detail-483927.html

到了这里,关于基于Spark的数据清洗与转换的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【数据预处理】基于Kettle的字符串数据清洗、Kettle的字段清洗、Kettle的使用参照表集成数据

    🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌本专栏

    2024年02月03日
    浏览(56)
  • 【机器学习】数据清洗——基于Pandas库的方法删除重复点

    🎈个人主页:豌豆射手^ 🎉欢迎 👍点赞✍评论⭐收藏 🤗收录专栏:机器学习 🤝希望本文对您有所裨益,如有不足之处,欢迎在评论区提出指正,让我们共同学习、交流进步! 引言 在机器学习领域,高质量的数据是构建强大模型的基石。而数据清洗作为数据预处理的关键

    2024年02月20日
    浏览(41)
  • Spark学习——DataFrame清洗HDFS日志并存入Hive中

    目录 1.开启Hadoop集群和Hive元数据、Hive远程连接 2.配置 3.读取日志文件并清洗 4.单独处理第四列的数据——方法一: 5.单独处理第四列的数据——方法二:  6.单独处理第四列的数据——方法三:  7.数据清洗结果展示 8.存入Hive中 9.DataGrip中的代码 HDFS日志文件内容: 我们要将

    2023年04月12日
    浏览(36)
  • Spark【Spark SQL(二)RDD转换DataFrame、Spark SQL读写数据库 】

    Saprk 提供了两种方法来实现从 RDD 转换得到 DataFrame: 利用反射机制推断 RDD 模式 使用编程方式定义 RDD 模式 下面使用到的数据 people.txt :         在利用反射机制推断 RDD 模式的过程时,需要先定义一个 case 类,因为只有 case 类才能被 Spark 隐式地转换为DataFrame对象。 注意

    2024年02月09日
    浏览(55)
  • 基于Hadoop的豆瓣电影的数据抓取、数据清洗、大数据分析(hdfs、flume、hive、mysql等)、大屏可视化

    项目介绍 有需要整个项目的可以私信博主,提供部署和讲解,对相关案例进行分析和深入剖析 环境点击顶部下载 = 本研究旨在利用Python的网络爬虫技术对豆瓣电影网站进行数据抓取,并通过合理的数据分析和清洗,将非结构化的数据转化为结构化的数据,以便于后续的大数

    2024年02月11日
    浏览(50)
  • 如何在不使用任何软件的情况下将 PDF 转换为 Excel

    通常,您可能会遇到这样的情况:您需要的数据不在 Excel 工作表中,而是以数据表形式出现在 PDF 文件中。为了将此数据放入 Excel 工作表中,如果您尝试将数字复制并粘贴到电子表格中,则列/行将无法正确复制和对齐。因此,如果您想使用该表进行任何类型的分析,则无需

    2024年02月11日
    浏览(43)
  • 数据中台系统是一个重要的数字化转型方式之一,它基于现代的大数据处理技术,通过构建统一的数据仓库,将不同来源、格式的数据进行整合、清洗、融合,并提供给业务人员进行分析挖掘的数据集合

    作者:禅与计算机程序设计艺术 数据中台系统是一个重要的数字化转型方式之一,它基于现代的大数据处理技术,通过构建统一的数据仓库,将不同来源、格式的数据进行整合、清洗、融合,并提供给业务人员进行分析挖掘的数据集合。其目标就是为了实现数字化进程中的各

    2024年02月11日
    浏览(48)
  • 基于Spark的气象数据分析

    研究背景与方案 1.1.研究背景 在大数据时代背景下,各行业数据的规模大幅度增加,数据类别日益复杂,给数据分析工作带来极大挑战。 气象行业和人们 的生活息息相关,随着信息时代的发展,大数据技术的出现为气象数据的发展带来机遇。基于此,本项目使用 Spark 等大

    2024年02月09日
    浏览(55)
  • 数据清洗是什么?如何进行数据清洗?

    数据清洗是数据治理过程中非常重要的一环, 它指的是对数据进行清理、筛选、去重、格式化等操作,以确保数据质量和数据准确性。 。在本文中,我们将围绕数据清洗展开讨论,并介绍一些数据清洗 相关 技术。 一、 数据清洗的概念 数据清洗是指对数据进行处理和加工,

    2024年02月03日
    浏览(44)
  • Spark内容分享(二十七):阿里云基于 Spark 的云原生数据湖分析实践

    目录 Spark 与云原生的结合 1. 传统 Spark 集群的痛点 2. Spark 与云原生结合的优势 Spark on K8s 原理介绍 1. Spark 的集群部署模式 2. Spark on K8s 的部署架构 3. Spark on K8s 部署架构——对比 4. Spark on K8s 社区进展 5. Spark 3.3 新特性介绍 Spark on K8s 在阿里云 EMR 上的实践 1. EMR Spark on ACK 2. 充分

    2024年01月15日
    浏览(94)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包