Spark【RDD编程(三)键值对RDD】

这篇具有很好参考价值的文章主要介绍了Spark【RDD编程(三)键值对RDD】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

简介

        键值对 RDD 就是每个RDD的元素都是 (key,value)类型的键值对,是一种常见的 RDD,可以应用于很多场景。        

        因为毕竟通过我们之前Hadoop的学习中,我们就可以看到对数据的处理,基本都是以键值对的形式进行统一批处理的,因为MapReduce模型中,Mapper和Reducer之间的联系就是通过键和值进行连接产生关系的。

键值对RDD的创建

        其实就是个RDD 的创建,无非就是通过并行集合创建和通过文件系统创建,然后文件系统又分为本地文件系统和HDFS。

常用的键值对RDD转换操作

1、reduceByKey(func)

 和上一篇文章中的用法一致。

2、groupByKey(func)

和上一篇文章中的用法一致。

3、keys

返回键值对 RDD 中所有的key,构成一个新的 RDD。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object KV_RDD {

  def main(args: Array[String]): Unit = {
    //创建SparkContext对象
    val conf = new SparkConf()
    conf.setAppName("kv_rdd").setMaster("local")
    val sc:SparkContext = new SparkContext(conf)

    //通过并行集合创建RDD
    val arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)

    val res: RDD[String] = rdd.keys
    res.foreach(println)

    //关闭SparkContext
   sc.stop()
  }
}

输出结果:

Spark
Hadoop
Spark
Flink

4、values

返回键值对 RDD 中所有的key,构成一个新的 RDD。

//通过并行集合创建RDD
    val arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)

    val res: RDD[Int] = rdd.values
    res.foreach(println)

运行结果:

1
1
1
1

5、sortByKey(Boolean asce)

返回一个根据 key 排序(字典序)的RDD。

//通过并行集合创建RDD
    val arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)

    val res: RDD[(String,Int)] = rdd.sortByKey()
    res.foreach(println)

运行结果:

(Flink,1)
(Hadoop,1)
(Spark,1)
(Spark,1)

设置升序/降序

默认我们sortByKey()方法是升序排序的,如果要降序可以传入一个false的值。

//通过并行集合创建RDD
    val arr = Array(("Spark",1),("Hadoop",1),("Spark",1),("Flink",1))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)
    //降序
    val res: RDD[(String,Int)] = rdd.sortByKey(false)
    res.foreach(println)

运行结果:

(Spark,1)
(Spark,1)
(Hadoop,1)
(Flink,1)

6、sortBy()

可以根据其他字段进行排序。

//通过并行集合创建RDD
    val arr = Array(("Spark",1),("Hadoop",5),("Hive",2),("Flink",3))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)

    //按照value升序排序
    val res: RDD[(String,Int)] = rdd.sortBy(kv=>kv._2,true)
    res.foreach(println)

运行结果:

(Spark,1)
(Hive,2)
(Flink,3)
(Hadoop,5)

7、mapValues(func)

        之前我们处理的RDD 都是文本或数字类型的,之前我们的map(func)中的func函数是对整个RDD的元素进行处理。但是这里换成了mapValues(func),这里func函数处理的是我们(key,value)中的所有value,而key 不会发生变化。

//通过并行集合创建RDD
    val arr = Array(("Spark",1),("Hadoop",5),("Hive",2),("Flink",3))
    val rdd: RDD[(String, Int)] = sc.parallelize(arr)

    //所有的value+1
    val res: RDD[(String,Int)] = rdd.mapValues(value=>value+1)
    res.foreach(println)

运行结果:

(Spark,2)
(Hadoop,6)
(Hive,3)
(Flink,4)

8、join()

内连接,(K,V1)和(K,V2)进行内连接生成(K,(V1,V2))。

//通过并行集合创建RDD
    val arr1 = Array(("Spark",1),("Hadoop",5),("Hive",2),("Flink",3))
    val arr2 = Array(("Spark","fast"),("Hadoop","good"))
    val rdd1: RDD[(String,Int)] = sc.parallelize(arr1)
    val rdd2: RDD[(String,String)] = sc.parallelize(arr2)

    //所有的value+1
//    val res: RDD[(String,(Int,Int))] = rdd1.join(rdd2)
  val res: RDD[(String, (Int, String))] = rdd1.join(rdd2)
    res.foreach(println)

运行结果:

(Spark,(1,fast))
(Hadoop,(5,good))

我们可以看到,返回的RDD 的元素都是满足连接表rdd2的K的。 

9、combineByKey()

这个函数的参数比较多,下面做个介绍:

  1. createCombiner:用于将RDD中的每个元素转换为一个类型为C(V=>C)的值。这个函数在第一次遇到某个key的时候会被调用,用于创建一个累加器。
  2. mergeValue:用于将RDD中的每个value值合并到已经存在的累加器中。这个函数在遇到相同key的value时会被调用。
  3. mergeCombiners:用于将不同分区中的累加器值进行合并。这个函数在每个分区处理完后,将各个分区的累加器值进行合并。

案例-统计公司三个季度的总收入和平均收入

//通过并行集合创建RDD
    val arr = Array(("company-1",88),("company-1",96),("company-1",85),("company-2",94),("company-2",86),("company-2",74),("company-3",86),("company-3",88),("company-3",92))

    val rdd: RDD[(String, Int)] = sc.parallelize(arr,3)

    val res: RDD[(String,Int,Float)] = rdd.combineByKey(
      income=>(income,1),
      (acc:(Int,Int),income)=>(acc._1+income,+acc._2+1),
      (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)
    ).map({
      case (key,value) => (key,value._1,value._1/value._2.toFloat)
    })
    //重新分配分区 将3个分区合并为1个
    res.repartition(1).saveAsTextFile("data/kv_rdd/")

运行结果中-part-00000文件内容:

(company-3,266,88.666664)
(company-1,269,89.666664)
(company-2,254,84.666664)

其中,第一列为季度名称。第二列为总收入,第三列为平均收入。

参数解析

        第一个参数的作用是:当我们取出的RDD元素是第一次遇到的key,那么就创建一个组合器函数createCombiner(),负责将我们的键值对(K:季度名称,V:收入额)中的 V:收入额转为 C格式(总收入额,1)的格式,其中的1代表当前已经累加了一个月的收入。

        第二个参数是合并值函数 mergeValue(),它的作用是:如果遇到相同的key,比如都是"company-1",那么就对相同key的的value进行mergeValue()中定义的操作。

        第三个参数的作用是 :由于我们开启了多个分区,所以最后要对不同分区的数据进行一个对总,这个函数中定义的就是对两个 C格式 的键值对进行的操作。

最后我们进行了一个模式匹配,对于结果返回的(k,v)形式的数据,其中 k 就是指季度名称, v 是一个键值对(总收入额,月份数),我们将它转为 (季度名称,总收入额,平均收入额)。

分区1:
1-调用createCombiner()函数
(company-1,88) => (company-1,(88,1))
2-调用mergeValue()函数
(company-1,96) => (company-1,(184,2))
分区2:
1-调用createCombiner()函数
(company-1,85) => (company-1,(85,1))

3-调用mergeCombiners()函数
(company-1,(184,2)) + (company-1,(85,1)) => (company-1,(269,3))

10、flatMapValues(fubc)

        flatMapValues(func)的操作和mapValues(func)相似。它们都是对键值对类型的RDD进行操作,mapValues(func)是对(ke要,value)的value通过函数 func 进行一个处理,而key不变。而flatMapValues(func)则是对value先通过函数 func 进行处理,然后再处理后的值和key组成一系列新的键值对。

输入数据:

("k1","hadoop,spark,flink")
("k2","hadoop,hive,hbase")

处理

//通过并行集合创建RDD
    val arr = Array(("k1","hadoop,spark,flink"),("k2","hadoop,hive,hbase"))
    val rdd: RDD[(String, String)] = sc.parallelize(arr)

    //flatMapValues(func)
    //val res: Array[(String, String)] = rdd.flatMapValues(value =>   value.split(",")).collect()  
    //mapValues(func)
    val res: Array[(String, Array[String])] =rdd.mapValues(value => value.split(",")).collect()

value.split(",")).collect()
    res.foreach(println)

运行结果:

(k1,hadoop)
(k1,spark)
(k1,flink)
(k2,hadoop)
(k2,hive)
(k2,hbase)

而我们的mapValues(func)执行后的RDD集合内为:

(k1,Array("hadoop","spark","flink"))
(k2,Array("hadoop","hive","hbase"))

显然我们的flatMapValues(func)是多进行了一部扁平化的操作,将集合内的元素与key一一组成一系列心得键值对。文章来源地址https://www.toymoban.com/news/detail-706252.html

到了这里,关于Spark【RDD编程(三)键值对RDD】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 云计算实验2 Spark分布式内存计算框架配置及编程案例

    掌握分布式多节点计算平台Spark配置,Spark编程环境IDEA配置,示例程序启动与运行 Linux的虚拟机环境、线上操作视频和实验指导手册 完成Spark开发环境安装、熟悉基本功能和编程方法。 请按照线上操作视频和实验指导手册 ,完成以下实验内容: 实验2-1 Spark安装部署:Standal

    2023年04月13日
    浏览(63)
  • Spark 【RDD编程(一)RDD编程基础】

            在Spark中,RDD是弹性分布式数据集(Resilient Distributed Dataset)的缩写。通俗来讲,RDD是一种抽象的数据结构,用于表示分布式计算中的数据集合。它是Spark中最基本的数据模型,可以看作是一个不可变的、可分区、可并行处理的数据集合。这个数据集的全部或部分可

    2024年02月09日
    浏览(56)
  • Spark弹性分布式数据集

    1. Spark RDD是什么 RDD(Resilient Distributed Dataset,弹性分布式数据集)是一个不可变的分布式对象集合,是Spark中最基本的数据抽象。在代码中RDD是一个抽象类,代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。 每个RDD都被分为多个分区,这些分区运行在集群中

    2024年02月13日
    浏览(60)
  • 大数据学习06-Spark分布式集群部署

    配置好IP vim /etc/sysconfig/network-scripts/ifcfg-ens33 修改主机名 vi /etc/hostname 做好IP映射 vim /etc/hosts 关闭防火墙 systemctl status firewalld systemctl stop firewalld systemctl disable firewalld 配置SSH免密登录 ssh-keygen -t rsa 下载Scala安装包 配置环境变量 添加如下配置 使环境生效 验证 Spark官网 解压 上

    2024年02月10日
    浏览(70)
  • spark DStream从不同数据源采集数据(RDD 队列、文件、diy 采集器、kafka)(scala 编程)

    目录 1. RDD队列 2 textFileStream 3 DIY采集器 4 kafka数据源【重点】        a、使用场景:测试        b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理     1. 自定义采集器     2. 什么情况下需要自定采集器呢?          比

    2024年02月07日
    浏览(51)
  • Spark RDD编程基本操作

    RDD是Spark的核心概念,它是一个只读的、可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,可在多次计算间重用。Spark用Scala语言实现了RDD的API,程序员可以通过调用API实现对RDD的各种操作,从而实现各种复杂的应用。 Spark采用textFile()方法来从文件系统中加

    2024年02月06日
    浏览(84)
  • Spark【RDD编程(四)综合案例】

    输入数据:   处理代码: 代码解析:  运行结果: 要求:输入三个文件(每行一个数字),要求输出一个文件,文件内文本格式为(序号 数值)。         我们会发现,如果我们不调用 foreach 这个行动操作而是直接在转换操作中进行输出的话,这样是输出不来结果的,

    2024年02月09日
    浏览(38)
  • Spark避坑系列二(Spark Core-RDD编程)

    大家想了解更多大数据相关内容请移驾我的课堂: 大数据相关课程 剖析及实践企业级大数据 数据架构规划设计 大厂架构师知识梳理:剖析及实践数据建模 PySpark避坑系列第二篇,该篇章主要介绍spark的编程核心RDD,RDD的概念,基础操作 RDD(Resilient Distributed Dataset)叫做弹性

    2024年02月02日
    浏览(39)
  • Spark综合大作业:RDD编程初级实践

    Spark综合大作业:RDD编程初级实践 实验配置:操作系统:Ubuntu16.04 | 环境:Spark版本:2.4.0 | 软件:Python版本:3.4.3。 (1)熟悉Spark的RDD基本操作及键值对操作; (2)熟悉使用RDD编程解决实际具体问题的方法。 本次大作业的实验是操作系统:Ubuntu16.04,Spark版本:2.4.0,Python版

    2023年04月26日
    浏览(47)
  • 【Spark编程基础】实验三RDD 编程初级实践(附源代码)

    1、熟悉 Spark 的 RDD 基本操作及键值对操作; 2、熟悉使用 RDD 编程解决实际具体问题的方法 1、Scala 版本为 2.11.8。 2、操作系统:linux(推荐使用Ubuntu16.04)。 3、Jdk版本:1.7或以上版本。 请到本教程官网的“下载专区”的“数据集”中下载 chapter5-data1.txt,该数据集包含了某大

    2024年03月25日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包