pyspark基础学习——数据处理

这篇具有很好参考价值的文章主要介绍了pyspark基础学习——数据处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言

上一篇文章中讲了如何在windows下安装和检测: pyspark,同时简单介绍了运行的环境。本文想就我的一些学习经验,分享一下使用pyspark来处理csv文件上的一些常用的pyspark语法。

一、准备工作和数据的导入选择

运行python代码,第一件事当然是导入对应的包,同时我们要为spark先创建好相应的环境,并且,spark中支持SQL,而且在SQL中有众多的函数,因此我们可以创建SparkSession对象,为了后续SQL函数的调用,我们要导入functions包,以及数据类型转换的时候,我们要导入types的包。

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType 

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

1.1 导入数据

将csv文件导入为Dataframe样式:
header表示是否需要导入表头;inferSchema表示是否需要推导出数据的类型(false默认为string);delimiter表示指定分隔符进行读取。file对应文件的位置。

df1 = spark.read.options(header='True', inferSchema='True', delimiter=',').csv(file)

1.2 选择数据子集:

drop中填入不需要的列的列名。

df2 = df1.drop('列名')

1.3 列名重命名

df3=df2.withColumnRenamed("original name", "modified name")

如果有多个列的列名要进行修改,可以直接在后面再加上withColumnRenamed()进行修改

二、数据清洗

因为数据本身的问题,在处理的过程中需要我们对一些空值、异常值等进行处理。但是此次作业获取到的数据中主要是对空值的处理,因此对于异常值的处理不进行讨论

2.1 检测空值数量

df3.toPandas().isnull().sum()

2.2 删除存在空值的行

对于一些关键列的数据丢失、或是该行的缺失值占比较高的情况下,我们很难将人工将其弥补,因此直接对该行进行删除。

df_clear=df3.dropna(subset='列名')

2.3 forward,backward填充

forward: 前面一个值填充后面
backward:后面一个值填充前面

代码示例:

df = spark.createDataFrame([
    (1, 'd1',None),
    (1, 'd2',10),
    (1, 'd3',None),
    (1, 'd4',30),
    (1, 'd5',None),
    (1, 'd6',None),
],('id', 'day','temperature'))
df.show()

运行结果如下:

id day temperature
1 d1 null
1 d2 10
1 d3 null
1 d4 30
1 d5 null
1 d6 null
from pyspark.sql.window import Window

forward = Window.partitionBy('id').orderBy('day').rowsBetween(
    Window.unboundedPreceding, Window.currentRow)
backward = Window.partitionBy('id').orderBy('day').rowsBetween(
    Window.currentRow, Window.unboundedFollowing)
    
df.withColumn('forward_fill', last('temperature', ignorenulls=True).over(forward))\
  .withColumn('backward_fill', first('temperature', ignorenulls=True).over(backward))\
.show()

填充后的结果如下表所示:

id day temperature forward_fill backward_fill
1 d1 null null 10
1 d2 10 10 10
1 d3 null 10 30
1 d4 30 30 30
1 d5 null 30 null
1 d6 null 30 null

Window.unboundedPreceding:分区的开始位置
Window.currentRow:分区计算到现在的位置
Window.unboundedFollowing:分区的最后位置。
负数:表示若前面有元素,范围向前延申几个元素
0:表示当前位置,等价于Window.currentRow
正数:表示若后面有元素,范围向后延申几个元素

三、 数据处理

3.1 数据筛选

data1= df_clear.filter(df_clear['column'] == 'attribute') # 条件过滤
data2 = df_clear.select('column') # 选择某一列的数据

3.2 数据统计

# 输出树状结构(输出列名、数据类型和是否能为空值)
df_clear.printSchema() 
# 将该列数据进行汇总统计
df_clear.select('column').describe().show() 
# 求平均,按照id的方式进行统计
ave_column = df_clear.groupBy('id').agg({'column': 'mean'}) 

agg({“列名”,“函数名”})为聚合函数,其中有:

函数名 作用
avg 求均值
count 计数
max 求最大值
mean 求均值
min 求最小值
sum 求和

3.3 数据类型转换

from pyspark.sql.functions import *
# 转换为Int类型
df_clear.withColumn("column",df.age.cast('int'))

# 转换为String类型
df_clear.withColumn("column",df.age.cast('string'))

# 转换为Data类型
df_clear= df_clear.withColumn('column', to_date(df_clear['column']))

# 转换为TimestampType类型
dfTime=df_clear.withColumn('column',F.col('column').cast(TimestampType()))

3.4 采用SQL语法进行处理

df_sql_cf=df_clear.createOrReplaceTempView("carflow")
spark.sql("select * from carflow\
           where sum_Total_CF=\
          (select max(sum_Total_CF) from carflow)").show()

四、数据导出

# ascending表示是否为升序,默认为True
df_clear_asc= df_clear.orderBy("column",ascending=False)
# 将对应的数据类型转化为list,再导出为csv文件
df_asc= df_clear_asc.select(F.collect_list('column')).first()[0]
df_asc.select("col1","col2","col3").toPandas().to_csv("total.csv")

总结

由于此次学习仅用于完成课堂大作业,因此有不足之处还望各位大佬在评论区制指正,若是能够为你们提供一点小小的帮助,希望各位大佬们能动动手指,给小弟一个赞!感谢各位大佬们!
该作业的处理的源代码和相关数据已经传至github文章来源地址https://www.toymoban.com/news/detail-401028.html

到了这里,关于pyspark基础学习——数据处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • PySpark实战指南:大数据处理与分析的终极指南【上进小菜猪大数据】

    上进小菜猪,沈工大软件工程专业,爱好敲代码,持续输出干货。 大数据处理与分析是当今信息时代的核心任务之一。本文将介绍如何使用PySpark(Python的Spark API)进行大数据处理和分析的实战技术。我们将探讨PySpark的基本概念、数据准备、数据处理和分析的关键步骤,并提

    2024年02月06日
    浏览(47)
  • Python与大数据:Hadoop、Spark和Pyspark的应用和数据处理技巧

      在当今的数字时代,数据成为了无处不在的关键资源。大数据的崛起为企业提供了无限的机遇,同时也带来了前所未有的挑战。为了有效地处理和分析大规模数据集,必须依靠强大的工具和技术。在本文中,我们将探讨Python在大数据领域的应用,重点介绍Hadoop、Spark和Pysp

    2024年02月16日
    浏览(44)
  • 音频数据处理基本知识学习——降噪滤波基础知识

    滤波是一种信号处理方法,它可以通过消除或减弱信号中的某些频率分量,来实现信号的去噪、去除干扰、增强某些频率成分等目的。常见的滤波方法包括低通滤波、高通滤波、带通滤波等。 降噪是一种信号处理方法,它可以通过消除或减弱信号中的噪声成分,来提高信号的

    2024年02月15日
    浏览(52)
  • 机器学习算法基础--逻辑回归简单处理mnist数据集项目

    目录 1.项目背景介绍 2.Mnist数据导入 3.数据标签提取且划分数据集 4.数据特征标准化 5.模型建立与训练 6.后验概率判断及预测 7.处理模型阈值及准确率 8.阈值分析的可视化绘图 9.模型精确性的评价标准

    2024年02月07日
    浏览(49)
  • tableau基础学习2:时间序列数据预处理与绘图

    这一部分,我们记录一些分析时序趋势的分析步骤 原始数据是excel表格,其中包含三个Sheet页, 这里我们选择两家公司的股票,作为时序数据进行对比:恩捷股份与科大讯飞 首先打开下面的【已使用数据解释器清理】,这里可以自动剔除一部分无用行,以保留需要分析的数据

    2024年02月10日
    浏览(45)
  • 机器学习基础 数据集、特征工程、特征预处理、特征选择 7.27

    无量纲化 1.标准化 2.归一化 信息数据化 1.特征二值化 2. Ont-hot编码 3.缺失数据补全 1.方差选择法 2.相关系数法

    2024年02月14日
    浏览(55)
  • Java 学习路线:基础知识、数据类型、条件语句、函数、循环、异常处理、数据结构、面向对象编程、包、文件和 API

    Java 是一种由 Sun Microsystems 于 1995 年首次发布的编程语言和计算平台。Java 是一种通用的、基于类的、面向对象的编程语言,旨在减少实现依赖性。它是一个应用程序开发的计算平台。Java 快速、安全、可靠,因此在笔记本电脑、数据中心、游戏机、科学超级计算机、手机等领

    2024年03月24日
    浏览(91)
  • 【Flink基础】-- 延迟数据的处理

    目录 ​一、关于延迟的一些概念 1、什么是延迟? 2、什么导致互联网延迟?

    2024年02月03日
    浏览(45)
  • 【机器学习6】数据预处理(三)——处理类别数据(有序数据和标称数据)

    在【机器学习4】构建良好的训练数据集——数据预处理(一)处理缺失值及异常值这一篇文章中,主要说明热数据预处理的重要性以及如何处理缺失值及异常值这些数值特征。然而,在现实生活中遇到的数据集往往不仅仅只会包含 数值型特征 ,还会包含一个或者多个 类别特征

    2024年02月12日
    浏览(46)
  • 数据仓库和商业智能:数据处理与分析的基础

    [toc] 引言 1.1. 背景介绍 随着互联网和信息技术的快速发展,数据已经成为企业获取竞争优势的核心资产之一。然而,如何处理、存储和分析这些海量数据成为了摆在企业面前的一个严峻挑战。数据仓库和商业智能(BI)应运而生,成为了实现企业数据分析和决策的重要工具。

    2024年02月17日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包