Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

这篇具有很好参考价值的文章主要介绍了Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 

目录

零、本讲学习目标

一、基本操作

二、默认数据源

(一)默认数据源Parquet

(二)案例演示读取Parquet文件

1、在Spark Shell中演示

2、通过Scala程序演示

三、手动指定数据源

(一)format()与option()方法概述

(二)案例演示读取不同数据源

1、读取房源csv文件

2、读取json,保存为parquet

3、读取jdbc数据源,保存为json文件

四、数据写入模式

(一)mode()方法

(二)枚举类SaveMode

(三)案例演示不同写入模式

五、分区自动推断

(一)分区自动推断概述

(二)分区自动推断演示

1、建四个文件

2、读取表数据

3、输出Schema信息

4、显示数据帧内容

(三)分区自动推断注意事项


零、本讲学习目标

  1. 学会使用默认数据源
  2. 学会手动指定数据源
  3. 理解数据写入模式
  4. 掌握分区自动推断

Spark SQL支持通过DataFrame接口对各种数据源进行操作。DataFrame可以使用相关转换算子进行操作,也可以用于创建临时视图。将DataFrame注册为临时视图可以对其中的数据使用SQL查询。

一、基本操作

  • Spark SQL提供了两个常用的加载数据和写入数据的方法:load()方法和save()方法。load()方法可以加载外部数据源为一个DataFrame,save()方法可以将一个DataFrame写入指定的数据源。

二、默认数据源

(一)默认数据源Parquet

  • 默认情况下,load()方法和save()方法只支持Parquet格式的文件,Parquet文件是以二进制方式存储数据的,因此不可以直接读取,文件中包括该文件的实际数据和Schema信息,也可以在配置文件中通过参数spark.sql.sources.default对默认文件格式进行更改。Spark SQL可以很容易地读取Parquet文件并将其数据转为DataFrame数据集。

(二)案例演示读取Parquet文件

  • 将数据文件users.parquet上传到master虚拟机/homeSpark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 将数据文件users.parquet上传到HDFS的 /datasource/input目录Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

1、在Spark Shell中演示

  • 启动Spark Shell,执行命令:spark-shell --master spark://master:7077Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 加载parquet文件,返回数据帧
  • 执行命令:val userdf = spark.read.load("hdfs://master:9000/datasource/input/users.parquet")Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 执行命令:userdf.show(),查看数据帧内容Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 执行命令:userdf.select("name", "favorite_color").write.save("hdfs://master:9000/datasource/output"),对数据帧指定列进行查询,查询结果依然是数据帧,然后通过save()方法写入HDFS指定目录Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 查看HDFS上的输出结果Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 除了使用select()方法查询外,也可以使用SparkSession对象的sql()方法执行SQL语句进行查询,该方法的返回结果仍然是一个DataFrame。
  • 基于数据帧创建临时视图,执行命令:userdf.createTempView("t_user")Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 执行SQL查询,将结果写入HDFS,执行命令:spark.sql("select name, favorite_color from t_user").write.save("hdfs://master:9000/result2")Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 查看HDFS上的输出结果Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

课堂练习1、将4.1节的student.txt文件转换成student.parquet,保存到HDFS的/datasource/input目录

  • 解决思路:将student.txt转成studentdf,利用数据帧的save()方法保存到/datasource/output3目录,然后将文件更名复制到/datasource/input目录

  • 得到学生数据帧Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

  • 将学生数据帧保存为parquet文件Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

  • 查看生成的parquet文件Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

  • 复制parquet文件到/datasource/input目录Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

课堂练习2、读取student.parquet文件得到学生数据帧,并显示数据帧内容

  • 执行命令:val studentDF = spark.read.load("hdfs://master:9000/datasource/input/student.parquet")Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 执行命令:studentDF.show
  • Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

2、通过Scala程序演示

  • 创建Maven项目 - SparkSQLDemo
  • Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 在pom.xml文件里添加依赖与插件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>net.huawei.sql</groupId>
    <artifactId>SparkSQLDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.15</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.1.3</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
    </build>
</project>
  • resources目录里添加HFDS配置文件
  • Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property>
        <description>only config in clients</description>
        <name>dfs.client.use.datanode.hostname</name>
        <value>true</value>
    </property>
</configuration>
  • resources目录里添加日志属性文件
  • Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
log4j.rootLogger=ERROR, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spark.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  • 创建net.hw.sparksql包,在包里创建ReadParquet对象
    Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
package net.hw.sparksql

import org.apache.spark.sql.SparkSession

/**
 * 功能:Parquet数据源
 * 作者:华卫
 * 日期:2022年05月01日
 */
object ReadParquet {
  def main(args: Array[String]): Unit = {
    // 本地调试必须设置,否则会报Permission Denied错误
    System.setProperty("HADOOP_USER_NAME", "root")
    // 创建或得到SparkSession
    val spark = SparkSession.builder()
      .appName("ReadParquet")
      .master("local[*]")
      .getOrCreate()
    // 加载parquet文件,返回数据帧
    val usersdf = spark.read.load("hdfs://master:9000/input/users.parquet")
    // 显示数据帧内容
    usersdf.show()
    // 查询DataFrame中指定列,结果写入HDFS
    usersdf.select("name","favorite_color")
      .write.save("hdfs://master:9000/result3")
  }
}
  • 运行程序,查看控制台结果Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 在HDFS查看输出结果Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

三、手动指定数据源

(一)format()与option()方法概述

  • 使用format()方法可以手动指定数据源。数据源需要使用完全限定名(例如org.apache.spark.sql.parquet),但对于Spark SQL的内置数据源,也可以使用它们的缩写名(JSON、Parquet、JDBC、ORC、Libsvm、CSV、Text)。
  • 通过手动指定数据源,可以将DataFrame数据集保存为不同的文件格式或者在不同的文件格式之间转换。
  • 在指定数据源的同时,可以使用option()方法向指定的数据源传递所需参数。例如,向JDBC数据源传递账号、密码等参数。

(二)案例演示读取不同数据源

1、读取房源csv文件

  • 查看HDFS上/input目录里的house.csv文件Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 在spark shell里,执行命令:val house_csv_df = spark.read.format("csv").load("hdfs://master:9000/input/house.csv"),读取房源csv文件,得到房源数据帧Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 执行命令:house_csv_df.show(),查看房源数据帧内容Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 大家可以看到,house.csv文件第一行是字段名列表,但是转成数据帧之后,却成了第一条记录,这样显然是不合理的,怎么办呢?就需要用到option()方法来传递参数,告诉Spark第一行是表头header,而不是表记录。
  • 执行命令:val house_csv_df = spark.read.format("csv").option("header", "true").load("hdfs://master:9000/input/house.csv")Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 执行命令:house_csv_df.show(),查看房源数据帧内容Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

2、读取json,保存为parquet

  • people.json上传到HDFS的/input目录Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 执行命令:val peopledf = spark.read.format("json").load("hdfs://master:9000/input/people.json")Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 执行命令:peopledf.show()Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 执行命令:peopledf.select("name", "age").write.format("parquet").save("hdfs://master:9000/result4")
  • 查看生成的parquet文件Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

3、读取jdbc数据源,保存为json文件

  • 查看student数据库里的t_userSpark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 执行命令
val userdf = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://master:3306/student")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "t_user")  
  .option("user", "root")  
  .option("password", "903213")
  .load()
  • 报错,找不到数据库驱动程序com.mysql.jdbc.DriverSpark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 解决问题,将数据库驱动程序拷贝到$SPARK_HOME/jars目录Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 将数据驱动程序分发到slave1和slave2虚拟机Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 执行命令
val userdf = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://master:3306/student")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "t_user")  
  .option("user", "root")  
  .option("password", "903213")
  .load()
  • 加载jdbc数据源成功,但是有个警告,需要通过设置useSSL=false来消除Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 执行命令
val userdf = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://master:3306/student?useSSL=false")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("dbtable", "t_user")  
  .option("user", "root")  
  .option("password", "903213")
  .load()

Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作文章来源地址https://www.toymoban.com/news/detail-491654.html

  • 执行命令:userdf.show()Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 执行命令:userdf.write.format("json").save("hdfs://master:9000/result5")Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 在虚拟机slave1查看生成的json文件,执行命令:hdfs dfs -cat /result5/*Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

四、数据写入模式

(一)mode()方法

  • 在写入数据时,可以使用mode()方法指定如何处理已经存在的数据,该方法的参数是一个枚举类SaveMode
  • 使用SaveMode类,需要import org.apache.spark.sql.SaveMode;

(二)枚举类SaveMode

  • SaveMode.ErrorIfExists默认值。当向数据源写入一个DataFrame时,如果数据已经存在,就会抛出异常。
  • SaveMode.Append:当向数据源写入一个DataFrame时,如果数据或表已经存在,会在原有的基础上进行追加。
  • SaveMode.Overwrite:当向数据源写入一个DataFrame时,如果数据或表已经存在,就会将其覆盖(包括数据或表的Schema)。
  • SaveMode.Ignore:当向数据源写入一个DataFrame时,如果数据或表已经存在,就不会写入内容,类似SQL中的CREATE TABLE IF NOT EXISTS

(三)案例演示不同写入模式

  • 查看数据源:people.jsonSpark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 查询该文件name里,采用覆盖模式写入/result/result目录里本来有东西的Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 执行命令:val peopledf = spark.read.format("json").load("hdfs://master:9000/input/people.json")Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 导入SaveMode类,执行命令:peopledf.select("name").write.mode(SaveMode.Overwrite).format("json").save("hdfs://master:9000/result")Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 在slave1虚拟机上查看生成的json文件Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 查询age列,以追加模式写入HDFS的/result目录,执行命令:peopledf.select("age").write.mode(SaveMode.Append).format("json").save("hdfs://master:9000/result")Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 在slave1虚拟机上查看追加生成的json文件Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

五、分区自动推断

(一)分区自动推断概述

  • 表分区是Hive等系统中常用的优化查询效率的方法(Spark SQL的表分区与Hive的表分区类似)。在分区表中,数据通常存储在不同的分区目录中,分区目录通常以“分区列名=值”的格式进行命名。
  • 以people作为表名,gender和country作为分区列,给出存储数据的目录结构Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

(二)分区自动推断演示

1、建四个文件

  • 在master虚拟机上/home里创建如下目录及文件,其中目录people代表表名,gendercountry代表分区列,people.json存储实际人口数据Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

2、读取表数据

  • 执行命令:spark-shell,启动Spark ShellSpark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 执行命令:val peopledf = spark.read.format("json").load("file:///home/people")Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

3、输出Schema信息

  • 执行命令:peopledf.printSchema()Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作

4、显示数据帧内容

  • 执行命令:peopledf.show()Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作
  • 从输出的Schema信息和表数据可以看出,Spark SQL在读取数据时,自动推断出了两个分区列gendercountry,并将这两列的值添加到了数据帧peopledf中。

(三)分区自动推断注意事项

  • 分区列的数据类型是自动推断的,目前支持数字、日期、时间戳、字符串数据类型。若不希望自动推断分区列的数据类型,则可以在配置文件中将spark.sql.sources.partitionColumnTypeInference.enabled的值设置为false(默认为true,表示启用)。当禁用自动推断时,分区列将使用字符串数据类型

到了这里,关于Spark大数据处理讲课笔记4.2 Spark SQL数据源 - 基本操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark大数据处理讲课笔记--- RDD持久化机制

    理解RDD持久化的必要性 了解RDD的存储级别 学会如何查看RDD缓存 Spark中的RDD是懒加载的,只有当遇到行动算子时才会从头计算所有RDD,而且当同一个RDD被多次使用时,每次都需要重新计算一遍,这样会严重增加消耗。为了避免重复计算同一个RDD,可以将RDD进行持久化。 Spark中

    2024年02月06日
    浏览(35)
  • 4.2 Spark SQL数据源 - 基本操作

    案例演示读取Parquet文件 查看Spark的样例数据文件users.parquet 1、在Spark Shell中演示 启动Spark Shell 查看数据帧内容 查看数据帧模式 对数据帧指定列进行查询,查询结果依然是数据帧,然后通过write成员的save()方法写入HDFS指定目录 查看HDFS上的输出结果 执行SQL查询 查看HDFS上的输

    2024年02月08日
    浏览(31)
  • 4.5 Spark SQL 处理JSON数据

    4.1 Spark SQL概述 4.2 Spark SQL DataFrame 编程操作大全 (超详细)

    2024年02月02日
    浏览(38)
  • 结构化数据处理与分析:Spark SQL 教程

    作者:禅与计算机程序设计艺术 Apache Spark 是由 Apache 基金会开发的开源分布式计算框架,最初用于对大规模数据进行快速的处理,在大数据计算领域占据重要地位。其独特的高性能处理能力及丰富的数据处理功能使得 Spark 在各个行业应用广泛。Spark SQL 是 Spark 提供的用于结构

    2024年02月06日
    浏览(33)
  • Spark大数据处理学习笔记(2.2)搭建Spark Standalone集群

    一、在master虚拟机上安装配置Spark 1.1 将spark安装包上传到master虚拟机 下载Spark:pyw2 进入/opt目录,查看上传的spark安装包 1.2 将spark安装包解压到指定目录 执行命令: tar -zxvf spark-3.3.2-bin-hadoop3.tgz 修改文件名:mv spark-3.3.2-bin-hadoop3 spark-3.3.2 1.3 配置spark环境变量 执行命令:vim

    2024年02月09日
    浏览(37)
  • Spark大数据处理学习笔记(3.1)掌握RDD的创建

    文章目录 一、准备工作 1.1 准备文件 1.1.1 准备本地系统文件 在/home目录里创建test.txt 单词用空格分隔 1.1.2 启动HDFS服务 执行命令:start-dfs.sh 1.1.3 上传文件到HDFS 将test.txt上传到HDFS的/park目录里 查看文件内容 1.2 启动Spark Shell 1.2.1 启动Spark服务 执行命令:start-all.sh 1.2.2 启动Sp

    2024年02月09日
    浏览(33)
  • Spark大数据处理学习笔记(3.2.2)掌握RDD算子

    衔接上文:http://t.csdn.cn/Z0Cfj 功能: reduce()算子按照传入的函数进行归约计算 案例: 计算1 + 2 + 3 + …+100的值 计算1 × 2 × 3 × 4 × 5 × 6 的值(阶乘 - 累乘) 计算1 2 + 2 2 + 3 2 + 4 2 + 5**2的值(先映射,后归约) 功能: collect()算子向Driver以数组形式返回数据集的所有元素。通常对

    2024年02月08日
    浏览(38)
  • Spark大数据处理学习笔记(2.4)IDEA开发词频统计项目

    该文章主要为完成实训任务,详细实现过程及结果见【http://t.csdn.cn/0qE1L】 从Scala官网下载Scala2.12.15 - https://www.scala-lang.org/download/2.12.15.html 安装在默认位置 安装完毕 在命令行窗口查看Scala版本(必须要配置环境变量) 启动HDFS服务 启动Spark集群 在master虚拟机上创建单词文件

    2024年02月08日
    浏览(48)
  • 大数据流处理与实时分析:Spark Streaming和Flink Stream SQL的对比与选择

    作者:禅与计算机程序设计艺术

    2024年02月07日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包