Spark【Spark SQL(二)RDD转换DataFrame、Spark SQL读写数据库 】

这篇具有很好参考价值的文章主要介绍了Spark【Spark SQL(二)RDD转换DataFrame、Spark SQL读写数据库 】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

从 RDD 转换得到 DataFrame

Saprk 提供了两种方法来实现从 RDD 转换得到 DataFrame:

  1. 利用反射机制推断 RDD 模式
  2. 使用编程方式定义 RDD 模式

下面使用到的数据 people.txt :

Tom, 21
Mike, 25
Andy, 18

1、利用反射机制推断 RDD 模式

        在利用反射机制推断 RDD 模式的过程时,需要先定义一个 case 类,因为只有 case 类才能被 Spark 隐式地转换为DataFrame对象。

object Tese{
  
    // 反射机制推断必须使用 case 类,case class 必须放到main方法之外
    case class Person(name: String,age: Long)  //定义一个case类

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("rdd to df 1")
      .getOrCreate()


    import spark.implicits._ //这里的spark不是org.apache.spark这个包 而是我们创建的SparkSession对象 它支持把一个RDD隐式地转换为一个 DataFrame对象


    val rdd: RDD[Person] = spark.sparkContext
      .textFile("data/sql/people.txt")
      .map(line => line.split(","))
      .map(t => Person(t(0), t(1).trim.toInt))

    // 将RDD对象转为DataFrame对象
    val df: DataFrame = rdd.toDF()

    df.createOrReplaceTempView("people")

    spark.sql("SELECT * FROM people WHERE age > 20").show()

    spark.stop()
  }
}

注意事项1:

case 类必须放到伴生对象下,main方法之外,因为在隐式转换的时候它会自动通过 伴生对象名.case类名 来调用case类,如果放到main下面就找不到了。

注意事项2:

import spark.implicits._
这里的spark不是org.apache.spark这个包 而是我们上面创建的SparkSession对象 它支持把一个RDD隐式地转换为一个 DataFrame对象

2、使用编程方式定义 RDD 模式

        反射机制推断时需要定义 case class,但当无法定义 case 类时,就需要采用编程式来定义 RDD 模式了。这种方法看起来比较繁琐,但是很好用,不容易报错。

        我们现在同样加载 people.txt 中的数据,生成 RDD 对象,再把RDD对象转为DataFrame对象,进行SparkSQL 查询。主要包括三个步骤:

  1. 制作表头 schema: StructType
  2. 制作表中记录 rowRDD: RDD[Row]
  3. 合并表头和记录 df:DataFramw
def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("rdd to df 2")
      .getOrCreate()

    //1.制作表头-也就是定义表的模式
    val schema: StructType = StructType(Array(StructField("name", StringType, true),
      StructField("age", IntegerType, true)))
    //2.加载表中的记录-也就是读取文件生成RDD
    val rowRdd: RDD[Row] = spark.sparkContext
      .textFile("data/sql/people.txt")
      .map(_.split(","))
      .map(attr => Row(attr(0), attr(1).trim.toInt))
    //3.把表头和记录拼接在一起
    val peopleDF: DataFrame = spark.createDataFrame(rowRdd, schema)

    peopleDF.createOrReplaceTempView("people")

    spark.sql("SELECT * FROM people WHERE age > 20").show()

    spark.stop()
  }

运行结果:

+----+---+
|name|age|
+----+---+
| Tom| 21|
|Mike| 25|
+----+---+

Spark SQL读取数据库

导入依赖

根据自己本地的MySQL版本导入对应的驱动。

注意:mysql8.0版本在JDBC中的url是:" com.mysql.cj.jdbc.Driver "

<dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.31</version>
        </dependency>

读取 MySQL 中的数据

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("jdbc spark sql")
      .getOrCreate()

    val mysql: DataFrame = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/spark")
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("dbtable", "student")
      .option("user", "root")
      .option("password", "Yan1029.")
      .load()

    mysql.show()

    spark.stop()
  }

运行结果:

默认显示整张表

+---+----+---+---+
| id|name|age|sex|
+---+----+---+---+
|  1| Tom| 21| 男|
|  2|Andy| 20| 女|
+---+----+---+---+

向 MySQL 写入数据

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("jdbc spark sql")
      .getOrCreate()

    //导入两条student信息
    val rdd: RDD[Array[String]] = spark.sparkContext
      .parallelize(Array("3 Mike 22 男", "4 Cindy 23 女"))
      .map(_.split(" "))

    //设置模式信息-创建表头
    val schema: StructType = StructType(Array(StructField("id", IntegerType, true),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true),
      StructField("sex", StringType, true)))

    //创建Row对象 每个 Row对象都是表中的一行-创建记录
    val rowRDD = rdd.map(stu => Row(stu(0).toInt, stu(1), stu(2).toInt, stu(3)))

    //创建DataFrame对象 拼接表头和记录
    val df = spark.createDataFrame(rowRDD, schema)

    //创建一个 prop 变量 用来保存 JDBC 连接参数
    val prop = new Properties()
    prop.put("user","root")
    prop.put("password","Yan1029.")
    prop.put("driver","com.mysql.cj.jdbc.Driver")

    //写入数据 采用 append 模式追加
    df.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)

    spark.stop()
  }

运行结果:

Spark【Spark SQL(二)RDD转换DataFrame、Spark SQL读写数据库 】,Spark,数据库,spark,sql,大数据


总结

        今天上午就学到这里,本想着今天专门看看StructType、StructField和Row这三个类的,没想到就在这节课。这一篇主要学了RDD对象向DataFrame对象的转换以及Spark SQL如何读取数据库、写入数据库。

        下午学完这一章最后的DataSet。文章来源地址https://www.toymoban.com/news/detail-705634.html

到了这里,关于Spark【Spark SQL(二)RDD转换DataFrame、Spark SQL读写数据库 】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark RDD、DataFrame、DataSet比较

    在Spark的学习当中,RDD、DataFrame、DataSet可以说都是需要着重理解的专业名词概念。尤其是在涉及到数据结构的部分,理解清楚这三者的共性与区别,非常有必要。 RDD,作为Spark的核心数据抽象,是Spark当中不可或缺的存在,而在SparkSQL中,Spark为我们提供了两个新的抽象,分别

    2024年02月04日
    浏览(23)
  • RDD转换为DataFrame

            spark官方提供了两种方法实现从RDD转换到DataFrame。第一种方法是利用反射机制来推断包含特定类型对象的Schema,这种方式适用于对已知的数据结构的RDD转换;第二种方法通过编程接口构造一个 Schema ,并将其应用在已知的RDD数据中。         在Windows系统下开发

    2023年04月23日
    浏览(22)
  • 【Spark基础】-- RDD 转 Dataframe 的三种方式

    目录 一、环境说明 二、RDD 转 Dataframe 的方法 1、通过 StructType 创建 Dataframe(强烈推荐使用这种方法)

    2024年01月19日
    浏览(25)
  • Spark的核心概念:RDD、DataFrame和Dataset

    Apache Spark,其核心概念包括RDD(Resilient Distributed Dataset)、DataFrame和Dataset。这些概念构成了Spark的基础,可以以不同的方式操作和处理数据,根据需求选择适当的抽象。 RDD是Spark的基本数据抽象,它代表一个不可变、分布式的数据集合。下面我们将更详细地探讨RDD: RDD的特性

    2024年02月04日
    浏览(26)
  • spark sql 查看全部数据库的表

    大数据环境下,metastore一般都交个hive处理,随着数据库 表 越来越多,进行源数据管理的就会成为痛点,如何能够查询出所有的数据库下的所有表 Spark 官方文档Tables 官方给的sample中,只能一个库一个库查询,如果有成百上千个库呢? 从 Python 3.6 开始,Python f 字符串可用。

    2024年02月14日
    浏览(27)
  • GaussDB数据库SQL系列-行列转换

    一、前言 二、简述 1、行转列概念 2、列转行概念 三、GaussDB数据库的行列转行实验示例 1、行转列示例 1)创建实验表(行存表) 2)静态行转列 3)行转列(结果值:拼接式) 4)动态行转列(拼接SQL式) 2、列转行示例 1)创建实验表(复用前面的测试数据) 2)使用union a

    2024年02月10日
    浏览(29)
  • 【Spark】RDD转换算子

    目录 map mapPartitions mapPartitionsWithIndex flatMap glom groupBy shuffle filter sample distinct coalesce repartition sortBy ByKey intersection union subtract zip partitionBy reduceByKey groupByKey reduceByKey 和 groupByKey 的区别 aggregateByKey foldByKey combineByKey reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别 join leftOuterJoin

    2024年02月12日
    浏览(58)
  • [SQL Server]SQL Server数据库中如何将时间日期类型(DateTime)转换成字符串类型(varchar,nvarchar)

    SQL Server数据库中,如何将时间日期类型(DateTime)的数据转换成字符串类型(varchar,nvarchar),并对其进行 yyyy-mm-dd 形式的格式化输出 使用SQL Server的 CONVERT() 函数,如下: SELECT LEFT(CONVERT(VARCHAR, GETDATE(), 120), 10) 或者 SELECT CONVERT(VARCHAR(10), GETDATE(), 120) 在SQL Server 2012及以上版本中,新增

    2024年02月07日
    浏览(48)
  • Spark---RDD算子(单值类型转换算子)

    RDD算子是用于对RDD进行转换(Transformation)或行动(Action)操作的方法或函数。通俗来讲,RDD算子就是RDD中的函数或者方法,根据其功能,RDD算子可以分为两大类: 转换算子(Transformation): 转换算子用于从一个RDD生成一个新的RDD,但是原始RDD保持不变。常见的转换算子包括

    2024年01月21日
    浏览(34)
  • python将dataframe数据导入MongoDB非关系型数据库

    pymongo连接 新建数据库和集合 pandas导入数据 使用 df.to_dict 函数,返回结果为列表,列表中的每个元素为json型,是原来excel中的一条记录。 插入数据 数据查看

    2024年02月16日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包