目录
一、SparkSQL介绍
二、创建DataFrame
1、通过ToDF方法
2、通过createDataFrame方法
3、通过读取文件或数据库
三、保存DataFrame
四、DataFrame API
1、显示数据
2、统计信息
3、类RDD操作
4、类Excel操作
5、类SQL表操作
五、DataFrame+SQL
1、注册视图
2、操作Hive表
六、总结
PySpark系列文章:
(一)PySpark3:安装教程及RDD编程
(二)PySpark3:SparkSQL编程
(三)PySpark3:SparkSQL40题
一、SparkSQL介绍
Spark SQL是Apache Spark生态系统的一个关键组件,专注于处理和分析结构化和半结构化的大规模数据。Spark SQL建立在Spark核心之上,为用户提供了高效且易用的数据处理接口,从而将关系型和非关系型数据融入到分布式计算环境中。
核心概念之一是DataFrame API,它提供了一个高级的、面向数据的抽象,允许用户以声明性的方式处理数据。DataFrame是一个分布式的、具有表格结构的数据集,类似于传统数据库中的表。通过DataFrame API,用户可以执行各种数据操作,包括过滤、转换、聚合、连接等,而无需深入了解底层的分布式计算模型。
RDD,DataFrame和DataSet对比:
RDD可以存储任何类型的数据,包括结构化数据、半结构化数据和非结构化数据,RDD的操作更接近底层,更适合对数据进行底层控制和自定义处理。
DataFrame构建在RDD之上,提供了更高级的抽象,是一个分布式的、以列为主的数据集合,类似于关系型数据库中的表。DataFrame可以通过多种数据源进行创建,包括结构化数据源(如JSON、CSV、Parquet)和Hive表,并且提供了丰富的SQL和DataFrameAPI,可以方便地进行数据处理和分析。
DataSet在DataFrame基础上进一步增加了数据类型信息,可以通过编程语言的类型系统来检查错误,并提供更好的编译时类型检查。
DataFrame和DataSet都支持SQL交互式查询,可以和 Hive无缝衔接。DataSet只有Scala语言和Java语言接口中才支持,在Python和R语言接口只支持DataFrame。
二、创建DataFrame
首先导包:
import pandas as pd
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Row
pd.DataFrame.iteritems = pd.DataFrame.items
1、通过ToDF方法
可以将RDD用toDF方法转换成DataFrame。
rdd = sc.parallelize([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)])
df = rdd.toDF(["id","name","age","sal"])
df.show()
输出结果:
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 1|James| 27|1000|
| 2| Bob| 22| 500|
| 3|Alice| 25| 800|
| 4| Jon| 29|1200|
+---+-----+---+----+
2、通过createDataFrame方法
可以将Pandas.DataFrame转换成pyspark中的DataFrame,也可直接对数据列表、schema进行转换。
#将pandas.DataFrame转换为pyspark.DataFrame
pdf = pd.DataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)]
,columns=["id","name","age","sal"])
df = spark.createDataFrame(pdf)
print(type(df))
#将list转换为pyspark.DataFrame
df = spark.createDataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)]
,["id","name","age","sal"])
print(type(df))
#根据指定rdd和schema创建pyspark.DataFrame
schema = StructType([StructField("id", IntegerType(), nullable = False),
StructField("name", StringType(), nullable = True),
StructField("age", IntegerType(), nullable = True),
StructField("sal", FloatType(), nullable = True),
])
rdd = sc.parallelize([Row(1,"James",27,1000),
Row(2,"Bob",22,500),
Row(3,"Alice",25,800),
Row(4,"Jon",29,1200),
])
df = spark.createDataFrame(rdd, schema)
print(type(df))
输出结果:
<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
3、通过读取文件或数据库
可以通过读取json、csv等文件,或hive、mysql数据表得到DataFrame。
spark.read.csv(...): 从 CSV 文件中读取数据。
spark.read.json(...): 从 JSON 文件中读取数据。
spark.read.parquet(...): 从 Parquet 文件中读取数据。
spark.read.text(...): 读取文本文件。
spark.read.format(...): 使用指定的格式读取数据。
读取json文件:
df = spark.read.json("test.json")
df.show()
输出结果:test.json内容如下:
{"id":1,"name":"James","age":27,"sal":1000}
{"id":2,"name":"Bob","age":22,"sal":500}
{"id":3,"name":"Alice","age":25,"sal":800}
{"id":4,"name":"Jon","age":29,"sal":1200}
读取parquet文件:
df = spark.read.parquet("data/users.parquet")
df.show()
读取csv文件:
#方式1
df = spark.read.option("header","true") \
.option("inferSchema","true") \
.option("delimiter", ",") \
.csv("test.csv")
#方式2
df = spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.option("delimiter", ",") \
.load("test.csv")
#方式3
df = spark.read.csv(path="test.csv",
header=True, #指定将第一行作为列名
inferSchema=True, #自动推断出每列的数据类型
sep=',' #分隔符
)
df.show()
输出结果:
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 1|James| 27|1000|
| 2| Bob| 22| 500|
| 3|Alice| 25| 800|
| 4| Jon| 29|1200|
+---+-----+---+----+
读取Hive数据表:
spark.sql("CREATE TABLE IF NOT EXISTS test (id INT, name STRING, age INT, sal FLOAT) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'data/test.txt' INTO TABLE test")
df = spark.sql("SELECT * FROM test")
三、保存DataFrame
通过df.write()对DataFrame进行保存。
#保存为csv文件
df.write.format("csv").option("header","true").save("data/test.csv")
#保存为json文件
df.write.json("data/test.json")
#保存成parquet文件
df.write.parquet("data/test.parquet")
df.write.partitionBy("age").format("parquet").save("data/test.parquet")
#保存成hive数据表
df.write.bucketBy(2, "name").sortBy("age").saveAsTable("test")
四、DataFrame API
1、显示数据
①df.collect()
用于将DataFrame中的所有行收集到Driver节点上,并以列表的形式返回这些行。
df = spark.createDataFrame([(1,"James",27,1000),
(2,"Bob",22,500),
(3,"Alice",25,800),
(4,"Jon",29,1200)]
,["id","name","age","sal"])
df.collect()
输出结果:
[Row(id=1, name='James', age=27, sal=1000),
Row(id=2, name='Bob', age=22, sal=500),
Row(id=3, name='Alice', age=25, sal=800),
Row(id=4, name='Jon', age=29, sal=1200)]
②df.first()
获取第一行数据。
df.first()
输出结果:
Row(id=1, name='James', age=27, sal=1000)
③df.head(n)
获取前n行数据。
df.head(2)
输出结果:
[Row(id=1, name='James', age=27, sal=1000),
Row(id=2, name='Bob', age=22, sal=500)]
④df.show(n)
与df.head(n)类似,但是df.show(n)是打印成表格。
df.show(2)
输出结果:
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 1|James| 27|1000|
| 2| Bob| 22| 500|
+---+-----+---+----+
only showing top 2 rows
⑤df.printSchema()
用于打印DataFrame的模式schema,定义了各列的名称和类型。
df.printSchema()
输出结果:
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- sal: long (nullable = true)
2、统计信息
①df.describe()
一般与df.show()连用,用于查看DataFrame的数据分布。
df.describe().show()
输出结果:
+-------+------------------+-----+------------------+----------------+
|summary| id| name| age| sal|
+-------+------------------+-----+------------------+----------------+
| count| 4| 4| 4| 4|
| mean| 2.5| null| 25.75| 875.0|
| stddev|1.2909944487358056| null|2.9860788111948193|298.607881119482|
| min| 1|Alice| 22| 500|
| max| 4| Jon| 29| 1200|
+-------+------------------+-----+------------------+----------------+
若只想查看某一列的数据分布,如:df.describe('age').show()。
②df.count()
返回数据总行数。
df.count()
输出结果:
4
③聚合函数
一些常用的聚合函数如下sum()、mean()、min()、max()、avg()
#求最小工资
df.select(F.min(df['sal'])).show()
#输出结果:
+--------+
|min(sal)|
+--------+
| 500|
+--------+
#求最大工资
df.select(F.max(df['sal'])).show()
#输出结果:
+--------+
|max(sal)|
+--------+
| 1200|
+--------+
#求总工资
df.select(F.sum(df['sal'])).show()
#输出结果:
+--------+
|sum(sal)|
+--------+
| 3500|
+--------+
#求平均工资
df.select(F.avg(df['sal'])).show()
#输出结果:
+--------+
|avg(sal)|
+--------+
| 875.0|
+--------+
同时对多列操作:
df.agg({"name":"count","age":"max","sal":"avg"}).show()
#输出:
+-----------+--------+--------+
|count(name)|max(age)|avg(sal)|
+-----------+--------+--------+
| 4| 29| 875.0|
+-----------+--------+--------+
④df.stat.freqItems()
统计值出现的频率。
#统计age、name两列出现频率超过0.25的值
df.stat.freqItems(("age","name"),0.25).show()
输出结果:
+----------------+--------------------+
| age_freqItems| name_freqItems|
+----------------+--------------------+
|[29, 22, 25, 27]|[Bob, Jon, Alice,...|
+----------------+--------------------+
3、类RDD操作
可以把DataFrame当做数据类型为Row的RDD来进行操作。
部分操作需要先转换为RDD才能运行,如map、flatMap等等。
部分操作可以直接在DataFrame上进行,如filter、distinct、sample、cache、intersect等等。
①df.map()
#所有人age+1
rdd = df.rdd.map(lambda x:Row(x[2]+1))
rdd.toDF(["age"]).show()
输出结果:
+---+
|age|
+---+
| 28|
| 23|
| 26|
| 30|
+---+
②df.flatMap()
rdd = df.rdd.flatMap(lambda x:x[1].split('o')).map(lambda x:Row(x))
rdd.toDF(["name"]).show()
输出结果:
+-----+
| name|
+-----+
|James|
| B|
| b|
|Alice|
| J|
| n|
+-----+
③df.filter()
#筛选工资大于800的
df.filter(F.col("sal")>800).show()
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 1|James| 27|1000|
| 4| Jon| 29|1200|
+---+-----+---+----+
#筛选姓名为Bob,以下三种方式输出结果一致
df.filter(F.col("name")=="Bob").show()
df.filter(df["name"]=="Bob").show()
df.filter("name='Bob'").show()
+---+----+---+---+
| id|name|age|sal|
+---+----+---+---+
| 2| Bob| 22|500|
+---+----+---+---+
#筛选姓名以J开头的
df.filter(F.col("name").startswith("J")).show()
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 1|James| 27|1000|
| 4| Jon| 29|1200|
+---+-----+---+----+
#筛选除指定值外的其他数据
broads = sc.broadcast(["James","Bob"])
df.filter(~F.col("name").isin(broads.value)).show()
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 3|Alice| 25| 800|
| 4| Jon| 29|1200|
+---+-----+---+----+
④df.distinct()
#去重
df.distinct().show()
⑤df.cache()
#cache缓存
df.cache()
#释放缓存
df.unpersist()
⑥df.sample()
随机抽样。
#withReplacement=False表示无放回,即抽取不重复的数据
#fraction=0.5表示抽样的比例为50%
#seed为随机种子,用于复现
df_sample = df.sample(withReplacement=False, fraction=0.5, seed=2)
df_sample.show()
输出结果:
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 1|James| 27|1000|
| 3|Alice| 25| 800|
+---+-----+---+----+
⑦df.intersect(df)
取两个DataFrame所有交集的行,返回结果不包含重复行
df.intersect(df_sample).show()
输出结果:
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 3|Alice| 25| 800|
| 1|James| 27|1000|
+---+-----+---+----+
⑧df.exceptAll()
求差集。
#从df中移除与df_sample相同的行,返回一个新的DataFrame
df.exceptAll(df_sample).show()
输出结果:
+---+----+---+----+
| id|name|age| sal|
+---+----+---+----+
| 2| Bob| 22| 500|
| 4| Jon| 29|1200|
+---+----+---+----+
4、类Excel操作
①df.withColumn()
增加列。
df = df.withColumn("birthyear",-df["age"]+2024)
df.show()
输出结果:
+---+-----+---+----+---------+
| id| name|age| sal|birthyear|
+---+-----+---+----+---------+
| 1|James| 27|1000| 1997|
| 2| Bob| 22| 500| 2002|
| 3|Alice| 25| 800| 1999|
| 4| Jon| 29|1200| 1995|
+---+-----+---+----+---------+
②df.select()
筛选列。
df.select("name","age").show()
输出结果:
+-----+---+
| name|age|
+-----+---+
|James| 27|
| Bob| 22|
|Alice| 25|
| Jon| 29|
+-----+---+
③df.drop()
删除列。
#删除一列
df.drop("age").show()
#删除多列
df.drop(*["age","birthyear"]).show()
④df.withColumnRenamed()
对列进行重命名。
#对一列进行重命名
df.withColumnRenamed("sal","salary").show()
#对多列进行重命名
df.withColumnRenamed("sal","salary").withColumnRenamed("birthyear","year").show()
⑤df.sort()、df.orderBy()
按照某一列或某几列进行排序。
#按照某一列进行排序
df.sort(df["age"].desc()).show() #降序
df.sort(df["age"].asc()).show() #升序
#按照某几列进行排序
df.orderBy(F.col("age").asc(), F.col("sal").desc()).show()
⑥df.na.drop()、df.na.fill()
处理带空值的行。
注意,在填充空值时,只能对相同数据类型的列的空值进行填充。
df = spark.createDataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,None,29,None)]
,["id","name","age","sal"])
df.show()
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 1|James| 27|1000|
| 2| Bob| 22| 500|
| 3|Alice| 25| 800|
| 4| null| 29|null|
+---+-----+---+----+
#删除带有nan值的行
df.na.drop().show()
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 1|James| 27|1000|
| 2| Bob| 22| 500|
| 3|Alice| 25| 800|
+---+-----+---+----+
#填充nan值
df.na.fill("Jon").show()
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 1|James| 27|1000|
| 2| Bob| 22| 500|
| 3|Alice| 25| 800|
| 4| Jon| 29|null|
+---+-----+---+----+
df.na.fill(0).show()
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 1|James| 27|1000|
| 2| Bob| 22| 500|
| 3|Alice| 25| 800|
| 4| null| 29| 0|
+---+-----+---+----+
⑦df.replace()
替换指定的值。
#注意,不能同时对不同数据类型的值进行替换
#例如,这句代码会报错:df.replace({"James": "Jim",1000: 100}).show()
df.replace({"James": "Jim", "Bob":"Bieber" }).show()
df.replace({1000: 100}).show()
⑧df.dropDuplicates()
跟distinct方法不同的是,dropDuplicates方法接收传参,可以根据指定字段去重。
df.dropDuplicates(["name"]).show()
5、类SQL表操作
类SQL表操作主要包括表查询(select,selectExpr,where),表连接(join,union,unionAll),表分组(groupby,agg,pivot)等操作。
①df.select()
df.select()用于对DataFrame进行列的表达式操作,允许使用SQL表达式来筛选、计算列。
#筛选两列,并限制输出前两行
#df.limit(2)与df.take(2)类似,不过limit输出的是DataFrame,take输出的是list
df.select("age","name").limit(2).show()
+---+-----+
|age| name|
+---+-----+
| 27|James|
| 22| Bob|
+---+-----+
#可以对列进行操作
df.select("name",df["age"] + 1).show()
+-----+---------+
| name|(age + 1)|
+-----+---------+
|James| 28|
| Bob| 23|
|Alice| 26|
| Jon| 30|
+-----+---------+
#通过toDF()对列进行重命名
df.select("name",-df["age"]+2024).toDF("name","birth_year").show()
+-----+----------+
| name|birth_year|
+-----+----------+
|James| 1997|
| Bob| 2002|
|Alice| 1999|
| Jon| 1995|
+-----+----------+
②df.selectExpr()
df.selectExpr()用于对DataFrame进行列的表达式操作,允许使用SQL表达式来筛选、计算和重命名列。参数是一个字符串列表,其中每个字符串都是一个SQL表达式。
#创建一个UDF函数
spark.udf.register("getBirthYear",lambda x:2024-x)
#调用函数对列进行转换并重命名
df.selectExpr("name",
"getBirthYear(age) as birth_year" ,
"UPPER(name) as NAME" ).show()
+-----+----------+-----+
| name|birth_year| NAME|
+-----+----------+-----+
|James| 1997|JAMES|
| Bob| 2002| BOB|
|Alice| 1999|ALICE|
| Jon| 1995| JON|
+-----+----------+-----+
#使用row_number()函数
df.selectExpr("name","age",
"row_number() over (order by age desc) as order").show()
+-----+---+-----+
| name|age|order|
+-----+---+-----+
| Jon| 29| 1|
|James| 27| 2|
|Alice| 25| 3|
| Bob| 22| 4|
+-----+---+-----+
使用df.selectExpr()还可以将DataFrame转换为复合类型。
#array类型
dfarray = df.selectExpr("name","array(age,sal) as info")
dfarray.selectExpr("name","info[0] as age","info[1] as sal").show()
#struct类型
df_struct = df.selectExpr("name","struct(age,sal) as info")
df_struct.selectExpr("name","info.age","info.sal").show()
#map类型
df_map = df.selectExpr("name","map('age',age,'sal',sal) as info")
df_map.selectExpr("name","info['age'] as age","info['sal'] as sal").show()
#输出结果
+-----+---+----+
| name|age| sal|
+-----+---+----+
|James| 27|1000|
| Bob| 22| 500|
|Alice| 25| 800|
| Jon| 29|1200|
+-----+---+----+
#构造named_struct类型
df_named_struct = df.selectExpr("name","named_struct('age',age,'sal',sal) as info")
df_named_struct.show()
+-----+----------+
| name| info|
+-----+----------+
|James|{27, 1000}|
| Bob| {22, 500}|
|Alice| {25, 800}|
| Jon|{29, 1200}|
+-----+----------+
#转换为json类型
df_named_struct.selectExpr("name","to_json(info) as json_info").show()
+-----+--------------------+
| name| json_info|
+-----+--------------------+
|James|{"age":27,"sal":1...|
| Bob|{"age":22,"sal":500}|
|Alice|{"age":25,"sal":800}|
| Jon|{"age":29,"sal":1...|
+-----+--------------------+
③df.where()
用法与SQL中的where一致,注意书写时,等于是一个“=”。
df.where("name='Bob' or age=27").show()
输出结果:
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 1|James| 27|1000|
| 2| Bob| 22| 500|
+---+-----+---+----+
④df.join(df)
df.join(df, on='列名', how='inner')。on参数可以指定连接方式为inner、left、right、outer、semi、full、leftanti、anti"等多种方式。关联的列如果有多列,则传入一个列名list。
scores = spark.createDataFrame([("James","English",90),("James","Math",60),
("Bob","Math",50),("Bob","Physics",50),
("Alice","Math",70),("Alice","Physics",80),
("Jon","English",40),("Jon","Math",80)
]) \
.toDF("name","subject","score")
scores.show()
df.join(scores,on="name",how="inner").show()
df.join(scores,df["name"]==scores["name"],"inner").show()
+-----+---+---+----+-------+-----+
| name| id|age| sal|subject|score|
+-----+---+---+----+-------+-----+
|Alice| 3| 25| 800| Math| 70|
|Alice| 3| 25| 800|Physics| 80|
| Bob| 2| 22| 500| Math| 50|
| Bob| 2| 22| 500|Physics| 50|
|James| 1| 27|1000|English| 90|
|James| 1| 27|1000| Math| 60|
| Jon| 4| 29|1200|English| 40|
| Jon| 4| 29|1200| Math| 80|
+-----+---+---+----+-------+-----+
⑤df.union(df)、df.unionAll(df)
df.where("name='Jon'").union(df.limit(2)).show()
df.where("name='Jon'").unionAll(df.limit(2)).show()
输出结果:
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 4| Jon| 29|1200|
| 1|James| 27|1000|
| 2| Bob| 22| 500|
+---+-----+---+----+
⑥df.groupBy()
df_join = df.join(scores,on="name",how="inner")
df_join.groupBy("name").mean("score").show()
+-----+----------+
| name|avg(score)|
+-----+----------+
|Alice| 75.0|
| Bob| 50.0|
|James| 75.0|
| Jon| 60.0|
+-----+----------+
#多列聚合,并重命名
df_join.groupBy("name")\
.agg(F.mean("age").alias("mean_age"),
F.collect_list("score").alias("scores")
).show()
+-----+----------+--------+
| name|mean_score| scores|
+-----+----------+--------+
|Alice| 75.0|[70, 80]|
| Bob| 50.0|[50, 50]|
|James| 75.0|[90, 60]|
| Jon| 60.0|[40, 80]|
+-----+----------+--------+
#与以上输出结果一致
df_join.groupBy("name").agg(F.expr("mean(score) as mean_score"),
F.expr("collect_list(score) as scores")
).show()
#数据透视表(行转列)
df_join.groupBy("subject").pivot("name").max("score").show()
+-------+-----+----+-----+----+
|subject|Alice| Bob|James| Jon|
+-------+-----+----+-----+----+
| Math| 70| 50| 60| 80|
|English| null|null| 90| 40|
|Physics| 80| 50| null|null|
+-------+-----+----+-----+----+
五、DataFrame+SQL
将DataFrame注册为临时表视图或者全局表视图后,可以使用SQL select语句对DataFrame进行操作,从而方便地实现对数据的查询、排序,不过由于DataFrame不可变,所以不支持delete、truncate、update等语句。不过可以通过SparkSQL对Hive表直接进行增删改查等操作。
1、注册视图
当使用createOrReplaceTempView()方法时,会创建一个临时表视图。这个视图只在当前的SparkSession中有效,当会话结束或者程序终止时,该视图也会随之消失。
如果使用createOrReplaceGlobalTempView方法,则会创建一个全局临时表视图。与临时表视图不同,全局临时表在整个Spark应用程序中都是有效的,不会因为单个SparkSession的结束而失效。
无论是临时表视图还是全局表视图,都不会占用额外的内存空间,它们实际上是对现有DataFrame的一种引用或者说是一种命名方式,方便用户通过SQL语句来进行数据操作。
①临时表视图
df.createOrReplaceTempView("test")
query='''select * from test
where age>26
'''
spark.sql(query).show()
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 1|James| 27|1000|
| 4| Jon| 29|1200|
+---+-----+---+----+
②全局表视图
df.createOrReplaceGlobalTempView("test")
query='''select * from global_temp.test
where age>26
'''
spark.sql(query).show()
#创建一个新Session也能使用全局表
spark.newSession().sql(query).show()
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
| 1|James| 27|1000|
| 4| Jon| 29|1200|
+---+-----+---+----+
2、操作Hive表
①创建表
hsql = """CREATE TABLE IF NOT EXISTS `test`(
`name` STRING COMMENT '姓名',
`age` INT COMMENT '年龄'
)
PARTITIONED BY ( `sex` STRING COMMENT '性别')
""".replace("\n"," ")
spark.sql(hsql)
②删除表
hsql= "DROP TABLE IF EXISTS test"
spark.sql(hsql)
③写入表文章来源:https://www.toymoban.com/news/detail-840759.html
#动态写入数据到hive分区表
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
df = spark.createDataFrame([("James",27,"1"),
("Bob",22,"1"),
("Alice",25,"0"),
("Jon",29,"1")]).toDF("name","age","sex")
df.write.mode("overwrite").format("hive")\
.partitionBy("sex").saveAsTable("test")
六、总结
总的来说,Spark SQL是一个功能强大的工具,适合于处理大规模数据集和进行复杂的数据分析。Spark SQL能够访问多种数据源,包括本地数据集、HDFS、Hive、HBase等,并且通过集成类RDD、类Excel、类SQL的数据处理操作,增强了数据处理的易用性和多样性。文章来源地址https://www.toymoban.com/news/detail-840759.html
到了这里,关于(二)PySpark3:SparkSQL编程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!