(二)PySpark3:SparkSQL编程

这篇具有很好参考价值的文章主要介绍了(二)PySpark3:SparkSQL编程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、SparkSQL介绍

二、创建DataFrame

1、通过ToDF方法

2、通过createDataFrame方法

3、通过读取文件或数据库

三、保存DataFrame

四、DataFrame API

1、显示数据

2、统计信息

3、类RDD操作

4、类Excel操作

5、类SQL表操作

五、DataFrame+SQL

1、注册视图

2、操作Hive表

六、总结


 PySpark系列文章:

(一)PySpark3:安装教程及RDD编程

(二)PySpark3:SparkSQL编程

(三)PySpark3:SparkSQL40题

一、SparkSQL介绍

Spark SQL是Apache Spark生态系统的一个关键组件,专注于处理和分析结构化和半结构化的大规模数据。Spark SQL建立在Spark核心之上,为用户提供了高效且易用的数据处理接口,从而将关系型和非关系型数据融入到分布式计算环境中。

核心概念之一是DataFrame API,它提供了一个高级的、面向数据的抽象,允许用户以声明性的方式处理数据。DataFrame是一个分布式的、具有表格结构的数据集,类似于传统数据库中的表。通过DataFrame API,用户可以执行各种数据操作,包括过滤、转换、聚合、连接等,而无需深入了解底层的分布式计算模型。

RDD,DataFrame和DataSet对比:

RDD可以存储任何类型的数据,包括结构化数据、半结构化数据和非结构化数据,RDD的操作更接近底层,更适合对数据进行底层控制和自定义处理。

DataFrame构建在RDD之上,提供了更高级的抽象,是一个分布式的、以列为主的数据集合,类似于关系型数据库中的表。DataFrame可以通过多种数据源进行创建,包括结构化数据源(如JSON、CSV、Parquet)和Hive表,并且提供了丰富的SQL和DataFrameAPI,可以方便地进行数据处理和分析。

DataSet在DataFrame基础上进一步增加了数据类型信息,可以通过编程语言的类型系统来检查错误,并提供更好的编译时类型检查。

DataFrame和DataSet都支持SQL交互式查询,可以和 Hive无缝衔接。DataSet只有Scala语言和Java语言接口中才支持,在Python和R语言接口只支持DataFrame。

二、创建DataFrame

首先导包:

import pandas as pd 
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Row
pd.DataFrame.iteritems = pd.DataFrame.items

1、通过ToDF方法

可以将RDD用toDF方法转换成DataFrame。

rdd = sc.parallelize([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)])
df = rdd.toDF(["id","name","age","sal"])
df.show()

输出结果:

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
|  3|Alice| 25| 800|
|  4|  Jon| 29|1200|
+---+-----+---+----+

2、通过createDataFrame方法

可以将Pandas.DataFrame转换成pyspark中的DataFrame,也可直接对数据列表、schema进行转换。

#将pandas.DataFrame转换为pyspark.DataFrame
pdf = pd.DataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)]
                  ,columns=["id","name","age","sal"])
df = spark.createDataFrame(pdf)
print(type(df))

#将list转换为pyspark.DataFrame
df = spark.createDataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)]
                          ,["id","name","age","sal"])
print(type(df))

#根据指定rdd和schema创建pyspark.DataFrame
schema = StructType([StructField("id", IntegerType(), nullable = False),
                     StructField("name", StringType(), nullable = True),
                     StructField("age", IntegerType(), nullable = True),
                     StructField("sal", FloatType(), nullable = True),
                    ])

rdd = sc.parallelize([Row(1,"James",27,1000),
                      Row(2,"Bob",22,500),
                      Row(3,"Alice",25,800),
                      Row(4,"Jon",29,1200),
                     ])

df = spark.createDataFrame(rdd, schema)
print(type(df))

输出结果:

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>

3、通过读取文件或数据库

可以通过读取json、csv等文件,或hive、mysql数据表得到DataFrame。

spark.read.csv(...): 从 CSV 文件中读取数据。
spark.read.json(...): 从 JSON 文件中读取数据。
spark.read.parquet(...): 从 Parquet 文件中读取数据。
spark.read.text(...): 读取文本文件。
spark.read.format(...): 使用指定的格式读取数据。

读取json文件:

df = spark.read.json("test.json")
df.show()

输出结果:test.json内容如下:

{"id":1,"name":"James","age":27,"sal":1000}
{"id":2,"name":"Bob","age":22,"sal":500}
{"id":3,"name":"Alice","age":25,"sal":800}
{"id":4,"name":"Jon","age":29,"sal":1200}

读取parquet文件:

df = spark.read.parquet("data/users.parquet")
df.show()

读取csv文件:

#方式1
df = spark.read.option("header","true") \
 .option("inferSchema","true") \
 .option("delimiter", ",") \
 .csv("test.csv")

#方式2
df = spark.read.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .load("test.csv")

#方式3
df = spark.read.csv(path="test.csv", 
                    header=True,   #指定将第一行作为列名
                    inferSchema=True, #自动推断出每列的数据类型
                    sep=','  #分隔符
                   )
df.show()

输出结果:

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
|  3|Alice| 25| 800|
|  4|  Jon| 29|1200|
+---+-----+---+----+

读取Hive数据表:

spark.sql("CREATE TABLE IF NOT EXISTS test (id INT, name STRING, age INT, sal FLOAT) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'data/test.txt' INTO TABLE test")
df = spark.sql("SELECT * FROM test")

三、保存DataFrame

 通过df.write()对DataFrame进行保存。

#保存为csv文件
df.write.format("csv").option("header","true").save("data/test.csv")

#保存为json文件
df.write.json("data/test.json")

#保存成parquet文件
df.write.parquet("data/test.parquet")
df.write.partitionBy("age").format("parquet").save("data/test.parquet")

#保存成hive数据表
df.write.bucketBy(2, "name").sortBy("age").saveAsTable("test")

四、DataFrame API

1、显示数据

①df.collect()

用于将DataFrame中的所有行收集到Driver节点上,并以列表的形式返回这些行。

df = spark.createDataFrame([(1,"James",27,1000),
                            (2,"Bob",22,500),
                            (3,"Alice",25,800),
                            (4,"Jon",29,1200)]
                          ,["id","name","age","sal"])
df.collect()

输出结果:

[Row(id=1, name='James', age=27, sal=1000),
 Row(id=2, name='Bob', age=22, sal=500),
 Row(id=3, name='Alice', age=25, sal=800),
 Row(id=4, name='Jon', age=29, sal=1200)]

②df.first()

获取第一行数据。

df.first()

输出结果:

Row(id=1, name='James', age=27, sal=1000)

③df.head(n)

获取前n行数据。

df.head(2)

输出结果:

[Row(id=1, name='James', age=27, sal=1000),
 Row(id=2, name='Bob', age=22, sal=500)]

④df.show(n)

与df.head(n)类似,但是df.show(n)是打印成表格。

df.show(2)

输出结果:

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
+---+-----+---+----+
only showing top 2 rows

⑤df.printSchema()

用于打印DataFrame的模式schema,定义了各列的名称和类型。

df.printSchema()

输出结果:

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- sal: long (nullable = true)

2、统计信息

①df.describe()

一般与df.show()连用,用于查看DataFrame的数据分布。

df.describe().show()

输出结果:

+-------+------------------+-----+------------------+----------------+
|summary|                id| name|               age|             sal|
+-------+------------------+-----+------------------+----------------+
|  count|                 4|    4|                 4|               4|
|   mean|               2.5| null|             25.75|           875.0|
| stddev|1.2909944487358056| null|2.9860788111948193|298.607881119482|
|    min|                 1|Alice|                22|             500|
|    max|                 4|  Jon|                29|            1200|
+-------+------------------+-----+------------------+----------------+

若只想查看某一列的数据分布,如:df.describe('age').show()。

②df.count()

返回数据总行数。

df.count()

输出结果:

4

③聚合函数

一些常用的聚合函数如下sum()、mean()、min()、max()、avg()

#求最小工资
df.select(F.min(df['sal'])).show()
#输出结果:
+--------+
|min(sal)|
+--------+
|     500|
+--------+


#求最大工资
df.select(F.max(df['sal'])).show()
#输出结果:
+--------+
|max(sal)|
+--------+
|    1200|
+--------+


#求总工资
df.select(F.sum(df['sal'])).show()
#输出结果:
+--------+
|sum(sal)|
+--------+
|    3500|
+--------+

#求平均工资
df.select(F.avg(df['sal'])).show()
#输出结果:
+--------+
|avg(sal)|
+--------+
|   875.0|
+--------+

同时对多列操作:

df.agg({"name":"count","age":"max","sal":"avg"}).show()

#输出:
+-----------+--------+--------+
|count(name)|max(age)|avg(sal)|
+-----------+--------+--------+
|          4|      29|   875.0|
+-----------+--------+--------+

④df.stat.freqItems()

统计值出现的频率。

#统计age、name两列出现频率超过0.25的值
df.stat.freqItems(("age","name"),0.25).show()

输出结果:

+----------------+--------------------+
|   age_freqItems|      name_freqItems|
+----------------+--------------------+
|[29, 22, 25, 27]|[Bob, Jon, Alice,...|
+----------------+--------------------+

3、类RDD操作

可以把DataFrame当做数据类型为Row的RDD来进行操作。

部分操作需要先转换为RDD才能运行,如map、flatMap等等。

部分操作可以直接在DataFrame上进行,如filter、distinct、sample、cache、intersect等等。

①df.map()

#所有人age+1
rdd = df.rdd.map(lambda x:Row(x[2]+1))
rdd.toDF(["age"]).show()

输出结果:

+---+
|age|
+---+
| 28|
| 23|
| 26|
| 30|
+---+

②df.flatMap()

rdd = df.rdd.flatMap(lambda x:x[1].split('o')).map(lambda x:Row(x))
rdd.toDF(["name"]).show()

输出结果:

+-----+
| name|
+-----+
|James|
|    B|
|    b|
|Alice|
|    J|
|    n|
+-----+

③df.filter()

#筛选工资大于800的
df.filter(F.col("sal")>800).show()

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  4|  Jon| 29|1200|
+---+-----+---+----+

#筛选姓名为Bob,以下三种方式输出结果一致
df.filter(F.col("name")=="Bob").show()
df.filter(df["name"]=="Bob").show()
df.filter("name='Bob'").show()

+---+----+---+---+
| id|name|age|sal|
+---+----+---+---+
|  2| Bob| 22|500|
+---+----+---+---+

#筛选姓名以J开头的
df.filter(F.col("name").startswith("J")).show()

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  4|  Jon| 29|1200|
+---+-----+---+----+

#筛选除指定值外的其他数据
broads = sc.broadcast(["James","Bob"])
df.filter(~F.col("name").isin(broads.value)).show() 

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  3|Alice| 25| 800|
|  4|  Jon| 29|1200|
+---+-----+---+----+

④df.distinct()

#去重
df.distinct().show()

⑤df.cache()

#cache缓存
df.cache()

#释放缓存
df.unpersist()

⑥df.sample()

随机抽样。

#withReplacement=False表示无放回,即抽取不重复的数据
#fraction=0.5表示抽样的比例为50%
#seed为随机种子,用于复现
df_sample = df.sample(withReplacement=False, fraction=0.5, seed=2)
df_sample.show()

输出结果:

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  3|Alice| 25| 800|
+---+-----+---+----+

⑦df.intersect(df)

取两个DataFrame所有交集的行,返回结果不包含重复行

df.intersect(df_sample).show()

输出结果:

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  3|Alice| 25| 800|
|  1|James| 27|1000|
+---+-----+---+----+

⑧df.exceptAll()

求差集。

#从df中移除与df_sample相同的行,返回一个新的DataFrame
df.exceptAll(df_sample).show()

输出结果:

+---+----+---+----+
| id|name|age| sal|
+---+----+---+----+
|  2| Bob| 22| 500|
|  4| Jon| 29|1200|
+---+----+---+----+

4、类Excel操作

①df.withColumn()

增加列。

df = df.withColumn("birthyear",-df["age"]+2024)
df.show()

输出结果:

+---+-----+---+----+---------+
| id| name|age| sal|birthyear|
+---+-----+---+----+---------+
|  1|James| 27|1000|     1997|
|  2|  Bob| 22| 500|     2002|
|  3|Alice| 25| 800|     1999|
|  4|  Jon| 29|1200|     1995|
+---+-----+---+----+---------+

②df.select()

筛选列。

df.select("name","age").show()

输出结果:

+-----+---+
| name|age|
+-----+---+
|James| 27|
|  Bob| 22|
|Alice| 25|
|  Jon| 29|
+-----+---+

③df.drop()

删除列。

#删除一列
df.drop("age").show() 

#删除多列
df.drop(*["age","birthyear"]).show()

④df.withColumnRenamed()

对列进行重命名。

#对一列进行重命名
df.withColumnRenamed("sal","salary").show() 

#对多列进行重命名
df.withColumnRenamed("sal","salary").withColumnRenamed("birthyear","year").show() 

⑤df.sort()、df.orderBy()

按照某一列或某几列进行排序。

#按照某一列进行排序
df.sort(df["age"].desc()).show() #降序
df.sort(df["age"].asc()).show()  #升序

#按照某几列进行排序
df.orderBy(F.col("age").asc(), F.col("sal").desc()).show()

⑥df.na.drop()、df.na.fill()

处理带空值的行。

注意,在填充空值时,只能对相同数据类型的列的空值进行填充。

df = spark.createDataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,None,29,None)]
                          ,["id","name","age","sal"])
df.show()

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
|  3|Alice| 25| 800|
|  4| null| 29|null|
+---+-----+---+----+

#删除带有nan值的行
df.na.drop().show()

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
|  3|Alice| 25| 800|
+---+-----+---+----+

#填充nan值
df.na.fill("Jon").show()

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
|  3|Alice| 25| 800|
|  4|  Jon| 29|null|
+---+-----+---+----+

df.na.fill(0).show()

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
|  3|Alice| 25| 800|
|  4| null| 29|   0|
+---+-----+---+----+

⑦df.replace()

替换指定的值。

#注意,不能同时对不同数据类型的值进行替换
#例如,这句代码会报错:df.replace({"James": "Jim",1000: 100}).show()
df.replace({"James": "Jim", "Bob":"Bieber" }).show()
df.replace({1000: 100}).show()

⑧df.dropDuplicates()

跟distinct方法不同的是,dropDuplicates方法接收传参,可以根据指定字段去重。

df.dropDuplicates(["name"]).show()

5、类SQL表操作

类SQL表操作主要包括表查询(select,selectExpr,where),表连接(join,union,unionAll),表分组(groupby,agg,pivot)等操作。

①df.select()

df.select()用于对DataFrame进行列的表达式操作,允许使用SQL表达式来筛选、计算列。

#筛选两列,并限制输出前两行
#df.limit(2)与df.take(2)类似,不过limit输出的是DataFrame,take输出的是list
df.select("age","name").limit(2).show()
+---+-----+
|age| name|
+---+-----+
| 27|James|
| 22|  Bob|
+---+-----+

#可以对列进行操作
df.select("name",df["age"] + 1).show()
+-----+---------+
| name|(age + 1)|
+-----+---------+
|James|       28|
|  Bob|       23|
|Alice|       26|
|  Jon|       30|
+-----+---------+

#通过toDF()对列进行重命名
df.select("name",-df["age"]+2024).toDF("name","birth_year").show()
+-----+----------+
| name|birth_year|
+-----+----------+
|James|      1997|
|  Bob|      2002|
|Alice|      1999|
|  Jon|      1995|
+-----+----------+

②df.selectExpr()

df.selectExpr()用于对DataFrame进行列的表达式操作,允许使用SQL表达式来筛选、计算和重命名列。参数是一个字符串列表,其中每个字符串都是一个SQL表达式。

#创建一个UDF函数
spark.udf.register("getBirthYear",lambda x:2024-x)

#调用函数对列进行转换并重命名
df.selectExpr("name", 
              "getBirthYear(age) as birth_year" , 
              "UPPER(name) as NAME" ).show()
+-----+----------+-----+
| name|birth_year| NAME|
+-----+----------+-----+
|James|      1997|JAMES|
|  Bob|      2002|  BOB|
|Alice|      1999|ALICE|
|  Jon|      1995|  JON|
+-----+----------+-----+

#使用row_number()函数
df.selectExpr("name","age",
         "row_number() over (order by age desc) as order").show()
+-----+---+-----+
| name|age|order|
+-----+---+-----+
|  Jon| 29|    1|
|James| 27|    2|
|Alice| 25|    3|
|  Bob| 22|    4|
+-----+---+-----+

使用df.selectExpr()还可以将DataFrame转换为复合类型。

#array类型
dfarray = df.selectExpr("name","array(age,sal) as info")
dfarray.selectExpr("name","info[0] as age","info[1] as sal").show()

#struct类型
df_struct = df.selectExpr("name","struct(age,sal) as info")
df_struct.selectExpr("name","info.age","info.sal").show()

#map类型
df_map = df.selectExpr("name","map('age',age,'sal',sal) as info")
df_map.selectExpr("name","info['age'] as age","info['sal'] as sal").show()

#输出结果
+-----+---+----+
| name|age| sal|
+-----+---+----+
|James| 27|1000|
|  Bob| 22| 500|
|Alice| 25| 800|
|  Jon| 29|1200|
+-----+---+----+

#构造named_struct类型
df_named_struct = df.selectExpr("name","named_struct('age',age,'sal',sal) as info")
df_named_struct.show() 
+-----+----------+
| name|      info|
+-----+----------+
|James|{27, 1000}|
|  Bob| {22, 500}|
|Alice| {25, 800}|
|  Jon|{29, 1200}|
+-----+----------+

#转换为json类型
df_named_struct.selectExpr("name","to_json(info) as json_info").show()
+-----+--------------------+
| name|           json_info|
+-----+--------------------+
|James|{"age":27,"sal":1...|
|  Bob|{"age":22,"sal":500}|
|Alice|{"age":25,"sal":800}|
|  Jon|{"age":29,"sal":1...|
+-----+--------------------+

③df.where()

用法与SQL中的where一致,注意书写时,等于是一个“=”。

df.where("name='Bob' or age=27").show()

输出结果:

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
+---+-----+---+----+

④df.join(df)

df.join(df, on='列名', how='inner')。on参数可以指定连接方式为inner、left、right、outer、semi、full、leftanti、anti"等多种方式。关联的列如果有多列,则传入一个列名list。

scores = spark.createDataFrame([("James","English",90),("James","Math",60),
                                ("Bob","Math",50),("Bob","Physics",50),
                                ("Alice","Math",70),("Alice","Physics",80),
                                ("Jon","English",40),("Jon","Math",80)
                               ]) \
          .toDF("name","subject","score") 
scores.show()


df.join(scores,on="name",how="inner").show()
df.join(scores,df["name"]==scores["name"],"inner").show()
+-----+---+---+----+-------+-----+
| name| id|age| sal|subject|score|
+-----+---+---+----+-------+-----+
|Alice|  3| 25| 800|   Math|   70|
|Alice|  3| 25| 800|Physics|   80|
|  Bob|  2| 22| 500|   Math|   50|
|  Bob|  2| 22| 500|Physics|   50|
|James|  1| 27|1000|English|   90|
|James|  1| 27|1000|   Math|   60|
|  Jon|  4| 29|1200|English|   40|
|  Jon|  4| 29|1200|   Math|   80|
+-----+---+---+----+-------+-----+

⑤df.union(df)、df.unionAll(df)

df.where("name='Jon'").union(df.limit(2)).show()

df.where("name='Jon'").unionAll(df.limit(2)).show()

输出结果:

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  4|  Jon| 29|1200|
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
+---+-----+---+----+

⑥df.groupBy()

df_join = df.join(scores,on="name",how="inner")
df_join.groupBy("name").mean("score").show()
+-----+----------+
| name|avg(score)|
+-----+----------+
|Alice|      75.0|
|  Bob|      50.0|
|James|      75.0|
|  Jon|      60.0|
+-----+----------+

#多列聚合,并重命名
df_join.groupBy("name")\
        .agg(F.mean("age").alias("mean_age"),
             F.collect_list("score").alias("scores")
            ).show()
+-----+----------+--------+
| name|mean_score|  scores|
+-----+----------+--------+
|Alice|      75.0|[70, 80]|
|  Bob|      50.0|[50, 50]|
|James|      75.0|[90, 60]|
|  Jon|      60.0|[40, 80]|
+-----+----------+--------+

#与以上输出结果一致
df_join.groupBy("name").agg(F.expr("mean(score) as mean_score"),
                            F.expr("collect_list(score) as scores")
                           ).show()


#数据透视表(行转列)
df_join.groupBy("subject").pivot("name").max("score").show()
+-------+-----+----+-----+----+
|subject|Alice| Bob|James| Jon|
+-------+-----+----+-----+----+
|   Math|   70|  50|   60|  80|
|English| null|null|   90|  40|
|Physics|   80|  50| null|null|
+-------+-----+----+-----+----+

五、DataFrame+SQL

将DataFrame注册为临时表视图或者全局表视图后,可以使用SQL select语句对DataFrame进行操作,从而方便地实现对数据的查询、排序,不过由于DataFrame不可变,所以不支持delete、truncate、update等语句。不过可以通过SparkSQL对Hive表直接进行增删改查等操作。

1、注册视图

当使用createOrReplaceTempView()方法时,会创建一个临时表视图。这个视图只在当前的SparkSession中有效,当会话结束或者程序终止时,该视图也会随之消失。

如果使用createOrReplaceGlobalTempView方法,则会创建一个全局临时表视图。与临时表视图不同,全局临时表在整个Spark应用程序中都是有效的,不会因为单个SparkSession的结束而失效。

无论是临时表视图还是全局表视图,都不会占用额外的内存空间,它们实际上是对现有DataFrame的一种引用或者说是一种命名方式,方便用户通过SQL语句来进行数据操作。

①临时表视图

df.createOrReplaceTempView("test")
query='''select * from test
         where age>26
      '''
spark.sql(query).show()
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  4|  Jon| 29|1200|
+---+-----+---+----+

②全局表视图

df.createOrReplaceGlobalTempView("test")
query='''select * from global_temp.test
         where age>26
      '''
spark.sql(query).show()


#创建一个新Session也能使用全局表
spark.newSession().sql(query).show()

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  4|  Jon| 29|1200|
+---+-----+---+----+

2、操作Hive表

①创建表

hsql = """CREATE TABLE IF NOT EXISTS `test`(
          `name` STRING COMMENT '姓名',
          `age` INT COMMENT '年龄'
           )
           PARTITIONED BY ( `sex` STRING  COMMENT '性别')
""".replace("\n"," ")
spark.sql(hsql) 

②删除表

hsql= "DROP TABLE IF EXISTS test" 
spark.sql(hsql) 

③写入表

#动态写入数据到hive分区表
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
df = spark.createDataFrame([("James",27,"1"),
                            ("Bob",22,"1"),
                            ("Alice",25,"0"),
                            ("Jon",29,"1")]).toDF("name","age","sex")
df.write.mode("overwrite").format("hive")\
.partitionBy("sex").saveAsTable("test")

六、总结

总的来说,Spark SQL是一个功能强大的工具,适合于处理大规模数据集和进行复杂的数据分析。Spark SQL能够访问多种数据源,包括本地数据集、HDFS、Hive、HBase等,并且通过集成类RDD、类Excel、类SQL的数据处理操作,增强了数据处理的易用性和多样性。文章来源地址https://www.toymoban.com/news/detail-840759.html

到了这里,关于(二)PySpark3:SparkSQL编程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 林子雨 VirtualBox + Ubuntu[linux] 配置 java、hadoop、Spark[python]、pyspark快速配置流程

    按照步骤快速执行shell,最快速配置。 读者可以根据该篇随记快速回顾流程,以及用到的shell指令和相关配置文件。 是林老师教程的精简版,初次配置者只能作为流程参考,主要和林子雨Spark[python]版课程配套。  林老师厦大实验指南链接如下: Spark编程基础(Python版)教材官

    2024年04月12日
    浏览(34)
  • Python大数据之PySpark(二)PySpark安装

    1-明确PyPi库,Python Package Index 所有的Python包都从这里下载,包括pyspark 2-为什么PySpark逐渐成为主流? http://spark.apache.org/releases/spark-release-3-0-0.html Python is now the most widely used language on Spark. PySpark has more than 5 million monthly downloads on PyPI, the Python Package Index. 记住如果安装特定的版本

    2024年02月04日
    浏览(34)
  • Spark与PySpark(1.概述、框架、模块)

    目录 1.Spark 概念 2. Hadoop和Spark的对比 3. Spark特点 3.1 运行速度快 3.2 简单易用 3.3 通用性强 3.4 可以允许运行在很多地方 4. Spark框架模块 4.1 Spark Core 4.2 SparkSQL 4.3 SparkStreaming 4.4 MLlib 4.5 GraphX 5. Spark的运行模式 5.1 本地模式(单机) Local运行模式 5.2 Standalone模式(集群) 5.3 Hado

    2024年02月02日
    浏览(33)
  • Python大数据之PySpark

    Apache Spark是一种用于大规模数据处理的多语言分布式引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习 Spark官网:https://spark.apache.org/ 按照官网描述,Spark关键特征包括: 批/流处理 Spark支持您使用喜欢的语言:Python、SQL、Scala、Java或R,统一批量和实时流处

    2024年02月08日
    浏览(33)
  • spark、pyspark 常用的模版 demo 网址

    1、我自己有时候用百度或者其他的搜索出来的spark 常用案例,质量有的好有的差有时候就很烦。特地分享一个我常用的质量高的网站地址

    2024年02月11日
    浏览(229)
  • PySpark-Spark SQL基本介绍

    目录 Spark SQL基本介绍 Spark SQL特点 Spark SQL与Hive的异同 Spark SQL的数据结构 Spark SQL的入门 创建SparkSession对象 DataFrame详解 DataFrame基本介绍  DataFrame的构建方式 RDD构建DataFrame  内部初始化数据得到DataFrame schema总结 读取外部文件得到DataFrame Text方式读取 CSV方式读取 JSON方式读取 概

    2024年01月16日
    浏览(44)
  • 基于Headless构建高可用spark+pyspark集群

    Headless 服务类型并不分配容器云虚拟 IP,而是直接暴露所属 Pod 的 DNS 记录。没有默认负载均衡器,可直接访问 Pod IP 地址。因此,当我们需要与集群内真实的 Pod IP 地址进行直接交互时,Headless 服务就很有用。 其中Service的关键配置如下: clusterIP: None ,不让其获取clusterIP ,

    2024年02月06日
    浏览(33)
  • 10-用PySpark建立第一个Spark RDD

    PySpark实战笔记系列第一篇 Apache Spark的核心组件的基础是RDD。所谓的RDD,即 弹性分布式数据集(Resiliennt Distributed Datasets) ,基于RDD可以实现Apache Spark各个组件在多个计算机组成的集群中进行无缝集成,从而能够在一个应用程序中完成海量数据处理。 只读不能修改 :只能通过

    2024年04月08日
    浏览(36)
  • Python小案例(九)PySpark读写数据

    有些业务场景需要Python直接读写Hive集群,也需要Python对MySQL进行操作。pyspark就是为了方便python读取Hive集群数据,当然环境搭建也免不了数仓的帮忙,常见的如开发企业内部的 Jupyter Lab 。 ⚠️注意:以下需要在企业服务器上的jupyter上操作,本地jupyter是无法连接公司hive集群的

    2024年02月12日
    浏览(38)
  • Python大数据之PySpark(一)SparkBase

    Spark学习方法: 不断重复,28原则(使用80%时间完成20%重要内容) Spark风雨十年s 2012年Hadoop1.x出现,里程碑意义 2013年Hadoop2.x出现,改进HDFS,Yarn,基于Hadoop1.x框架提出基于内存迭代式计算框架Spark 1-Spark全家桶,实现离线,实时,机器学习,图计算 2-spark版本从2.x到3.x很多优化

    2024年02月08日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包