大数据技术之Spark——Spark SQL

这篇具有很好参考价值的文章主要介绍了大数据技术之Spark——Spark SQL。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、SparkSQL 概述

1.1  SparkSQL是什么

        Spark SQL是Spark用于结构化数据处理的Spark模块。

1.2 Hive and SparkSQL

        我们之前学习过hive,hive是一个基于hadoop的SQL引擎工具,目的是为了简化mapreduce的开发。由于mapreduce开发效率不高,且学习较为困难,为了提高mapreduce的开发效率,出现了hive,用SQL的方式来简化mapreduce:hive提供了一个框架,将SQL转换成mapreduce来执行。执行的效率不会因此提升,但开发效率会大大提高。

        同样的,sparkCore的代码能不能转换成SQL语句呢?sparkSQL的前身是Shark,是一个将spark和hive结合的框架,利用hive SQL简化的思想,将RDD进行简化。Shark的出现,是SQL-on-Hadoop的性能比Hive有了10-100倍的提高。

        随着spark的发展,shark的发展受制于hive,在此基础上发展出sparkSQL和hive on spark,SparkSQL 作为 Spark 生态的一员继续发展,而不再受限于 Hive,只是兼容 Hive。

        sparkSQL可以用于简化RDD的开发,提高开发效率,且执行效率飞铲快。所以实际工作中,基本上采用的都是sparkSQL。sparkSQL为了简化RDD的开发,提高开发效率,提供了2个编程抽象,类似于spark Core的RDD。

DataSet

DataFrame

        

1.3 SparkSQL特点

1.3.1 易整合

无缝的整合了SQL查询和Spark编程

1.3.2 统一的数据访问

使用相同的方式连接不同的数据源

1.3.3 兼容Hive

在已有的仓库上直接运行SQL或者HiveQL

1.3.4 标准数据连接

通过JDBC或者ODBC来连接

1.4 DataFrame是什么

        在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,RDD只关心数据,如传入一数组(1,2,3,4),RDD不关心数组的意思。而DataFrame关心数据结构,如主键。

sparksql,# spark,大数据,spark,hive

 RDD

        虽然以Person为参数类型,但Spark框架本身不了解Person类的内部结构。

DataFrame

        提供了详细的结构信息,使得SparkSQL可以清楚的知道该数据集中包含哪些列,每列的名称和类型各是什么。

        DataFrame为数据提供了Schema视图,可以把它当作数据库中的一张表来对待。

1.5 DataSet是什么

        DataSet是分布式数据集合,是DataFrame的一个扩展。提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点

sparksql,# spark,大数据,spark,hive

        DataFrame是一个特定泛型的DataSet。

二、SparkSQL核心编程

2.1 新的起点

        SparkCore中,如果想要执行应用程序,首先需要构建上下文环境对象SparkContext。SparkSQL可以理解为是对SparkCore的封装。不仅是在模型上进行了封装,上下文环境对象也进行了封装。

        在老的版本中,SparkSQL提供了两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。

        SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。Spark Session 内部封装了 SparkContext,所以计算实际上是由 sparkContext 完成的。当我们使用 spark-shell 的时候, spark 框架会自动的创建一个名称叫做 spark 的SparkSession 对 象, 就像我们以前可以自动获取到一个 sc 来表示 SparkContext 对象一样。

2.2 DataFrame

        在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口,创建 DataFrame有三种方式:通过 Spark 的数据源进行创建;从一个存在的 RDD 进行转换;还可以从 Hive Table 进行查询返回。

2.2.1 创建DataFrame

Spark支持创建文件的数据源格式:spark.read.

csv        format        jdbc        json        load        option        options        orc        
parquet         schema         table        text        textFile
sparksql,# spark,大数据,spark,hive

sparksql,# spark,大数据,spark,hive

object DataFrameTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("detaSetDemo").getOrCreate()

    val dataFrame = spark.read.json("in/user.json")

    dataFrame.printSchema()
    dataFrame.show()

    spark.stop()
  }

}
注意:
        如果从内存中获取数据,spark 可以知道数据类型具体是什么。
        如果是数字,默认作 为 Int 处理;但是从文件中读取的数字,不能确定是什么类型,所以用 bigint 接收,可以和 Long 类型转换,但是和 Int 不能进行转换。

结果展示: 

sparksql,# spark,大数据,spark,hive

 2.2.2 SQL语法

1)读取JSON文件创建DataFrame

val dataFrame = spark.read.json("in/user.json")

2)对DataFrame创建一个临时表

dataFrame.createTempView("user")
// 或
dataFrame.createOrReplaceTempView("user")

3)通过SQL语句实现查询全表

val frame = spark.sql("select * from user")

4)结果展示

frame.show()

 sparksql,# spark,大数据,spark,hive

 注意:

普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people

2.2.3 DSL语法

        DataFrame提供一个特定领域语言(domain-specific kanguage, DSL)去管理结构化的数据。可以在Sacla, Java, Python和 R 中使用DSL,使用DSL语法风格不必去创建临时试图了。

1)创建一个DataFrame

val df = spark.read.json("in/user.json")

2)查看DataFrame的Schema信息

df.printSchema()

3)只查看列数据的6种方式

注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名
//    输出的6种方式
    import spark.implicits._
    userDF.select('name,'age).show()
    userDF.select("name","age").show()
    userDF.select($"name",$"age").show()

    userDF.select(userDF("name"),userDF("age")).show()

    userDF.select(col("name"),col("age")).show()
    userDF.select(column("name"),column("age")).show()

    val idColumn = df("id")

4)查看“age”大于“22”的数据

条件过滤可以使用filter,也可以使用where,where的底层调用的也是filter方法。

df.select(userDF("name"),userDF("age"),(userDF("age")+1).as("ageinc"))
      .where($"name"=!="zhangsan").show()   // where底层也是filter
//        .filter($"ageinc">22).show()

5)按照“age”分区,查看数据条数

val countDF = df.groupBy("age").count()
countDF.printSchema()

6)增加列withColumn

val frame = countDF.withColumn("number",$"count".cast(StringType))

7)修改列名withColumnRenamed

 val frame2 = countDF.withColumnRenamed("count","number")

2.2.4 RDD转换为DataFrame

        在 IDEA 中开发程序时,如果需要 RDD 与 DF 或者 DS 之间互相操作,那么需要引入import spark.implicits._

        这里的 spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称,所以必 须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用 var 声明,因为 Scala 只支持 val 修饰的对象的引入。

rdd =>DataFrame: rdd.toDF

DataFrame => rdd: df.rdd

sparksql,# spark,大数据,spark,hive

2.3 DataSet

        DataSet是具有强类型的数据集合,需要提供对于的类型信息。

2.3.1 创建DataSet

object DataSetDemo {

// 1)使用样例类来创建DataSet
  case class Point(label:String,x:Double,y:Double)
  case class Category(id:Long,name:String)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("dataSet").getOrCreate()
    val sc = spark.sparkContext

//    重点记忆
    import spark.implicits._


// 2)使用基本类型的序列创建DataSet
    val points: Seq[Point] = Seq(Point("nj", 23.43, 57.12), Point("bj", 18.21, 199.43), Point("sh", 16.11, 18.3))
    val pointDS = points.toDS()

    val categories: Seq[Category] = Seq(Category(1, "nj"), Category(2, "bj"))
    val categoryDS = categories.toDS()
    categoryDS.printSchema()
    categoryDS.show()


}
注意
        在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet。

2.3.2 RDD转换为DataSet

        SparkSQL能够自动将包含case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射编程了表的列名。case类可以包含如Seq或者Array等复杂的结构。

case class User(name: String, age: Int)

sc.makeRDD(Seq(("zhangsan",18), ("zhaosi",20))).toDS

2.3.3 DataSet转换为RDD

        DataSet也是对RDD的封装,所以可以直接获得内部的RDD。

case class User(name: String, age: Int)
val res1 = sc.makeRDD(Seq(("zhangsan",18), ("zhaosi",20))).toDS

val rdd = res1.rdd

2.3.4 DataFrame和DataSet转换

DataFrame => DataSet:as[样例类]

DataSet => DataFrame:toDF

case class User(name: String, age: Int)
val userDF = sc.makeRDD(Seq(("zhangsan",18), ("zhaosi",20))).toDF("name", "age")

val userDS = userDF.as[User]

2.4 RDD、DataFrame、DataSet 三者的关系

2.4.1 相互转化

sparksql,# spark,大数据,spark,hive

// RDD <=> DataFrame
val rdd = spark.sparkContext.makeRDD(List(1,"zhangsan",30),(2,"lisi",40))
val df: DataFrame = rdd.toDF("id","name","age")
val rowRDD:RDD[Row] = df.rdd

// DataFrame <=> DataFrame
val ds:Dataset[User] = df.as[User]
val df1:DataFrame = ds.toDF()

// RDD <=> DataSet
rdd.map {
    case (id, name, age) =>{
        User(id, name, age)
    }
}
val userRDD:RAA[User] = ds1.rdd

 

 2.4.2 三者的共性

1. 都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利

2. 都有惰性机制,在创建、转换时,不会立即执行。只有在遇到行动算子时,才会开始运行

3. 有很多共同的函数

4. DataFrame 和 DataSet 许多操作都需要导入包:import spark.implicits._

5. 都会根据Spark的内存情况自动缓存运算,即使数据量很大,也不用担心内存溢出

6. 都有partition的概念

7.  DataFrame 和 DataSet 都可以使用匹配模式获取各个字段的值和类型

2.4.3 三者的区别

 1)RDD

  • RDD一般和spark mllib同时使用
  • RDD不支持sparkSQL操作

 2)DataFrame

  • RDD和DataFrame不同,DataFrame每一行的类型固定为Row,每一列的值无法直接访问,只有通过解析才能获取各个字段的值
  • DataFrame 和 DataSet 一般捕鱼spark mllib 同时使用
  • DataFrame 和 DataSet 都支持SparkSQL操作,如select,groupby等。同事也能注册临时表/视窗,进行sql语句操作。
  • DataFrame 和 DataSet支持一些方便的保存方式,比如保存成csv,可以带上表头

 3)DataSet文章来源地址https://www.toymoban.com/news/detail-663735.html

  • DataFrame 和 DataSet拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame其实就是DataSet的一个特例
  • DataFrame 也可以叫 DataSet[Row],每一行的类型是Row

三、SparkSQL连接Hive

object SparkHive {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("sparkHive")
      .master("local[*]")
      .config("hive.metastore.uris", "thrift://192.168.153.139:9083")
      .enableHiveSupport()
      .getOrCreate()
    spark.sql("show databases").show()

    val ecd = spark.table("shopping.ext_customer_details")
    ecd.printSchema()
    ecd.show()

    println("============")

//    每个国家分别有多少员工
    import spark.implicits._
    import org.apache.spark.sql.functions._
    val usernumdf = ecd
//    这里输出之后会自带表头。这是由于我们在hive中设置了删除表头'skip.header.line.count'='1'
//    所以需要过滤表头文件。
      .where($"customer_id"=!="customer_id")
//      .filter($"customer_id"=!="customer_id")
      .groupBy("country")
      .agg(count("customer_id").as("usernum"))
    usernumdf.printSchema()
    usernumdf.show(3)

    usernumdf.write.mode("append").saveAsTable("shopping.usernum")


    spark.close()
  }
}

到了这里,关于大数据技术之Spark——Spark SQL的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Hive & Spark & Flink 数据倾斜

    绝大部分任务都很快完成,只有一个或者少数几个任务执行的很慢甚至最终执行失败, 这样的现象为数据倾斜现象。 任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 redu

    2024年02月07日
    浏览(30)
  • spark读取数据写入hive数据表

    目录 spark 读取数据 spark从某hive表选取数据写入另一个表的一个模板 概述: create_tabel建表函数,定义日期分区 删除原有分区drop_partition函数 generate_data 数据处理函数,将相关数据写入定义的表中  注: 关于 insert overwrite/into 中partition时容易出的分区报错问题:  添加分区函数

    2024年01月19日
    浏览(41)
  • hive/spark数据倾斜解决方案

    数据倾斜主要表现在,mapreduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍之多),这条Key所在的reduce节点所处理的数据量比其他节点就大很多,

    2024年02月11日
    浏览(33)
  • Spark SQL数据源:Hive表

    Spark SQL还支持读取和写入存储在Apache Hive中的数据。然而,由于Hive有大量依赖项,这些依赖项不包括在默认的Spark发行版中,如果在classpath上配置了这些Hive依赖项,Spark就会自动加载它们。需要注意的是,这些Hive依赖项必须出现在所有Worker节点上,因为它们需要访问Hive序列化

    2024年02月11日
    浏览(29)
  • 使用spark将MongoDB数据导入hive

    使用spark将MongoDB数据导入hive 一、pyspark 1.1 pymongo+spark 代码 spark-submit 1.2 mongo-spark-connector 生产环境不方便使用,亲测各种报错 二、Scala 2.1 pom.xml 2.2 代码

    2024年01月22日
    浏览(28)
  • 万字解决Flink|Spark|Hive 数据倾斜

    此篇主要总结到Hive,Flink,Spark出现数据倾斜的表现,原因和解决办法。首先会让大家认识到不同框架或者计算引擎处理倾斜的方案。最后你会发现计算框架只是“异曲”,文末总结才是“同工之妙”。点击收藏与分享,工作和涨薪用得到!!! 数据倾斜最笼统概念就是数据的

    2024年02月03日
    浏览(28)
  • hive on spark hql 插入数据报错 Failed to create Spark client for Spark session Error code 30041

    离线数仓 hive on spark 模式,hive 客户端 sql 插入数据报错 Failed to execute spark task, with exception \\\'org.apache.hadoop.hive.ql.metadata.HiveException(Failed to create Spark client for Spark session 50cec71c-2636-4d99-8de2-a580ae3f1c58)\\\' FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed t

    2024年02月14日
    浏览(41)
  • 大数据:Hadoop基础常识hive,hbase,MapReduce,Spark

    Hadoop是根据Google三大论文为基础研发的,Google 三大论文分别是: MapReduce、 GFS和BigTable。 Hadoop的核心是两个部分: 一、分布式存储(HDFS,Hadoop Distributed File System)。 二、分布式计算(MapReduce)。 MapReduce MapReduce是“ 任务的分解与结果的汇总”。 Map把数据切分——分布式存放

    2024年04月25日
    浏览(40)
  • 爱奇艺大数据加速:从Hive到Spark SQL

    01 爱奇艺自2012年开展大数据业务以来,基于大数据开源生态服务建设了一系列平台,涵盖了数据采集、数据处理、数据分析、数据应用等整个大数据流程,为公司的运营决策和各种数据智能业务提供了强有力的支持。随着数据规模的不断增长和计算复杂度的增加,如何快速挖

    2024年02月08日
    浏览(31)
  • 大数据技术之Spark(一)——Spark概述

    大数据技术之Spark(一)——Spark概述 Apache Spark是一个开源的、强大的分布式 查询和处理引擎 ,它提供MapReduce的灵活性和可扩展性,但速度明显要快上很多;拿数据存储在内存中的时候来说,它比Apache Hadoop 快100倍,访问磁盘时也要快上10倍。 Spark 是一种由 Scala 语言开发的快

    2024年02月14日
    浏览(24)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包