2023_Spark_实验十四:SparkSQL入门操作

这篇具有很好参考价值的文章主要介绍了2023_Spark_实验十四:SparkSQL入门操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、将emp.csv、dept.csv文件上传到分布式环境,再用 

hdfs  dfs -put dept.csv /input/

hdfs  dfs -put emp.csv /input/

将本地文件put到hdfs文件系统的input目录下文章来源地址https://www.toymoban.com/news/detail-719047.html

2、或者调用本地文件也可以。区别:sc.textFile("file:///D:\\temp\\emp.csv")


import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.types._

import spark.implicits._



case classEmp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)



val lines =sc.textFile("hdfs://Master:9000/input/emp.csv").map(_.split(","))



val allEmp = lines.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))





val allEmpDF = allEmp.toDF



allEmpDF.show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

  • StructType 是个case class,一般用于构建schema.

  • 因为是case class,所以使用的时候可以不用new关键字

构造函数

  • 可以传入Seq,List,Array,都是可以的~

  • 还可以用无参的构造器,因为它有一个无参的构造器.

例子

private val schema: StructType = StructType(List(

    StructField("name", DataTypes.StringType),

    StructField("age", DataTypes.IntegerType)

  ))

也可以是

private val schema: StructType = StructType(Array(

    StructField("name", DataTypes.StringType),

    StructField("age", DataTypes.IntegerType)

  ))
  • 还可以调用无参构造器,这么写

private val schema = (new StructType)

    .add(StructField("name", DataTypes.StringType))

    .add(StructField("age", DataTypes.IntegerType))
  • 这个无参的构造器,调用了一个有参构造器.this里面是个方法,这个方法的返回值是Array类型,实际上就是无参构造器调用了主构造器

def this() = this(Array.empty[StructField])

case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {}

import org.apache.spark.sql.types._

val myschema =StructType(List(

StructField("empno",DataTypes.IntegerType),

StructField("ename",DataTypes.StringType),

StructField("job",DataTypes.StringType),

StructField("mgr",DataTypes.StringType),

StructField("hiredate",DataTypes.StringType),

StructField("sal",DataTypes.IntegerType),

StructField("comm",DataTypes.StringType),

StructField("deptno",DataTypes.IntegerType)

))



val empcsvRDD = sc.textFile("hdfs://Master:9000/input/emp.csv").map(_.split(","))

import org.apache.spark.sql.Row

val rowRDD=empcsvRDD.map(line => Row (line(0).toInt,line(1),line(2),line(3),line(4),line(5).toInt,line(6),line(7).toInt))



val df = spark.createDataFrame(rowRDD,myschema)

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

将people.json文件上传到分布式环境

hdfs  dfs -put people.json /input

hdfs  dfs -put emp.json /input

//读json文件

val df = spark.read.json("hdfs://Master:9000/input/emp.json")

df.show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

df.select ("ename").show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

df.select($"ename").show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

df.select($"ename",$"sal",$"sal"+100).show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

df.filter($"sal">2000).show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

df.groupBy($"deptno").count.show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

df.createOrReplaceTempView("emp")

spark.sql("select * from emp").show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

spark.sql("select * from emp where deptno=10").show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

spark.sql("select deptno,sum(sal) from emp group by deptno").show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式


//1 创建一个普通的 view 和一个全局的 view

df.createOrReplaceTempView("emp1")

df.createGlobalTempView("emp2")



//2 在当前会话中执行查询,均可查询出结果

spark.sql("select * from emp1").show

spark.sql("select * from global_temp.emp2").show



//3 开启一个新的会话,执行同样的查询



spark.newSession.sql("select * from emp1").show //运行出错

spark.newSession.sql("select * from global_temp.emp2").show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

//7、创建 Datasets

//创建 DataSet,方式一:使用序列

//1、定义 case class

case class MyData(a:Int,b:String)



//2、生成序列,并创建 DataSet

val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS



//3、查看结果



ds.show



ds.collect

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式


//创建 DataSet,方式二:使用 JSON 数据



//1、定义 case class

case class Person(name: String, gender: String)



//2、通过 JSON 数据生成 DataFrame



val df = spark.read.json(sc.parallelize("""{"gender": "Male", "name": "Tom"}""":: Nil))



//3、将 DataFrame 转成 DataSet



df.as[Person].show

df.as[Person].collect

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式


//创建 DataSet,方式三:使用 HDFS 数据



val linesDS = spark.read.text("hdfs://Master:9000/input/word.txt").as[String]

val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)

words.show

words.collect

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式


val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count

result.show

result.orderBy($"value").show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

1、将emp.json文件上传到分布式环境,再用 

hdfs  dfs -put emp.json /input/

将本地文件put到hdfs文件系统的input目录下


//8、Datasets 的操作案例



//1.使用 emp.json 生成 DataFrame

val empDF = spark.read.json("hdfs://Master:9000/input/emp.json")

//查询工资大于 3000 的员工

empDF.where($"sal" >= 3000).show



//创建 case class

case classEmp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)



//生成 DataSets,并查询数据



val empDS = empDF.as[Emp]

//查询工资大于 3000 的员工

empDS.filter(_.sal > 3000).show

//查看 10 号部门的员工

empDS.filter(_.deptno == 10).show





//多表查询



//1、创建部门表



val deptRDD=sc.textFile("hdfs://Master:9000/input/dept.csv").map(_.split(","))

case class Dept(deptno:Int,dname:String,loc:String)

val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS

//2、创建员工表



case classEmp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)

val empRDD = sc.textFile("hdfs://Master:9000/input/emp.csv").map(_.split(","))

val empDS = empRDD.map(x =>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS



//3、执行多表查询:等值链接



val result = deptDS.join(empDS,"deptno")

//另一种写法:注意有三个等号

val result = deptDS.joinWith(empDS,deptDS("deptno")===empDS("deptno"))



//查看执行计划:

result.explain



到了这里,关于2023_Spark_实验十四:SparkSQL入门操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据课程K22——Spark的SparkSQL的API调用

    文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州 ⚪ 掌握Spark的通过api使用SparkSQL; 1. 打开scala IDE开发环境,创建一个scala工程。 2. 导入spark相关依赖jar包。 3. 创建包路径以object类。 4. 写代码。 5)打jar包,并上传到linux虚拟机上 6)在spark的bin目录下 执行: s

    2024年02月10日
    浏览(40)
  • 大数据编程实验一:HDFS常用操作和Spark读取文件系统数据

    这是我们大数据专业开设的第二门课程——大数据编程,使用的参考书是《Spark编程基础》,这门课跟大数据技术基础是分开学习的,但这门课是用的我们自己在电脑上搭建的虚拟环境进行实验的,不是在那个平台上,而且搭建的还是伪分布式,这门课主要偏向于有关大数据

    2024年04月10日
    浏览(52)
  • 2023_Spark_实验十二:Spark高级算子使用

    掌握Spark高级算子在代码中的使用 相同点分析 三个函数的共同点,都是Transformation算子。惰性的算子。 不同点分析 map函数是一条数据一条数据的处理,也就是,map的输入参数中要包含一条数据以及其他你需要传的参数。 mapPartitions函数是一个partition数据一起处理,也即是说,

    2024年02月08日
    浏览(33)
  • 轻大21级软工大数据实验(手把手教你入门Hadoop、hbase、spark)

    写在最前面,如果你只是来找答案的,那么很遗憾,本文尽量避免给出最后结果,本文适合Linux0基础学生,给出详细的环境配置过程,实验本身其实很简单,供大家一起学习交流。 1 .编程实现以下指定功能,并利用 Hadoop 提供的 Shell 命令完成相同任务 : 向HDFS 中上传任意文

    2024年02月05日
    浏览(82)
  • 2023_Spark_实验八:Scala高级特性实验

    1、什么是泛型类 和Java或者C++一样,类和特质可以带类型参数。在Scala中,使用方括号来定义类型 参数,如下所示: 2、什么是泛型函数 函数和方法也可以带类型参数。和泛型类一样,我们需要把类型参数放在方法名之 后。 注意:这里的ClassTag是必须的,表示运行时的一些信

    2024年02月08日
    浏览(31)
  • 2023_Spark_实验四:SCALA基础

    或者用windows徽标+R  输入cmd 进入命令提示符 输入scala直接进入编写界面 1、Scala的常用数据类型 注意:在Scala中,任何数据都是对象。例如: 1. 数值类型:Byte,Short,Int,Long,Float,Double Byte: 8位有符号数字,从-128 到 127 Short: 16位有符号数据,从-32768 到 32767 Int: 32位有符号

    2024年02月10日
    浏览(44)
  • Spark---SparkSQL介绍

    Shark是基于Spark计算框架之上且兼容Hive语法的SQL执行引擎,由于底层的计算采用了Spark,性能比MapReduce的Hive普遍快2倍以上,当数据全部load在内存的话,将快10倍以上,因此Shark可以作为交互式查询应用服务来使用。除了基于Spark的特性外,Shark是完全兼容Hive的语法,表结构以及

    2024年01月21日
    浏览(43)
  • 【spark】SparkSQL

    什么是SparkSQL SparkSQL是Spark的一个模块,用于处理海量 结构化数据 为什么学习SparkSQL SparkSQL是非常成熟的海量结构化数据处理框架: 学习SparkSQL主要在2个点: SparkSQL本身十分优秀,支持SQL语言、性能强、可以自动优化、API简单、兼容HIVE等等 企业大面积在使用SparkSQL处理业务数

    2024年01月20日
    浏览(47)
  • Spark(16):SparkSQL之DataSet

    目录 0. 相关文章链接 1. DataSet的定义 2. 创建DataSet 2.1. 使用样例类序列创建 DataSet 2.2. 使用基本类型的序列创建 DataSet 2.3. 注意 3. RDD 转换为 DataSet 4. DataSet 转换为 RDD  Spark文章汇总  DataSet 是具有强类型的数据集合,需要提供对应的类型信息。 在实际使用的时候,很少用到

    2024年02月13日
    浏览(40)
  • Spark(15):SparkSQL之DataFrame

    目录 0. 相关文章链接 1. DataFrame的作用 2. 创建DataFrame 3. SQL 语法 4. DSL 语法 5. RDD 转换为 DataFrame 6. DataFrame 转换为 RDD  Spark文章汇总          Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有

    2024年02月13日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包