DataSet
DataFrame 的出现,让 Spark 可以更好地处理结构化数据的计算,但存在一个问题:编译时的类型安全问题,为了解决它,Spark 引入了 DataSet API(DataFrame API 的扩展)。DataSet 是分布式的数据集合,它提供了强类型支持,也就是给 RDD 的每行数据都添加了类型约束。
在 Spark 2.0 中,DataFrame 和 DataSet 被合并为 DataSet 。DataSet包含 DataFrame 的功能,DataFrame 表示为 DataSet[Row] ,即DataSet 的子集。
三种 API 的选择
RDD 是DataFrame 和 DataSet 的底层,如果需要更多的控制功能(比如精确控制Spark 怎么执行一条查询),尽量使用 RDD。
如果希望在编译时获得更高的类型安全,建议使用 DataSet。
如果想统一简化 Spark 的API ,则使用 DataFrame 和 DataSet。
基于 DataFrame API 和 DataSet API 开发的程序会被自动优化,开发人员不需要操作底层的RDD API 来手动优化,大大提高了开发效率。但是RDD API 对于非结构化数据的处理有独特的优势(比如文本数据流),而且更方便我们做底层的操作。
DataSet 的创建
1、使用createDataset()方法创建
def main(args: Array[String]): Unit = {
//local代表本地单线程模式 local[*]代表本地多线程模式
val spark = SparkSession.builder()
.appName("create dataset")
.master("local[*]")
.getOrCreate()
//一定要导入它 不然无法创建DataSet对象
import spark.implicits._
val ds1 = spark.createDataset(1 to 5)
ds1.show()
val ds2 = spark.createDataset(spark.sparkContext.textFile("data/sql/people.txt"))
ds2.show()
spark.stop()
}
运行结果:
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
+-----+
+--------+
| value|
+--------+
| Tom, 21|
|Mike, 25|
|Andy, 18|
+--------+
2、通过 toDS 方法生成
import org.apache.spark.sql.{Dataset, SparkSession}
object DataSetCreate {
//case类一定要写到main方法之外
case class Person(name:String,age:Int)
def main(args: Array[String]): Unit = {
//local代表本地单线程模式 local[*]代表本地多线程模式
val spark = SparkSession.builder()
.appName("create dataset")
.master("local[*]")
.getOrCreate()
//一定要导入 SparkSession对象下的implicits
import spark.implicits._
val data = List(Person("Tom",21),Person("Andy",22))
val ds: Dataset[Person] = data.toDS()
ds.show()
spark.stop()
}
}
运行结果:
+----+---+
|name|age|
+----+---+
| Tom| 21|
|Andy| 22|
+----+---+
3、通过DataFrame 转换生成
object DataSetCreate{
case class Person(name:String,age:Long,sex:String)
def main(args: Array[String]): Unit = {
//local代表本地单线程模式 local[*]代表本地多线程模式
val spark = SparkSession.builder()
.appName("create dataset")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val df = spark.read.json("data/sql/people.json")
val ds = df.as[Person]
ds.show()
spark.stop()
}
}
RDD、DataFrame 和 DataSet 之间的相互转换
RDD <=> DataFrame
- RDD 转 DataFrame ,也就是上一篇博客中介绍的两种方法:
- 能创建case类,就直接映射出一个RDD[Person],然后调用toDF方法利用反射机制推断RDD模式。
- 无法创建case类,就使用编程方式定义RDD模式,使用 createDataFrame(rowRDD,schema) 指定rowRDD:RDD[Row]和schema:StructType。
- 如果RDD是像:RDD[(Long, String)] 这样保存的是一个元组类型的RDD,那么也可以直接使用 toDF 方法转为 DataFrame 对象,因为元组的 k,v 数据类型是已知的,就相当于有了创建 DataFrame 的模式信息(schema)。
- DataFrame 转 RDD,直接使用 rdd() 方法。
package com.study.spark.core.sql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
object TransForm {
case class Person(name:String,age:Int) //txt文件age字段可以用Int,但json文件尽量用Long
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("transform")
.master("local[*]")
.getOrCreate()
import spark.implicits._
//1.RDD和DataFrame之间互相转换
//1.1 创建RDD对象
val rdd: RDD[Person] = spark.sparkContext.textFile("data/sql/people.txt")
.map(_.split(","))
.map(attr => Person(attr(0), attr(1).trim.toInt))
rdd.foreach(println)
/*
Person(Andy,18)
Person(Tom,21)
Person(Mike,25)
*/
//1.2 RDD转DataFrame
val df = rdd.toDF()
df.show()
/*
+----+---+
|name|age|
+----+---+
| Tom| 21|
|Mike| 25|
|Andy| 18|
+----+---+
*/
//1.3 DataFrame转RDD
val res: RDD[Row] = df.rdd
/*
[Andy,18]
[Tom,21]
[Mike,25]
*/
res.foreach(println)
spark.stop()
}
}
可以看到,RDD[Person]转为DataFrame后,再从DataFrame转回RDD就变成了RDD[Row] 类型了。
RDD <=> DataSet
RDD 和 DataSet 之间的转换比较简单:
- RDD 转 DataSet 直接使用case 类(比如Person),然后映射出 RDD[Person] ,直接调用 toDS方法。
- DataSet 转 RDD 直接调用 rdd方法即可。
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
object TransForm {
case class Person(name:String,age:Int) //txt文件age字段可以用Int,但json文件尽量用Long
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("transform")
.master("local[*]")
.getOrCreate()
import spark.implicits._
//1.RDD和DataSet之间互相转换
//1.1 创建RDD对象
val rdd: RDD[Person] = spark.sparkContext.textFile("data/sql/people.txt")
.map(_.split(","))
.map(attr => Person(attr(0), attr(1).trim.toInt))
rdd.foreach(println)
/*
Person(Andy,18)
Person(Tom,21)
Person(Mike,25)
*/
//1.2 RDD转DataSet
val ds = rdd.toDS()
ds.show()
/*
+----+---+
|name|age|
+----+---+
| Tom| 21|
|Mike| 25|
|Andy| 18|
+----+---+
*/
//1.3 DataFrame转RDD
val res: RDD[Person] = ds.rdd
res.foreach(println)
/*
Person(Andy,18)
Person(Tom,21)
Person(Mike,25)
*/
spark.stop()
}
}
可以看到,相比RDD和DataFrame互相转换,RDD和DataSet转换的过程中,不会有数据类型的变化,而DataFrame转RDD的过程就会把我们定义的case类转为Row对象。
DataFrame <=> DataSet
- DataFrame 转 DataSet 先使用case类,然后直接使用 as[case 类] 方法。
- DataSet 转 DataFrame 直接使用 toDF 方法。
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object TransForm {
case class Person(name:String,age:Long,sex:String) //txt文件age字段可以用Int,但json文件尽量用Long
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("transform")
.master("local[*]")
.getOrCreate()
import spark.implicits._
//1.DataFrame和DataSet之间互相转换
//1.1 创建DataFrame对象
val df = spark.read.json("data/sql/people.json")
df.show()
/*
+---+----------+---+
|age| name|sex|
+---+----------+---+
| 30| Michael| 男|
| 19| Andy| 女|
| 19| Justin| 男|
| 20|Bernadette| 女|
| 23| Gretchen| 女|
| 27| David| 男|
| 33| Joseph| 女|
| 27| Trish| 女|
| 33| Alex| 女|
| 25| Ben| 男|
+---+----------+---+
*/
//1.2 DataFrame转DataSet
val ds = df.as[Person]
ds.show()
/*
+---+----------+---+
|age| name|sex|
+---+----------+---+
| 30| Michael| 男|
| 19| Andy| 女|
| 19| Justin| 男|
| 20|Bernadette| 女|
| 23| Gretchen| 女|
| 27| David| 男|
| 33| Joseph| 女|
| 27| Trish| 女|
| 33| Alex| 女|
| 25| Ben| 男|
+---+----------+---+
*/
//1.3 DataSet转DataFrame
val res: DataFrame = ds.toDF()
res.show()
/*
+---+----------+---+
|age| name|sex|
+---+----------+---+
| 30| Michael| 男|
| 19| Andy| 女|
| 19| Justin| 男|
| 20|Bernadette| 女|
| 23| Gretchen| 女|
| 27| David| 男|
| 33| Joseph| 女|
| 27| Trish| 女|
| 33| Alex| 女|
| 25| Ben| 男|
+---+----------+---+
*/
spark.stop()
}
}
DataSet 实现 WordCount
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("create dataset")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val res: Dataset[(String, Long)] = spark.read.text("data/word.txt").as[String]
.flatMap(_.split(" "))
.groupByKey(_.toLowerCase)
.count()
res.show()
spark.stop()
}
运行结果:文章来源:https://www.toymoban.com/news/detail-705629.html
| key|count(1)|
+------+--------+
| fast| 1|
| is| 3|
| spark| 2|
|better| 1|
| good| 1|
|hadoop| 1|
+------+--------+
总结
剩下来就是不断练习各种DataFrame和DataSet的操作、熟悉各种转换和行动操作。文章来源地址https://www.toymoban.com/news/detail-705629.html
到了这里,关于Spark【Spark SQL(三)DataSet】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!