【spark】SparkSQL

这篇具有很好参考价值的文章主要介绍了【spark】SparkSQL。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SparkSQL

01.快速入门

什么是SparkSQL

SparkSQL是Spark的一个模块,用于处理海量结构化数据

为什么学习SparkSQL

SparkSQL是非常成熟的海量结构化数据处理框架:
学习SparkSQL主要在2个点:

  • SparkSQL本身十分优秀,支持SQL语言、性能强、可以自动优化、API简单、兼容HIVE等等
  • 企业大面积在使用SparkSQL处理业务数据
    1、离线开发
    2、数仓搭建
    3、科学计算
    4、数据分析

SparkSQL的特点

  • 融合性:SQL可以无缝集成在代码中,随时用SQL处理数据
  • 统一数据访问:一套标准API可读写不同数据源
  • Hive兼容:可以使用SparkSQL直接计算并生成Hive数据表
  • 标准化连接:支持标准化JDBC/ODBC连接,方便和各种数据源进行数据交互

SparkSQL发展历史-前身Shark框架

【spark】SparkSQL,spark,spark,大数据,分布式

SparkSQL发展历史

【spark】SparkSQL,spark,spark,大数据,分布式

02.SparkSQL概述

SparkSQL和Hive的异同

相同点:
1、分布式SQL计算引擎
2、构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能

不同点:
【spark】SparkSQL,spark,spark,大数据,分布式

SparkSQL的数据抽象

1、SparkSQL-DataFrame

- 二维表数据结构
- 分布式结构集合(分区)

2、SparkSQL FOor JVM-DataSet【可用于Java\Scala\语言】
3、SparkSQL For Python\R-DataFrame【可用于Java\Scale\Python\R】

DataFrame概述

【spark】SparkSQL,spark,spark,大数据,分布式

【spark】SparkSQL,spark,spark,大数据,分布式
DataFrame是按照二维表格的形式存储数据
RDD则是存储对象本身

SparkSession对象

在RDD阶段,程序的执行入口对象是SparkContext
在Sparke2.0后,推出SparkSeaaion对象,作为Spark编码的统一入口对象

SparkSession对象可以:
1、用于SparkSQL编程作为入口对象
2、用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
【spark】SparkSQL,spark,spark,大数据,分布式

03.DataFrame入门和操作

DataFrame的组成

DataFrame是一个二维表结构,那么表格结构就有无法绕开的三个点:

  • 表结构表述
    比如MySQL中的一张表:

  • 由许多行组成

  • 数据也可以被分成多个列

  • 表也有表结构信息(列、列名、列类型、列约束等)
    在结构层面上:

  • StructType对象描述整个DataFrame的表结构

  • StructField对象描述一个列的信息
    在数据层面上:

  • Row对象记录一行数据

  • Column对象记录一列数据并包含列的信息

【spark】SparkSQL,spark,spark,大数据,分布式
在表结构层面,DataFrame的表结构由:
StructType描述:

struct_type = StructType().\
    add("id",IntegerType(),False).\
    add("name",StringType(),True).\
    add("age",IntegerType(),False)

一个StructField记录:列名、列类型、列是否运行为空
多个StructField组成一个StructedType对象
一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空

一行数据描述为Row对象,如Row(1,张三,11)
一列数据描述为Column对象,Column对象包含一列数据和列的信息

DataFrame的代码构建-基于RDD-1
#coding:utf8

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 0.构建执行环境入口对象SparkSession
    spark = SparkSesion.builder.\
            appName("test").\
            master("local[*]").\
            getOrcreate()
    #1.基于RDD转换成DataFrame
    sc = spark.sparkContext
    rdd = sc.testFile("../data/input/sql/people.txt).map(lambda x:x.spalit(",")).map(lambda x:(x[0],int(x[1])))
     #2.构建DataFrame对象
     ## 参数一:被转换的rdd
     ## 参数二:指定列名,通过list的形式指定,按照顺序依次提供字符串名称即可
     df = spark.createDataFrame(rdd,schema=['name','age'])
     df.printSchema()
     ## 参数一:表示 展示出来多少条数据,默认不传的话是20
     ## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替
     ## 如果给False,表示不截断全部显示,默认是True
     df.show(20,False)
    
    
         
DataFrame的代码构建-基于RDD-2
#coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType

if __name__ == '__main__':
    # 0.构建执行环境入口对象SparkSession
    spark = SparkSesion.builder.\
            appName("test").\
            master("local[*]").\
            getOrcreate()
    #1.基于RDD转换成DataFrame
    sc = spark.sparkContext
    rdd = sc.testFile("../data/input/sql/people.txt).map(lambda x:x.spalit(",")).map(lambda x:(x[0],int(x[1])))
    
    schema=StructType().add('name',StringType(),True).add('age'IntegerType(),False)
     df = spark.createDataFrame(rdd,schema)
     df.printSchema()
     ## 参数一:表示 展示出来多少条数据,默认不传的话是20
     ## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替
     ## 如果给False,表示不截断全部显示,默认是True
     df.show(20,False)
DataFrame的代码构建-基于RDD-3

该方法用于对数据类型不敏感

#coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType

if __name__ == '__main__':
    # 0.构建执行环境入口对象SparkSession
    spark = SparkSesion.builder.\
            appName("test").\
            master("local[*]").\
            getOrcreate()
    #1.基于RDD转换成DataFrame
    sc = spark.sparkContext
    rdd = sc.testFile("../data/input/sql/people.txt).map(lambda x:x.spalit(",")).map(lambda x:(x[0],int(x[1])))
    
    # toDF的方式构建DataFrame
    df1 = rdd.toDF(['name','age'])
    # 方法二
    schema=StructType().add('name',StringType(),True).add('age'IntegerType(),False)
    rdd.toDF(schema)
    df1.printSchema()
     ## 参数一:表示 展示出来多少条数据,默认不传的话是20
     ## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替
     ## 如果给False,表示不截断全部显示,默认是True
    df1.show(20,False)

DataFrame的代码构建-基于Pandas的DataFrame
#coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType

if __name__ == '__main__':
    # 0.构建执行环境入口对象SparkSession
    spark = SparkSesion.builder.\
            appName("test").\
            master("local[*]").\
            getOrcreate()
    #1.基于RDD转换成DataFrame
    sc = spark.sparkContext
    # 基于Pandas的DataFrame构建SparkSQL的DataFrame对象
    pdf = pd.DataFrame({'id':[1,2,3],'name':['张大仙','王小小','王大锤'],'age':[11,11,11]})
    # 将Pandas的DF对象转换成SparkDF
    df1 = spark.createDataFrame(pdf)
    df1.printSchema()
     ## 参数一:表示 展示出来多少条数据,默认不传的话是20
     ## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替
     ## 如果给False,表示不截断全部显示,默认是True
    df1.show(20,False)

DataFrame的代码构建-读取外部数据-text

构建StructType,text数据源,读取数据的特点是,是将一整行只作为一个列读取,默认列名是value 类型是String

spark session.read.format(“text|csv|json|parquet|orc|avro|jdbc…”)
.option(“k”,“v”)#option可选
.schema(StructType|String)#STRING的语法如。Schema(“name STRING”,“age INT” )
.load(“被读取文件的路径,支持本地文件系统和HDFS”)

#coding:utf8

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 0.构建执行环境入口对象SparkSession
    spark = SparkSesion.builder.\
            appName("test").\
            master("local[*]").\
            getOrcreate()
    #1.基于RDD转换成DataFrame
    sc = spark.sparkContext
    # 构建StructType,text数据源,读取数据的特点是,是将一整行只作为一个列读取,默认列名是value 类型是String
    schema = StructType().add('data',StirngType(),True)
    df = spark.read.format('text').schema(schema=schema).load('../data/input/sql/people.txt')

     df.printSchema()
     ## 参数一:表示 展示出来多少条数据,默认不传的话是20
     ## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替
     ## 如果给False,表示不截断全部显示,默认是True
     df.show(20,False)
    
DataFrame的代码构建-读取外部数据-json

json类型自带有Schema信息

#coding:utf8

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 0.构建执行环境入口对象SparkSession
    spark = SparkSesion.builder.\
            appName("test").\
            master("local[*]").\
            getOrcreate()
    #1.基于RDD转换成DataFrame
    sc = spark.sparkContext
    # json类型自带有Schema信息
    schema = StructType().add('data',StirngType(),True)
    df = spark.read.format('json').load('../data/input/sql/people.txt')

     df.printSchema()
     ## 参数一:表示 展示出来多少条数据,默认不传的话是20
     ## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替
     ## 如果给False,表示不截断全部显示,默认是True
     df.show(20,False)
DataFrame的代码构建-读取外部数据-csv
#coding:utf8

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 0.构建执行环境入口对象SparkSession
    spark = SparkSesion.builder.\
            appName("test").\
            master("local[*]").\
            getOrcreate()
    #1.基于RDD转换成DataFrame
    sc = spark.sparkContext
    # json类型自带有Schema信息
    schema = StructType().add('data',StirngType(),True)
    df = spark.read.format('csv').\
         option('sep',';').\
         option('header',True).\
         option('encoding','utf-8').\
         schema('name STRING age INT,job STRING').\
         load('../data/input/sql/people.txt')

     df.printSchema()
     df.show(20,False)
DataFrame的代码构建-读取外部数据-parquet

parquet:是spark中常用的一种列式存储文件格式,和Hive中ORC差不多,他俩都是列存储格式

parquet对比普通文本文件的区别

  • parquet内置schema(列名、列类型、是否为空)
  • 存储是以列作为存储格式
  • 存储时序列化存储在文件中的,有压缩属性体积小
#coding:utf8

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 0.构建执行环境入口对象SparkSession
    spark = SparkSesion.builder.\
            appName("test").\
            master("local[*]").\
            getOrcreate()
    #1.基于RDD转换成DataFrame
    sc = spark.sparkContext
    # parquet类型自带有Schema信息
    schema = StructType().add('data',StirngType(),True)
    df = spark.read.format('parquet').load('../data/input/sql/people.txt')

     df.printSchema()
     df.show(20,False)

DataFrame的入门操作

DataFrame支持两种风格进行编程,分别是:

  • DSL风格:DataFrame的特有API,调用API的方式来处理Data
#coding:utf8

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 0.构建执行环境入口对象SparkSession
    spark = SparkSesion.builder.\
            appName("test").\
            master("local[*]").\
            getOrcreate()
    #1.基于RDD转换成DataFrame
    sc = spark.sparkContext
    # parquet类型自带有Schema信息
    df = spark.read.format('csv').load('../data/input/sql/people.txt')
    # column对象的获取
    id_column = df['id']
    subject_column = df['subject']
    # DSL风格演示
    df.select(["id","subject"]).show()
    df.select ("id","subject").show()
    df.select(id_column,subject_column) 
    # filter API
    df.filter("score < 99").show()
    df.filter(df['score'] < 99).show()
    # where API
    df.where("score < 99").show()
    df.where(df['score'] < 99).show()
    # group by API
    df.groupBy("subject").count().show()
    df.groupBy(df['subject']).count().show()

  • SQL风格:spark.sql(“select * from XXX”)
    使用sparj.sql()来执行SQL语句查询,结果返回一个DataFrame
df.createTempView("score") #注册一个临时视图
df.createOrReplaceTempView("socre") #注册一个临时表,如果存在,进行替换
df.createGlobalTempView("score") # 注册一个全局表

全局表:跨sparksession对象使用,在一个程序内的多个sparkSession中均可调用,查询前带上前缀
global_temp.

SparkSQL数据清洗API

  • 去重方法:dropDuplicates
  • 缺失值处理:
    • dropna 是可以对缺失值进行删除;只要列中有null 就删除这一行数据
      参数:thread=3表示,至少满足3个有效列,不满足就删除当前数据
    • fillna(“loss”) 对缺失值的列进行填充
    • fillna(“N/A”,subset=[‘job’])指定列进行填充
    • fillna({‘name’:‘未知姓名’,‘age’:1,‘job’:‘worker’})设定一个字典,对所有的列提供填充规则

DataFrame数据写出

SparkSQL 统一API写出DataFrame数据
df.write.mode().format().option(K,V).save(PATH)

  • mode,传入模式字符串可选:append追加,overwrite覆盖,ignore忽略,error重复就报异常(默认的)
  • format,传入格式字符串,可选:text,csv,json,parquet,orc,avro,jdbc
  • save 写出的路径,支持本地文件和HDFS

04.SparkSQL函数定义

SparkSQL定义UDF

pyspark UDF

SparkSQL使用窗口函数

  • 聚合开窗函数

  • 排序开窗函数
    – ROW_NUMBER() OVER()
    –DENSE_RANK() OVER()
    –RANK() OVER()

  • NTILE分组窗口

05.SparkSQL的运行流程

SparkSQL的自动优化

RDD的运行完全是按照开发者的代码执行,如果开发者水平有限,RDD的执行效率也会收到影响
而SparkSQL会对写完的代码,执行“自动优化”,以提升代码运行效率,避免开发者水平影响到代码执行效率;依赖于:Catalyst优化器

Catalyst优化器

【spark】SparkSQL,spark,spark,大数据,分布式
STEP1:解析SQL,并生成AST(抽象语法树)
【spark】SparkSQL,spark,spark,大数据,分布式【spark】SparkSQL,spark,spark,大数据,分布式
【spark】SparkSQL,spark,spark,大数据,分布式

大方面的优化点有2个:

  • 谓词下推、断言下推:将逻辑判断提前到前面,以减少shuffle阶段的数据量
  • 列值剪裁:将加载的列进行剪裁,尽量减少被处理数据的宽度

SparkSQL的执行流程

【spark】SparkSQL,spark,spark,大数据,分布式

06.SparkSQL整合Hive

Hive执行流程

【spark】SparkSQL,spark,spark,大数据,分布式

SparkOn Hive

【spark】SparkSQL,spark,spark,大数据,分布式

Spark On Hive就是因为Spark自身没有元数据管理功能,所以使用Hive的Metastore服务做为元数据管理服务。计算有Spark执行

07.分布式SQL引擎配置

【spark】SparkSQL,spark,spark,大数据,分布式文章来源地址https://www.toymoban.com/news/detail-808280.html

到了这里,关于【spark】SparkSQL的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据课程K2——Spark的RDD弹性分布式数据集

    文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州 ⚪ 了解Spark的RDD结构; ⚪ 掌握Spark的RDD操作方法; ⚪ 掌握Spark的RDD常用变换方法、常用执行方法; 初学Spark时,把RDD看做是一个集合类型(类似于Array或List),用于存储数据和操作数据,但RDD和普通集合的区别

    2024年02月12日
    浏览(50)
  • Spark大数据分析与实战笔记(第三章 Spark RDD 弹性分布式数据集-02)

    人生很长,不必慌张。你未长大,我要担当。 传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。Spark中的RDD可以很好的解决这一缺点。 RDD是Spark提供的最重要的抽象概念

    2024年02月22日
    浏览(83)
  • 大数据开源框架环境搭建(七)——Spark完全分布式集群的安装部署

    前言:七八九用于Spark的编程实验 大数据开源框架之基于Spark的气象数据处理与分析_木子一个Lee的博客-CSDN博客_spark舆情分析 目录 实验环境: 实验步骤: 一、解压 二、配置环境变量:  三、修改配置文件  1.修改spark-env.sh配置文件: 2.修改配置文件slaves: 3.分发配置文件:

    2024年02月11日
    浏览(49)
  • 云计算与大数据第16章 分布式内存计算平台Spark习题

    1、Spark是Hadoop生态(  B  )组件的替代方案。 A. Hadoop     B. MapReduce        C. Yarn             D.HDFS 2、以下(  D  )不是Spark的主要组件。 A. Driver      B. SparkContext       C. ClusterManager D. ResourceManager 3、Spark中的Executor是(  A  )。 A.执行器      B.主节

    2024年02月14日
    浏览(104)
  • 分布式计算中的大数据处理:Hadoop与Spark的性能优化

    大数据处理是现代计算机科学的一个重要领域,它涉及到处理海量数据的技术和方法。随着互联网的发展,数据的规模不断增长,传统的计算方法已经无法满足需求。因此,分布式计算技术逐渐成为了主流。 Hadoop和Spark是目前最为流行的分布式计算框架之一,它们都提供了高

    2024年01月23日
    浏览(54)
  • 数据存储和分布式计算的实际应用:如何使用Spark和Flink进行数据处理和分析

    作为一名人工智能专家,程序员和软件架构师,我经常涉及到数据处理和分析。在当前大数据和云计算的时代,分布式计算已经成为了一个重要的技术方向。Spark和Flink是当前比较流行的分布式计算框架,它们提供了强大的分布式计算和数据分析功能,为数据处理和分析提供了

    2024年02月16日
    浏览(58)
  • 分布式计算框架:Spark、Dask、Ray 分布式计算哪家强:Spark、Dask、Ray

    目录 什么是分布式计算 分布式计算哪家强:Spark、Dask、Ray 2 选择正确的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式计算是一种计算方法,和集中式计算是相对的。 随着计算技术的发展, 有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成

    2024年02月11日
    浏览(64)
  • Spark单机伪分布式环境搭建、完全分布式环境搭建、Spark-on-yarn模式搭建

    搭建Spark需要先配置好scala环境。三种Spark环境搭建互不关联,都是从零开始搭建。 如果将文章中的配置文件修改内容复制粘贴的话,所有配置文件添加的内容后面的注释记得删除,可能会报错。保险一点删除最好。 上传安装包解压并重命名 rz上传 如果没有安装rz可以使用命

    2024年02月06日
    浏览(76)
  • spark分布式解压工具

    ​ spark解压缩工具,目前支持tar、gz、zip、bz2、7z压缩格式,默认解压到当前路下,也支持自定义的解压输出路径。另外支持多种提交模式,进行解压任务,可通过自定义配置文件,作为spark任务的资源设定 2.1 使用hadoop的FileSystem类,对tos文件的进行读取、查找、写入等操作

    2024年02月02日
    浏览(45)
  • 分布式内存计算Spark环境部署与分布式内存计算Flink环境部署

    目录 分布式内存计算Spark环境部署 1.  简介 2.  安装 2.1【node1执行】下载并解压 2.2【node1执行】修改配置文件名称 2.3【node1执行】修改配置文件,spark-env.sh 2.4 【node1执行】修改配置文件,slaves 2.5【node1执行】分发 2.6【node2、node3执行】设置软链接 2.7【node1执行】启动Spark集群

    2024年02月08日
    浏览(71)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包