Spark学习(6)-Spark SQL

这篇具有很好参考价值的文章主要介绍了Spark学习(6)-Spark SQL。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1 快速入门

SparkSQL是Spark的一个模块, 用于处理海量结构化数据
SparkSQL是非常成熟的 海量结构化数据处理框架.
学习SparkSQL主要在2个点:

  • SparkSQL本身十分优秀, 支持SQL语言\性能强\可以自动优化\API简单\兼容HIVE等等。
  • 企业大面积在使用SparkSQL处理业务数据。
    • 离线开发
    • 数仓搭建
    • 科学计算
    • 数据分析

特点:
sparksql,大数据,spark,学习

2 SparkSQL概述

2.1 SparkSQL和Hive的异同

sparksql,大数据,spark,学习

2.2 SparkSQL的数据抽象

sparksql,大数据,spark,学习
sparksql,大数据,spark,学习

2.3 SparkSession对象

在RDD阶段,程序的执行入口对象是: SparkContext
在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。
SparkSession对象可以:

  • 用于SparkSQL编程作为入口对象。
  • 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext

所以,后续执行环境入口对象,统一变更为SparkSession对象。

sparksql,大数据,spark,学习
2.4 SparkSession对象

# coding:utf8
# SparkSQL 中的入口对象是SparkSession对象
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 构建SparkSession对象, 这个对象是 构建器模式 通过builder方法来构建
spark = SparkSession.builder.\
appName("local[*]").\
config("spark.sql.shuffle.partitions", "4").\
getOrCreate()
# appName 设置程序名称, config设置一些常用属性
# 最后通过getOrCreate()方法 创建SparkSession对象

3 DataFrame入门和操作

3.1 DataFrame的组成

在结构层面:

  • StructType对象描述整个DataFrame的表结构。
  • StructField对象描述一个列的信息。

在数据层面:

  • Row对象记录一行数据。
  • Column对象记录一列数据并包含列的信。

StructType描述,如下图:
sparksql,大数据,spark,学习
一个StructField记录:列名、列类型、列是否运行为空。
多个StructField组成一个StructType对象。
一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空。

3.2 DataFrame的代码构建 - 基于RDD方式1

DataFrame对象可以从RDD转换而来,都是分布式数据集,其实就是转换一下内部存储的结构,转换为二维表结构。

# 首先构建一个RDD rdd[(name, age), ()]
rdd = sc.textFile("../data/sql/people.txt").\
map(lambda x: x.split(',')).\
map(lambda x: [x[0], int(x[1])]) # 需要做类型转换, 因为类型从RDD中探测
# 构建DF方式1
df = spark.createDataFrame(rdd, schema = ['name', 'age'])

通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame,这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)。

3.3 DataFrame的代码构建 - 基于RDD方式2

通过StructType对象来定义DataFrame的“表结构”转换RDD

# 创建DF , 首先创建RDD 将RDD转DF
rdd = sc.textFile("../data/sql/stu_score.txt").\
map(lambda x:x.split(',')).\
map(lambda x:(int(x[0]), x[1], int(x[2])))
# StructType 类
# 这个类 可以定义整个DataFrame中的Schema
schema = StructType().\
add("id", IntegerType(), nullable=False).\
add("name", StringType(), nullable=True).\
add("score", IntegerType(), nullable=False)
# 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add, 每一个add代表一个StructField
# add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
df = spark.createDataFrame(rdd, schema)

3.4 DataFrame的代码构建 - 基于RDD方式3

使用RDD的toDF方法转换RDD

# StructType 类
# 这个类 可以定义整个DataFrame中的Schema
schema = StructType().\
add("id", IntegerType(), nullable=False).\
add("name", StringType(), nullable=True).\
add("score", IntegerType(), nullable=False)
# 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
# add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
# 方式1: 只传列名, 类型靠推断, 是否允许为空是true
df = rdd.toDF(['id', 'subject', 'score'])
df.printSchema()
df.show()
# 方式2: 传入完整的Schema描述对象StructType
df = rdd.toDF(schema)
df.printSchema()
df.show()

3.5 DataFrame的代码构建 - 基于Pandas的DataFrame

将Pandas的DataFrame对象,转变为分布式的SparkSQL

# 构建Pandas的DF
pdf = pd.DataFrame({
"id": [1, 2, 3],
"name": ["张大仙", '王晓晓', '王大锤'],
"age": [11, 11, 11]
})
# 将Pandas的DF对象转换成Spark的DF
df = spark.createDataFrame(pdf)

3.6 DataFrame的代码构建 - 读取外部数据

通过SparkSQL的统一API进行数据读取构建DataFrame

sparksession.read.format("text|csv|json|parquet|orc|avro|jdbc|......")
.option("K", "V") # option可选
.schema(StructType | String) # STRING的语法如.schema("name STRING", "age INT")
.load("被读取文件的路径, 支持本地文件系统和HDFS")
读取text数据源

使用format(“text”)读取文本数据,读取到的DataFrame只会有一个列,列名默认称之为:value。

schema = StructType().add("data", StringType(), nullable=True)
df = spark.read.format("text")\
.schema(schema)\
.load("../data/sql/people.txt")
读取json数据源

使用format(“json”)读取json数据

df = spark.read.format("json").\
load("../data/sql/people.json")
# JSON 类型 一般不用写.schema, json自带, json带有列名 和列类型(字符串和数字)
df.printSchema()
df.show()
读取csv数据源

使用format(“csv”)读取csv数据

df = spark.read.format("csv")\
.option("sep", ";")\ # 列分隔符
.option("header", False)\ # 是否有CSV标头
.option("encoding", "utf-8")\ # 编码
.schema("name STRING, age INT, job STRING")\ # 指定列名和类型
.load("../data/sql/people.csv") # 路径
df.printSchema()
df.show()
读取parquet数据源

使用format(“parquet”)读取parquet数据

# parquet 自带schema, 直接load啥也不需要了
df = spark.read.format("parquet").\
load("../data/sql/users.parquet")
df.printSchema()
df.show()

注意:
parquet: 是Spark中常用的一种列式存储文件格式,和Hive中的ORC差不多, 他俩都是列存储格式
parquet对比普通的文本文件的区别:

  • parquet 内置schema (列名\ 列类型\ 是否为空)。
  • 存储是以列作为存储格式。
  • 存储是序列化存储在文件中的(有压缩属性体积小)。

Parquet文件不能直接打开查看,如果想要查看内容,可以在PyCharm中安装如下插件来查看:
sparksql,大数据,spark,学习

3.7 DataFrame的入门操作

DataFrame支持两种风格进行编程,分别是:DSL风格和SQL风格。

  • DSL语法风格:
    DSL称之为:领域特定语言。其实就是指DataFrame的特有API,DSL风格意思就是以调用API的方式来处理Data。比如:df.where().limit()
  • SQL语法风格
    SQL风格就是使用SQL语句处理DataFrame的数据,比如:spark.sql(“SELECT * FROM xxx)
DSL - show 方法

功能:展示DataFrame中的数据, 默认展示20条。
语法:

df.show(参数1, 参数2)
- 参数1: 默认是20, 控制展示多少条
- 参数2: 是否阶段列, 默认只输出20个字符的长度, 过长不显示, 要显示的话 请填入 truncate = True
DSL - printSchema方法

功能:打印输出df的schema信息
语法:

df.printSchema()

例如:
sparksql,大数据,spark,学习

DSL - select

功能:选择DataFrame中的指定列(通过传入参数进行指定)
语法:
sparksql,大数据,spark,学习
可传递:

  • 可变参数的cols对象,cols对象可以是Column对象来指定列或者字符串
    列名来指定列。
  • List[Column]对象或者List[str]对象, 用来选择多个列。

sparksql,大数据,spark,学习

DSL - filter和where

功能:过滤DataFrame内的数据,返回一个过滤后的DataFrame
语法:

df.filter()
df.where()

where和filter功能上是等价的。
sparksql,大数据,spark,学习

DSL - groupBy 分组

功能:按照指定的列进行数据的分组, 返回值是GroupedData对象
语法:

df.groupBy()

传入参数和select一样,支持多种形式,不管怎么传意思就是告诉spark
按照哪个列分组。

sparksql,大数据,spark,学习

GroupedData对象

GroupedData对象是一个特殊的DataFrame数据集,其类全名:<class 'pyspark.sql.group.GroupedData'>,这个对象是经过groupBy后得到的返回值, 内部记录了 以分组形式存储的数据。
GroupedData对象其实也有很多API,像:min、max、avg、sum、等等许多方法都存在。

SQL风格语法 - 注册DataFrame成为表

DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中,使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。
如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:
sparksql,大数据,spark,学习

SQL风格语法 - 使用SQL查询

sparksql,大数据,spark,学习

pyspark.sql.functions 包

PySpark提供了一个包: pyspark.sql.functions,这个包里面提供了 一系列的计算函数供SparkSQL使用。

导包
from pyspark.sql import functions as F

3.8 SparkSQL Shuffle 分区数目

sparksql,大数据,spark,学习

3.9 SparkSQL 数据清洗API

在大数据处理之前,首先要对数据进行清洗,有去重,删除缺值,填充缺值等等。
sparksql,大数据,spark,学习
sparksql,大数据,spark,学习
sparksql,大数据,spark,学习

3.10 DataFrame数据写出

sparksql,大数据,spark,学习
sparksql,大数据,spark,学习

3.11 DataFrame 通过JDBC读写数据库(MySQL示例)

sparksql,大数据,spark,学习
sparksql,大数据,spark,学习
sparksql,大数据,spark,学习文章来源地址https://www.toymoban.com/news/detail-576326.html

到了这里,关于Spark学习(6)-Spark SQL的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据笔记--Spark机器学习(第一篇)

    目录 一、数据挖掘与机器学习 1、概念 2、人工智能 3、数据挖掘体系 二、机器学习 1、什么是机器学习 2、机器学习的应用 3、实现机器学习算法的工具与技术框架 三、Spark MLlib介绍 1、简介 2、MLlib基本数据类型 Ⅰ、概述 Ⅱ、本地向量 Ⅲ、向量标签的使用 Ⅳ、本地矩阵 Ⅴ、

    2024年02月07日
    浏览(86)
  • 学习Spark的数据生命周期管理技术

    数据生命周期管理是数据科学家和数据工程师在处理大规模数据时面临的重要挑战。Apache Spark是一个开源的大数据处理框架,它可以处理批量数据和流式数据,并提供了一个易用的API来进行数据处理和分析。在本文中,我们将探讨如何学习Spark的数据生命周期管理技术,以便

    2024年02月22日
    浏览(35)
  • 【大数据学习篇6】 Spark操作统计分析数据操作

    通过前面的文章安装好环境下面我们就可以开始来操作 使用MySQL的root用户对数据库进行修改以下设置

    2024年02月05日
    浏览(44)
  • 大数据学习06-Spark分布式集群部署

    配置好IP vim /etc/sysconfig/network-scripts/ifcfg-ens33 修改主机名 vi /etc/hostname 做好IP映射 vim /etc/hosts 关闭防火墙 systemctl status firewalld systemctl stop firewalld systemctl disable firewalld 配置SSH免密登录 ssh-keygen -t rsa 下载Scala安装包 配置环境变量 添加如下配置 使环境生效 验证 Spark官网 解压 上

    2024年02月10日
    浏览(66)
  • 重生之从零开始学习大数据之Spark篇(一)

    什么是spark? spark是一个用来实现快速,通用的集群计算平台,它基于Hadoop的MapReduce分布式框架优化并改进缺陷而形成的。 在速度方面,Spark扩展了广泛使用的MapReduce计算模型,而且高效地支持更多计算模式,包括交互式查询和流处理。在处理大规模数据集事,速度是非常重

    2024年03月08日
    浏览(39)
  • Azure - 机器学习:使用 Apache Spark 进行交互式数据整理

    关注TechLead,分享AI全维度知识。作者拥有10+年互联网服务架构、AI产品研发经验、团队管理经验,同济本复旦硕,复旦机器人智能实验室成员,阿里云认证的资深架构师,项目管理专业人士,上亿营收AI产品研发负责人。 数据整理已经成为机器学习项目中最重要的步骤之一。

    2024年02月08日
    浏览(45)
  • Spark大数据处理学习笔记(3.1)掌握RDD的创建

    文章目录 一、准备工作 1.1 准备文件 1.1.1 准备本地系统文件 在/home目录里创建test.txt 单词用空格分隔 1.1.2 启动HDFS服务 执行命令:start-dfs.sh 1.1.3 上传文件到HDFS 将test.txt上传到HDFS的/park目录里 查看文件内容 1.2 启动Spark Shell 1.2.1 启动Spark服务 执行命令:start-all.sh 1.2.2 启动Sp

    2024年02月09日
    浏览(42)
  • Spark大数据处理学习笔记(3.2.2)掌握RDD算子

    衔接上文:http://t.csdn.cn/Z0Cfj 功能: reduce()算子按照传入的函数进行归约计算 案例: 计算1 + 2 + 3 + …+100的值 计算1 × 2 × 3 × 4 × 5 × 6 的值(阶乘 - 累乘) 计算1 2 + 2 2 + 3 2 + 4 2 + 5**2的值(先映射,后归约) 功能: collect()算子向Driver以数组形式返回数据集的所有元素。通常对

    2024年02月08日
    浏览(46)
  • Spark大数据处理学习笔记(2.4)IDEA开发词频统计项目

    该文章主要为完成实训任务,详细实现过程及结果见【http://t.csdn.cn/0qE1L】 从Scala官网下载Scala2.12.15 - https://www.scala-lang.org/download/2.12.15.html 安装在默认位置 安装完毕 在命令行窗口查看Scala版本(必须要配置环境变量) 启动HDFS服务 启动Spark集群 在master虚拟机上创建单词文件

    2024年02月08日
    浏览(56)
  • 数据采集 通过Apache Spark和Amazon SageMaker构建机器学习管道;

    作者:禅与计算机程序设计艺术 随着人们生活水平的提高,收集、整理、分析和处理海量数据已成为当今社会所需的工具。而在云计算时代,数据的价值及其价值的获取越来越重要。近年来,Apache Spark和Amazon SageMaker的结合让数据收集变得更加简单、高效、可靠,基于这些框

    2024年02月04日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包