【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎

这篇具有很好参考价值的文章主要介绍了【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

【大家好,我是爱干饭的猿,本文重点介绍、SparkSQL的运行流程、 SparkSQL的自动优化、Catalyst优化器、SparkSQL的执行流程、Spark On Hive原理配置、分布式SQL执行引擎概念、代码JDBC连接。

后续会继续分享其他重要知识点总结,如果喜欢这篇文章,点个赞👍,关注一下吧】

上一篇文章:《【SparkSQL】SparkSQL函数定义(重点:定义UDF函数、使用窗口函数)》

5. SparkSQL的运行流程

5.1 SparkRDD的执行流程回顾

【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎,spark,分布式,spark,hive
代码->DAG调度器逻辑任务->Task调度器任务分配和管理监控-> Worker干活

5.2 SparkSQL的自动优化

RDD的运行会完全按照开发者的代码执行, 如果开发者水平有限,RDD的执行效率也会受到影响。

而SparkSQL会对写完的代码,执行“自动优化”, 以提升代码运行效率,避免开发者水平影响到代码执行效率。

问:为什么SparkSQL可以自动优化而RDD不可以?

RDD:内含数据类型不限格式和结构
DataFrame:100% 是二维表结构,可以被针对SparkSQL的自动优化,依赖于:Catalyst优化器

5.3 Catalyst优化器

为了解决过多依赖Hive 的问题, SparkSQL使用了一个新的SQL优化器替代 Hive 中的优化器,这个优化器就是Catalyst,整个SparkSQL的架构大致如下:
【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎,spark,分布式,spark,hive

  1. API层简单的说就是Spark会通过一些API接受SQL语句
  2. 收到SQL语句以后,将其交给Catalyst, Catalyst负责解析SQL,生成执行计划等
  3. Catalyst的输出应该是RDD的执行计划
  4. 最终交由集群运行

具体流程:
【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎,spark,分布式,spark,hive
step1:解析SQL,并且生成AST(抽象语法树)
【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎,spark,分布式,spark,hive
Step 2:在AST中加入元数据信息做这一步主要是为了一些优化。例如 col = col这样的条件,下图是一个简略图,便于理解
【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎,spark,分布式,spark,hive

  • score.id →id#1#L为score.id生成id为1,类型是Long
  • score.math_score → math_score#2#L为score.math_score 生成id为2,类型为Long
  • people.id → id#3#L为people.id生成 id为3,类型为Long
  • people.age →age#4#L为people.age 生成 id为4,类型为Long

Step 3:对已经加入元数据的AST,输入优化器,进行优化,从两种常见的优化开始,简单介绍:
【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎,spark,分布式,spark,hive

  • 断言下推 Predicate Pushdown,将Filter这种可以减小数据集的操作下推,放在Scan 的位置,这样可以减少操作时候的数据量。断言下推后,会先过滤age,然后在JOIN,减少JOIN的数据量提高性能.
  • 列值裁剪Column Pruning,在断言下推后执行裁剪,由于people表之上的操作只用到了 id 列,所以可以把其它列裁剪掉,这样可以减少处理的数据量,从而优化处理速度

Step 4:上面的过程生成的AST其实最终还没办法直接运行,这个AST叫做逻辑计划,结束后,需要生成.物理计划,从而生成RDD来运行

  • 在生成物理计划的时候,会经过成本模型对整棵树再次执行优化,选择一个更好的计划
  • 在生成物理计划以后,因为考虑到性能,所以会使用代码生成,在机器中运行

可以使用queryExecution方法查看逻辑执行计划,使用explain方法查看物理执行计划:
【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎,spark,分布式,spark,hive
小结:

catalyst的各种优化细节非常多,大方面的优化点有2个:

  • 谓词下推(Predicate Pushdown)\断言下推:将逻辑判断提前到前面,以减少shuffle阶段的数据量
  • 列值裁剪(Column Pruning):将加载的列进行裁剪,尽量减少被处理数据的|宽度

大白话:

  • 行过滤,提前执行where
  • 列过滤,提前规划select的字段数量

思考:列值裁剪,有一种非常合适的存储系统: parquet

5.4 SparkSQL的执行流程

【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎,spark,分布式,spark,hive

  1. 提交SparkSQL代码
  2. catalyst优化
    a. 生成原始AST语法数
    b.标记AST元数据
    c.进行断言下推和列值裁剪以及其它方面的优化作用在AST上
    d.将最终AST得到,生成执行计划
    e.将执行计划翻译为RDD代码
  3. Driver执行环境入口构建(SparkSession)
  4. DAG调度器规划逻辑任务
  5. TASK调度区分配逻辑任务到具体Executor上工作并监控管理任务
  6. Worker干活.

5.5 总结

  1. DataFrame因为存储的是二维表数据结构,可以被针对,所以可以
    自动优化执行流程。
  2. 自动优化依赖Catalyst优化器
  3. 自动优化2个大的优化项是:1. 断言(谓词)下推(行过滤) 2. 列
    值裁剪(列过滤)
  4. DataFrame代码在被优化有,最终还是被转换成RDD去执行

6 Spark On Hive

6.1 原理

  • 回顾Hive的组件

对于Hive来说,就2东西:

  1. SQL优化翻译器(执行引擎),翻译SQL到MapReduce并提交到YARN执行
  2. MetaStore元数据管理中心
  • Spark On Hive

对于Spark来说,自身是一个执行引擎,但是 Spark自己没有元数据管理功能,当我们执行:
SELECT * FROM person WHERE age > 10的时候,Spark完全有能力将SQL变成RDD提交
但是问题是, Person的数据在哪? Person有哪些字段?字段啥类型? Spark完全不知道了
不知道这些东西,如何翻译RDD运行

在SparkSQL代码中可以写SQL那是因为,表是来自DataFrame注册的,DataFrame中有数据,有字段,有类型,足够Spark用来翻译RDD用
如果以不写代码的角度来看,SELECT * FROM person WHERE age > 10 spark无法翻译,因为没有元数据

  • 解决方案

Spark提供执行引擎能力
Hive的MetaStore 提供元数据管理功能.
让Spark和Metastore连接起来,那么:

Spark On Hive 就有了:

  1. 引擎: spark
  2. 元数据管理: metastore

总结:
Spark On Hive 就是把Hive的MetaStore服务拿过来给Spark做元数据管理用而已.
市面上元数据管理的框架很多,为什么非要用Hive内置的MetaStore

6.2 配置

根据原理,就是Spark能够连接上Hive的MetaStore就可以了.
所以:

  1. MetaStore需要存在并开机
  2. Spark知道MetaStore在哪里( IP端口号)

步骤1:
在spark的conf目录中,创建hive-site.xml
【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎,spark,分布式,spark,hive
步骤2:

将mysql的驱动jar包放入spark的jars目录
因为要连接元数据,会有部分功能连接到mysql库,需要mysql驱动包

步骤3:

确保Hive 配置了MetaStore相关的服务
检查hive配置文件目录内的: hive-site.xml
确保有如下配置:
【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎,spark,分布式,spark,hive
步骤4:
启动hive的MetaStore服务:
【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎,spark,分布式,spark,hive
nohup :后台启动程序的命令,使用
nohup xxx命令&将命令后台执行,日志输出到当前目录的nohup.out中
nohup xxx命令 2>&1>>某路径下的日志文件&,将命令后台执行,将日志输出到你指定的路径中

测试:
bin/pyspark:在里面直接写spark.sql(“sql语句”).show()即可

或者:
bin/spark-sql:可以直接写sql语句

6.3 在代码中集成

# coding:utf8
import string
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse").\
        config("hive.metastore.uris", "thrift://node3:9083").\
        enableHiveSupport().\
        getOrCreate()
    sc = spark.sparkContext

    spark.sql("SELECT * FROM student").show()

6.4 总结

Spark On Hive 就是因为Spark自身没有元数据管理功能, 所以使用
Hive的Metastore服务作为元数据管理服务。计算由Spark执行。

7. 分布式SQL执行引擎

7.1 概念

Spark中有一个服务叫做: ThriftServer服务,可以启动并监听在10000端口
这个服务对外提供功能,我们可以用数据库工具或者代码连接上来直接写SQL即可操作spark
【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎,spark,分布式,spark,hive

当使用ThriftServer后,相当于是一个持续性的Spark On Hive集成模式.它提供10000端口,持续对外提供服务,外部可以通过这个端口连接上来, 写SQL, 让Spark运行

7.2 客户端工具连接

1.确保已经配置好了Spark On Hive
2.启动ThriftServer即可
【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎,spark,分布式,spark,hive

7.3 代码JDBC连接

【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎,spark,分布式,spark,hive

# coding:utf8

from pyhive import hive

if __name__ == '__main__':
    # 获取到Hive(Spark ThriftServer的链接)
    conn = hive.Connection(host="node1", port=10000, username="hadoop")

    # 获取一个游标对象
    cursor = conn.cursor()

    # 执行SQL
    cursor.execute("SELECT * FROM student")

    # 通过fetchall API 获得返回值
    result = cursor.fetchall()

    print(result)

7.4 总结

分布式SQL执行引擎就是使用Spark提供的ThriftServer服务,以“后台进程”的模式持续运行,对外提供端口。

可以通过客户端工具或者代码,以JDBC协议连接使用。

SQL提交后,底层运行的就是Spark任务。相当于构建了一个以MetaStore服务为元数据,Spark为执行引擎的数据库服务,像操作数据库那样方便的操作SparkSQL进行分布式的SQL
计算。文章来源地址https://www.toymoban.com/news/detail-767278.html

到了这里,关于【SparkSQL】SparkSQL的运行流程 & Spark On Hive & 分布式SQL执行引擎的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SparkSQL与Hive整合(Spark On Hive)

    hive metastore元数据服务用来存储元数据,所谓元数据,即hive中库、表、字段、字段所属表、表所属库、表的数据所在目录及数据分区信息。元数据默认存储在hive自带的Derby数据库。在内嵌模式和本地模式下,metastore嵌入在主hive server进程中。但在远程模式下,metastore 和 hive

    2024年02月12日
    浏览(64)
  • Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive 完全分布式高可用集群搭建(保姆级超详细含图文)

    说明: 本篇将详细介绍用二进制安装包部署hadoop等组件,注意事项,各组件的使用,常用的一些命令,以及在部署中遇到的问题解决思路等等,都将详细介绍。 ip hostname 192.168.1.11 node1 192.168.1.12 node2 192.168.1.13 node3 1.2.1系统版本 1.2.2内存建议最少4g、2cpu、50G以上的磁盘容量 本次

    2024年02月12日
    浏览(37)
  • Spark on Hive及 Spark SQL的运行机制

    代码中集成Hive: Spark SQL底层依然运行的是Spark RDD的程序,所以说Spark RDD程序的运行的流程,在Spark SQL中依然是存在的,只不过在这个流程的基础上增加了从SQL翻译为RDD的过程 Spark SQL的运行机制,其实就是在描述如何将Spark SQL翻译为RDD程序 Catalyst内部具体的执行流程: 专业术

    2024年01月23日
    浏览(37)
  • 分布式计算框架:Spark、Dask、Ray 分布式计算哪家强:Spark、Dask、Ray

    目录 什么是分布式计算 分布式计算哪家强:Spark、Dask、Ray 2 选择正确的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式计算是一种计算方法,和集中式计算是相对的。 随着计算技术的发展, 有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成

    2024年02月11日
    浏览(44)
  • spark分布式解压工具

    ​ spark解压缩工具,目前支持tar、gz、zip、bz2、7z压缩格式,默认解压到当前路下,也支持自定义的解压输出路径。另外支持多种提交模式,进行解压任务,可通过自定义配置文件,作为spark任务的资源设定 2.1 使用hadoop的FileSystem类,对tos文件的进行读取、查找、写入等操作

    2024年02月02日
    浏览(32)
  • 分布式内存计算Spark环境部署与分布式内存计算Flink环境部署

    目录 分布式内存计算Spark环境部署 1.  简介 2.  安装 2.1【node1执行】下载并解压 2.2【node1执行】修改配置文件名称 2.3【node1执行】修改配置文件,spark-env.sh 2.4 【node1执行】修改配置文件,slaves 2.5【node1执行】分发 2.6【node2、node3执行】设置软链接 2.7【node1执行】启动Spark集群

    2024年02月08日
    浏览(52)
  • Spark弹性分布式数据集

    1. Spark RDD是什么 RDD(Resilient Distributed Dataset,弹性分布式数据集)是一个不可变的分布式对象集合,是Spark中最基本的数据抽象。在代码中RDD是一个抽象类,代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。 每个RDD都被分为多个分区,这些分区运行在集群中

    2024年02月13日
    浏览(39)
  • Spark分布式内存计算框架

    目录 一、Spark简介 (一)定义 (二)Spark和MapReduce区别 (三)Spark历史 (四)Spark特点 二、Spark生态系统 三、Spark运行架构 (一)基本概念 (二)架构设计 (三)Spark运行基本流程 四、Spark编程模型 (一)核心数据结构RDD (二)RDD上的操作 (三)RDD的特性 (四)RDD 的持

    2024年02月04日
    浏览(45)
  • 分布式计算MapReduce | Spark实验

    题目1 输入文件为学生成绩信息,包含了必修课与选修课成绩,格式如下: 班级1, 姓名1, 科目1, 必修, 成绩1 br (注: br 为换行符) 班级2, 姓名2, 科目1, 必修, 成绩2 br 班级1, 姓名1, 科目2, 选修, 成绩3 br ………., ………, ………, ………, ……… br 编写两个Hadoop平台上的MapRed

    2024年02月08日
    浏览(40)
  • 【Spark分布式内存计算框架——Spark 基础环境】1. Spark框架概述

    第一章 说明 整个Spark 框架分为如下7个部分,总的来说分为Spark 基础环境、Spark 离线分析和Spark实时分析三个大的方面,如下图所示: 第一方面、Spark 基础环境 主要讲述Spark框架安装部署及开发运行,如何在本地模式和集群模式运行,使用spark-shell及IDEA开发应用程序,测试及

    2024年02月11日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包