1. 创建DataFrame
本文使用DataFrame通过读取json文件获取数据,代码如下:文章来源:https://www.toymoban.com/news/detail-823679.html
文章来源地址https://www.toymoban.com/news/detail-823679.html
from pyspark.sql import SparkSession
spark = SparkSeesion.builder.getOrCreate() #创建sparkSession
peopleDF = spark.read.format("json").load("people.json")
"""
spark支持读取多种文件格式:text/json/parquet
path:文件路径 type:格式类型 返回DataFrame类型
spark.read.text(path)
spark.read.json(path)
spark.read.parquet(path)
spark.read.format("text").load(path)
spark.read.format("json").load(path)
spark.read.format("parquet").load(path)
此外spark读取mysql等数据
spark.read.format("jdbc")\
.option("driver","com.mysql.jdbc.Driver")\
.option("url","jdbc:mysql://localhost:3306/spark")\
.option("dbtable","people")\
.option("user","root")\
.option("password","root")\
.load()
"""
"""
相对于读取文件,保存文件类似于读取文件,如下:
peopleDF.write.text(path)
peopleDF.write.json(path)
peopleDF.write.parquet(path)
peopleDF.write.format("text").save(path)
peopleDF.write.format("json").save(path)
peopleDF.write.format("parquet").save(path)
此外spark写入mysql数据
PROP = {}
PROP['driver'] = 'com.mysql.jdbc.Driver'
PROP['user'] = 'root'
PROP['password'] = 'root'
peopleDF.write.jdbc(url="jdbc:mysql://localhost:3306/spark",table="people",mode="append",properties=PROP)
mode主要有两种模式:append:元数据保存,插入新数据
overwrite:删除原有数据,插入新数据
"""
2. DataFrame常见操作
2.1 printSchema
#printSchema:打印出DataFrame的模式
peopleDF.printSchema()
输出如下:
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
2.2 show
#show:显示dataframe
"""
show(n) #控制输出行数
show(truncate=false) #不展示所有字段,如果truncate=true,则展示全部字段
show(n,truncate=true) #控制输出行数并不隐藏字段 truncate默认true
"""
peopleDF.show()
输出如下:
+---+----+
|age|name|
+---+----+
| 12| tom|
| 22|jack|
| 33| Bob|
+---+----+
2.3 select
#select:从DataFrame中选取部分列的数据
peopleDF.select(['name','age']).show()
peopleDF.select(peopleDF['name'],peopleDF['age']).show()
输出如下:
+----+---+
|name|age|
+----+---+
| tom| 12|
|jack| 22|
| Bob| 33|
+----+---+
2.4 groupBy
#groupBy:对记录进行分组,一般后续和sum()/count()函数
peopleDF.groupBy('name').count().show()
+----+-----+
|name|count|
+----+-----+
|jack| 1|
| Bob| 1|
| tom| 1|
+----+-----+
2.5 filter
#filter:按条件对数据进行筛选
peopleDF.filter(peopleDF['age']>20).show()
输出如下:
+---+----+
|age|name|
+---+----+
| 22|jack|
| 33| Bob|
+---+----+
2.6 sort
#sort:对数据进行排序
peopleDF.sort(peopleDF['name'].asc(),peopleDF['age'].desc()).show()
#按照name升序,如果name排序一致,按照age降序
输出如下:
+---+----+
|age|name|
+---+----+
| 33| Bob|
| 22|jack|
| 12| tom|
+---+----+
2.7 replace
#replace:替换内容
peopleDF.replace("Bob","Jane",subset=['name']).show()
"""
将name字段为Bob改为Jane,如果不设置subset,则针对于整个DataFrame进行修改,注意此处不是substr()相等,而是需要字段等于Bob才进行replace
"""
输出如下:
+---+----+
|age|name|
+---+----+
| 12| tom|
| 22|jack|
| 33|Jane|
+---+----+
2.8 alias
#alias:设置别用名
peopleDF.select(peopleDF['name'].alias('username')).show() #将name修改为username
输出如下:
+--------+
|username|
+--------+
| tom|
| jack|
| Bob|
+--------+
2.9 withColumn
#withColumn:新增数据列
peopleDF_new = peopleDF.withColumn("test",peopleDF['age']/2)
peopleDF_new .show()
输出如下:
+---+----+-----------------+
|age|name| test|
+---+----+-----------------+
| 12| tom| 4.0|
| 22|jack|7.333333333333333|
| 33| Bob| 11.0|
+---+----+-----------------+
2.10 foreach
#foreach:函数对每行数据计算
peopleDF.foreach(print)
输出如下:
Row(age=12, name='tom')
Row(age=22, name='jack')
Row(age=33, name='Bob')
到了这里,关于pyspark学习_dataframe常用操作_01的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!