Spark重温笔记(四):秒级处理庞大数据量的 SparkSQL 操作大全,能否成为你的工作备忘指南?

这篇具有很好参考价值的文章主要介绍了Spark重温笔记(四):秒级处理庞大数据量的 SparkSQL 操作大全,能否成为你的工作备忘指南?。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spark学习笔记

前言:今天是温习 Spark 的第 4 天啦!主要梳理了 SparkSQL 工作中常用的操作大全,以及演示了几个企业级案例,希望对大家有帮助!

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"


(本节的所有数据集放在我的资源下载区哦,感兴趣的小伙伴可以自行下载:最全面的SparkSQL系列案例数据集

二、SparkSQL基本入门

1.sparkSQL简介

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。

sparkSQL的四个特性

  • 1-易整合:将sql查询与spark程序无缝混合,可以使用java、scala、python、R等语言的API操作。
  • 2-统一的数据访问:以相同的方式连接到任何数据源。
  • 3-兼容hive: 支持Hive HQL的语法,兼容hive(元数据库、SQL语法、UDF、序列化、反序列化机制)。
  • 4-标准的数据连接:可以使用行业标准的JDBC或ODBC连接。

2. 创建sparksession

  • 1-spark.read.text():读取文件
  • 2-fileDF.printSchema():字段的名称和类型
  • 3-fileDF.show(truncate=False):显示的字符串将不会被截断
  • 4-fileDF.rdd.collect():以rdd的形式出现
_01SparkSessionCreate.py

# -*- coding: utf-8 -*-
# Program function:学会创建SparkSession

from pyspark.sql import SparkSession
from pyspark import SparkConf

if __name__ == '__main__':
    # TODO 1-引入SparkSession的环境
    conf = SparkConf().setAppName("sparksession").setMaster("local[*]")
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    # TODO 2-利用Spark环境变量生成SparkContext
    sc = spark.sparkContext
    # TODO 3-读取一个文件
    fileDF = spark.read.text("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/words.txt")
    # TODO 4-查看数据有多少行
    print("fileDF counts value is:{}".format(fileDF.count()))  # fileDF counts value is:2
    fileDF.printSchema()  # 字段的名称和字段的类型
    # root
    # |-- value: string (nullable = true)
    fileDF.show(truncate=False)
    # +------------------------+
    # |value                   |
    # +------------------------+
    # |hello you Spark Flink   |
    # |hello me hello she Spark|
    # +------------------------+
    print(fileDF.rdd.collect())
    #[Row(value='hello you Spark Flink'), Row(value='hello me hello she Spark')]
    
    # TODO 5-查看数据有多少行
    spark.stop()

fileDF counts value is:2
root
 |-- value: string (nullable = true)

+------------------------+
|value                   |
+------------------------+
|hello you Spark Flink   |
|hello me hello she Spark|
+------------------------+

[Row(value='hello you Spark Flink'), Row(value='hello me hello she Spark')]

3. RDD 转化为 DataFrame 的两种方式

(1) toDF()

1-无参数:

  • alias(‘age’):取别名操作
  • Row(name=zhangsan, age=18):行操作
  • dsl操作:df.select(df语句).show()
  • sql操作:spark.sql(sql语句).show(),记得提前创建临时视图:df.createOrReplaceTempView(“t_table”)
_01rddToDataFrameWay1.py

# -*- coding: utf-8 -*-
# Program function:第一种方式处理rdd转化为df
'''
1-准备好上下文环境SparkSession
2-读取数据,sc.textFile()
3-使用Row对象对每行数据进行操作 Row(name=zhangsan,age=18)
4-使用spark.createDataFrame(schema)创建DataFrame
5-直接使用printSchema查看Scheme
6-使用show展示数据
'''
from pyspark.sql import SparkSession
from pyspark.sql.types import Row

if __name__ == '__main__':
    # 1 - 准备好上下文环境SparkSession
    spark = SparkSession.builder.master("local[*]").appName("testPi").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")
    # 2 - 读取数据,sc.textFile()
    rdd_file = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.txt")
    file_map_rdd = rdd_file.map(lambda record: record.split(","))
    # print(file__map.collect())
    # 3 - 使用Row对象对每行数据进行操作Row(name=zhangsan, age=18)
    df = file_map_rdd.map(lambda line: Row(name=line[0], age=int(line[1]))).toDF()

    # 4 - SparkSQL提供了两种风格查询数据
    # 4-1第一种风格DSL 领域查询语言df.select.filter
    print("===================df.select DSL-=========================")
    df.select("name").show()
    df.select(["name", "age"]).show()
    df.select(df.name, (df.age + 10).alias('age')).show()

    # 4-2第二种风格SQL 写SQL实现
    print("===================spark.sql- SQL=========================")
    df.createOrReplaceTempView("t_table")
    spark.sql("select * from t_table").show()
    spark.sql("select name from t_table").show()
    spark.sql("select name,age from t_table").show()
    spark.sql("select name,age + 10 from t_table").show()

    # 5 - 直接使用printSchema查看Scheme
    df.printSchema()
    spark.sql("desc t_table").show()
    # 6 - 使用show展示数据
    df.show()
    sc.stop()
===================df.select DSL-=========================
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

+-------+---+
|   name|age|
+-------+---+
|Michael| 39|
|   Andy| 40|
| Justin| 29|
+-------+---+

===================spark.sql- SQL=========================
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

+-------+--------------------------+
|   name|(age + CAST(10 AS BIGINT))|
+-------+--------------------------+
|Michael|                        39|
|   Andy|                        40|
| Justin|                        29|
+-------+--------------------------+

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|    name|   string|   null|
|     age|   bigint|   null|
+--------+---------+-------+

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+


  

2-有参数:

_05rddToDataFrameWay4.py

# -*- coding: utf-8 -*-
# Program function:第一种方式处理rdd转化为df
'''
1-准备好上下文环境SparkSession
2-读取数据,sc.textFile()
3-使用StructType和StructFiled创建Schema
4-使用spark.createDataFrame(schema)创建DataFrame
5-直接使用printSchema查看Scheme
6-使用show展示数据
'''
from pyspark.sql import SparkSession
from pyspark.sql.types import *

if __name__ == '__main__':
    # 1 - 准备好上下文环境SparkSession
    spark = SparkSession.builder.master("local[*]").appName("testPi").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")
    # 2 - 读取数据,sc.textFile()
    rdd_file = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.txt")
    file_map_rdd = rdd_file.map(lambda record: record.split(","))
    #List[str]
    df = file_map_rdd.toDF(["name", "age"])
    df.printSchema()
    df.show()
    spark.stop()
(2)spark.createDataFrame(schema)
  • scheme_people :进行Row行操作了
_02rddToDataFrameWay1.py

# -*- coding: utf-8 -*-
# Program function:第一种方式处理rdd转化为df
'''
1-准备好上下文环境SparkSession
2-读取数据,sc.textFile()
3-使用Row对象对每行数据进行操作 Row(name=zhangsan,age=18)
4-使用spark.createDataFrame(schema)创建DataFrame
5-直接使用printSchema查看Scheme
6-使用show展示数据
'''
from numpy.distutils.system_info import dfftw_info
from pyspark.sql import SparkSession
from pyspark.sql.types import Row

if __name__ == '__main__':
    # 1 - 准备好上下文环境SparkSession
    spark = SparkSession.builder.master("local[*]").appName("testPi").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")
    # 2 - 读取数据,sc.textFile()
    rdd_file = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.txt")
    file_map_rdd = rdd_file.map(lambda record: record.split(","))
    # print(file__map.collect())
    # 3 - 使用Row对象对每行数据进行操作Row(name=zhangsan, age=18)
    scheme_people = file_map_rdd.map(lambda line: Row(name=line[0], age=int(line[1])))
    # 4 - 使用spark.createDataFrame(schema)创建DataFrame
    df = spark.createDataFrame(scheme_people)
    # 5 - 直接使用printSchema查看Scheme
    df.printSchema()
    # 6 - 使用show展示数据
    df.show()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+


  

(3) spark.createDataFrame(rdd,schema)

1-不同字段类型时:

  • peoplerdd:只是取出数据
  • schema:使用StructType和StructFiled创建Schema
_03rddToDataFrameWay3.py

# -*- coding: utf-8 -*-
# Program function:第一种方式处理rdd转化为df
'''
1-准备好上下文环境SparkSession
2-读取数据,sc.textFile()
3-使用StructType和StructFiled创建Schema
4-使用spark.createDataFrame(schema)创建DataFrame
5-直接使用printSchema查看Scheme
6-使用show展示数据
'''
from pyspark.sql import SparkSession
from pyspark.sql.types import *

if __name__ == '__main__':
    # 1 - 准备好上下文环境SparkSession
    spark = SparkSession.builder.master("local[*]").appName("testPi").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")
    # 2 - 读取数据,sc.textFile()
    rdd_file = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.txt")
    file_map_rdd = rdd_file.map(lambda record: record.split(","))
    # print(file__map.collect())
    # 3 - 使用Row对象对每行数据进行操作Row(name=zhangsan, age=18)
    peoplerdd = file_map_rdd.map(lambda line: (line[0], int(line[1].strip())))
    # 使用StructType和StructFiled创建Schema
    schema = StructType([StructField("name", StringType(), True), StructField("age", IntegerType(), True)])
    # 它的默认值为 True,即表示该字段可以为空。
    
    # 4 - 使用spark.createDataFrame(schema)创建DataFrame
    df = spark.createDataFrame(peoplerdd, schema)
    # 5 - 直接使用printSchema查看Scheme
    df.printSchema()
    # root
    # | -- name: string(nullable=true)
    # | -- age: integer(nullable=true)
    # 6 - 使用show展示数据
    df.show()

    spark.stop()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+


  

1-相同字段类型时:文章来源地址https://www.toymoban.com/news/detail-848180.html

_04rddToDataFrameWay31.py

# -*- coding: utf-8 -*-
# Program function:第一种方式处理rdd转化为df
'''
1-准备好上下文环境SparkSession
2-读取数据,sc.textFile()
3-使用StructType和StructFiled创建Schema
4-使用spark.createDataFrame(schema)创建DataFrame
5-直接使用printSchema查看Scheme
6-使用show展示数据
'''
from pyspark.sql import SparkSession
from pyspark.sql.types import *

if __name__ == '__main__':
    # 1 - 准备好上下文环境SparkSession
    spark = SparkSession.builder.master("local[*]").appName("testPi").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")
    # 2 - 读取数据,sc.textFile()
    rdd_file = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.txt")
    file_map_rdd = rdd_file.map(lambda record: record.split(","))
    # print(file__map.collect())
    # 3 - 使用Row对象对每行数据进行操作Row(name=zhangsan, age=18)
    peoplerdd = file_map_rdd.map(lambda line: (line[0], int(line[1].strip())))
    
    # 使用StructType和StructFiled创建Schema
    schemaName = "name,age"
    split_ = [StructField(scheme, StringType(), True) for scheme in schemaName.split(",")]
    schema = StructType(split_)
    
    # 4 - 使用spark.createDataFrame(schema)创建DataFrame
    df = spark.createDataFrame(peoplerdd, schema)
    # 5 - 直接使用printSchema查看Scheme
    df.printSchema()
    # root
    # | -- name: string(nullable=true)
    # | -- age: integer(nullable=true)
    # 6 - 使用show展示数据
    df.show()

    spark.stop()

4. Pandas to DF

  • spark.createDataFrame(pandas_df)
pandasToDF.py

# -*- coding: utf-8 -*-
# Program function:pandas转化为DF
import pandas as pd
from pyspark.sql import SparkSession
from datetime import datetime, date

if __name__ == '__main__':
    spark = SparkSession.builder.appName("readData").master("local[*]").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")

    pandas_df = pd.DataFrame({
        'a': [1, 2, 3],
        'b': [2., 3., 4.],
        'c': ['string1', 'string2', 'string3'],
        'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
        'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
    })
    print(pandas_df)
    # a    b        c           d                   e
    #  0  1  2.0  string1  2000-01-01 2000-01-01 12:00:00
    #  1  2  3.0  string2  2000-02-01 2000-01-02 12:00:00
    #  2  3  4.0  string3  2000-03-01 2000-01-03 12:00:00
    print(pandas_df.shape)  # (3, 5)
    # print(pandas_df.values)
    # from an :class:`RDD`, a list or a :class:`pandas.DataFrame`.
    df_pandas = spark.createDataFrame(pandas_df)
    df_pandas.printSchema()
    # root
    # | -- a: long(nullable=true)
    # | -- b: double(nullable=true)
    # | -- c: string(nullable=true)
    # | -- d: date(nullable=true)
    # | -- e: timestamp(nullable=true)
    df_pandas.show()
    print(df_pandas.rdd.collect())

    spark.stop()

5. 常见spark读数据的三种方式

(1) spark.read.text()
(2) spark.read.format()
  • 1-读csv格式数据
    • option(“header”,True):保留文件头
    • option(“sep”,“;”):分隔符是;
    • option(“inferSchema”,True):自动推断每列的数据类型
    • load(路径地址)
  • 2-读取json数据
  • 3-读取Parquet数据
_01_readCsvData.py

# -*- coding: utf-8 -*-
# Program function:读取csv数据
# csv 以逗号作为分隔符的文本

from pyspark.sql import SparkSession

if __name__ == '__main__':
    spark = SparkSession.builder.appName("readData").master("local[*]").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")
    # 读取csv数据
    csv_data=spark.read.format("csv")\
        .option("header",True)\
        .option("sep",";")\
        .option("inferSchema",True)\
        .load("/export/data/pyspark_workspace/PySpark-SparkSQL_3.1.2/data/sql/people.csv")
    csv_data.printSchema()
    csv_data.show()
    print(type(csv_data))#<class 'pyspark.sql.dataframe.DataFrame'>
    # 读取Json数据
    json__load = spark.read.format("json").load("/export/data/pyspark_workspace/PySpark-SparkSQL_3.1.2/data/sql/people.json")
    json__load.printSchema()
    json__load.show()
    # 读取Parquet数据
    parquet__load = spark.read.format("parquet").load("/export/data/pyspark_workspace/PySpark-SparkSQL_3.1.2/data/sql/users.parquet")
    parquet__load.printSchema()
    parquet__load.show()
(3) DSL 和 SQL
  • 1-DSL
    • 查看数据:show,printSchema
    • 查看字段数据:col,column,pandas语法
    • 统计:groupby,count,orderby,agg
    • 重命名:withColumnRenamed
  • 2-SQL
    • 创建临时视图:createOrReplaceTempView
    • spark.sql(SQL语法)
# -*- coding: utf-8 -*-
# Program function:DSL & SQL
from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 1-准备环境变量
    spark = SparkSession.builder.appName("readData").master("local[*]").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")
    # 2-读取数据
    dataDF = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", True) \
        .option("sep", ";") \
        .load("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.csv")
    # 3-查看数据
    dataDF.show(2, truncate=False)
    dataDF.printSchema()
    # 4-执行DSL的操作
    from pyspark.sql.functions import col, column

    # 查看name字段的数据
    dataDF.select("name").show()
    dataDF.select(col("name")).show()
    # dataDF.select(column("name")).show()
    dataDF.select(dataDF.name).show()
    dataDF.select(dataDF["name"]).show()
    # 查看name,age字段的数据
    dataDF.select(["name", "age"]).show()
    dataDF.select(col("name"), col("age")).show()
    dataDF.select(dataDF["name"], col("age")).show()
    dataDF.select(dataDF.name, col("age")).show()
    # 过滤personDF的年龄大于21岁的信息
    dataDF.filter("age >30").show()
    dataDF.filter(dataDF["age"] > 30).show()
    dataDF.filter(col("age") > 30).show()
    # groupBy统计
    dataDF.groupby("age").count().orderBy("count").withColumnRenamed("count", "countBig").show()
    from pyspark.sql import functions as F

    dataDF.groupby("age").agg(F.count(dataDF.age)).show()
    dataDF.groupby("age").agg({"age": "count"}).show()

    # SQL
    dataDF.createOrReplaceTempView("t_table")
    spark.sql("select name from t_table").show()
    spark.sql("select name,age from t_table").printSchema()
    spark.sql("select Name,age from t_table").printSchema()
    # root
    # | -- Name: string(nullable=true)
    # | -- age: integer(nullable=true)
    spark.sql("select name ,age from t_table where age>30").show()
    spark.sql("select name ,age from t_table order by age limit 2").show()

    spark.stop()

6. WordCount 的两种做法

(1) DSL 做法
  • 1-explode爆炸函数:扁平化操作
  • 2-withColumn:有则改之,无则添之
# -*- coding: utf-8 -*-
# Program function:DSL wordcount
from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 1-准备环境变量
    spark = SparkSession.builder.appName("readData").master("local[*]").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")
    # 2-读取数据
    dataDF = spark.read.text("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/words.txt")
    # 3-查看数据
    dataDF.printSchema()
    # root
    # | -- value: string(nullable=true)
    # 4-wordcount
    from pyspark.sql import functions as F

    # 这里使用explode爆炸函数将文本数据扁平化处理
    # withColumn,如果有相同列调换掉,否则增加列
    dataExplodeDF = dataDF.withColumn("words", F.explode(F.split(F.col("value"), " ")))
    dataExplodeDF.show(truncate=False)
    # +------------------------+-----+
    # |value                   |words|
    # +------------------------+-----+
    # |hello you Spark Flink   |hello|
    # |hello you Spark Flink   |you  |
    # |hello you Spark Flink   |Spark|
    # |hello you Spark Flink   |Flink|
    # |hello me hello she Spark|hello|
    # |hello me hello she Spark|me   |
    # |hello me hello she Spark|hello|
    # |hello me hello she Spark|she  |
    # |hello me hello she Spark|Spark|
    # +------------------------+-----+
    dataExplodeDF.groupby("words").count().orderBy("count", ascending=False).show()
    # +-----+-----+
    # | words | count |
    # +-----+-----+
    # | hello | 3 |
    # | Spark | 2 |
    # | me | 1 |
    # | Flink | 1 |
    # | you | 1 |
    # | she | 1 |
    # +-----+-----+

(2) SQL 做法
  • 1-split(value,’ '):切割函数
  • 2-explode:爆炸函数可以结合切割函数
# -*- coding: utf-8 -*-
# Program function:DSL wordcount
from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 1-准备环境变量
    spark = SparkSession.builder.appName("readData").master("local[*]").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")
    # 2-读取数据
    dataDF = spark.read.text("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/words.txt")
    # 3-查看数据
    dataDF.printSchema()
    # root
    # | -- value: string(nullable=true)
    # 4-wordcount
    dataDF.createOrReplaceTempView("t_table")
    spark.sql("select split(value,' ') from t_table").show(truncate=False)

    # +------------------------------+
    # |split(value,  , -1)           |
    # +------------------------------+
    # |[hello, you, Spark, Flink]    |
    # |[hello, me, hello, she, Spark]|
    # +------------------------------+
    
    spark.sql("select explode(split(value,' ')) as words from t_table").show(truncate=False)
    # +-----+
    # |words|
    # +-----+
    # |hello|
    # |you  |
    # |Spark|
    # |Flink|
    # |hello|
    # |me   |
    # |hello|
    # |she  |
    # |Spark|
    # +-----+
    
    spark.sql("""
        select words,count(1) as count from 
        (select explode(split(value,' ')) as words from t_table) w
        group by words
        order by count desc
    """).show(truncate=False)
    # +-----+-----+
    # | words | count |
    # +-----+-----+
    # | hello | 3 |
    # | Spark | 2 |
    # | you | 1 |
    # | me | 1 |
    # | Flink | 1 |
    # | she | 1 |
    # +-----+-----+

7. 电影案例解析

(1) 预先清洗数据
  • 1-读取后转化为to_df
    • 过滤:每一行不为空,且切割后不能缺字段
    • 映射:正式切割
    • 映射:逐一对应字段
    • toDF:分配字段
# -*- coding: utf-8 -*-
# Program function:对电影数据集的数据接入演示
'''
* 之前做数据的接入:
* 1-查看数据的格式,根据几种方式读取即可
* 2-准备环境
* 3-读取数据文件、
* 4-进行转化DF
* 5-后续统计
'''
from pyspark.sql import SparkSession
from pyspark import SparkConf
import re

if __name__ == '__main__':
    # TODO *之前做数据的接入:
    # TODO *1 - 查看数据的格式,根据几种方式读取即可--结论:通过rdd方式读取并过滤的操作,之后sc.textFile
    # TODO *2 - 准备环境
    conf = SparkConf().setAppName("testIris").setMaster("local[*]")
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")
    # TODO *3 - 读取数据文件
    movies_rdd = sc.textFile("/export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/ml-100k/u.data")
    print("movies count is:" + str(movies_rdd.count()))
    # TODO 数据清洗
    moviesDF = movies_rdd \
        .filter(lambda line: (len(line.strip()) > 0 and (len(re.split("\s+", line.strip())) == 4))) \
        .map(lambda line: re.split("\s+", line.strip())) \
        .map(lambda line: (int(line[0]), int(line[1]), float(line[2]), int(line[3]))) \
        .toDF(["userid", "itemid", "rating", "timestamp"])
    moviesDF.show()
    moviesDF.printSchema()
    # root
    # |-- userid: long (nullable = true)
    # |-- itemid: long (nullable = true)
    # |-- rating: double (nullable = true)
    # |-- timestamp: long (nullable = true)
    # TODO *4 - 进行转化DF
    # TODO *5 - 后续统计

  • 2-读取后自定义schema
    • StructType,StructField定义字段
    • createDataFrame(data,schema)
# -*- coding: utf-8 -*-
# Program function:对电影数据集的数据接入演示
'''
* 之前做数据的接入:
* 1-查看数据的格式,根据几种方式读取即可
* 2-准备环境
* 3-读取数据文件、
* 4-进行转化DF
* 5-后续统计
'''
from pyspark.sql import SparkSession
from pyspark import SparkConf
import re

if __name__ == '__main__':
    # TODO *之前做数据的接入:
    # TODO *1 - 查看数据的格式,根据几种方式读取即可--结论:通过rdd方式读取并过滤的操作,之后sc.textFile
    # TODO *2 - 准备环境
    conf = SparkConf().setAppName("testIris").setMaster("local[*]")
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")
    # TODO *3 - 读取数据文件
    movies_rdd = sc.textFile("/export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/ml-100k/u.data")
    print("movies count is:" + str(movies_rdd.count()))
    # TODO 数据清洗
    moviesData = movies_rdd \
        .filter(lambda line: (len(line.strip()) > 0 and (len(re.split("\s+", line.strip())) == 4))) \
        .map(lambda line: re.split("\s+", line.strip())) \
        .map(lambda line: (int(line[0]), int(line[1]), float(line[2]), int(line[3])))

    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

    movies_schehma = StructType([StructField("userid", StringType(), True),
                                 StructField("itemid", IntegerType(), False),
                                 StructField("rating", FloatType(), False),
                                 StructField("timestamp", IntegerType(), False),
                                 ])

    moviesDF = spark.createDataFrame(moviesData, movies_schehma)
    moviesDF.show()
    moviesDF.printSchema()
    # root
    # |-- userid: long (nullable = true)
    # |-- itemid: long (nullable = true)
    # |-- rating: double (nullable = true)
    # |-- timestamp: long (nullable = true)
    # TODO *4 - 进行转化DF
    # TODO *5 - 后续统计

(2) 实战流程
  • 1-设置shuffle做法:这种设置的好处是可以减少 Shuffle 过程中的数据传输和磁盘 IO,从而提高性能。然而,设置过小的分区数可能会导致数据不均匀的问题,从而影响性能。因此,在设置 shuffle 分区数时,需要综合考虑数据量、集群资源和任务需求等因素。
  • 2-写入数据存储
    • coalesce:减少分区操作
    • write:写入操作
    • csv:数据格式地址注意是文件夹,且不能私自创建
    • mysql:一个format和mode,五个option,一个save
# -*- coding: utf-8 -*-
# Program function:对电影数据集进行按照需求统计分析
'''
* 1-准备SparkSession
* 2-读取ratings.dat数据文件
* 3-转化为DF
* 4-使用DSL实战
* 5-使用SQL实战
* 6-将结果输出
* 7-spark.stop
'''
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
import os

os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3//bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

if __name__ == '__main__':
    # *1 - 准备SparkSession
    spark = SparkSession \
        .builder \
        .appName("testRatings") \
        .master("local[3]") \
        .config("spark.sql.shuffle.partitions", "2") \
        .getOrCreate()

    sc = spark.sparkContext
    sc.setLogLevel("WARN")  # 默认用INfo打印很多内容,4040WebUI端口
    # *2 - 读取ratings.dat数据文件
    ratingsRDD = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/ml-1m/ratings.dat")
    # 数据基础预处理
    # *3 - 转化为DF
    ratingsDF = ratingsRDD \
        .filter(lambda line: len(line.strip()) > 0 and len(line.strip().split("::")) == 4) \
        .map(lambda line: line.strip().split("::")) \
        .map(lambda p: Row(userid=int(p[0]), moviesId=int(p[1]), ratings=float(p[2]), timestamp=int(p[3]))) \
        .toDF()
    # *5 - 使用SQL实战
    # 对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)。
    # ratingsDF.printSchema()
    # ratingsDF.show(2)
    ratingsDF.createOrReplaceTempView("table_view")
    spark.sql("""
        select moviesId,round(avg(ratings),2) as cnt_rating,count(moviesId) as cnt_movies
        from table_view
        group by moviesId
        having cnt_movies>2000
        order by cnt_rating desc,cnt_movies desc
        limit 10
    """).show()
    # +--------+----------+----------+
    # |moviesId|cnt_rating|cnt_movies|
    # +--------+----------+----------+
    # |     318|      4.55|      2227|
    # |     858|      4.52|      2223|
    # |     527|      4.51|      2304|
    # |    1198|      4.48|      2514|
    # *4 - 使用DSL实战
    import pyspark.sql.functions as F

    # resultDF = ratingsDF \
    #     .select("moviesId", "ratings") \
    #     .groupby("moviesId") \
    #     .agg(F.round(F.avg("ratings"), 2).alias("cnt_rating"), F.count("moviesId").alias("cnt_movies")) \
    #     .filter("cnt_movies >2000") \
    #     .orderBy(["cnt_rating", "cnt_movies"], ascending=[0, 0]) \
    #     .limit(10)
    # resultDF.show()
    # +--------+----------+----------+
    # |moviesId|cnt_rating|cnt_movies|
    # +--------+----------+----------+
    # |     318|      4.55|      2227|
    # |     858|      4.52|      2223|
    # |     527|      4.51|      2304|
    # *6 - 将结果输出到csv
    # resultDF\
    #     .coalesce(1)\
    #     .write\
    #     .csv("/export/data/pyspark_workspace/PySpark-SparkSQL_3.1.2/data/output1")
    # *6 - 将结果输出到mysql
    # 如果没有bigdata数库可以创建一个database,create database bigdata;
    # resultDF\
    #     .coalesce(1)\
    #     .write\
    #     .format("jdbc") \
    #     .mode("overwrite") \
    #     .option("driver", "com.mysql.jdbc.Driver") \
    #     .option("url", "jdbc:mysql://node1:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true") \
    #     .option("dbtable", "bigdata.tb_top10_movies") \
    #     .option("user", "root") \
    #     .option("password", "123456") \
    #     .save()
    print("写入MySQL停止")
    # *7 - spark.stop
    time.sleep(600)
    spark.stop()

8. 数据清洗流程

(1)数据去重
  • 1-createDataFrame:(数据集,字段列表)
  • 2-dropDuplicates:删除所有字段重复记录,subset参数可以指定字段
  • 3-countDistinct:去重计数
  • 4-count:普通计数
  • 5-monotonically_increasing_id:自增的id列
01dataClean.py
# -*- coding: utf-8 -*-
# Program function:数据清洗的操作
from pyspark.sql import SparkSession
from pyspark import SparkConf
import re

if __name__ == '__main__':
    # TODO *之前做数据的接入:
    # TODO *1 - 准备环境
    conf = SparkConf().setAppName("testIris").setMaster("local[*]")
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")
    # TODO 准备好数据集
    df = spark.createDataFrame([
        (1, 144.5, 5.9, 33, 'M'),
        (2, 167.2, 5.4, 45, 'M'),
        (3, 124.1, 5.2, 23, 'F'),
        (4, 144.5, 5.9, 33, 'M'),
        (5, 133.2, 5.7, 54, 'F'),
        (3, 124.1, 5.2, 23, 'F'),
        (5, 129.2, 5.3, 42, 'M'),
    ], ['id', 'weight', 'height', 'age', 'gender'])
    # 无意义重复数据去重:数据中行与行完全重复
    # 1.首先删除完全一样的记录
    print("=================1-删除所有字段重复记录================")
    df1 = df.dropDuplicates()
    df1.show()
    
    # 2.其次,关键字段值完全一模一样的记录(在这个例子中,是指除了id之外的列一模一样)
    print("=================2-删除除了id字段重复记录================")
    # print(df1.columns)#['id', 'weight', 'height', 'age', 'gender']
    df2 = df1.dropDuplicates(subset=[c for c in df1.columns if c != 'id'])
    df2.show()
    # 3- 查看某一列是否有重复值,统计id列,及去重id列后去重
    print("=================3-查看某一列是否有重复值================")
    import pyspark.sql.functions as F
    df3 = df2.agg(F.count("id").alias("id_Count"), F.countDistinct("id").alias("id_distinct_count"))
    df3.show()
    print("=================4-自增的id列================")
    df4 = df2.withColumn("new_id", F.monotonically_increasing_id())
    df4.show()
    #+---+------+------+---+------+-------------+
    # | id|weight|height|age|gender|       new_id|
    # +---+------+------+---+------+-------------+
    # |  5| 133.2|   5.7| 54|     F|  25769803776|
    # |  1| 144.5|   5.9| 33|     M| 171798691840|
    # |  2| 167.2|   5.4| 45|     M| 592705486848|
    # |  3| 124.1|   5.2| 23|     F|1236950581248|
    # |  5| 129.2|   5.3| 42|     M|1365799600128|
    # +---+------+------+---+------+-------------+
=================2-删除除了id字段重复记录================
+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+

=================3-查看某一列是否有重复值================
+--------+-----------------+
|id_Count|id_distinct_count|
+--------+-----------------+
|       5|                4|
+--------+-----------------+

=================4-自增的id列================
+---+------+------+---+------+-------------+
| id|weight|height|age|gender|       new_id|
+---+------+------+---+------+-------------+
|  5| 133.2|   5.7| 54|     F|  25769803776|
|  1| 144.5|   5.9| 33|     M| 171798691840|
|  2| 167.2|   5.4| 45|     M| 592705486848|
|  3| 124.1|   5.2| 23|     F|1236950581248|
|  5| 129.2|   5.3| 42|     M|1365799600128|
+---+------+------+---+------+-------------+


  

(2) 数据空值处理
  • 1-row和列表推导式结合sum函数:计算每条记录的缺失值情况
  • 2-count:计算各列的非缺失值个数
  • 3-*:全部字段操作
  • 4-字段的缺失值比例:缺失的个数/总个数
  • 5-拆分技巧:agg聚合函数
  • 6-删除非空值少于thresh的缺失行:dropna(),thresh参数
  • 7-删除缺失值过于严重的列:其实是先建一个DF,不要缺失值的列
# -*- coding: utf-8 -*-
# Program function:数据清洗的操作
from pyspark.sql import SparkSession
from pyspark import SparkConf
import re

if __name__ == '__main__':
    # TODO *之前做数据的接入:
    # TODO *1 - 准备环境
    conf = SparkConf().setAppName("testIris").setMaster("local[*]")
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")
    # TODO 准备好数据集
    df_miss = spark.createDataFrame([
        (1, 143.5, 5.6, 28, 'M', 100000),
        (2, 167.2, 5.4, 45, 'M', None),
        (3, None, 5.2, None, None, None),
        (4, 144.5, 5.9, 33, 'M', None),
        (5, 133.2, 5.7, 54, 'F', None),
        (6, 124.1, 5.2, None, 'F', None),
        (7, 129.2, 5.3, 42, 'M', 76000), ],
        ['id', 'weight', 'height', 'age', 'gender', 'income'])
    # TODO 1:使用列表推导式结合sum函数--计算每条记录的缺失值情况
    dfmiss_sum = df_miss.rdd.map(lambda row: (row['id'], sum([c == None for c in row])))
    print("diss_Sum value is:", dfmiss_sum.collect())
    # diss_Sum value is: [(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]
    # 2.计算各列的非缺失值个数
    import pyspark.sql.functions as F

    df_miss.agg(F.count("id").alias("id_Count"),
                F.count("income").alias("income_count"),
                F.count("*").alias("all")).show()
    # +--------+------------+
    # |id_Count|income_count|
    # +--------+------------+
    # |       7|           2|
    # +--------+------------+
    # 因为income初始值有5,5/7=70%,业务上规定达到60%以上空值去掉该列
    df_miss.agg(*[F.count(t).alias(t + "dismissing") for t in df_miss.columns if t != 'income']).show()
    # +------------+----------------+----------------+-------------+----------------+
    # |iddismissing|weightdismissing|heightdismissing|agedismissing|genderdismissing|
    # +------------+----------------+----------------+-------------+----------------+
    # |           7|               6|               7|            5|               6|
    # +------------+----------------+----------------+-------------+----------------+
    # 如何统计出缺失值的比例
    # 方法:1- 非缺失值个数/总数据个数=缺失值比例
    df_miss.agg(*[(1 - (F.count(c) / F.count('*'))).alias(c + '_missing') for c in df_miss.columns]).show()
    # +----------+------------------+--------------+------------------+------------------+------------------+
    # |id_missing|    weight_missing|height_missing|       age_missing|    gender_missing|    income_missing|
    # +----------+------------------+--------------+------------------+------------------+------------------+
    # |       0.0|0.1428571428571429|           0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
    # +----------+------------------+--------------+------------------+------------------+------------------+
    # 拆分
    df_miss.agg((F.count("weight") / F.count("*")).alias("w_missing"),
                (F.count("height") / F.count("*")).alias("w_missing")).show()
    
    # +------------------+---------+
    # |         w_missing|w_missing|
    # +------------------+---------+
    # |0.8571428571428571|      1.0|
    # +------------------+---------+

    # 3、删除缺失值过于严重的列
    # 其实是先建一个DF,不要缺失值的列
    df_miss_no_income = df_miss.select([c for c in df_miss.columns if c != 'income'])
    df_miss_no_income.show()
    # +---+------+------+----+------+
    # | id|weight|height| age|gender|
    # +---+------+------+----+------+
    # |  1| 143.5|   5.6|  28|     M|
    # |  2| 167.2|   5.4|  45|     M|
    # |  3|  null|   5.2|null|  null|
    # |  4| 144.5|   5.9|  33|     M|
    # |  5| 133.2|   5.7|  54|     F|
    # |  6| 124.1|   5.2|null|     F|
    # |  7| 129.2|   5.3|  42|     M|
    # +---+------+------+----+------+
    # drop rows that have less than `thresh` non-null values.
    # 删除少于thread的非空行
    df_miss_no_income.dropna(thresh=3).show()
    
    # +---+------+------+----+------+
    # | id|weight|height| age|gender|
    # +---+------+------+----+------+
    # |  1| 143.5|   5.6|  28|     M|
    # |  2| 167.2|   5.4|  45|     M|
    # |  4| 144.5|   5.9|  33|     M|
    # |  5| 133.2|   5.7|  54|     F|
    # |  6| 124.1|   5.2|null|     F|
    # |  7| 129.2|   5.3|  42|     M|
    # +---+------+------+----+------+
    
    # 5、填充缺失值,可以用fillna来填充缺失值,
    # 对于bool类型、或者分类类型,可以为缺失值单独设置一个类型,missing
    # 对于数值类型,可以用均值或者中位数等填充

    # fillna可以接收两种类型的参数:
    # 一个数字、字符串,这时整个DataSet中所有的缺失值都会被填充为相同的值。
    # 也可以接收一个字典{列名:值}这样
    # 比如age,可以计算除了null之外的平均值在填充age的缺失值
    # means = df_miss_no_income.agg(*[F.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).toPandas().to_dict('records')[0]
    # 案列求解均值
    df_miss_no_income.agg(*[F.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).show()
    # +---+------------------+-----------------+----+
    # | id|            weight|           height| age|
    # +---+------------------+-----------------+----+
    # |4.0|140.28333333333333|5.471428571428571|40.4|
    # +---+------------------+-----------------+----+
    # 按照数值型的数据求解的均值按列填入缺失值
    print(df_miss_no_income.agg(*[F.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).toPandas())
    #    id      weight    height   age
    # 0  4.0  140.283333  5.471429  40.4
    means = df_miss_no_income.agg(
        *[F.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).toPandas().to_dict(orient="records")[
        0]
    print(means)  # [{'id': 4.0, 'weight': 140.283333, 'height': 5.471429, 'age': 40.4}]
    # 对于gender需要填入mising
    # {'id': 4.0, 'weight': 140.28333333333333, 'height': 5.471428571428571, 'age': 40.4, 'gender': 'missing'}
    means["gender"] = "missing"
    print(means)
    # `value` must be a mapping from column name (string)
    df_miss_no_income.fillna(means).show()
    
    # +---+------------------+------+---+-------+
    # | id|            weight|height|age| gender|
    # +---+------------------+------+---+-------+
    # |  1|             143.5|   5.6| 28|      M|
    # |  2|             167.2|   5.4| 45|      M|
    # |  3|140.28333333333333|   5.2| 40|missing|
    # |  4|             144.5|   5.9| 33|      M|
    # |  5|             133.2|   5.7| 54|      F|
    # |  6|             124.1|   5.2| 40|      F|
    # |  7|             129.2|   5.3| 42|      M|
    # +---+------------------+------+---+-------+
    
    

9. 挑选花案例

  • 1-启动sparkSession
  • 2-读取数据源
  • 3-转化为df
  • 4-执行花式查询
# -*- coding: utf-8 -*-
# Program function:
'''
1-如果数据文件没有header,直接使用sc.textFile读取
    1-1 row对象
    1-2 structType方法
2-如果数据文件有header,使用spark.read.format("csv").option("sep")
'''
from pyspark.sql import SparkSession
from pyspark import SparkConf

if __name__ == '__main__':
    # 1-启动sparksession
    conf = SparkConf().setAppName("testIris").setMaster("local[*]")
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")

    # 2-读取数据源
    file_rdd = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/iris.data")
    print("数量:",file_rdd.count())
    # 数量: 150

    # 3- 转化为df
    from pyspark.sql.types import Row

    irisDF = file_rdd \
    .filter((lambda line: len(line.strip()>0))  and (lambda line : len(line.strip().split(","))==5) ) \
    .map(lambda line: line.strip().split(",")) \
    .map(lambda x : Row(sepal_length = float(x[0]),sepal_width=float(x[1]), petal_length=float(x[2]),
                           petal_width=float(x[3]), irisclass=x[4])) \
    .toDF()

    # 4-执行花式查询操作
    irisDF.printSchema()
    irisDF.show()
    # root
    #  |-- sepal_length: double (nullable = true)
    #  |-- sepal_width: double (nullable = true)
    #  |-- petal_length: double (nullable = true)
    #  |-- petal_width: double (nullable = true)
    #  |-- irisclass: string (nullable = true)
    # 
    # +------------+-----------+------------+-----------+-----------+
    # |sepal_length|sepal_width|petal_length|petal_width|  irisclass|
    # +------------+-----------+------------+-----------+-----------+
    # |         5.1|        3.5|         1.4|        0.2|Iris-setosa|
    # |         4.9|        3.0|         1.4|        0.2|Iris-setosa|
    # |         4.7|        3.2|         1.3|        0.2|Iris-setosa|
    # |         4.6|        3.1|         1.5|        0.2|Iris-setosa|
    # |         5.0|        3.6|         1.4|        0.2|Iris-setosa|
    # |         5.4|        3.9|         1.7|        0.4|Iris-setosa|
    # |         4.6|        3.4|         1.4|        0.3|Iris-setosa|
    # |         5.0|        3.4|         1.5|        0.2|Iris-setosa|
    # |         4.4|        2.9|         1.4|        0.2|Iris-setosa|
    # |         4.9|        3.1|         1.5|        0.1|Iris-setosa|
    # |         5.4|        3.7|         1.5|        0.2|Iris-setosa|
    # |         4.8|        3.4|         1.6|        0.2|Iris-setosa|
    # |         4.8|        3.0|         1.4|        0.1|Iris-setosa|
    # |         4.3|        3.0|         1.1|        0.1|Iris-setosa|
    # |         5.8|        4.0|         1.2|        0.2|Iris-setosa|
    # |         5.7|        4.4|         1.5|        0.4|Iris-setosa|
    # |         5.4|        3.9|         1.3|        0.4|Iris-setosa|
    # |         5.1|        3.5|         1.4|        0.3|Iris-setosa|
    # |         5.7|        3.8|         1.7|        0.3|Iris-setosa|
    # |         5.1|        3.8|         1.5|        0.3|Iris-setosa|
    # +------------+-----------+------------+-----------+-----------+
    # only showing top 20 rows

    irisDF.select("sepal_length").show()
    irisDF.select(irisDF.sepal_length).show()
    irisDF.select("sepal_length", "sepal_width").show()
    
    # +------------+
    # |sepal_length|
    # +------------+
    # |         5.1|
    # |         4.9|
    # |         4.7|
    # |         4.6|
    # |         5.0|
    # |         5.4|
    # |         4.6|
    # |         5.0|
    # |         4.4|
    # |         4.9|
    # |         5.4|
    # |         4.8|
    # |         4.8|
    # |         4.3|
    # |         5.8|
    # |         5.7|
    # |         5.4|
    # |         5.1|
    # |         5.7|
    # |         5.1|
    # +------------+
    # only showing top 20 rows
    # 
    # +------------+
    # |sepal_length|
    # +------------+
    # |         5.1|
    # |         4.9|
    # |         4.7|
    # |         4.6|
    # |         5.0|
    # |         5.4|
    # |         4.6|
    # |         5.0|
    # |         4.4|
    # |         4.9|
    # |         5.4|
    # |         4.8|
    # |         4.8|
    # |         4.3|
    # |         5.8|
    # |         5.7|
    # |         5.4|
    # |         5.1|
    # |         5.7|
    # |         5.1|
    # +------------+
    # only showing top 20 rows
    # 
    # +------------+-----------+
    # |sepal_length|sepal_width|
    # +------------+-----------+
    # |         5.1|        3.5|
    # |         4.9|        3.0|
    # |         4.7|        3.2|
    # |         4.6|        3.1|
    # |         5.0|        3.6|
    # |         5.4|        3.9|
    # |         4.6|        3.4|
    # |         5.0|        3.4|
    # |         4.4|        2.9|
    # |         4.9|        3.1|
    # |         5.4|        3.7|
    # |         4.8|        3.4|
    # |         4.8|        3.0|
    # |         4.3|        3.0|
    # |         5.8|        4.0|
    # |         5.7|        4.4|
    # |         5.4|        3.9|
    # |         5.1|        3.5|
    # |         5.7|        3.8|
    # |         5.1|        3.8|
    # +------------+-----------+
    # only showing top 20 rows
    
    # 基于统计操作
    irisDF.groupby("irisclass").count().show()
    
    # +---------------+-----+
    # |      irisclass|count|
    # +---------------+-----+
    # | Iris-virginica|   50|
    # |    Iris-setosa|   50|
    # |Iris-versicolor|   50|
    # +---------------+-----+

    import pyspark.sql.functions as F

    irisDF.groupby("irisclass").agg(F.count(F.col("irisclass")).alias("variable")).show()
    
    # +---------------+--------+
    # |      irisclass|variable|
    # +---------------+--------+
    # | Iris-virginica|      50|
    # |    Iris-setosa|      50|
    # |Iris-versicolor|      50|
    # +---------------+--------+
    
    # SQL风格
    irisDF.createOrReplaceTempView("table_view")
    spark.sql("""
            select irisclass,count(1) as count
            from table_view
            group by irisclass
        """).show()
    
    # +---------------+-----+
    # |      irisclass|count|
    # +---------------+-----+
    # | Iris-virginica|   50|
    # |    Iris-setosa|   50|
    # |Iris-versicolor|   50|
    # +---------------+-----+
    

到了这里,关于Spark重温笔记(四):秒级处理庞大数据量的 SparkSQL 操作大全,能否成为你的工作备忘指南?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据课程K22——Spark的SparkSQL的API调用

    文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州 ⚪ 掌握Spark的通过api使用SparkSQL; 1. 打开scala IDE开发环境,创建一个scala工程。 2. 导入spark相关依赖jar包。 3. 创建包路径以object类。 4. 写代码。 5)打jar包,并上传到linux虚拟机上 6)在spark的bin目录下 执行: s

    2024年02月10日
    浏览(40)
  • Spark大数据处理讲课笔记3.7 Spark任务调度

    理解DAG概念 了解Stage划分 了解RDD在Spark中的运行流程 DAG(Directed Acyclic Graph) 叫做 有向无环图 ,Spark中的RDD通过一系列的转换算子操作和行动算子操作形成了一个DAG。DAG是一种非常重要的图论数据结构。如果一个有向图无法从任意顶点出发经过若干条边回到该点,则这个图就

    2024年02月09日
    浏览(51)
  • Spark大数据处理讲课笔记---Spark RDD典型案例

    利用RDD计算总分与平均分 利用RDD统计每日新增用户 利用RDD实现分组排行榜 针对成绩表,计算每个学生总分和平均分   读取成绩文件,生成lines;定义二元组成绩列表;遍历lines,填充二元组成绩列表;基于二元组成绩列表创建RDD;对rdd按键归约得到rdd1,计算总分;将rdd1映射

    2024年02月06日
    浏览(50)
  • Spark大数据处理学习笔记(2.2)搭建Spark Standalone集群

    一、在master虚拟机上安装配置Spark 1.1 将spark安装包上传到master虚拟机 下载Spark:pyw2 进入/opt目录,查看上传的spark安装包 1.2 将spark安装包解压到指定目录 执行命令: tar -zxvf spark-3.3.2-bin-hadoop3.tgz 修改文件名:mv spark-3.3.2-bin-hadoop3 spark-3.3.2 1.3 配置spark环境变量 执行命令:vim

    2024年02月09日
    浏览(51)
  • Spark大数据处理讲课笔记4.1 Spark SQL概述、数据帧与数据集

      目录 零、本讲学习目标 一、Spark SQL (一)Spark SQL概述 (二)Spark SQL功能 (三)Spark SQL结构 1、Spark SQL架构图 2、Spark SQL三大过程 3、Spark SQL内部五大组件 (四)Spark SQL工作流程 (五)Spark SQL主要特点 1、将SQL查询与Spark应用程序无缝组合 2、Spark SQL以相同方式连接多种数据

    2024年02月09日
    浏览(64)
  • Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

      目录 零、本讲学习目标 一、基本操作 二、默认数据源 (一)默认数据源Parquet (二)案例演示读取Parquet文件 1、在Spark Shell中演示 2、通过Scala程序演示 三、手动指定数据源 (一)format()与option()方法概述 (二)案例演示读取不同数据源 1、读取房源csv文件 2、读取json,保

    2024年02月09日
    浏览(44)
  • sparkSql数据离线处理--整理记录

    前言:本文作为本人学习sparkSql离线数据抽取,离线数据处理的学习整理记录,文中参考博客均附上原文链接。 1、配置文件准备: /opt/hive/conf/ hive-site.xml :(2021/12/31修改,添加了useSSL=falseuseUnicode=truecharacterEncoding=utf8支持中文编码) 若要在idea环境下运行要把 hdfs-site.xml core-site

    2023年04月09日
    浏览(41)
  • Spark大数据处理讲课笔记--- RDD持久化机制

    理解RDD持久化的必要性 了解RDD的存储级别 学会如何查看RDD缓存 Spark中的RDD是懒加载的,只有当遇到行动算子时才会从头计算所有RDD,而且当同一个RDD被多次使用时,每次都需要重新计算一遍,这样会严重增加消耗。为了避免重复计算同一个RDD,可以将RDD进行持久化。 Spark中

    2024年02月06日
    浏览(44)
  • SparkSQL-对数据缺失和异常值进行处理

    缺失数据 准备数据  处理方式 丢弃规则 填充规则 异常数据  方法1:丢弃处理  方法2:替换处理 准备数据  处理方式 1、丢弃/过滤:DataFrame.na.drop() 2、填充:DataFrame.na.fill() 3、替换:DataFrame.na.replace() 丢弃规则 1、any 一行中有任何一个是NaN/null就丢弃 2、all 只有一行中所有

    2024年02月05日
    浏览(58)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包