【Spark】Spark SQL基础使用详解和案例

这篇具有很好参考价值的文章主要介绍了【Spark】Spark SQL基础使用详解和案例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spark SQL是Apache Spark的一个模块,它提供了一种基于结构化数据的编程接口。
Spark SQL支持结构化数据的处理,包括数据的读取、转换和查询。它可以将传统的基于表和SQL的操作和Spark的分布式计算相结合,提供强大的数据处理和分析能力。
Spark SQL也可以与其他Spark组件集成,如MLlib和GraphX,以支持更广泛的数据处理场景。

  1. 读入数据
val spark: SparkSession = SparkSession.builder().master("local").appName("agent_log_df").getOrCreate()

val fileRDD: RDD[String] = spark.sparkContext.textFile("datas/agent.log")
val rowRDD: RDD[Row] = fileRDD.map(_.split(" ")).map(
line => Row(line(0), line(1).toInt, line(2).toInt)
)
  1. 创建表结构
// 定义表结构
val df: DataFrame = spark.sqlContext.createDataFrame(rowRDD,
StructType(Seq(StructField("t1", StringType), StructField("t2", IntegerType), StructField("t3", IntegerType)))
)
  1. 创建临时表
df.createTempView("tmp_table")
  1. sql逻辑
val sql =
  """
    |select t1,t2,t3
    |from (
    |select t1, sum(t2) as t2, sum(t3) as t3 from tmp_table group by t1
    |) t
    |order by t2 desc,t3 desc
    |limit 10
    |""".stripMargin
  1. sql执行
val result: DataFrame = spark.sql(sql)
  1. 结果展示
result.show()

上述中有几个关键的类和方法:

  1. sqlContext

Spark的SQLContext是负责Spark SQL操作的上下文对象,它提供了许多与SQL相关的功能,包括读取和处理各种数据源中的数据、执行SQL查询、创建数据框架和表等等。
通过SQLContext,用户可以使用DataFrame API来以结构化和类型安全的方式处理数据,并可以使用SQL语言和Spark SQL的内置函数来进行数据分析和查询。
总体来说,Spark的SQLContext是非常强大和灵活的,可以适应各种数据处理和分析需求,并且在处理大规模数据时具有出色的性能和扩展性。
在使用Spark的SqlContext之前,需要首先初始化一个SparkContext对象并创建一个RDD。

使用SqlContext需要进行以下步骤:

创建一个SparkConf对象,并设置一些参数,如AppName和Master。
使用SparkConf对象创建一个SparkContext对象。
通过SparkContext对象创建一个SqlContext对象。
使用SqlContext对象加载数据,并将其转换为DataFrame类型。
以下是具体的代码示例:

from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf

#创建SparkConf
conf = SparkConf().setAppName("sql_example").setMaster("local[*]")

#创建SparkContext
sc = SparkContext(conf=conf)

#创建SqlContext
sqlContext = SQLContext(sc)

#读取数据文件
people = sc.textFile("people.txt")

#将数据转换为一个DataFrame对象
people_df = sqlContext.createDataFrame(people.map(lambda row: row.split(",")), ["name", "age"])

在以上示例中,我们首先创建了一个SparkConf对象,并设置了AppName和Master属性。然后使用SparkConf对象创建了一个SparkContext对象,并将其传递给SqlContext构造函数。接着读取了一个数据文件,并使用SqlContext对象将数据转换成DataFrame对象。

注意:使用SqlContext时需要将数据转换成DataFrame对象,而不是RDD对象。如果需要在SqlContext中使用RDD对象,可以将其转换为DataFrame对象,再进行操作。

  1. StructType

Spark的StructType是一种定义结构化数据的数据类型。
它类似于SQL表的结构,每个StructType都由一组结构字段组成,每个结构字段都有一个名称和数据类型。
使用StructType,用户可以对结构化数据进行索引、查询和分析。
StructType被广泛应用于Spark中的DataFrame API和SQL查询中。

使用方法:

导入 Spark SQL 的相关包
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
定义 StructType
例如,假设要定义一个包含 name 和 age 两个字段的 StructType。则可以按照以下方式定义:

val schema = StructType(
    StructField("name", StringType, true) ::
    StructField("age", IntegerType, true) :: Nil
)
// 其中,StructType 用于表示整个数据结构,StructField 用于表示每个字段的信息,StringType 用于表示字段类型为字符串类,IntegerType 用于表示字段类型为整数。

// 使用 StructType
// 在创建 DataFrame 时,可以通过传递定义好的 StructType 对象来指定 DataFrame 的结构。例如:

val data = spark.sparkContext.parallelize(Seq(("John", 25), ("Mary", 30), ("Jack", 22)))
val df = spark.createDataFrame(data).toDF("name", "age")

df.printSchema()
df.show()

输出结果为:

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+----+---+
|name|age|
+----+---+
|John| 25|
|Mary| 30|
|Jack| 22|
+----+---+

栗子

给一个日志文件,过滤出两张表,然后设计表结构,使用Spark SQL实现两张表的连接文章来源地址https://www.toymoban.com/news/detail-611258.html

object spark_sql_code_1 {
  def main(args: Array[String]): Unit = {
    // TODO 1: 创建spark环境
    val spark: SparkSession = SparkSession.builder().master("local").appName("spark sql code").getOrCreate()

    // TODO 2: 读取数据
    val rowRDD: RDD[Row] = spark.sparkContext.textFile("datas/agent.log")
      .map(line => {
        val words: Array[String] = line.split(" ");
        Row(words(1), words(2).toInt, words(3).toInt)
      })

    rowRDD.persist()

    val tableRDD1: RDD[Row] = rowRDD.filter(row => {
      row.getInt(1) % 2 == 0
    })

    val tableRDD2: RDD[Row] = rowRDD.filter(row => {
      row.getInt(2) % 2 == 0
    })


    // TODO 3: 创建表结构和临时表
    // 定义表结构
    val df1: DataFrame = spark.sqlContext.createDataFrame(tableRDD1,
      StructType(Seq(StructField("t1", StringType), StructField("t2", IntegerType), StructField("t3", IntegerType)))
    )

    df1.createTempView("t")

    val df2: DataFrame = spark.sqlContext.createDataFrame(tableRDD2,
      StructType(Seq(StructField("r1", StringType), StructField("r2", IntegerType), StructField("r3", IntegerType)))
    )

    df2.createTempView("r")

    // TODO 4: sql逻辑
    val sql: String =
      """
        |select r1 as t1, r2 as t2, r3 as t3, 'r' as tp from r
        |""".stripMargin


    // TODO 5: 执行sql
    val result: DataFrame = spark.sql(sql)

    // TODO 6: 结果显示
    result.show()

    // TODO 7: 关闭spark环境
    spark.stop()
  }

到了这里,关于【Spark】Spark SQL基础使用详解和案例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark---SparkSQL介绍

    Shark是基于Spark计算框架之上且兼容Hive语法的SQL执行引擎,由于底层的计算采用了Spark,性能比MapReduce的Hive普遍快2倍以上,当数据全部load在内存的话,将快10倍以上,因此Shark可以作为交互式查询应用服务来使用。除了基于Spark的特性外,Shark是完全兼容Hive的语法,表结构以及

    2024年01月21日
    浏览(34)
  • 【spark】SparkSQL

    什么是SparkSQL SparkSQL是Spark的一个模块,用于处理海量 结构化数据 为什么学习SparkSQL SparkSQL是非常成熟的海量结构化数据处理框架: 学习SparkSQL主要在2个点: SparkSQL本身十分优秀,支持SQL语言、性能强、可以自动优化、API简单、兼容HIVE等等 企业大面积在使用SparkSQL处理业务数

    2024年01月20日
    浏览(39)
  • 电影评分数据分析案例-Spark SQL

    1. 2. 3. 4. 5. 6.

    2024年02月08日
    浏览(43)
  • Spark(16):SparkSQL之DataSet

    目录 0. 相关文章链接 1. DataSet的定义 2. 创建DataSet 2.1. 使用样例类序列创建 DataSet 2.2. 使用基本类型的序列创建 DataSet 2.3. 注意 3. RDD 转换为 DataSet 4. DataSet 转换为 RDD  Spark文章汇总  DataSet 是具有强类型的数据集合,需要提供对应的类型信息。 在实际使用的时候,很少用到

    2024年02月13日
    浏览(33)
  • Spark(15):SparkSQL之DataFrame

    目录 0. 相关文章链接 1. DataFrame的作用 2. 创建DataFrame 3. SQL 语法 4. DSL 语法 5. RDD 转换为 DataFrame 6. DataFrame 转换为 RDD  Spark文章汇总          Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有

    2024年02月13日
    浏览(30)
  • spark第四章:SparkSQL基本操作

    spark第一章:环境安装 spark第二章:sparkcore实例 spark第三章:工程化代码 spark第四章:SparkSQL基本操作 接下来我们学习SparkSQL他和Hql有些相似。Hql是将操作装换成MR,SparkSQL也是,不过是使用Spark引擎来操作,效率更高一些 以上是这次博客需要的所有依赖,一次性全加上。 一共

    2024年02月07日
    浏览(33)
  • 【Spark精讲】一文讲透SparkSQL执行过程

    逻辑计划阶段会将用户所写的 SQL语句转换成树型数据结构( 逻辑算子树 ), SQL语句中蕴含的逻辑映射到逻辑算子树的不同节点。 顾名思义,逻辑计划阶段生成的逻辑算子树并不会直接提交执行,仅作为中间阶段 。 最终逻辑算子树的生成过程经历 3 个子阶段,分别对应 未解析

    2024年02月03日
    浏览(25)
  • 2023_Spark_实验十四:SparkSQL入门操作

    1、将emp.csv、dept.csv文件上传到分布式环境,再用  hdfs  dfs -put dept.csv /input/ hdfs  dfs -put emp.csv /input/ 将本地文件put到hdfs文件系统的input目录下 2、或者调用本地文件也可以。区别:sc.textFile(\\\"file:///D:\\\\temp\\\\emp.csv\\\") StructType 是个case class,一般用于构建schema. 因为是case class,所以使

    2024年02月08日
    浏览(31)
  • spark中Rdd依赖和SparkSQL介绍--学习笔记

    1.1概念 rdd的特性之一 相邻rdd之间存在依赖关系(因果关系) 窄依赖 每个父RDD的一个Partition最多被子RDD的一个Partition所使用 父rdd和子rdd的分区是一对一(多对一) 触发窄依赖的算子 map(),flatMap(),filter() 宽依赖 父RDD的一个partition会被子rdd的多个Partition所使用 父rdd和子rdd的

    2024年01月17日
    浏览(35)
  • SparkSQL的分布式执行引擎(Spark ThriftServer)

    Spark ThriftServer 相当于一个持续性的Spark on Hive集成模式,可以启动并监听在10000端口,持续对外提供服务,可以使用数据库客户端工具或代码连接上来,操作Spark bin/spark-sql 脚本,类似于Hive的 bin/hive脚本 ( 内部内置了hive的hiveServer2服务或Spark执行引擎,每次脚本执行,都会启动

    2024年02月11日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包