前言
在18年初刚开始接触学习spark的时候,买了一本《Spark大数据处理技术》的书,虽然后来一些Spark开发的知识都是从官网和实践中得来的,但是这本书对我来说是启蒙和领路的作用。
还记得这本书编程的开篇就是Spark程序”Hello World“!果然,这辈子是摆脱不了”Hello World“了。
Hello World
在大数据里第一次遇到Hello World,还是在经典的MapReduce WordCount,以此讨论如何实现map和reduce的过程。
后来学习Spark,又遇到了Hello World,同样也是用WrodCount案例,来演示Spark对数据集的基本操作。那么就借Hello World来做一个Spark的入门教程。
认识Spark
Spark是一个大数据的分布式计算框架。既能和一个普通的程序一样,运行在本地(local)IDE中,也能运行在搭建的Spark集群(Cluster)上,不过现在已经很少见。最常见的就是运行在第三方的计算调度平台上,例如yarn和K8s。
我测试使用local模式,生产是yarn,所以Spark就围绕着这两个来写。先说说在IDE中如何开发local的Spark程序。
Spark开发语言一共有三种:Java、python、scala。我使用scala来完成Spark开发,原因:
- 定理变量无需指定类型,使用val或var
- lambada操作,更符合流式计算的感觉(我开发流式计算比较多)
- 调用无参方法可以不写括号
- 趁机多掌握一门语言,而且Spark源码大多为scala
程序开发
因为Spark源码是java和scala开发的,所以要配置java和scala环境,在选择spark版本的同时,一起选择对应的scala的版本。
为了和生产保持一致,Spark版本我用的是2.3.2,scala版本2.11.8。至于spark3的新特性什么的,用到的时候再讲也不迟。
先创建一个scala的maven程序。
然后删除项目自带的scala,将自己需要的scala版本添加到项目中。
这样项目就有了2.11.8的scala编译、运行环境了。
maven依赖
Spark作为计算框架,和其他Java框架一样,需要引入依赖的jar。
定义了spark.version和scala.binary.version来统一控制spark的版本,这样在后面引用SparkStreaming、SparkSql、mlib等组件依赖的时候,就可以使用变量来指定。
对于最spark最基本的功能,我也称之为离线处理、批处理,只需要引入spark-core即可。
开发程序
新建一个scala object文件,而不是class,因为在scala中只有object才能定义main函数。然后就是模板化的程序开发。
1. 创建sparkContext
SparkContext是Spark程序的入口点,用于创建RDD、累加器和广播变量等。
val conf = new SparkConf().setAppName("test").setMaster("local[2]")
val sc = new SparkContext(conf)
setMaster() 用来指定程序运行在哪里。
local[4]表示使用cpu的4个core来执行任务,local[*]表示使用cpu的所有核心。当然,这只是在IDE中的写法。生产环境通常是通过shell脚本,提交到Hadoop的yarn上运行,所以都是在启动脚本里指定master,就不用在程序中指定了。
spark-submit --master yarn [...]
master指定为yarn。
2. 数据集
在大数据处理技术架构中,程序一般就分为三个模块:数据源、数据处理、数据输出。WorkCount的数据源可以定义为外部文件,也可以在程序内直接使用字符串变量表示,这里为了方便,就用字符串表示数据源。
Spark的计算数据是以RDD的形式存在的(这里RDD先可以理解成数据集合),Spark通过api接口从外部文件、数据源读取的数据,都会被抽象化成各种RDD,我们这里是在程序内指定的字符串,没有与数据源交互,所以需要我们调用makeRDD手动创建RDD。
val words = "Hello,World,Hello,World,Hello,Spark,Spark,Scala".split(",")
val wordKV = sc.makeRDD(words)
.filter(x => x.equals("Hello") || x.equals("World"))
.map(x => (x, 1))
.reduceByKey((x, y) => y + y)
先将字符串分割之后创建RDD,然后通过filter来过滤”Hello World“字符串,通过map处理成(Hello, 1)的形式,最后通过reduceByKey对具有相同key的value进行累加,最后输出。
wordKV返回的同样是一个RDD,我们使用map或者foreach进行遍历输出。
这样,关于Hello World的WordCount就完成,你可以看到这里我用的foreach进行遍历输出的,如果我用map呢。
???我的输出去哪里了?哈哈,别着急,等我操作一下,加一个collect方法,然后启动程序。
你看,结果又能照常输出了。
至于原因,就留在Spark运行架构或RDD的时候一起写。文章来源:https://www.toymoban.com/news/detail-839365.html
结语
本篇文章主要利用wordcount的小案例,带大家认识了一下Spark,内容比较浅显,后续会更加深入,期待与大家共同学习。
文章来源地址https://www.toymoban.com/news/detail-839365.html
到了这里,关于自学Spark,又是Hello World?的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!