大数据编程实验三:SparkSQL编程

这篇具有很好参考价值的文章主要介绍了大数据编程实验三:SparkSQL编程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

大数据编程实验三:SparkSQL编程

一、前言

二、实验目的与要求

  1. 通过实验掌握Spark SQL的基本编程方法
  2. 熟悉RDD到DataFrame的转化方法
  3. 熟悉利用Spark SQL管理来自不同数据源的数据

三、实验内容

  1. Spark SQL基本操作

    将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

    { "id":1 , "name":" Ella" , "age":36 }
    { "id":2, "name":"Bob","age":29 }
    { "id":3 , "name":"Jack","age":29 }
    { "id":4 , "name":"Jim","age":28 }
    { "id":4 , "name":"Jim","age":28 }
    { "id":5 , "name":"Damon" }
    { "id":5 , "name":"Damon" }
    

    为employee.json创建DataFrame,并写出Python语句完成下列操作:

    (1) 查询所有数据;

    (2) 查询所有数据,并去除重复的数据;

    (3) 查询所有数据,打印时去除id字段;

    (4) 筛选出age>30的记录;

    (5) 将数据按age分组;

    (6) 将数据按name升序排列;

    (7) 取出前3行数据;

    (8) 查询所有记录的name列,并为其取别名为username;

    (9) 查询年龄age的平均值;

    (10) 查询年龄age的最小值。

  2. 编程实现将RDD转换为DataFrame

    源文件内容如下(包含id,name,age):

    1,Ella,36
    2,Bob,29
    3,Jack,29
    

    请将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

  3. 编程实现利用DataFrame读写MySQL的数据

    (1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如下表所示的两行数据。

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表5-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

四、实验步骤

1、Spark SQL基本操作

我们在之前创建的sparkdata目录下创建该json文件并将上面信息复制进去并保存命名为employee.json:

cd /usr/local/spark/sparkdata
vim employee.json

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

然后我们进入到pyspark中,开始做题。

首先我们创建一个DataFrame:

>>> sp=SparkSession.builder.getOrCreate()
>>> df=sp.read.json("file:///usr/local/spark/sparkdata/employee.json")

(1)查询DataFrame的所有数据

>>> df.show()

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

(2)查询所有数据,并去除重复的数据

>>> df.distinct().show()

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

(3)查询所有数据,打印时去除id字段

>>> df.drop("id").show()

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

(4)筛选age>30的记录

 df.filter(df.age>30).show()

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

(5) 将数据按age分组

>>> df.groupBy("age").count().show()

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

(6) 将数据按name升序排列

>>> df.sort(df.name.asc()).show()

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

(7) 取出前3行数据

>>> df.take(3)

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

(8) 查询所有记录的name列,并为其取别名为username

>>> df.select(df.name.alias("username")).show()

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

(9) 查询年龄age的平均值

>>> df.agg({"age":"mean"}).show()

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

(10) 查询年龄age的最小值

>>> df.agg({"age":"min"}).show()

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

2、编程实现将RDD转换为DataFrame

首先我们仍然在sparkdata目录下创建我们需要的文件并命令为employee.txt,然后写入信息:

vim employee.txt

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

然后我们还是在该目录下新建一个py文件命名为rddTodf.py,然后写入如下py程序:

from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
if __name__ == "__main__":
        sc = SparkContext("local","Simple App")
        spark=SparkSession(sc)
        peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/sparkdata/employee.txt")
        rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()
        rowRDD.createOrReplaceTempView("employee")
        personsDF = spark.sql("select * from employee")
        personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

然后我们运行该程序:

python3 rddTodf.py

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

出现这个结果证明成功。

3、编程实现利用DataFrame读写MySQL的数据

我们首先启动mysql服务并进入到mysql数据库中:

systemctl start mysqld.service
mysql -u root -p

然后开始接下来的操作。

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,并写入题目中的原始数据

mysql> create database sparktest;
mysql> use sparktest;
mysql> create table employee (id int(4),name char(20),gender char(4),age int(4));
mysql> insert into employee values(1,'Alice','F',22);
mysql> insert into employee values(2,'John','M',25);

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入下列数据到MySQL,最后打印出age的最大值和age的总和

我们仍然在sparkdata目录下面新建一个py程序并命名为mysqlTest.py

cd /usr/local/spark/sparkdata
vim mysqlTest.py

然后写入如下py程序:

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
if __name__ == "__main__":

    sc = SparkContext( 'local', 'test')
    spark=SQLContext(sc)
    jdbcDF=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user", "root").option("password", "MYsql123!").load()
    jdbcDF.filter(jdbcDF.age>20).collect()      # 检测是否连接成功
    studentRDD = sc.parallelize(["3 Mary F 26","4 Tom M 23"]).map(lambda line : line.split(" "))
    schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
    rowRDD = studentRDD.map(lambda p : Row(int(p[0]),p[1].strip(), p[2].strip(),int(p[3])))
    employeeDF = spark.createDataFrame(rowRDD, schema)
    prop = {}
    prop['user'] = 'root'
    prop['password'] = 'MYsql123!'
    prop['driver'] = "com.mysql.jdbc.Driver"
    employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest",'employee','append', prop)
    jdbcDF.collect()
    jdbcDF.agg({"age": "max"}).show()
    jdbcDF.agg({"age": "sum"}).show()

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

然后直接运行该py程序即可得到结果:

python3 mysqlTest.py

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

五、最后我想说

本次实验的话,难度主要在后面两个题目中,在第二题中我遇见了两个错误:

  1. PipelinedRDD’ object has no attribute ‘toDF’
  2. ‘SparkSession’ object has no attribute ‘textFile’

第一个错误我是通过如下解决的:

spark = SparkSession(sc)

解决第一个错误之后,我再次运行的时候就开始报第二个错误了,第二个错误我是这样解决的:

from pyspark.sql import SQLContext
spark.sparkContext.textFile('filepath')

具体可以看我们上面对于的代码就可以明白了。

另外,很明显可以看见第三题第二问后面抛出了异常:

** BEGIN NESTED EXCEPTION ** 

javax.net.ssl.SSLException
MESSAGE: closing inbound before receiving peer's close_notify

STACKTRACE:

javax.net.ssl.SSLException: closing inbound before receiving peer's close_notify
	at sun.security.ssl.Alert.createSSLException(Alert.java:133)
	at sun.security.ssl.Alert.createSSLException(Alert.java:117)
	at sun.security.ssl.TransportContext.fatal(TransportContext.java:340)
	at sun.security.ssl.TransportContext.fatal(TransportContext.java:296)
	at sun.security.ssl.TransportContext.fatal(TransportContext.java:287)
	at sun.security.ssl.SSLSocketImpl.shutdownInput(SSLSocketImpl.java:737)
	at sun.security.ssl.SSLSocketImpl.shutdownInput(SSLSocketImpl.java:716)
	at com.mysql.jdbc.MysqlIO.quit(MysqlIO.java:2239)
	at com.mysql.jdbc.ConnectionImpl.realClose(ConnectionImpl.java:4267)
	at com.mysql.jdbc.ConnectionImpl.close(ConnectionImpl.java:1531)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.org$apache$spark$sql$execution$datasources$jdbc$JDBCRDD$$close$1(JDBCRDD.scala:259)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$compute$1.apply$mcV$sp(JDBCRDD.scala:308)
	at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

这是因为与MySQL数据库的SSL连接失败了,我们只需要将数据源的URL后面添加**?useSSL=false**就可以解决,也就是禁用SSL:

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

编程实现利用dataframe读写mysql的数据。 (1)在mysql数据库中新建数据库sparktest,大数据学习,大数据,spark,分布式,mysql

但是它还是抛出了异常,只是异常没有之前那么多了,我上网查阅了一下相关错误,好像这样添加不能完全禁用SSL,具体原因我也不知道,可能跟底层C语言有关,这个我不了解,所以就先这样了。

本次实验到这里就结束了,谢谢你们的阅读!文章来源地址https://www.toymoban.com/news/detail-786849.html

到了这里,关于大数据编程实验三:SparkSQL编程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Python编程实验五:文件的读写操作

    目录 一、实验目的与要求 二、实验内容 三、主要程序清单和程序运行结果 第1题 第2题 四、实验结果分析与体会 (1)通过本次实验,学生应掌握与文件打开、关闭相关的函数,以及与读写操作相关的常用方法的使用; (2)理解基于文件的词频统计以及数据分析的基本思路

    2024年03月14日
    浏览(37)
  • 《大数据系统与编程》MapReduce程序实现词频统计实验报告

    MapReduce程序实现词频统计            实验目的 1) 理解Hadoop中MapReduce模块的处理逻辑; 2)熟悉MapReduce编程; 实验平台 操作系统:Linux 工具:Eclipse或者Intellij Idea等Java IDE 实验内容 1) 在电脑上新建文件夹input,并input文件夹中创建三个文本文件:file1.txt,file2.txt,file3.tx

    2024年02月09日
    浏览(28)
  • 利用python读写mysql数据库数据

    方法一: 1. python连接mysql数据库:需要用到 pymysql 库和 sqlalchemy库: 创建连接 create_engine: 2. 提供 sql 语句 3. 执行查询,使用 read_sql_query 得到pandas的dataframe数据 4. 写数据到 mysql 数据库 方法二:使用 read_sql 方法三:直接使用pymysql进行查询,插入数据、更新数据、删除数据

    2024年02月09日
    浏览(47)
  • 大数据技术基础实验四:HDFS实验——读写HDFS文件

    在前面的实验中我们进行了HDFS的部署,并设置了一键启动HDFS操作,本期实验我们将使用HDFS开发环境进行HDFS写、读的编写,以及程序的运行,了解HDFS读写文件的调用流程,理解HDFS读写文件的原理。 好啦废话不多说,我们开始今天的实验操作。 会在Linux环境下编写读写HDFS文

    2023年04月16日
    浏览(29)
  • SparkSQL编程实践

    将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。 { \\\"id\\\":1 , \\\"name\\\":\\\" Ella\\\" , \\\"age\\\":36 } { \\\"id\\\":2, \\\"name\\\":\\\"Bob\\\",\\\"age\\\":29 } { \\\"id\\\":3 , \\\"name\\\":\\\"Jack\\\",\\\"age\\\":29 } { \\\"id\\\":4 , \\\"name\\\":\\\"Jim\\\",\\\"age\\\":28 } { \\\"id\\\":4 , \\\"name\\\":\\\"Jim\\\",\\\"age\\\":28 } { \\\"id\\\":5 , \\\"name\\\":\\\"Damon\\\" } { \\\"id\\\":5 , \\\"name\\\":\\\"Damon\\\" } 为employee.json创建DataFrame,并写出

    2024年02月09日
    浏览(21)
  • 2023_Spark_实验十四:SparkSQL入门操作

    1、将emp.csv、dept.csv文件上传到分布式环境,再用  hdfs  dfs -put dept.csv /input/ hdfs  dfs -put emp.csv /input/ 将本地文件put到hdfs文件系统的input目录下 2、或者调用本地文件也可以。区别:sc.textFile(\\\"file:///D:\\\\temp\\\\emp.csv\\\") StructType 是个case class,一般用于构建schema. 因为是case class,所以使

    2024年02月08日
    浏览(31)
  • (二)PySpark3:SparkSQL编程

    目录 一、SparkSQL介绍 二、创建DataFrame 1、通过ToDF方法 2、通过createDataFrame方法 3、通过读取文件或数据库 三、保存DataFrame 四、DataFrame API 1、显示数据 2、统计信息 3、类RDD操作 4、类Excel操作 5、类SQL表操作 五、DataFrame+SQL 1、注册视图 2、操作Hive表 六、总结  PySpark系列文章:

    2024年03月17日
    浏览(20)
  • Spark RDD编程 文件数据读写

    从本地文件系统读取数据,可以采用textFile()方法,可以为textFile()方法提供一个本地文件或目录地址,如果是一个文件地址,它会加载该文件,如果是一个目录地址,它会加载该目录下的所有文件的数据。 示例:读取一个本地文件word.txt val textFile中的textFile是变量名称,sc.t

    2024年02月05日
    浏览(31)
  • 【Java 编程】文件操作,文件内容的读写—数据流

    平时说的文件一般都是指存储在 硬盘 上的普通文件 形如 txt, jpg, mp4, rar 等这些文件都可以认为是普通文件,它们都是在硬盘上存储的 在计算机中,文件可能是一个 广义的概念 ,就不只是包含普通文件,还可以包含 目录 (把目录称为目录文件) 操作系统中,还会使用文件来描

    2023年04月08日
    浏览(37)
  • 【FPGA】Verilog编程实现SDRAM读写(一) ----- 初识SDRAM

    SDRAM(Synchronous Dynamic Random Access Memory),同步动态随机存储器。同步、动态、随机是其性能特点的外在说明: 同步(Synchronous )是指内存工作需要同步时钟,内部的命令的发送与数据的传输都以它为基准 动态(Dynamic )是指存储阵列 需要不断的刷新来保证数据不丢失 随机(

    2023年04月08日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包