PySpark-Spark SQL基本介绍

这篇具有很好参考价值的文章主要介绍了PySpark-Spark SQL基本介绍。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

Spark SQL基本介绍

Spark SQL特点

Spark SQL与Hive的异同

Spark SQL的数据结构

Spark SQL的入门

创建SparkSession对象

DataFrame详解

DataFrame基本介绍

 DataFrame的构建方式

RDD构建DataFrame

 内部初始化数据得到DataFrame

schema总结

读取外部文件得到DataFrame

Text方式读取

CSV方式读取

JSON方式读取


Spark SQL基本介绍

概念:Spark SQL是Spark多种组件中其中一个,主要是用于处理大规模的结构化数据

结构化数据:可以转化为二维表格的数据,一份数据,每一行,每一列的了下都是一致的,我们将这样的数据称为结构化数据

例如:mysql的表数据

        1 张三 10

        2 李四 18

        3 王五 20

Spark SQL特点

1.融合性:既可以使用标准SQL语言,也可以编写代码,同时支持混合使用

2.统一的数据访问:可以通过统一的API来对接不同的数据源

3.Hive的兼容性:Spark SQL可以和Hive进行整合,整合后替换执行引擎为Spark,核心是基于Hive的metastore来管理元数据

4.标准化连接:Spark SQL也支持JDBC/ODBC的连接方式

Spark SQL与Hive的异同

相同点:

        1.都是分布式SQL计算引擎

        2.都可以处理大规模的结构化数据

        3.都可以建立YARN集群上运行

不同点:

        1.Spark SQL是基于内存计算,而Hive SQL是基于磁盘进行计算的

        2.Spark SQL没有元数据管理服务(自己维护),而Hive SQL是有metastore元数据管理服务的

        3.Spark SQL底层执行的是Spark RDD程序,而Hive SQL底层执行的是mapreduce程序

        4.Spark SQL可以编写SQL也可以编写代码,而Hive SQL仅能编写SQL语句

Spark SQL的数据结构

Pandas/SparkCore/SparkSQL数据结构对比
核心 数据结构 区别
Pandas DataFrame

二维表数据结构

处理单机(本地集合)结构数据

SparkCore RDD

无标准数据结构(任何的数据结构)

大规模的分布式结构数据(分区)

SparkSQL DataFrame

二维表格结构

大规模的分布式结构数据(分区)

PySpark-Spark SQL基本介绍,spark,大数据,分布式

以图为例:

RDD:存储直接就是对象,存储就是一个Person的对象,无法看到对象的数据内容

DataFrame:将Person中的各个字段数据,进行结构化存储,形成一个DataFrame,可以直接看到数据

Dataset:将Person对象中的数据按照结构化的方式存储,同时保留对象的类型,从而知道来源于开一个Person对象

Spark SQL的入门

创建SparkSession对象

        Spark SQL需要将顶级对象SparkContext变成SparkSesssion对象。SparkContext是RDD中的顶级对象,里面没有和SQL编程相关的API/方法。通过SparkSession对象还是可以得到SparkContext对象。

# 如何构建一个SparkSession对象呢?
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 创建SparkSQL中的顶级对象SparkSession
    """
        注意事项:
        1- SparkSession和builder都没有小括号
        2- appName():给应用程序取名词。等同于SparkCore中的setAppName()
        3- master():设置运行时集群类型。等同于SparkCore中的setMaster()
    """
    spark = SparkSession.builder\
        .appName('create_sparksession_demo')\
        .master('local[*]')\
        .getOrCreate()

    # 通过SparkSQL的顶级对象获取SparkCore中的顶级对象
    sc = spark.sparkContext

    # 释放资源
    sc.stop()
    spark.stop()

DataFrame详解

DataFrame基本介绍

PySpark-Spark SQL基本介绍,spark,大数据,分布式

DataFrame:表示的是一个二维得表,存在行,列等表结构描述信息

表结构描述信息(元数据schema):strucType对象

字段:structField对象,可以描述字段名称,字段数据类型,是否可以为空

行:Row对象

列:column对象,包括字段名称和字段值

在一个StructType对象下,由多个StructField组成,构建成一个完整的元数据信息

PySpark-Spark SQL基本介绍,spark,大数据,分布式

 DataFrame的构建方式

RDD构建DataFrame

场景:RDD可以存储任意结构的数据;而DataFrame只能处理二维表数据。在使用Spark处理数据的初期,可能输入进来的数据是半结构化或者是非结构化的数据,那么我可以先通过RDD对数据进行ETL处理成结构化数据,再使用开发效率高的SparkSQL来对后续数据进行处理分析。

# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField

# 绑定指定的python解释器


os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('通过Rdd创建DataFrame')
    # 创建SparkSession对象
    spark = SparkSession \
        .builder.appName('rdd_to_DataFrame_demo') \
        .master('local[*]') \
        .getOrCreate()
    # 通过SparkSession对象创建SparkContext顶级对象
    sc = spark.sparkContext
    # 数据输入
    # 构建rdd
    init_rdd = sc.parallelize(['1,张三,18', '2,李四,20', '3,王五,22'])
    # 将qrdd数据结构转成二维结构
    new_rdd = init_rdd.map(lambda line: (
        int(line.split(',')[0]),
        line.split(',')[1],
        int(line.split(',')[2], )))
    # 将RDD转成DataFrame:方式一
    # 构建schema方式一
    schema = StructType() \
        .add('id', IntegerType(), False) \
        .add('name', StringType(), False) \
        .add('age', IntegerType(), False)

    # 构建schema方式二
    schema = StructType([
        StructField('id', IntegerType(), False),
        StructField('name', StringType(), False),
        StructField('age', IntegerType(), False),
    ])

    # 构建schema方式三
    schema = "id:int,name:string,age:int"
    schema = "id int,name string,age int"

    # 构建schema方式四,不能指定字段类型
    schema = ['id', 'name', 'age']
    # 构建DataFrame
    init_df = spark.createDataFrame(data=new_rdd, schema=schema)
    # 数据输出
    init_df.show()
    # 输出schema
    init_df.printSchema()
    print('=' * 50)
    # 将RDD转成DataFrame:方式二
    """
    toDF:中的schema既可以传List,也可以传字符串形式的schema信息
    """
    # 方式一:传入列表
    init_df2 = new_rdd.toDF(schema=['id', 'name', 'age'])
    # 方式一:传入字符串
    init_df2 = new_rdd.toDF(schema="id:int,name:string,age:int")
    init_df2 = new_rdd.toDF(schema="id int,name string,age int")
    # 数据输出
    init_df2.show()
    # 输出schema
    init_df2.printSchema()


    # 释放资源
    spark.stop()
    sc.stop()

 内部初始化数据得到DataFrame

场景:一般用在开发和测试中。因为只能处理少量的数据

# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField

# 绑定指定的python解释器


os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('内部初始化数据得到DataFrame')
    # 创建SparkSession对象
    spark = SparkSession \
        .builder \
        .appName('inner_create_dataframe') \
        .master('local[*]') \
        .getOrCreate()
    # 2- 数据输入
    """
        内部初始化数据得到DataFrame
        通过createDataFrame创建DataFrame,schema数据类型可以是:DataType、字符串、List
            字符串:格式要求
                格式一 字段1 字段类型,字段2 字段类型
                格式二(推荐) 字段1:字段类型,字段2:字段类型

            List:格式要求
                ["字段1","字段2"]
    """
    # 方式一
    init_df = spark.createDataFrame(
        data=[(1, '张三', 18), (2, '李四', 20), (3, '王五', 22)],
        schema='id int,name string,age int'
    )

    # 方式二
    init_df = spark.createDataFrame(
        data=[(1, '张三', 18), (2, '李四', 20), (3, '王五', 22)],
        schema='id:int,name:string,age:int'
    )

    # 方式三,列表形式不能指定字段类型,有输入的数据自动推断字段类型
    init_df = spark.createDataFrame(
        data=[(1, '张三', 18), (2, '李四', 20), (3, '王五', 22)],
        schema=['id', 'name', 'age']
    )

    # 数据输出
    init_df.show()
    # 输出schema信息
    init_df.printSchema()

    # 是否资源
    spark.stop()

schema总结

通过createDataFrame创建DataFrame,schema数据类型可以是:DataType、字符串、List
1: 字符串
    格式一 字段1 字段类型,字段2 字段类型
    格式二(推荐) 字段1:字段类型,字段2:字段类型
    
2: List
    ["字段1","字段2"]
    
3: DataType(推荐,用的最多)
    格式一 schema = StructType()\
            .add('id',IntegerType(),False)\
            .add('name',StringType(),True)\
            .add('age',IntegerType(),False)

    格式二 schema = StructType([
            StructField('id',IntegerType(),False),
            StructField('name',StringType(),True),
            StructField('age',IntegerType(),False)
          ])

读取外部文件得到DataFrame

复杂API

统一API格式:

Sparksession.read

        .format('text | csv | json | parquet | orc |  avro | jdbc | ......')    # 读取外部文件的方式

        .option('k','v')          # 选项,可以设置相关的参数(可选)

        .schema(structType | string)  # 设置表的结构信息

        .load('加载数据路径')    # 读取外部文件的路径,支持HDFS也支持本地

 简写API

注意:所有的复杂API外部读取方式,都有简单的写法,spark内置了一些常用的读取方案的简写

格式:

           spark.read.读取方式()

例如:

        df=spark.read.csv(

                path='文件路径',

                header=True,

                sep=' ',

                inferschema=True,

                encoding='utf-8'

        )

Text方式读取

load:支持读取HDFS文件系统和本地文件系统
            HDFS文件系统:hdfs://node1:8020/文件路径
            本地文件系统:file:///文件路径
            
        text方式读取文件总结:
            1- 不管文件中内容是什么样的,text会将所有内容全部放到一个列中处理
            2- 默认生成的列名叫value,数据类型string
            3- 我们只能够在schema中修改字段value的名称,其他任何内容不能修改

# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

# 绑定指定的python解释器


os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    # text方式读取
    print('text方式读取外部文件')
    # 创建sparksession对象
    spark = SparkSession.builder.appName('text_demo').master('local[*]').getOrCreate()
    # 复杂API方式
    # 数据输入
    init_df = spark.read \
        .format('text') \
        .schema('my_file string') \
        .load('file:///export/data/pyspark_projects/02_spark_sql/data/stu.txt')
    # 数据输出
    init_df.show()
    # 输出schema
    init_df.printSchema()

    # 简写API方式
    init_df = spark.read.text(
        paths='file:///export/data/pyspark_projects/02_spark_sql/data/stu.txt'
    )
    init_df.show()
    # 输出schema
    init_df.printSchema()
CSV方式读取

csv格式读取外部文件总结:
    1- 复杂API和简写API都须掌握
    2- 相关参数作用说明:
        2.1- path:指定读取的文件路径。支持HDFS和本地文件路径
        2.2- schema:手动指定元数据信息
        2.3- sep:指定字段间的分隔符
        2.4- encoding:指定文件的编码方式
        2.5- header:指定文件中的第一行是否是字段名称
        2.6- inferSchema:根据数据内容自动推断数据类型。但是,推断结果可能不精确

# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

# 绑定指定的python解释器


os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    # json方式读取
    print('csv方式读取外部文件')
    # 创建sparksession对象
    spark = SparkSession.builder.appName('csv_demo').master('local[*]').getOrCreate()
    # 复杂API方式
    # 数据输入
    init_df = spark.read \
        .format('csv') \
        .option('sep', ' ') \
        .option('encoding', 'utf8') \
        .option('header', 'True') \
        .schema(schema='id int,name string,address string,sex string,age int') \
        .load('file:///export/data/pyspark_projects/02_spark_sql/data/stu.txt')
    # 数据输出
    init_df.show()
    # 输出schema
    init_df.printSchema()
    print('='*50)
    #简写API方式
    init_df = spark.read.csv(
        path='file:///export/data/pyspark_projects/02_spark_sql/data/stu.txt',
        schema='id int,name string,address string,sex string,age int',
        sep=' ',
        encoding='utf8',
        header=True
    )
    init_df.show()
    # 输出schema
    init_df.printSchema()
JSON方式读取

json读取数据总结:
     1- 需要手动指定schema信息。如果手动指定的时候,字段名称与json中的key名称不一致,会解析不成功,以null值填充
     2- csv/json中schema的结构,如果是字符串类型,那么字段名称和字段数据类型间,只能以空格分隔文章来源地址https://www.toymoban.com/news/detail-792126.html

# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

# 绑定指定的python解释器


os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    # json方式读取
    print('json方式读取外部文件')
    # 创建sparksession对象
    spark = SparkSession.builder.appName('json_demo').master('local[*]').getOrCreate()
    # 复杂API方式
    # 数据输入
    init_df = spark.read \
        .format('json') \
        .option('sep', ':') \
        .option('header', 'True') \
        .option('encoding', 'utf8') \
        .schema(schema='id int,name string,age int,address string') \
        .load('file:///export/data/pyspark_projects/02_spark_sql/data/json.txt')
    # 数据输出
    init_df.show()
    # 输出schema
    init_df.printSchema()
    print('=' * 50)
    # # 简写API方式
    init_df = spark.read.json(
        path='file:///export/data/pyspark_projects/02_spark_sql/data/json.txt',
        schema='id int,name string,age int,address string',
        encoding='utf8'
    )
    init_df.show()
    # 输出schema
    init_df.printSchema()

    # 释放资源
    spark.stop()

到了这里,关于PySpark-Spark SQL基本介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 《PySpark大数据分析实战》-12.Spark on YARN配置Spark运行在YARN上

    📋 博主简介 💖 作者简介:大家好,我是wux_labs。😜 热衷于各种主流技术,热爱数据科学、机器学习、云计算、人工智能。 通过了TiDB数据库专员(PCTA)、TiDB数据库专家(PCTP)、TiDB数据库认证SQL开发专家(PCSD)认证。 通过了微软Azure开发人员、Azure数据工程师、Azure解决

    2024年02月03日
    浏览(55)
  • 介绍 Apache Spark 的基本概念和在大数据分析中的应用

    Apache Spark是一种基于内存计算的大数据处理框架,它支持分布式计算,并且能够处理比传统处理框架更大量的数据。以下是Apache Spark的一些基本概念和在大数据分析中的应用: RDD (Resilient Distributed Dataset):RDD是Spark的核心概念,它是一个分布式的、不可变的数据集。RDD可以从

    2024年02月13日
    浏览(54)
  • 介绍 Apache Spark 的基本概念和在大数据分析中的应用。

    Apache Spark 是一个快速的开源大数据处理引擎,可以用于大数据处理、机器学习、图形计算等领域。它可以在多种计算环境中运行,包括独立模式、YARN、Mesos、Kubernetes等云计算平台。 Spark基于RDD(Resilient Distributed Datasets)模型,RDD是一个不可变的分布式对象集合,可通过并行

    2024年02月10日
    浏览(42)
  • Python与大数据:Hadoop、Spark和Pyspark的应用和数据处理技巧

      在当今的数字时代,数据成为了无处不在的关键资源。大数据的崛起为企业提供了无限的机遇,同时也带来了前所未有的挑战。为了有效地处理和分析大规模数据集,必须依靠强大的工具和技术。在本文中,我们将探讨Python在大数据领域的应用,重点介绍Hadoop、Spark和Pysp

    2024年02月16日
    浏览(44)
  • Python大数据之PySpark(三)使用Python语言开发Spark程序代码

    Spark Standalone的PySpark的搭建----bin/pyspark --master spark://node1:7077 Spark StandaloneHA的搭建—Master的单点故障(node1,node2),zk的leader选举机制,1-2min还原 【scala版本的交互式界面】bin/spark-shell --master xxx 【python版本交互式界面】bin/pyspark --master xxx 【提交任务】bin/spark-submit --master xxxx 【学

    2024年01月17日
    浏览(54)
  • Spark SQL基本操作

    Spark SQL基本操作 将下列json数据复制到你的ubuntu系统/usr/local/spark下,并保存命名为employee.json 首先为employee.json创建DataFrame,并写出Python语句完成以下操作: 创建DataFrame 查询DataFrame的所有数据 查询所有数据,并去除重复的数据 查询所有数据,打印时去除id字段 筛选age30的记录

    2024年02月05日
    浏览(61)
  • Spark On Hive配置测试及分布式SQL ThriftServer配置

    Spark本身是一个执行引擎,而没有管理metadate的能力,当我们在执行SQL的时候只能将SQL转化为RDD提交。而对于一些数据中的元数据Spark并不知道,而Spark能写SQL主要是通过DataFrame进行注册的。 这时候我们就可以借助Hive中的MetaStore进行元数据管理。也就是说把Hive中的metastore服务

    2024年01月21日
    浏览(46)
  • Spark基本介绍

    目录 Spark是什么 一、Spark与MapReduce对比区别 二、Spark的发展 三、Spark的特点 四、Spark框架模块         Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎,是一个分布式计算框架。 Spark中新的数据结构RDD(弹性分布式数据集),使得大数据分析能够

    2024年02月05日
    浏览(32)
  • 【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎

    【大家好,我是爱干饭的猿,本文重点介绍、SparkSQL的运行流程、 SparkSQL的自动优化、Catalyst优化器、SparkSQL的执行流程、Spark On Hive原理配置、分布式SQL执行引擎概念、代码JDBC连接。 后续会继续分享其他重要知识点总结,如果喜欢这篇文章,点个赞👍,关注一下吧】 上一篇

    2024年02月04日
    浏览(48)
  • 工具系列:PyCaret介绍_Fugue 集成_Spark、Dask分布式训练

    Fugue 是一个低代码的统一接口,用于不同的计算框架,如 Spark、Dask。PyCaret 使用 Fugue 来支持分布式计算场景。 让我们从最标准的例子开始,代码与本地版本完全相同,没有任何魔法。 compare_model 如果您不想使用分布式系统,也完全相同。 现在让我们将其分布式,作为一个玩

    2024年02月04日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包