2023_Spark_实验十四:SparkSQL入门操作

这篇具有很好参考价值的文章主要介绍了2023_Spark_实验十四:SparkSQL入门操作。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、将emp.csv、dept.csv文件上传到分布式环境,再用 

hdfs  dfs -put dept.csv /input/

hdfs  dfs -put emp.csv /input/

将本地文件put到hdfs文件系统的input目录下文章来源地址https://www.toymoban.com/news/detail-719047.html

2、或者调用本地文件也可以。区别:sc.textFile("file:///D:\\temp\\emp.csv")


import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.types._

import spark.implicits._



case classEmp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)



val lines =sc.textFile("hdfs://Master:9000/input/emp.csv").map(_.split(","))



val allEmp = lines.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))





val allEmpDF = allEmp.toDF



allEmpDF.show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

  • StructType 是个case class,一般用于构建schema.

  • 因为是case class,所以使用的时候可以不用new关键字

构造函数

  • 可以传入Seq,List,Array,都是可以的~

  • 还可以用无参的构造器,因为它有一个无参的构造器.

例子

private val schema: StructType = StructType(List(

    StructField("name", DataTypes.StringType),

    StructField("age", DataTypes.IntegerType)

  ))

也可以是

private val schema: StructType = StructType(Array(

    StructField("name", DataTypes.StringType),

    StructField("age", DataTypes.IntegerType)

  ))
  • 还可以调用无参构造器,这么写

private val schema = (new StructType)

    .add(StructField("name", DataTypes.StringType))

    .add(StructField("age", DataTypes.IntegerType))
  • 这个无参的构造器,调用了一个有参构造器.this里面是个方法,这个方法的返回值是Array类型,实际上就是无参构造器调用了主构造器

def this() = this(Array.empty[StructField])

case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {}

import org.apache.spark.sql.types._

val myschema =StructType(List(

StructField("empno",DataTypes.IntegerType),

StructField("ename",DataTypes.StringType),

StructField("job",DataTypes.StringType),

StructField("mgr",DataTypes.StringType),

StructField("hiredate",DataTypes.StringType),

StructField("sal",DataTypes.IntegerType),

StructField("comm",DataTypes.StringType),

StructField("deptno",DataTypes.IntegerType)

))



val empcsvRDD = sc.textFile("hdfs://Master:9000/input/emp.csv").map(_.split(","))

import org.apache.spark.sql.Row

val rowRDD=empcsvRDD.map(line => Row (line(0).toInt,line(1),line(2),line(3),line(4),line(5).toInt,line(6),line(7).toInt))



val df = spark.createDataFrame(rowRDD,myschema)

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

将people.json文件上传到分布式环境

hdfs  dfs -put people.json /input

hdfs  dfs -put emp.json /input

//读json文件

val df = spark.read.json("hdfs://Master:9000/input/emp.json")

df.show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

df.select ("ename").show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

df.select($"ename").show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

df.select($"ename",$"sal",$"sal"+100).show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

df.filter($"sal">2000).show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

df.groupBy($"deptno").count.show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

df.createOrReplaceTempView("emp")

spark.sql("select * from emp").show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

spark.sql("select * from emp where deptno=10").show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

spark.sql("select deptno,sum(sal) from emp group by deptno").show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式


//1 创建一个普通的 view 和一个全局的 view

df.createOrReplaceTempView("emp1")

df.createGlobalTempView("emp2")



//2 在当前会话中执行查询,均可查询出结果

spark.sql("select * from emp1").show

spark.sql("select * from global_temp.emp2").show



//3 开启一个新的会话,执行同样的查询



spark.newSession.sql("select * from emp1").show //运行出错

spark.newSession.sql("select * from global_temp.emp2").show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

//7、创建 Datasets

//创建 DataSet,方式一:使用序列

//1、定义 case class

case class MyData(a:Int,b:String)



//2、生成序列,并创建 DataSet

val ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS



//3、查看结果



ds.show



ds.collect

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式


//创建 DataSet,方式二:使用 JSON 数据



//1、定义 case class

case class Person(name: String, gender: String)



//2、通过 JSON 数据生成 DataFrame



val df = spark.read.json(sc.parallelize("""{"gender": "Male", "name": "Tom"}""":: Nil))



//3、将 DataFrame 转成 DataSet



df.as[Person].show

df.as[Person].collect

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式


//创建 DataSet,方式三:使用 HDFS 数据



val linesDS = spark.read.text("hdfs://Master:9000/input/word.txt").as[String]

val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)

words.show

words.collect

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式


val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).count

result.show

result.orderBy($"value").show

2023_Spark_实验十四:SparkSQL入门操作,Spark实验,spark,大数据,分布式

1、将emp.json文件上传到分布式环境,再用 

hdfs  dfs -put emp.json /input/

将本地文件put到hdfs文件系统的input目录下


//8、Datasets 的操作案例



//1.使用 emp.json 生成 DataFrame

val empDF = spark.read.json("hdfs://Master:9000/input/emp.json")

//查询工资大于 3000 的员工

empDF.where($"sal" >= 3000).show



//创建 case class

case classEmp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)



//生成 DataSets,并查询数据



val empDS = empDF.as[Emp]

//查询工资大于 3000 的员工

empDS.filter(_.sal > 3000).show

//查看 10 号部门的员工

empDS.filter(_.deptno == 10).show





//多表查询



//1、创建部门表



val deptRDD=sc.textFile("hdfs://Master:9000/input/dept.csv").map(_.split(","))

case class Dept(deptno:Int,dname:String,loc:String)

val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS

//2、创建员工表



case classEmp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)

val empRDD = sc.textFile("hdfs://Master:9000/input/emp.csv").map(_.split(","))

val empDS = empRDD.map(x =>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS



//3、执行多表查询:等值链接



val result = deptDS.join(empDS,"deptno")

//另一种写法:注意有三个等号

val result = deptDS.joinWith(empDS,deptDS("deptno")===empDS("deptno"))



//查看执行计划:

result.explain



到了这里,关于2023_Spark_实验十四:SparkSQL入门操作的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据课程K22——Spark的SparkSQL的API调用

    文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州 ⚪ 掌握Spark的通过api使用SparkSQL; 1. 打开scala IDE开发环境,创建一个scala工程。 2. 导入spark相关依赖jar包。 3. 创建包路径以object类。 4. 写代码。 5)打jar包,并上传到linux虚拟机上 6)在spark的bin目录下 执行: s

    2024年02月10日
    浏览(41)
  • 大数据编程实验一:HDFS常用操作和Spark读取文件系统数据

    这是我们大数据专业开设的第二门课程——大数据编程,使用的参考书是《Spark编程基础》,这门课跟大数据技术基础是分开学习的,但这门课是用的我们自己在电脑上搭建的虚拟环境进行实验的,不是在那个平台上,而且搭建的还是伪分布式,这门课主要偏向于有关大数据

    2024年04月10日
    浏览(54)
  • 2023_Spark_实验十二:Spark高级算子使用

    掌握Spark高级算子在代码中的使用 相同点分析 三个函数的共同点,都是Transformation算子。惰性的算子。 不同点分析 map函数是一条数据一条数据的处理,也就是,map的输入参数中要包含一条数据以及其他你需要传的参数。 mapPartitions函数是一个partition数据一起处理,也即是说,

    2024年02月08日
    浏览(34)
  • 轻大21级软工大数据实验(手把手教你入门Hadoop、hbase、spark)

    写在最前面,如果你只是来找答案的,那么很遗憾,本文尽量避免给出最后结果,本文适合Linux0基础学生,给出详细的环境配置过程,实验本身其实很简单,供大家一起学习交流。 1 .编程实现以下指定功能,并利用 Hadoop 提供的 Shell 命令完成相同任务 : 向HDFS 中上传任意文

    2024年02月05日
    浏览(88)
  • 2023_Spark_实验八:Scala高级特性实验

    1、什么是泛型类 和Java或者C++一样,类和特质可以带类型参数。在Scala中,使用方括号来定义类型 参数,如下所示: 2、什么是泛型函数 函数和方法也可以带类型参数。和泛型类一样,我们需要把类型参数放在方法名之 后。 注意:这里的ClassTag是必须的,表示运行时的一些信

    2024年02月08日
    浏览(33)
  • 2023_Spark_实验四:SCALA基础

    或者用windows徽标+R  输入cmd 进入命令提示符 输入scala直接进入编写界面 1、Scala的常用数据类型 注意:在Scala中,任何数据都是对象。例如: 1. 数值类型:Byte,Short,Int,Long,Float,Double Byte: 8位有符号数字,从-128 到 127 Short: 16位有符号数据,从-32768 到 32767 Int: 32位有符号

    2024年02月10日
    浏览(46)
  • Spark---SparkSQL介绍

    Shark是基于Spark计算框架之上且兼容Hive语法的SQL执行引擎,由于底层的计算采用了Spark,性能比MapReduce的Hive普遍快2倍以上,当数据全部load在内存的话,将快10倍以上,因此Shark可以作为交互式查询应用服务来使用。除了基于Spark的特性外,Shark是完全兼容Hive的语法,表结构以及

    2024年01月21日
    浏览(45)
  • 【spark】SparkSQL

    什么是SparkSQL SparkSQL是Spark的一个模块,用于处理海量 结构化数据 为什么学习SparkSQL SparkSQL是非常成熟的海量结构化数据处理框架: 学习SparkSQL主要在2个点: SparkSQL本身十分优秀,支持SQL语言、性能强、可以自动优化、API简单、兼容HIVE等等 企业大面积在使用SparkSQL处理业务数

    2024年01月20日
    浏览(50)
  • Spark(15):SparkSQL之DataFrame

    目录 0. 相关文章链接 1. DataFrame的作用 2. 创建DataFrame 3. SQL 语法 4. DSL 语法 5. RDD 转换为 DataFrame 6. DataFrame 转换为 RDD  Spark文章汇总          Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有

    2024年02月13日
    浏览(42)
  • Spark(16):SparkSQL之DataSet

    目录 0. 相关文章链接 1. DataSet的定义 2. 创建DataSet 2.1. 使用样例类序列创建 DataSet 2.2. 使用基本类型的序列创建 DataSet 2.3. 注意 3. RDD 转换为 DataSet 4. DataSet 转换为 RDD  Spark文章汇总  DataSet 是具有强类型的数据集合,需要提供对应的类型信息。 在实际使用的时候,很少用到

    2024年02月13日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包