Spark RDD编程 文件数据读写

这篇具有很好参考价值的文章主要介绍了Spark RDD编程 文件数据读写。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、本地文件系统的数据读写

1,从文件中读取数据创建RDD

从本地文件系统读取数据,可以采用textFile()方法,可以为textFile()方法提供一个本地文件或目录地址,如果是一个文件地址,它会加载该文件,如果是一个目录地址,它会加载该目录下的所有文件的数据。

示例:读取一个本地文件word.txt

scala> val  textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")

val textFile中的textFile是变量名称,sc.textFile()中的textFile是方法名称,二者同时使用时要注意区分,它们所代表的含义是不同的。执行上面这条命令以后,并不会马上显示结果

Spark采用惰性机制。可以查看textFile中的内容:

scala> textFile.first()

正因为Spark采用了惰性机制,在执行转换操作的时候,即使输入了错误的语句,spark-shell也不会马上报错,而是等到执行“行动”类型的语句时启动真正的计算,那个时候“转换”操作语句中的错误就会显示出来。
示例:
使用一个根本就不存在的word123.txt,执行上面语句时,spark-shell根本不会报错,因为,没有遇到“行动”类型的操作first()之前,这个加载操作是不会真正执行的。

scala> val  textFile = sc.
      |  textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")

2,把RDD写入到文本文件中

可以使用saveAsTextFile()方法把RDD中的数据保存到文本文件中。需要注意的是,saveAsTextFile()中提供的参数,不是文件名称,而是一个目录名称,因为,Spark通常是在分布式环境下执行,RDD会存在多个分区,由多个任务对这些分区进行并行计算,每个分区的计算结果都会保存到一个单独的文件中。例如,如果RDD有3个分区,saveAsTextFile()方法就会产生part-00001、part-00002和part-00003,以及一个_SUCCESS文件,其中,part-00001、part-00002和part-00003包含了RDD中的数据,_SUCCESS文件只是用来表示写入操作已经成功执行,该文件里面是空的,可以忽略该文件。因此,在Spark编程中,需要改变传统单机环境下编程的思维习惯,在单机编程中,我们已经习惯把数据保存到一个文件中,而作为分布式编程框架,因为RDD被分成多个分区,由多个任务并行执行计算,Spark通常都会产生多个文件,我们需要为这些文件提供一个保存目录,因此,需要为saveAsTextFile()方法提供一个目录地址,而不是一个文件地址。saveAsTextFile()要求提供一个事先不存在的保存目录,如果事先已经存在该目录,Spark就会报错。所以,如果是在独立应用程序中执行,最好在程序执行saveAsTextFile()之前先判断一下目录是否存在。

把textFile变量中的内容再次写回到另外一个目录wordback中,命令如下:

scala> val  textFile = sc.
      |  textFile("file:///usr/local/spark/mycode/wordcount/word.txt") 
scala> textFile.       
	  |  saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")

打开一个新的Linux终端,进入到“/usr/local/spark/mycode/wordcount/”目录,可以看到这个目录下面多了一个名称为“writeback”的子目录。进入writeback子目录以后,可以看到该目录中生成了两个文件part-00000和_SUCCESS(可以忽略),part-00000文件就包含了刚才写入的数据。之所以writeback目录下只包含一个文件part-00000,而不是多个part文件。

在启动进入Spark Shell环境:

$ cd /usr/local/spark
$ ./bin/spark-shell

spark-shell命令后面没有带上任何参数,则系统默认采用local模式启动spark-shell,即只使用一个Worker线程本地化运行Spark(完全不并行)。而且,在读取文件时,sc.textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)语句的圆括号中的参数只有文件地址,并没有包含分区数量,因此,生成的textFile这个RDD就只有一个分区,这样导致saveAsTextFile()最终生成的文件就只有一个part-00000。

在读取文件时进行分区:

scala> val  textFile = sc.
      |  textFile("file:///usr/local/spark/mycode/wordcount/word.txt",2) 
scala> textFile.       
      |  saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")

可以在writeback子目录下看到两个part文件,即part-00000和part-00001。

二、分布式文件系统HDFS的数据读写

从分布式文件系统HDFS中读取数据,也是采用textFile()方法,可以为textFile()方法提供一个HDFS文件或目录地址,如果是一个文件地址,它会加载该文件,如果是一个目录地址,它会加载该目录下的所有文件的数据。

读取一个HDFS文件:

scala> val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
scala> textFile.first()

为textFile()方法提供的文件地址格式可以有多种,如下3条语句都是等价的:

scala> val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
scala> val textFile = sc.textFile("/user/hadoop/word.txt")
scala> val textFile = sc.textFile("word.txt")

可以使用saveAsTextFile()方法把RDD中的数据保存到HDFS文件中:

scala> val textFile = sc.textFile("word.txt")
scala> textFile.saveAsTextFile("writeback")

三、JSON文件的读取

JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,它基于ECMAScript规范的一个子集,采用完全独立于编程语言的文本格式来存储和表示数据。简洁和清晰的层次结构使得JSON成为理想的数据交换语言,不仅易于阅读和编写,同时也易于机器解析和生成,并能够有效提升网络传输效率。Spark提供了一个JSON样例数据文件,存放在“/usr/local/spark/examples/src/main/resources/people.json”中(注意,“/usr/local/spark/”是Spark的安装目录)。

people.json文件内容如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

Scala中有一个自带的JSON库——scala.util.parsing.json.JSON,可以实现对JSON数据的解析,JSON.parseFull(jsonString:String)函数以一个JSON字符串作为输入并进行解析,如果解析成功,则返回一个Some(map: Map[String,Any]),如果解析失败,则返回None。
新建一个JSONRead.scala代码文件,输入以下内容:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext
._ import org.apache.spark.SparkConf import scala.util.parsing.json.JSON 
object JSONRead {     
	def main(args: Array[String]) { 
        val  inputFile  =  "file:///usr/local/spark/examples/src/main/resources/people. json"         
        val conf = new SparkConf().setAppName("JSONRead")         
        val sc = new SparkContext(conf)         
        val jsonStrs = sc.textFile(inputFile)         
        val result = jsonStrs.map(s => JSON.parseFull(s))         
        result.foreach( {
        	r => r match {                         
        		case Some(map: Map[String, Any]) => println(map)                         
        		case None =>println("Parsing failed")                         
        		case other => println("Unknown data structure: " + other)                 
        		}         
        	}         
        	)     
        } 
}

val jsonStrs = sc.textFile(inputFile)语句执行后,会生成一个名称为jsonStrs的RDD,这个RDD中的每个元素都是来自people.json文件中的一行,即一个JSON字符串。valresult =jsonStrs.map(s => JSON.parseFull(s))语句会对jsonStrs中的每个元素(即每个JSON字符串)进行解析,解析后的结果,存放到一个新的RDD(即result)中。如果解析成功,则返回一个Some(map: Map[String,Any]),如果解析失败,则返回None。所以,result中的元素,或者是一个Some(map: Map[String, Any]),或者是一个None。result.foreach()语句执行时,会依次扫描result中的每个元素,并对当前取出的元素进行模式匹配,如果是一个Some(map: Map[String, Any]),就打印出来,如果是一个None,就打印出“Parsing failed”。

使用sbt工具把JSONRead.scala代码文件编译打包成JAR包,通过spark-submit运行程序,屏幕上会输出如下信息:

Map(name -> Michael)
Map(name -> Andy, age -> 30.0)
Map(name -> Justin, age -> 19.0)

文章来源:《Spark编程基础》 作者:林子雨

文章内容仅供学习交流,如有侵犯,联系删除哦!文章来源地址https://www.toymoban.com/news/detail-450469.html

到了这里,关于Spark RDD编程 文件数据读写的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark【RDD编程(四)综合案例】

    输入数据:   处理代码: 代码解析:  运行结果: 要求:输入三个文件(每行一个数字),要求输出一个文件,文件内文本格式为(序号 数值)。         我们会发现,如果我们不调用 foreach 这个行动操作而是直接在转换操作中进行输出的话,这样是输出不来结果的,

    2024年02月09日
    浏览(38)
  • Spark RDD编程基本操作

    RDD是Spark的核心概念,它是一个只读的、可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,可在多次计算间重用。Spark用Scala语言实现了RDD的API,程序员可以通过调用API实现对RDD的各种操作,从而实现各种复杂的应用。 Spark采用textFile()方法来从文件系统中加

    2024年02月06日
    浏览(85)
  • 大数据编程实验一:HDFS常用操作和Spark读取文件系统数据

    这是我们大数据专业开设的第二门课程——大数据编程,使用的参考书是《Spark编程基础》,这门课跟大数据技术基础是分开学习的,但这门课是用的我们自己在电脑上搭建的虚拟环境进行实验的,不是在那个平台上,而且搭建的还是伪分布式,这门课主要偏向于有关大数据

    2024年04月10日
    浏览(54)
  • Spark综合大作业:RDD编程初级实践

    Spark综合大作业:RDD编程初级实践 实验配置:操作系统:Ubuntu16.04 | 环境:Spark版本:2.4.0 | 软件:Python版本:3.4.3。 (1)熟悉Spark的RDD基本操作及键值对操作; (2)熟悉使用RDD编程解决实际具体问题的方法。 本次大作业的实验是操作系统:Ubuntu16.04,Spark版本:2.4.0,Python版

    2023年04月26日
    浏览(47)
  • Spark避坑系列二(Spark Core-RDD编程)

    大家想了解更多大数据相关内容请移驾我的课堂: 大数据相关课程 剖析及实践企业级大数据 数据架构规划设计 大厂架构师知识梳理:剖析及实践数据建模 PySpark避坑系列第二篇,该篇章主要介绍spark的编程核心RDD,RDD的概念,基础操作 RDD(Resilient Distributed Dataset)叫做弹性

    2024年02月02日
    浏览(39)
  • 【Spark编程基础】实验三RDD 编程初级实践(附源代码)

    1、熟悉 Spark 的 RDD 基本操作及键值对操作; 2、熟悉使用 RDD 编程解决实际具体问题的方法 1、Scala 版本为 2.11.8。 2、操作系统:linux(推荐使用Ubuntu16.04)。 3、Jdk版本:1.7或以上版本。 请到本教程官网的“下载专区”的“数据集”中下载 chapter5-data1.txt,该数据集包含了某大

    2024年03月25日
    浏览(57)
  • Spark RDD 文件读取与保存(text、sequence、object)

    RDD 文件读取与保存 Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。 文件格式分为:text 文件、csv 文件、sequence 文件以及 Object 文件; 文件系统分为:本地文件系统、HDFS、HBASE 以及数据库。 text 文件 sequence 文件 SequenceFile 文件是 Hadoop 用来存储

    2024年02月02日
    浏览(38)
  • 大数据 - Spark系列《六》- RDD详解

    Spark系列文章: 大数据 - Spark系列《一》- 从Hadoop到Spark:大数据计算引擎的演进-CSDN博客 大数据 - Spark系列《二》- 关于Spark在Idea中的一些常用配置-CSDN博客 大数据 - Spark系列《三》- 加载各种数据源创建RDD-CSDN博客 大数据 - Spark系列《四》- Spark分布式运行原理-CSDN博客 大数据

    2024年02月20日
    浏览(46)
  • 【Java 编程】文件操作,文件内容的读写—数据流

    平时说的文件一般都是指存储在 硬盘 上的普通文件 形如 txt, jpg, mp4, rar 等这些文件都可以认为是普通文件,它们都是在硬盘上存储的 在计算机中,文件可能是一个 广义的概念 ,就不只是包含普通文件,还可以包含 目录 (把目录称为目录文件) 操作系统中,还会使用文件来描

    2023年04月08日
    浏览(50)
  • Spark大数据处理讲课笔记---Spark RDD典型案例

    利用RDD计算总分与平均分 利用RDD统计每日新增用户 利用RDD实现分组排行榜 针对成绩表,计算每个学生总分和平均分   读取成绩文件,生成lines;定义二元组成绩列表;遍历lines,填充二元组成绩列表;基于二元组成绩列表创建RDD;对rdd按键归约得到rdd1,计算总分;将rdd1映射

    2024年02月06日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包