spark第四章:SparkSQL基本操作

这篇具有很好参考价值的文章主要介绍了spark第四章:SparkSQL基本操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

系列文章目录

spark第一章:环境安装
spark第二章:sparkcore实例
spark第三章:工程化代码
spark第四章:SparkSQL基本操作


前言

接下来我们学习SparkSQL他和Hql有些相似。Hql是将操作装换成MR,SparkSQL也是,不过是使用Spark引擎来操作,效率更高一些


一、添加pom

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.2.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.2.3</version>

以上是这次博客需要的所有依赖,一次性全加上。

二、常用操作

spark第四章:SparkSQL基本操作
一共这么多,挨个讲解一下

1.类型转换

SparkSQL中有三种常用的类型,RDD之前说过就不说了。
DataFrame
Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有 action 操作。
DSL 语法
DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了

SparkSql_Basic.scala

package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object SparkSql_Basic {
  def main(args: Array[String]): Unit = {
    // 创建SparkSQL的运行环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._

//    val df: DataFrame = spark.read.json("datas/user.json")
//    df.show()

    //DataFrame => SQL
//    df.createOrReplaceTempView("user")
//    spark.sql("select age from user").show()


    //DtaFrame => DSL
    // 在使用DataFrame时,如何涉及到转换操作,需要引入转换规则

//    df.select("age","username").show()
//    df.select($"age"+1).show()
//    df.select('age+1).show()

    // DataSet
    // DataFrame 是特定泛型的DataSet
//    val seq: Seq[Int] = Seq(1, 2, 3, 4)
//    val ds: Dataset[Int] = seq.toDS()
//    ds.show()

    // RDD <=>DataFrame
    val rdd=spark.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",40)))
    val df: DataFrame = rdd.toDF("id", "name", "age")
    val rowRDD: RDD[Row] = df.rdd

    // DataFrame <=> DatsSet
    val ds: Dataset[User] = df.as[User]
    val df1: DataFrame = ds.toDF()

    // RDD <=> DataSet
    val ds1: Dataset[User] = rdd.map {
      case (id, name, age) => {
        User(id, name, age)
      }
    }.toDS()

    val userRDD: RDD[User] = ds1.rdd


    // 关闭环境
    spark.close()

  }

  case class User(id:Int,name:String,age:Int)

}

2.连接mysql

SparkSQL提供了多种数据接口,我们可以通过JDBC连接Mysql数据库,我们先随便在数据库里边写点东西。
spark第四章:SparkSQL基本操作
SparkSql_JDBC.scala

package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object SparkSql_JDBC {
  def main(args: Array[String]): Unit = {
    // 创建SparkSQL的运行环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._

    val df: DataFrame = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://hadoop102:3306/spark-sql")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "000000")
      .option("dbtable", "user")
      .option("useSSL","false")
      .load()

    df.show

    df.write
      .format("jdbc")
      .option("url", "jdbc:mysql://hadoop102:3306/spark-sql")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "000000")
      .option("dbtable", "user1")
      .option("useSSL","false")
      .mode(SaveMode.Append)
      .save()

    // 关闭环境
    spark.close()
  }
}

spark第四章:SparkSQL基本操作

spark第四章:SparkSQL基本操作

3.UDF函数

这个函数可以对简单的数据进行处理,但是比较局限.
这次我们从json文件读取数据
spark第四章:SparkSQL基本操作

{"username": "zhangsan", "age": 20}
{"username": "lisi", "age": 30}
{"username": "wangwu", "age": 40}

SparkSql_UDF.scala

package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object SparkSql_UDF {
  def main(args: Array[String]): Unit = {
    // 创建SparkSQL的运行环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._

    val df: DataFrame = spark.read.json("datas/user.json")
    df.createOrReplaceTempView("user")

    spark.udf.register("prefixName",(name:String)=>{
      "Name:" + name
    })

    spark.sql("select age ,prefixName(username) from user").show()

    // 关闭环境
    spark.close()
  }
}

spark第四章:SparkSQL基本操作

4.UDAF函数

UDAF函数的处理能力就比UDF强大多了,可以完成一些更复杂的操作.
SparkSql_UDAF1.scala

package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Encoder, Encoders, Row, SparkSession, functions}

object SparkSql_UDAF1 {
  def main(args: Array[String]): Unit = {
    // 创建SparkSQL的运行环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    val df: DataFrame = spark.read.json("datas/user.json")
    df.createOrReplaceTempView("user")

    //计算平均年龄
    spark.udf.register("ageAvg", functions.udaf(new MyAvgUDAF()))

    spark.sql("select ageAvg(age) from user").show()


    // 关闭环境
    spark.close()

  }

  case class Buff( var total:Long,var count:Long)

  class MyAvgUDAF extends Aggregator[Long,Buff,Long]{
    //初始值
    override def zero: Buff = {
      Buff(0L,0L)
    }
    //更新缓冲区
    override def reduce(buff: Buff, in: Long): Buff = {
      buff.total=buff.total+in
      buff.count=buff.count+1
      buff
    }

    //合并缓冲区
    override def merge(buff1: Buff, buff2: Buff): Buff = {
      buff1.total=buff1.total+buff2.total
      buff1.count=buff1.count+buff2.count
      buff1
    }

    //计算结果
    override def finish(buff: Buff): Long = {
      buff.total/buff.count
    }

    //缓冲区编码操作
    override def bufferEncoder: Encoder[Buff] = Encoders.product

    //输出的编码操作
    override def outputEncoder: Encoder[Long] = Encoders.scalaLong
  }

}

spark第四章:SparkSQL基本操作
还有一种方法,在Spark3已经不被官方推荐了,所以这里就不叙述了.

5.连接hive

首先我们在集群先,启动Hadoop和Hive
然后将jdbc的jar包放到hive的lib文件中
spark第四章:SparkSQL基本操作
这个jar包在安装Hive环境时,使用过.
将虚拟机中的hive配置文件,hive-site.xml导出
spark第四章:SparkSQL基本操作
放到idea的resource文件夹中,然后最好吧target文件夹删除,因为idea有可能从target中直接读取之前的数据,从而没有扫描hive-site.xml
spark第四章:SparkSQL基本操作
我们就做最简单的查询操作
SparkSql_Hive.scala

package com.atguigu.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object SparkSql_Hive {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
    val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

    spark.sql("show tables").show

    // 关闭环境
    spark.close()

  }

}

spark第四章:SparkSQL基本操作
如果能查询hive中的数据库,代表成功.

总结

SparkSQL的常用操作基本就这些,至于项目吗,下次专门在写一次吧文章来源地址https://www.toymoban.com/news/detail-465018.html

到了这里,关于spark第四章:SparkSQL基本操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 操作系统第四章练习题

    第三部分 填空题 将作业地址空间中的逻辑地址转换为主存中的物理地址的过程称为(      )。地址变换   分区分配中的存储保护通常采用(      )方法。界限寄存器和存储保护键 把(      )地址转换为(      )地址的工作称为地址映射。逻辑、物理 重定位的

    2024年02月11日
    浏览(23)
  • 【王道·操作系统】第四章 文件管理(下)

    用户需要通过操作系统提供的接口发出上述请求——用户接口 由于用户提供的是文件的存放路径,因此需要操作系统一层一层地查找目录,找到对应的目录项——文件目录系统 不同的用户对文件有不同的操作权限,因此为了保证安全,需要检查用户是否有访问权限——存取

    2024年02月11日
    浏览(31)
  • Linux操作系统——第四章 进程间通信

        目录 进程间通信介绍 进程间通信目的 进程间通信发展 进程间通信分类 管道 System V IPC POSIX IPC 管道 什么是管道 匿名管道 管道读写规则 管道特点 命名管道 创建一个命名管道 匿名管道与命名管道的区别 命名管道的打开规则 system V共享内存 共享内存示意图  共享内存数

    2024年02月09日
    浏览(31)
  • 【考研数学】线性代数第四章 —— 线性方程组(1,基本概念 | 基本定理 | 解的结构)

    继向量的学习后,一鼓作气,把线性方程组也解决了去。O.O 方程组 称为 n n n 元齐次线性方程组。 方程组 称为 n n n 元非齐次线性方程组。 方程组(I)又称为方程组(II)对应的齐次线性方程组或导出方程组。 方程组(I)和方程组(II)分别称为齐次线性方程组和非齐次线

    2024年02月11日
    浏览(28)
  • 【软考数据库】第四章 操作系统知识

    目录 4.1 进程管理 4.1.1 操作系统概述 4.1.2 进程组成和状态 4.1.3 前趋图 4.1.4 进程同步与互斥 4.1.5 进程调度 4.1.6 死锁 4.1.7 线程 4.2 存储管理 4.2.1 分区存储管理 4.2.3 分页存储管理 4.2.4 分段存储管理 4.2.5 段页式存储管理 4.3 设备管理 4.3.1 设备管理概述 4.3.2 I/0软件 4.3.3 设

    2024年02月06日
    浏览(40)
  • 第四章:数据操作Ⅰ 第二节:读写CSV文件

    使用read.csv函数,可以将CSV文件读入数据框,而使用write.csv()函数,则可以将数据框保存到CSV中 我们使用read.csv()的时候,其会返回一个数据框 例如: 例如:读取学分文件(以下为待读取文件)  使用head格式到R语言中取前面数据  使用tail格式到R语言中取后面数据  例如:

    2024年03月08日
    浏览(32)
  • Qt5开发及实例V2.0-第四章Qt基本对话框

    首先介绍标准文件对话框(QFileDialog)、标准颜色对话框(QColorDialog)、标准字体对话框(QFontDialog)、标准输入对话框(QInputDialog)及标准消息对话框(QMessageBox),运行效果如图4.1所示。 按如图4.1所示依次执行如下操作。 (1)单击“文件标准对话框实例”按钮,弹出“文

    2024年02月07日
    浏览(40)
  • 【操作系统复习之路】存储器管理(第四章 &超详细讲解)

    目录 一、存储器的层次结构 二、程序的装入和链接 2.1 逻辑地址和物理地址 2.2 绝对装入方式 2.3 可重定位装入方式 2.4 动态运行时装入方式 2.5 静态链接  2.6 装入时动态链接 2.7 运行时动态链接 三、连续分配存储器管理方式 3.1 单一连续分配 3.2 固定分区分配 3.3 动态分区

    2024年04月27日
    浏览(27)
  • 操作系统考试复习——第四章 存储器管理 4.1 4.2

    存储器的层次结构: 存储器的多层结构: 存储器至少分为三级:CPU寄存器,主存和辅存。 但是 一般分为6层 为寄存器,高速缓存,主存储器,磁盘缓存,固定磁盘,可移动存储介质。 这几个部分是 速度依次减小 但是 存储容量是依次增大 的。  只有固定磁盘和可移动存储

    2024年02月03日
    浏览(30)
  • 第四章 应用SysML基本特性集的汽车示例 P1|系统建模语言SysML实用指南学习

    仅供个人学习记录 主要就是应用练习建模了 Automobile Domain包 将模型组织入包的包图 汽车规范中包含系统需求的需求图 汽车域块定义图 描述车辆主要功能的用例图

    2024年02月06日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包