Spark SQL实战(07)-Data Sources

这篇具有很好参考价值的文章主要介绍了Spark SQL实战(07)-Data Sources。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1 概述

Spark SQL通过DataFrame接口支持对多种数据源进行操作。

DataFrame可使用关系型变换进行操作,也可用于创建临时视图。将DataFrame注册为临时视图可以让你对其数据运行SQL查询。

本节介绍使用Spark数据源加载和保存数据的一般方法,并进一步介绍可用于内置数据源的特定选项。

数据源关键操作:

  • load
  • save

2 大数据作业基本流程

input 业务逻辑 output
不管是使用MR/Hive/Spark/Flink/Storm。

Spark能处理多种数据源的数据,而且这些数据源可以是在不同地方:

  • file/HDFS/S3/OSS/COS/RDBMS
  • json/ORC/Parquet/JDBC
object DataSourceApp {

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .master("local").getOrCreate()
    
    text(spark)
    // json(spark)
    // common(spark)
    // parquet(spark)

    // convert(spark)

    // jdbc(spark)
    jdbc2(spark)
    spark.stop()
  }
}

3 text数据源读写

读取文本文件的 API,SparkSession.read.text()

参数:

  • path:读取文本文件的路径。可以是单个文件、文件夹或者包含通配符的文件路径。
  • wholetext:如果为 True,则将整个文件读取为一条记录;否则将每行读取为一条记录。
  • lineSep:如果指定,则使用指定的字符串作为行分隔符。
  • pathGlobFilter:用于筛选文件的通配符模式。
  • recursiveFileLookup:是否递归查找子目录中的文件。
  • allowNonExistingFiles:是否允许读取不存在的文件。
  • allowEmptyFiles:是否允许读取空文件。

返回一个 DataFrame 对象,其中每行是文本文件中的一条记录。

def text(spark: SparkSession): Unit = {
  import spark.implicits._

  val textDF: DataFrame = spark.read.text(
    "/Users/javaedge/Downloads/sparksql-train/data/people.txt")

  val result: Dataset[(String, String)] = textDF.map(x => {
    val splits: Array[String] = x.getString(0).split(",")
    (splits(0).trim, splits(1).trim)
  })

编译无问题,运行时报错:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 2 columns.;

思考下,如何使用text方式,输出多列的值?

修正后
val result: Dataset[String] = textDF.map(x => {
  val splits: Array[String] = x.getString(0).split(",")
  splits(0).trim
})

result.write.text("out")

继续报错:

Exception in thread "main" org.apache.spark.sql.AnalysisException: path file:/Users/javaedge/Downloads/sparksql-train/out already exists.;

回想Hadoop中MapReduce的输出:

  • 第一次0K
  • 第二次也会报错输出目录已存在

这关系到 Spark 中的 mode

SaveMode

Spark SQL中,使用DataFrame或Dataset的write方法将数据写入外部存储系统时,使用“SaveMode”参数指定如何处理已存在的数据。

SaveMode有四种取值:

  1. SaveMode.ErrorIfExists:如果目标路径已经存在,则会引发异常
  2. SaveMode.Append:将数据追加到现有数据
  3. SaveMode.Overwrite:覆盖现有数据
  4. SaveMode.Ignore:若目标路径已经存在,则不执行任何操作

所以,修正如下:

result.write.mode(SaveMode.overwrite).text("out")

4 JSON 数据源

// JSON
def json(spark: SparkSession): Unit = {
  import spark.implicits._

  val jsonDF: DataFrame = spark.read.json(
    "/Users/javaedge/Downloads/sparksql-train/data/people.json")

  jsonDF.show()

  // 只要age>20的数据
  jsonDF.filter("age > 20")
    .select("name")
    .write.mode(SaveMode.Overwrite).json("out")
  
output:
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

嵌套 JSON

// 嵌套 JSON
val jsonDF2: DataFrame = spark.read.json(
  "/Users/javaedge/Downloads/sparksql-train/data/people2.json")
jsonDF2.show()

jsonDF2.select($"name",
  $"age",
  $"info.work".as("work"),
  $"info.home".as("home"))
  .write.mode("overwrite")
  .json("out")

output:
+---+-------------------+----+
|age|               info|name|
+---+-------------------+----+
| 30|[shenzhen, beijing]|  PK|
+---+-------------------+----+

5 标准写法

// 标准API写法
private def common(spark: SparkSession): Unit = {
  import spark.implicits._

  val textDF: DataFrame = spark.read.format("text").load(
    "/Users/javaedge/Downloads/sparksql-train/data/people.txt")
  val jsonDF: DataFrame = spark.read.format("json").load(
    "/Users/javaedge/Downloads/sparksql-train/data/people.json")
  textDF.show()
  println("~~~~~~~~")
  jsonDF.show()

  jsonDF.write.format("json").mode("overwrite").save("out")

}

output:
+-----------+
|      value|
+-----------+
|Michael, 29|
|   Andy, 30|
| Justin, 19|
+-----------+

~~~~~~~~
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

6 Parquet数据源

6.1 简介

一种列式存储格式,在大数据环境中高效地存储和处理数据。由Hadoop生态系统中的Apache Parquet项目开发的。

6.2 设计目标

支持高效的列式存储和压缩,并提供高性能的读/写能力,以便处理大规模结构化数据。

Parquet可以与许多不同的计算框架一起使用,如Apache Hadoop、Apache Spark、Apache Hive等,因此广泛用于各种大数据应用程序中。

6.3 优点

高性能、节省存储空间、支持多种编程语言和数据类型、易于集成和扩展等。

private def parquet(spark: SparkSession): Unit = {
  import spark.implicits._

  val parquetDF: DataFrame = spark.read.parquet(
    "/Users/javaedge/Downloads/sparksql-train/data/users.parquet")
  parquetDF.printSchema()
  parquetDF.show()

  parquetDF.select("name", "favorite_numbers")
    .write.mode("overwrite")
    .option("compression", "none")
    .parquet("out")
  
output:
root
 |-- name: string (nullable = true)
 |-- favorite_color: string (nullable = true)
 |-- favorite_numbers: array (nullable = true)
 |    |-- element: integer (containsNull = true)

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

7convert

方便从一种数据源写到另一种数据源。

存储类型转换:JSON==>Parquet

def convert(spark: SparkSession): Unit = {
  import spark.implicits._

  val jsonDF: DataFrame = spark.read.format("json")
    .load("/Users/javaedge/Downloads/sparksql-train/data/people.json")
  jsonDF.show()

  jsonDF.filter("age>20")
    .write.format("parquet").mode(SaveMode.Overwrite).save("out")

Spark SQL实战(07)-Data Sources

8 JDBC

有些数据是在MySQL,使用Spark处理,肯定要通过Spark读出MySQL的数据。
数据源是text/json,通过Spark处理完后,要将统计结果写入MySQL。

查 DB

写法一
def jdbc(spark: SparkSession): Unit = {
  import spark.implicits._

  val jdbcDF = spark.read
    .format("jdbc")
    .option("url", "jdbc:mysql://localhost:3306")
    .option("dbtable", "smartrm_monolith.order")
    .option("user", "root")
    .option("password", "root")
    .load()

  jdbcDF.filter($"order_id" > 150).show(100)
}

Spark SQL实战(07)-Data Sources

写法二
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "root")

val jdbcDF2: DataFrame = spark.read
  .jdbc(url, srcTable, connectionProperties)

jdbcDF2.filter($"order_id" > 100)

写 DB

val connProps = new Properties()
connProps.put("user", "root")
connProps.put("password", "root")

val jdbcDF: DataFrame = spark.read.jdbc(url, srcTable, connProps)

jdbcDF.filter($"order_id" > 100)
  .write.jdbc(url, "smartrm_monolith.order_bak", connProps)

若 目标表不存在,会自动帮你创建:

Spark SQL实战(07)-Data Sources

统一配置管理

如何将那么多数据源配置参数统一管理呢?

先引入依赖:

<dependency>
    <groupId>com.typesafe</groupId>
    <artifactId>config</artifactId>
    <version>1.3.3</version>
</dependency>

配置文件:

Spark SQL实战(07)-Data Sources

读配置的程序:

package com.javaedge.bigdata.chapter05

import com.typesafe.config.{Config, ConfigFactory}

object ParamsApp {

  def main(args: Array[String]): Unit = {

    val config: Config = ConfigFactory.load()
    val url: String = config.getString("db.default.url")
    println(url)

  }

}
private def jdbcConfig(spark: SparkSession): Unit = {
  import spark.implicits._

  val config = ConfigFactory.load()
  val url = config.getString("db.default.url")
  val user = config.getString("db.default.user")
  val password = config.getString("db.default.password")
  val driver = config.getString("db.default.driver")
  val database = config.getString("db.default.database")
  val table = config.getString("db.default.table")
  val sinkTable = config.getString("db.default.sink.table")

  val connectionProperties = new Properties()
  connectionProperties.put("user", user)
  connectionProperties.put("password", password)

  val jdbcDF: DataFrame = spark.read.jdbc(url, s"$database.$table", connectionProperties)

  jdbcDF.filter($"order_id" > 100).show()

写到新表:

jdbcDF.filter($"order_id" > 158)
.write.jdbc(url, s"$database.$sinkTable", connectionProperties)

Spark SQL实战(07)-Data Sources文章来源地址https://www.toymoban.com/news/detail-403044.html

到了这里,关于Spark SQL实战(07)-Data Sources的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark SQL实战(08)-整合Hive

    Apache Spark 是一个快速、可扩展的分布式计算引擎,而 Hive 则是一个数据仓库工具,它提供了数据存储和查询功能。在 Spark 中使用 Hive 可以提高数据处理和查询的效率。 场景 历史原因积累下来的,很多数据原先是采用Hive来进行处理的,现想改用Spark操作数据,须要求Spark能够

    2023年04月15日
    浏览(92)
  • Spark SQL实战(04)-API编程之DataFrame

    Spark Core: SparkContext Spark SQL: 难道就没有SparkContext? 2.x之后统一的 1.x的Spark SQL编程入口点 SQLContext HiveContext Spark SQL中,SQLContext、HiveContext都是用来创建DataFrame和Dataset主要入口点,二者区别如下: 数据源支持:SQLContext支持的数据源包括JSON、Parquet、JDBC等等,而HiveContext除了支持

    2023年04月09日
    浏览(44)
  • 32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月05日
    浏览(46)
  • 【SQL开发实战技巧】系列(二十七):数仓报表场景☞通过对移动范围进行聚集来详解分析函数开窗原理以及如何一个SQL打印九九乘法表

    【SQL开发实战技巧】系列(一):关于SQL不得不说的那些事 【SQL开发实战技巧】系列(二):简单单表查询 【SQL开发实战技巧】系列(三):SQL排序的那些事 【SQL开发实战技巧】系列(四):从执行计划讨论UNION ALL与空字符串UNION与OR的使用注意事项 【SQL开发实战技巧】系列

    2023年04月09日
    浏览(53)
  • 大数据技术之Spark(一)——Spark概述

    大数据技术之Spark(一)——Spark概述 Apache Spark是一个开源的、强大的分布式 查询和处理引擎 ,它提供MapReduce的灵活性和可扩展性,但速度明显要快上很多;拿数据存储在内存中的时候来说,它比Apache Hadoop 快100倍,访问磁盘时也要快上10倍。 Spark 是一种由 Scala 语言开发的快

    2024年02月14日
    浏览(35)
  • 【Spark基础】Spark核心模块组成与功能概述

    Spark基于Spark Core开发了多种组件。开发人员可以基于这些组件,轻松完成多种不同场景的计算任务。   Spark Core是Spark的核心,各类核心组件都依赖于Spark Core。如下图所示,Spark Core核心组件包括基础设施、存储系统、调度系统、计算引擎四个部分。 Spark基础设施为其他组件提

    2024年02月08日
    浏览(48)
  • Learning Spark: LightningFast Big Data Analysis

    作者:禅与计算机程序设计艺术 Spark是一种开源快速通用大数据分析框架。它能够在超高速的数据处理能力下,轻松完成海量数据处理任务。相比于其他大数据处理系统(如Hadoop)来说,Spark具有如下优点: 更快的速度:Spark可以更快地处理超高速的数据,特别是在内存计算时,

    2024年02月08日
    浏览(57)
  • 【Spark分布式内存计算框架——Spark 基础环境】1. Spark框架概述

    第一章 说明 整个Spark 框架分为如下7个部分,总的来说分为Spark 基础环境、Spark 离线分析和Spark实时分析三个大的方面,如下图所示: 第一方面、Spark 基础环境 主要讲述Spark框架安装部署及开发运行,如何在本地模式和集群模式运行,使用spark-shell及IDEA开发应用程序,测试及

    2024年02月11日
    浏览(67)
  • spark概述与scala的安装

    1. Spark是什么 Spark  基于内存 式计算的 分布式 的 统一化 的数据分析引擎 2. Spark 模块 Spark 框架模块包含:Spark Core、Spark SQL、Spark Streaming、Spark GraphX、 Spark MLlib,而后四项的能力都是建立在核心引擎之上。 3.Spark 四大特点 Spark使用 Scala 语言进行实现,它是一种面向对象、函

    2024年03月10日
    浏览(49)
  • [机器学习、Spark]Spark机器学习库MLlib的概述与数据类型

    👨‍🎓👨‍🎓博主:发量不足 📑📑本期更新内容: Spark机器学习库MLlib的概述与数据类型 📑📑下篇文章预告:Spark MLlib基本统计 💨💨简介:分享的是一个当代疫情在校封校的大学生学习笔记 目录 Spark机器学习库MLlib的概述 一.MLib的简介 二.Spark机器学习工作流程 数

    2023年04月09日
    浏览(86)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包