Spark SQL join的三种实现方式

这篇具有很好参考价值的文章主要介绍了Spark SQL join的三种实现方式。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

引言

join是SQL中的常用操作,良好的表结构能够将数据分散到不同的表中,使其符合某种规范(mysql三大范式),可以最大程度的减少数据冗余,更新容错等,而建立表和表之间关系的最佳方式就是join操作。

对于Spark来说有3种Join的实现,每种Join对应的不同的应用场景(SparkSQL自动决策使用哪种实现范式):

1.Broadcast Hash Join:适合一张很小的表和一张大表进行Join;

2.Shuffle Hash Join:适合一张小表(比上一个大一点)和一张大表进行Join;

2.Sort Merge Join:适合两张大表进行Join;

前两者都是基于Hash Join的,只不过Hash Join之前需要先shuffle还是先brocadcast。下面详细解释一下这三种Join的具体原理。

Hash Join

先来看看这样一条SQL语句:select * from order,item where item.id = order.i_id,参与join的两张表是order和item,join key分别是item.id以及order.i_id。现在假设Join采用的是hash join算法,整个过程会经历三步:

1.确定Build Table以及Probe Table:这个概念比较重要,Build Table会被构建成以join key为key的hash table,而Probe Table使用join key在这张hash table表中寻找符合条件的行,然后进行join链接。Build表和Probe表是Spark决定的。通常情况下,小表会被作为Build Table,较大的表会被作为Probe Table。

2.构建Hash Table:依次读取Build Table(item)的数据,对于每一条数据根据Join Key(item.id)进行hash,hash到对应的bucket中(类似于HashMap的原理),最后会生成一张HashTable,HashTable会缓存在内存中,如果内存放不下会dump到磁盘中。

3.匹配:生成Hash Table后,在依次扫描Probe Table(order)的数据,使用相同的hash函数(在spark中,实际上就是要使用相同的partitioner)在Hash Table中寻找hash(join key)相同的值,如果匹配成功就将两者join在一起。

这里有两个问题需要关注:

1.hash join性能如何?很显然,hash join基本都只扫描两表一次,可以认为O(a+b),较之最极端的是笛卡尔积运算O(a*b);

2.为什么Build Table选择小表?道理很简单,因为构建Hash Table时,最好可以把数据全部加载到内存中,因为这样效率才最高,这也决定了hash join只适合于较小的表,如果是两个较大的表的场景就不适用了。

上文说,hash join是传统数据库中的单机join算法,在分布式环境在需要经过一定的分布式改造,说到底就是尽可能利用分布式计算资源进行并行计算,提高总体效率,hash join分布式改造一般有以下两种方案:

1.broadcast hash join:将其中一张较小的表通过广播的方式,由driver发送到各个executor,大表正常被分成多个区,每个分区的数据和本地的广播变量进行join(相当于每个executor上都有一份小表的数据,并且这份数据是在内存中的,过来的分区中的数据和这份数据进行join)。broadcast适用于表很小,可以直接被广播的场景;

2.shuffle hash join:一旦小表比较大,此时就不适合使用broadcast hash join了。这种情况下,可以对两张表分别进行shuffle,将相同key的数据分到一个分区中,然后分区和分区之间进行join。相当于将两张表都分成了若干小份,小份和小份之间进行hash join,充分利用集群资源。

Broadcast Hash Join

大家都知道,在数据库的常见模型中(比如星型模型或者雪花模型),表一般分为两种:事实表和维度表,维度表一般指固定的、变动较少的表,例如联系人、物品种类,一般数据有限;而事实表一遍记录流水,比如销售清单等,通过随着时间的增长不断增长。

因为join操作是对两个表中key相同的记录进行连接,在SparkSQL中,对两个表做join的最直接的方式就是先根据key进行分区,再在每个分区中把key相同的记录拿出来做连接操作,但这样不可避免的涉及到shuffle,而shuffle是spark中比较耗时的操作,我们应该尽可能的设计spark应用使其避免大量的shuffle操作。

Broadcast Hash Join的条件有以下几个:

1.被广播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的信息,默认是10M;

2.基表不能被广播,比如left outer join时,只能广播右表。

看起来广播是一个比较理想的方案,但它有没有缺点呢?缺点也是很明显的,这个方案只能广播较小的表,否则数据的冗余传输就是远大于shuffle的开销;另外,广播时需要被广播的表collect到driver端,当频繁的广播出现时,对driver端的内存也是一个考验。

broadcast hash join可以分为两步:

1.broadcast阶段:将小表广播到所有的executor上,广播的算法有很多,最简单的是先发给driver,driver再统一分发给所有的executor,要不就是基于bittorrete的p2p思路;

2.hash join阶段:在每个executor上执行 hash join,小表构建为hash table,大表的分区数据匹配hash table中的数据;

Shuffle Hash Join

当一侧的表比较小时,我们可以选择将其广播出去以避免shuffle,提高性能。但因为被广播的表首先被collect到driver端,然后被冗余的发送给各个executor上,所以当表比较大是,采用broadcast join会对driver端和executor端造成较大的压力。

我们可以通过将大表和小表都进行shuffle分区,然后对相同节点上的数据的分区应用hash join,即先将较小的表构建为hash table,然后遍历较大的表,在hash table中寻找可以匹配的hash值,匹配成功进行join连接。这样既在一定程度上减少了driver广播表的压力,也减少了executor端读取整张广播表的内存消耗。

Sshuffle Hash Join分为两步:

1.对两张表分别按照join key进行重分区(分区函数相同的时候,相同的相同分区中的key一定是相同的),即shuffle,目的是为了让相同join key的记录分到对应的分区中;

2.对对应分区中的数据进行join,此处先将小表分区构建为一个hash表,然后根据大表中记录的join key的hash值拿来进行匹配,即每个节点山单独执行hash算法。

Shuffle Hash Join的条件有以下几个:

  1. 分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M

  2. 基表不能被广播,比如left outer join时,只能广播右表

  3. 一侧的表要明显小于另外一侧,小的一侧将被广播(明显小于的定义为3倍小,此处为经验值)

看到这里,可以初步总结出来如果两张小表join可以直接使用单机版hash join;如果一张大表join一张极小表,可以选择broadcast hash join算法;而如果是一张大表join一张小表,则可以选择shuffle hash join算法;那如果是两张大表进行join呢?

Sort Merge Join

上面介绍的方式只对于两张表有一张是小表的情况适用,而对于两张大表,但当两个表都非常大时,显然无论哪种都会对计算内存造成很大的压力。这是因为join时两者采取都是hash join,是将一侧的数据完全加载到内存中,使用hash code取join key相等的记录进行连接。

当两个表都非常大时,SparkSQL采用了一种全新的方案来对表进行Join,即Sort Merge Join。这种方式不用将一侧数据全部加载后再进行hash join,但需要在join前将数据进行排序。

首先将两张表按照join key进行重新shuffle,保证join key值相同的记录会被分在相应的分区,分区后对每个分区内的数据进行排序,排序后再对相应的分区内的记录进行连接。可以看出,无论分区有多大,Sort Merge Join都不用把一侧的数据全部加载到内存中,而是即用即丢;因为两个序列都有有序的,从头遍历,碰到key相同的就输出,如果不同,左边小就继续取左边,反之取右边。从而大大提高了大数据量下sql join的稳定性。

SparkSQL对两张大表join采用了全新的算法-sort-merge join,整个过程分为三个步骤:

  1. shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理;

  2. sort阶段:对单个分区节点的两表数据,分别进行排序;

  3. merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则取更小一边文章来源地址https://www.toymoban.com/news/detail-676662.html

到了这里,关于Spark SQL join的三种实现方式的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Spark基础】-- RDD 转 Dataframe 的三种方式

    目录 一、环境说明 二、RDD 转 Dataframe 的方法 1、通过 StructType 创建 Dataframe(强烈推荐使用这种方法)

    2024年01月19日
    浏览(27)
  • PostgreSQL数据库命令行执行SQL脚本的三种方式

    生成环境中,出于安全性等原因,往往不提供数据库连接工具,所以对数据库的更新和升级就得通过命令行来实现。本文总结了三种命令行执行sql脚本的方式。 命令格式: psql [option…] [dbname] [username] 常用参数介绍: -h:指定IP地址或主机名。 -p:指定端口,默认为5432。 -U:

    2024年02月11日
    浏览(48)
  • Hudi Spark-SQL增量查询数据几种方式

    由于项目上主要用Hive查询Hudi,所以之前总结过一篇:Hive增量查询Hudi表。最近可能会有Spark SQL增量查询Hudi表的需求,并且我发现目前用纯Spark SQL的形式还不能直接增量查询Hudi表,于是进行学习总结一下。 先看一下官方文档上Spark SQL增量查询的方式,地址:https://hudi.apache.or

    2024年02月11日
    浏览(28)
  • 还原Sql Server数据库BAK备份文件的三种方式及常见错误

    这是演示的是Sql Server 2008R2版本,不同版本可能有细微差别 右键点击数据库→还原数据库    在还原的源中选择源设备→点击选择框  在指定备份中点击添加→选择具体文件→确定→确定  勾选用于还原的备份集→这时目标数据库中会自动生成目标数据库名,在此选择即可→

    2023年04月08日
    浏览(46)
  • Hudi-集成Spark之spark-sql方式

    启动spark-sql 创建表 建表参数: 参数名 默认值 说明 primaryKey uuid 表的主键名,多个字段用逗号分隔。同 hoodie.datasource.write.recordkey.field preCombineField 表的预合并字段。同 hoodie.datasource.write.precombine.field type cow 创建的表类型: type = ‘cow’ type = \\\'mor’同 hoodie.datasource.write.table.ty

    2024年02月05日
    浏览(33)
  • mybatisplus开启sql打印的三种方式

            使用mybatisplus自带的log-impl配置,可以在控制台打印出sql语句、执行结果的数据集、数据结果条数等详细信息,这种方法适合再调试的时候使用,因为这个展示的信息详细,更便于调试,查找问题进行优化。缺点就是如果执行的sql语句过多,则输出的日志就会很多,

    2024年02月05日
    浏览(45)
  • Hudi(7):Hudi集成Spark之spark-sql方式

    目录 0. 相关文章链接 1. 创建表 1.1. 启动spark-sql 1.2. 建表参数 1.3. 创建非分区表 1.4. 创建分区表 1.5. 在已有的hudi表上创建新表 1.6. 通过CTAS (Create Table As Select)建表 2. 插入数据 2.1. 向非分区表插入数据 2.2. 向分区表动态分区插入数据 2.3. 向分区表静态分区插入数据 2.4

    2024年02月06日
    浏览(35)
  • Spark-SQL连接JDBC的方式及代码写法

    提示:文章内容仅供参考! 目录 一、数据加载与保存 通用方式: 加载数据: 保存数据: 二、Parquet 加载数据: 保存数据: 三、JSON 四、CSV  五、MySQL SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的

    2024年02月13日
    浏览(23)
  • Spark【Spark SQL(二)RDD转换DataFrame、Spark SQL读写数据库 】

    Saprk 提供了两种方法来实现从 RDD 转换得到 DataFrame: 利用反射机制推断 RDD 模式 使用编程方式定义 RDD 模式 下面使用到的数据 people.txt :         在利用反射机制推断 RDD 模式的过程时,需要先定义一个 case 类,因为只有 case 类才能被 Spark 隐式地转换为DataFrame对象。 注意

    2024年02月09日
    浏览(44)
  • 解决执行 spark.sql 时版本不兼容的一种方式

    场景描述 hive 数据表的导入导出功能部分代码如下所示,使用 assemble 将 Java 程序和 spark 相关依赖一起打成 jar 包,最后 spark-submit 提交 jar 到集群执行。 在CDH6.3.2 集群(后面称CDH),当程序执行 spark.sql 导入本地磁盘 csv 数据到 hive 表时出现异常(如下),但导出表数据到本地

    2024年02月12日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包