Spark【Spark SQL(三)DataSet】

这篇具有很好参考价值的文章主要介绍了Spark【Spark SQL(三)DataSet】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

DataSet

         DataFrame 的出现,让 Spark 可以更好地处理结构化数据的计算,但存在一个问题:编译时的类型安全问题,为了解决它,Spark 引入了 DataSet API(DataFrame API 的扩展)。DataSet 是分布式的数据集合,它提供了强类型支持,也就是给 RDD 的每行数据都添加了类型约束。

        在 Spark 2.0 中,DataFrame 和 DataSet 被合并为 DataSet 。DataSet包含 DataFrame 的功能,DataFrame 表示为 DataSet[Row] ,即DataSet 的子集。

三种 API 的选择

        RDD 是DataFrame 和 DataSet 的底层,如果需要更多的控制功能(比如精确控制Spark 怎么执行一条查询),尽量使用 RDD。

        如果希望在编译时获得更高的类型安全,建议使用 DataSet。

        如果想统一简化 Spark 的API ,则使用 DataFrame 和 DataSet。

        基于 DataFrame API 和 DataSet API 开发的程序会被自动优化,开发人员不需要操作底层的RDD API 来手动优化,大大提高了开发效率。但是RDD API 对于非结构化数据的处理有独特的优势(比如文本数据流),而且更方便我们做底层的操作。

DataSet 的创建

1、使用createDataset()方法创建

def main(args: Array[String]): Unit = {
    //local代表本地单线程模式 local[*]代表本地多线程模式
    val spark = SparkSession.builder()
      .appName("create dataset")
      .master("local[*]")
      .getOrCreate()

    //一定要导入它 不然无法创建DataSet对象
    import spark.implicits._

    val ds1 = spark.createDataset(1 to 5)
    ds1.show()
    
    val ds2 = spark.createDataset(spark.sparkContext.textFile("data/sql/people.txt"))
    ds2.show()
    spark.stop()
  }

运行结果:

+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
+-----+

+--------+
|   value|
+--------+
| Tom, 21|
|Mike, 25|
|Andy, 18|
+--------+

2、通过 toDS 方法生成

import org.apache.spark.sql.{Dataset, SparkSession}

object DataSetCreate {

  //case类一定要写到main方法之外
  case class Person(name:String,age:Int)

  def main(args: Array[String]): Unit = {
    //local代表本地单线程模式 local[*]代表本地多线程模式
    val spark = SparkSession.builder()
      .appName("create dataset")
      .master("local[*]")
      .getOrCreate()

    //一定要导入 SparkSession对象下的implicits
    import spark.implicits._


    val data = List(Person("Tom",21),Person("Andy",22))
    val ds: Dataset[Person] = data.toDS()
    ds.show()

    spark.stop()
  }
}

运行结果:

+----+---+
|name|age|
+----+---+
| Tom| 21|
|Andy| 22|
+----+---+

3、通过DataFrame 转换生成

需要注意:json中的数
object DataSetCreate{
 case class Person(name:String,age:Long,sex:String)
def main(args: Array[String]): Unit = {
    
    //local代表本地单线程模式 local[*]代表本地多线程模式
    val spark = SparkSession.builder()
      .appName("create dataset")
      .master("local[*]")
      .getOrCreate()
    
    import spark.implicits._
    
    val df = spark.read.json("data/sql/people.json")
    val ds = df.as[Person]

    ds.show()
    spark.stop()
    }
}

RDD、DataFrame 和 DataSet 之间的相互转换

RDD <=> DataFrame

  1. RDD 转 DataFrame ,也就是上一篇博客中介绍的两种方法:
    1. 能创建case类,就直接映射出一个RDD[Person],然后调用toDF方法利用反射机制推断RDD模式。
    2. 无法创建case类,就使用编程方式定义RDD模式,使用 createDataFrame(rowRDD,schema) 指定rowRDD:RDD[Row]和schema:StructType。
    3. 如果RDD是像:RDD[(Long, String)] 这样保存的是一个元组类型的RDD,那么也可以直接使用 toDF 方法转为 DataFrame 对象,因为元组的 k,v 数据类型是已知的,就相当于有了创建 DataFrame 的模式信息(schema)。
  2. DataFrame 转 RDD,直接使用 rdd() 方法。
package com.study.spark.core.sql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}

object TransForm {

  case class Person(name:String,age:Int)  //txt文件age字段可以用Int,但json文件尽量用Long

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("transform")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    //1.RDD和DataFrame之间互相转换
    //1.1 创建RDD对象
    val rdd: RDD[Person] = spark.sparkContext.textFile("data/sql/people.txt")
      .map(_.split(","))
      .map(attr => Person(attr(0), attr(1).trim.toInt))
    rdd.foreach(println)
    /*
      Person(Andy,18)
      Person(Tom,21)
      Person(Mike,25)
     */
    //1.2 RDD转DataFrame
    val df = rdd.toDF()
    df.show()
    /*
      +----+---+
      |name|age|
      +----+---+
      | Tom| 21|
      |Mike| 25|
      |Andy| 18|
      +----+---+
  */
    //1.3 DataFrame转RDD
    val res: RDD[Row] = df.rdd
    /*
      [Andy,18]
      [Tom,21]
      [Mike,25]
     */
    
    res.foreach(println)
    spark.stop()
  }
}

可以看到,RDD[Person]转为DataFrame后,再从DataFrame转回RDD就变成了RDD[Row] 类型了。


RDD <=> DataSet

RDD 和 DataSet 之间的转换比较简单:

  1. RDD 转 DataSet 直接使用case 类(比如Person),然后映射出 RDD[Person] ,直接调用 toDS方法。
  2. DataSet 转 RDD 直接调用 rdd方法即可。

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}

object TransForm {

  case class Person(name:String,age:Int)  //txt文件age字段可以用Int,但json文件尽量用Long

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("transform")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    //1.RDD和DataSet之间互相转换
    //1.1 创建RDD对象
    val rdd: RDD[Person] = spark.sparkContext.textFile("data/sql/people.txt")
      .map(_.split(","))
      .map(attr => Person(attr(0), attr(1).trim.toInt))
    rdd.foreach(println)
    /*
      Person(Andy,18)
      Person(Tom,21)
      Person(Mike,25)
     */
    //1.2 RDD转DataSet
    val ds = rdd.toDS()
    ds.show()
    /*
      +----+---+
      |name|age|
      +----+---+
      | Tom| 21|
      |Mike| 25|
      |Andy| 18|
      +----+---+
  */
    //1.3 DataFrame转RDD
    val res: RDD[Person] = ds.rdd
    res.foreach(println)
    /*
      Person(Andy,18)
      Person(Tom,21)
      Person(Mike,25)
     */
    spark.stop()
  }
}

可以看到,相比RDD和DataFrame互相转换,RDD和DataSet转换的过程中,不会有数据类型的变化,而DataFrame转RDD的过程就会把我们定义的case类转为Row对象。

DataFrame <=> DataSet

  1. DataFrame 转 DataSet 先使用case类,然后直接使用 as[case 类] 方法。
  2. DataSet 转 DataFrame 直接使用 toDF 方法。
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object TransForm {

  case class Person(name:String,age:Long,sex:String)  //txt文件age字段可以用Int,但json文件尽量用Long

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("transform")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    //1.DataFrame和DataSet之间互相转换
    //1.1 创建DataFrame对象
    val df = spark.read.json("data/sql/people.json")
    df.show()
    /*
      +---+----------+---+
      |age|      name|sex|
      +---+----------+---+
      | 30|   Michael| 男|
      | 19|      Andy| 女|
      | 19|    Justin| 男|
      | 20|Bernadette| 女|
      | 23|  Gretchen| 女|
      | 27|     David| 男|
      | 33|    Joseph| 女|
      | 27|     Trish| 女|
      | 33|      Alex| 女|
      | 25|       Ben| 男|
      +---+----------+---+
    */
    //1.2 DataFrame转DataSet
    val ds = df.as[Person]
    ds.show()
    /*
      +---+----------+---+
      |age|      name|sex|
      +---+----------+---+
      | 30|   Michael| 男|
      | 19|      Andy| 女|
      | 19|    Justin| 男|
      | 20|Bernadette| 女|
      | 23|  Gretchen| 女|
      | 27|     David| 男|
      | 33|    Joseph| 女|
      | 27|     Trish| 女|
      | 33|      Alex| 女|
      | 25|       Ben| 男|
      +---+----------+---+
  */
    //1.3 DataSet转DataFrame
    val res: DataFrame = ds.toDF()
    res.show()
    /*
      +---+----------+---+
      |age|      name|sex|
      +---+----------+---+
      | 30|   Michael| 男|
      | 19|      Andy| 女|
      | 19|    Justin| 男|
      | 20|Bernadette| 女|
      | 23|  Gretchen| 女|
      | 27|     David| 男|
      | 33|    Joseph| 女|
      | 27|     Trish| 女|
      | 33|      Alex| 女|
      | 25|       Ben| 男|
      +---+----------+---+
    */
    spark.stop()
  }
}

DataSet 实现 WordCount

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("create dataset")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val res: Dataset[(String, Long)] = spark.read.text("data/word.txt").as[String]
      .flatMap(_.split(" "))
      .groupByKey(_.toLowerCase)
      .count()

    res.show()

    spark.stop()
  }

运行结果:

|   key|count(1)|
+------+--------+
|  fast|       1|
|    is|       3|
| spark|       2|
|better|       1|
|  good|       1|
|hadoop|       1|
+------+--------+

总结

        剩下来就是不断练习各种DataFrame和DataSet的操作、熟悉各种转换和行动操作。文章来源地址https://www.toymoban.com/news/detail-705629.html

到了这里,关于Spark【Spark SQL(三)DataSet】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark大数据分析与实战笔记(第三章 Spark RDD 弹性分布式数据集-02)

    人生很长,不必慌张。你未长大,我要担当。 传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。Spark中的RDD可以很好的解决这一缺点。 RDD是Spark提供的最重要的抽象概念

    2024年02月22日
    浏览(48)
  • 大数据课程K2——Spark的RDD弹性分布式数据集

    文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州 ⚪ 了解Spark的RDD结构; ⚪ 掌握Spark的RDD操作方法; ⚪ 掌握Spark的RDD常用变换方法、常用执行方法; 初学Spark时,把RDD看做是一个集合类型(类似于Array或List),用于存储数据和操作数据,但RDD和普通集合的区别

    2024年02月12日
    浏览(36)
  • 大数据开源框架环境搭建(七)——Spark完全分布式集群的安装部署

    前言:七八九用于Spark的编程实验 大数据开源框架之基于Spark的气象数据处理与分析_木子一个Lee的博客-CSDN博客_spark舆情分析 目录 实验环境: 实验步骤: 一、解压 二、配置环境变量:  三、修改配置文件  1.修改spark-env.sh配置文件: 2.修改配置文件slaves: 3.分发配置文件:

    2024年02月11日
    浏览(35)
  • 云计算与大数据第16章 分布式内存计算平台Spark习题

    1、Spark是Hadoop生态(  B  )组件的替代方案。 A. Hadoop     B. MapReduce        C. Yarn             D.HDFS 2、以下(  D  )不是Spark的主要组件。 A. Driver      B. SparkContext       C. ClusterManager D. ResourceManager 3、Spark中的Executor是(  A  )。 A.执行器      B.主节

    2024年02月14日
    浏览(48)
  • 分布式计算中的大数据处理:Hadoop与Spark的性能优化

    大数据处理是现代计算机科学的一个重要领域,它涉及到处理海量数据的技术和方法。随着互联网的发展,数据的规模不断增长,传统的计算方法已经无法满足需求。因此,分布式计算技术逐渐成为了主流。 Hadoop和Spark是目前最为流行的分布式计算框架之一,它们都提供了高

    2024年01月23日
    浏览(38)
  • 分布式计算框架:Spark、Dask、Ray 分布式计算哪家强:Spark、Dask、Ray

    目录 什么是分布式计算 分布式计算哪家强:Spark、Dask、Ray 2 选择正确的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式计算是一种计算方法,和集中式计算是相对的。 随着计算技术的发展, 有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成

    2024年02月11日
    浏览(44)
  • 数据存储和分布式计算的实际应用:如何使用Spark和Flink进行数据处理和分析

    作为一名人工智能专家,程序员和软件架构师,我经常涉及到数据处理和分析。在当前大数据和云计算的时代,分布式计算已经成为了一个重要的技术方向。Spark和Flink是当前比较流行的分布式计算框架,它们提供了强大的分布式计算和数据分析功能,为数据处理和分析提供了

    2024年02月16日
    浏览(44)
  • Spark单机伪分布式环境搭建、完全分布式环境搭建、Spark-on-yarn模式搭建

    搭建Spark需要先配置好scala环境。三种Spark环境搭建互不关联,都是从零开始搭建。 如果将文章中的配置文件修改内容复制粘贴的话,所有配置文件添加的内容后面的注释记得删除,可能会报错。保险一点删除最好。 上传安装包解压并重命名 rz上传 如果没有安装rz可以使用命

    2024年02月06日
    浏览(58)
  • 【Spark分布式内存计算框架——Spark 基础环境】1. Spark框架概述

    第一章 说明 整个Spark 框架分为如下7个部分,总的来说分为Spark 基础环境、Spark 离线分析和Spark实时分析三个大的方面,如下图所示: 第一方面、Spark 基础环境 主要讲述Spark框架安装部署及开发运行,如何在本地模式和集群模式运行,使用spark-shell及IDEA开发应用程序,测试及

    2024年02月11日
    浏览(43)
  • spark分布式解压工具

    ​ spark解压缩工具,目前支持tar、gz、zip、bz2、7z压缩格式,默认解压到当前路下,也支持自定义的解压输出路径。另外支持多种提交模式,进行解压任务,可通过自定义配置文件,作为spark任务的资源设定 2.1 使用hadoop的FileSystem类,对tos文件的进行读取、查找、写入等操作

    2024年02月02日
    浏览(32)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包