实验SparkSQL编程初级实践

这篇具有很好参考价值的文章主要介绍了实验SparkSQL编程初级实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

实验SparkSQL编程初级实践

实践环境:

  1. Oracle VM VirtualBox 6.1.12

  2. Ubuntu 16.04

  3. Hadoop3.1.3

  4. JDK1.8.0_162

  5. spark2.4.0

  6. python3.5

  7. Windows11系统下pycharm2019.1专业版

实验目的:

  1. 通过实验掌握Spark SQL的基本编程方法;

  2. 熟悉RDD到DataFrame的转化方法;

  3. 熟悉利用Spark SQL管理来自不同数据源的数据。

实验内容,步骤与实验结果:

  1. Spark SQL 基本操作

将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ “id”:1 , “name”:" Ella" , “age”:36 } { “id”:2, “name”:“Bob”,“age”:29 } { “id”:3 , “name”:“Jack”,“age”:29 } { “id”:4 , “name”:“Jim”,“age”:28 } { “id”:4 , “name”:“Jim”,“age”:28 } { “id”:5 , “name”:“Damon” } { “id”:5 , “name”:“Damon” }

为employee.json创建DataFrame,并写出Python语句完成下列操作:

import os
os.environ[“JAVA_HOME”]=“/usr/lib/jvm/jdk1.8.0_162”
os.environ[“PYSPARK_PYTHON”]=‘/usr/bin/python3.5’
#import SparkSession
from pyspark.sql import SparkSession
#create spar session object
spark=SparkSession.builder.appName(‘data_processing’).getOrCreate()
# Load csv Dataset
df=spark.read.json(“employee.json”)

1 .查询所有数据;
df.show()实验SparkSQL编程初级实践

2.查询所有数据,并去除重复的数据;
df.distinct().show()
实验SparkSQL编程初级实践
3.查询所有数据,打印时去除id字段;
df.drop(“id”).show()
实验SparkSQL编程初级实践
4.筛选出age>30的记录;
df.filter(“age”>30).show()
实验SparkSQL编程初级实践
5.将数据按age分组;
df.groupBy(“age”).count().show()
实验SparkSQL编程初级实践
6.将数据按name升序排列;
df.sort(df.name.asc()).show()
实验SparkSQL编程初级实践
7.取出前3行数据;
df.show(3)
实验SparkSQL编程初级实践
8. 查询所有记录的name列,并为其取别名为username;
df.distinct().show()
实验SparkSQL编程初级实践
9.查询年龄age的平均值;
df.agg({“age”:“mean”}).show()
实验SparkSQL编程初级实践
10. 查询年龄age的最小值。
df.agg({“age”:“min”}).show()
实验SparkSQL编程初级实践

2.编程实现将RDD转换为DataFrame

源文件内容如下(包含id,name,age):

1,Ella,36 2,Bob,29 3,Jack,29

请将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

#反射机制 – 针对数据项已知
import os
os.environ[“JAVA_HOME”]=“/usr/lib/jvm/jdk1.8.0_162”
os.environ[“PYSPARK_PYTHON”]=‘/usr/bin/python3.5’

# 导入Spark相关包
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession

# 构建 spark 单元
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
# 构建表头
schemaString = “id name age”
fields = [StructField(field_name ,StringType(),True) for field_name in schemaString.split(" ")]
schema = StructType(fields)

# 加载数据
filename = “employee.txt”
people= spark.sparkContext.textFile(filename)
# print(people.collect())
# 数据预处理
people_data = people.map(lambda x : x.split(“,”))
# print(people_data.collect())
# 处理为 ROW 对象模式
people_rows = people_data.map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2])))
# 构建 DataFrame
schemapeople = spark.createDataFrame(people_rows,schema)
# 构建临时表
schemapeople.createOrReplaceTempView(“employee”)
# SQL 查询
DF_people = spark.sql(“select * from employee”)
# DF – RDD
people_rdd = DF_people.rdd.map(lambda p : “id:” + p.id + “,” + “name:” + p.name + “,” + “Age:” + str(p.age))
# print(people_rdd.collect())
# print(people_rdd.collect())
for i in people_rdd.collect():
print(i)
实验SparkSQL编程初级实践

3. 编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表5-2所示的两行数据。

表5-2 employee表原有数据

id name gender Age
1 Alice F 22
2 John M 25

mysql> create database sparktest;

mysql> show databases;

mysql> use sparktest;

mysql> create table employee(id int(4),name char(20),gender char(4),age int(4));

mysql> insert into employee values(1,“Alice”,“F”,22);

mysql> insert into employee values(2,“John”,“M”,25);

mysql> select * from employee;
实验SparkSQL编程初级实践
实验SparkSQL编程初级实践
实验SparkSQL编程初级实践
(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表5-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。

表5-3 employee表新增数据

id name gender age
3 Mary F 26
4 Tom M 23

关于数据库的相关参数 driver – com.mysql.jdbc.Driver 数据库的JDBC驱动 url – 数据库的连接地址 jdbc:mysql://localhost:3306/spark dbtable – 访问的数据表 student user – 数据库的用户名 test password – 123456 数据库的用户密码
查看数据库内容并插入数据代码如下:
# -*- coding:utf-8 -*-
#反射机制 – 针对数据项已知
import os
os.environ[“JAVA_HOME”]=“/usr/lib/jvm/jdk1.8.0_162”
os.environ[“PYSPARK_PYTHON”]=‘/usr/bin/python3.5’
SUBMIT_ARGS = “–packages mysql:mysql-connector-java:5.1.40 pyspark-shell”
os.environ[“PYSPARK_SUBMIT_ARGS”] = SUBMIT_ARGS

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession

jdbcStr = “jdbc:mysql://localhost:3306/sparktest”
# 构建 spark 单元
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
# 读取数据库中的数据
jdbcDF=spark.read.format(“jdbc”).option(“url”,jdbcStr).option(“driver”,“com.mysql.jdbc.Driver”).option(“dbtable”,“employee”).option(“user”, “root”).option(“password”, “123456”).load()
# 数据查看
jdbcDF.show()

# 下面设置模式信息
schema = StructType([StructField(“id”, IntegerType(), True),StructField(“name”, StringType(), True),StructField(“gender”, StringType(), True),StructField(“age”, IntegerType(), True)])
# 下面设置两条数据,表示两个学生的信息
studentRDD = spark.sparkContext.parallelize([“3 Mary F 26”,“4 Tom M 23”]).map(lambda x:x.split(“\t”))
# 下面创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
# 建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
studentDF = spark.createDataFrame(rowRDD, schema)
studentDF.show()

# 写入数据库
prop = {}
prop[‘user’] = ‘root’
prop[‘password’] = ‘123456’
prop[‘driver’] = “com.mysql.jdbc.Driver”
studentDF.write.jdbc(jdbcStr,‘employee’,‘append’, prop)

# 读取数据库中的数据
jdbcDF=spark.read.format(“jdbc”).option(“url”,jdbcStr).option(“driver”,“com.mysql.jdbc.Driver”).option(“dbtable”,“employee”).option(“user”, “root”).option(“password”, “123456”).load()
# 数据查看
jdbcDF.show()

print(type(jdbcDF))
#查询年龄age的最大值
jdbcDF.agg({“age”:“max”}).show()
#查询年龄age的总和值
jdbcDF.agg({“age”:“sum”}).show()
实验SparkSQL编程初级实践
实验SparkSQL编程初级实践
出现的问题与解决方案:
问题一:spark连接mysql时报错,找不到JDBC。
问题原因:可能是实验前没有将jdbc放入spark的jars里面也可能是代码差点什么。

解决方法
将适合的jdbc放入spark的jars里面,并将jdbc路径添加在spark-env.sh中。
实验SparkSQL编程初级实践
以上步骤操作后重启后并没有解决。
复制spark-defaults.conf.template文件,修改spark-defaults.conf文件内容。
实验SparkSQL编程初级实践
再次重启尝试依旧报错。在代码中加入如下两行:
实验SparkSQL编程初级实践
运行成功!!!(我也不知道是不是只是因为这两行而执行成功的,但总归解决了)文章来源地址https://www.toymoban.com/news/detail-439741.html

到了这里,关于实验SparkSQL编程初级实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 实验四 Spark Streaming编程初级实践

    数据流  :数据流通常被视为一个随时间延续而无限增长的动态数据集合,是一组顺序、大量、快速、连续到达的数据序列。通过对流数据处理,可以进行卫星云图监测、股市走向分析、网络攻击判断、传感器实时信号分析。 1.下载安装包 https://www.apache.org/dyn/closer.lua/flume/

    2024年04月26日
    浏览(37)
  • 大数据实验 实验六:Spark初级编程实践

    实验环境:Windows 10 Oracle VM VirtualBox 虚拟机:cnetos 7 Hadoop 3.3 因为Hadoop版本为3.3所以在官网选择支持3.3的spark安装包 解压安装包到指定文件夹 配置spark-env.sh 启动成功 (1) 在spark-shell中读取Linux系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数; (2) 在spark-shell中读

    2024年02月04日
    浏览(71)
  • 【Spark编程基础】实验三RDD 编程初级实践(附源代码)

    1、熟悉 Spark 的 RDD 基本操作及键值对操作; 2、熟悉使用 RDD 编程解决实际具体问题的方法 1、Scala 版本为 2.11.8。 2、操作系统:linux(推荐使用Ubuntu16.04)。 3、Jdk版本:1.7或以上版本。 请到本教程官网的“下载专区”的“数据集”中下载 chapter5-data1.txt,该数据集包含了某大

    2024年03月25日
    浏览(53)
  • 大数据技术原理与应用实验4——MapReduce初级编程实践

    链接: 大数据技术原理与应用实验1——熟悉常用的HDFS操作 链接: 大数据技术原理与应用实验2——熟悉常用的Hbase操作 链接: 大数据技术原理与应用实验3——NoSQL和关系数据库的操作比较 (1)通过实验掌握基本的MapReduce编程方法; (2)掌握用MapReduce解决一些常见的数据处理

    2024年02月06日
    浏览(44)
  • 实验5 MapReduce初级编程实践(1)——编程实现文件合并和去重操作

    通过实验掌握基本的MapReduce编程方法; 掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。 操作系统:Linux(建议Ubuntu16.04或Ubuntu18.04) Hadoop版本:3.1.3 编程实现文件合并和去重操作 对于两个输入文件,即文件A和文件B,请编写MapReduce程序,

    2023年04月15日
    浏览(42)
  • 大数据技术原理及应用课实验5 :MapReduce初级编程实践

    目录 一、实验目的 二、实验平台 三、实验步骤(每个步骤下均需有运行截图) (一)编程实现文件合并和去重操作 (二)编写程序实现对输入文件的排序 (三)对给定的表格进行信息挖掘 四、实验总结 五、优化及改进(选做) 实验5  MapReduce初级编程实践 1. 通过实验掌

    2024年01月21日
    浏览(40)
  • 大数据技术原理及应用课实验7 :Spark初级编程实践

    实验7  Spark初级编程实践 一、实验目的 1. 掌握使用Spark访问本地文件和HDFS文件的方法 2. 掌握Spark应用程序的编写、编译和运行方法 二、实验平台 1. 操作系统:Ubuntu18.04(或Ubuntu16.04); 2. Spark版本:2.4.0; 3. Hadoop版本:3.1.3。 三、实验步骤(每个步骤下均需有运行截图) 实

    2024年01月22日
    浏览(45)
  • 云计算与大数据入门实验四 —— MapReduce 初级编程实践

    通过实验掌握基本的 MapReduce 编程方法 掌握用 MapReduce 解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等 (一)编程实现文件合并和去重操作 对于两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个

    2024年02月05日
    浏览(35)
  • 实验5 MapReduce初级编程实践(3)——对给定的表格进行信息挖掘

    通过实验掌握基本的MapReduce编程方法; 掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。 操作系统:Linux(建议Ubuntu16.04或Ubuntu18.04) Hadoop版本:3.1.3 下面给出一个child-parent的表格,要求挖掘其中的父子辈关系,给出祖孙辈关系的表格。

    2024年02月10日
    浏览(37)
  • 实验5 MapReduce初级编程实践(2)——编写程序实现对输入文件的排序

    通过实验掌握基本的MapReduce编程方法; 掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。 操作系统:Linux(建议Ubuntu16.04或Ubuntu18.04) Hadoop版本:3.1.3 现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数

    2024年02月09日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包