横扫Spark之 - 9个常见的行动算子

这篇具有很好参考价值的文章主要介绍了横扫Spark之 - 9个常见的行动算子。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

水善利万物而不争,处众人之所恶,故几于道💦

1. collect()

  收集RDD每个分区的数据以数组封装之后发给Driver
  如果RDD数据量比较大,Driver内存默认只有1G,可能出现内存溢出,工作中一般需要将Driver内存设置为5-10G。可以通过bin/spark-submit --driver-memory 10G 这样设置

  @Test
  def collect(): Unit ={

    val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6))

    val arr = rdd1.collect()

    println(arr.toList)

  }

结果:
横扫Spark之 - 9个常见的行动算子,Spark,spark,大数据,分布式,行动算子

2. count()

  返回RDD中元素的个数

@Test
def count(): Unit ={

  val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6))

  println(rdd1.count())
}

结果:
横扫Spark之 - 9个常见的行动算子,Spark,spark,大数据,分布式,行动算子

3. first()

  返回RDD中的第一个元素
  他会从多个分区取数据,如果0号分区取到了数据的话就只有一个job;如果0号分区没有取到数据,或者取到的数据不够,那就会再启动一个job去其他分区取

  @Test
  def first(): Unit ={

  val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6),7)
	// 0号分区没有数据所以就会再启动一个job从后面的分区取,所以web页面看到有两个job
  val i = rdd1.first()

  println(i)

  Thread.sleep(10000000)
}

结果:
横扫Spark之 - 9个常见的行动算子,Spark,spark,大数据,分布式,行动算子

4. take()

  返回RDD中前n个元素组成的数组
  take和first一样如果取到就一个job如果取不到或者没取够就再来一个job去取

@Test
def take(): Unit ={

  val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6),3)

  println(rdd1.take(3).toList)

  Thread.sleep(10000000)
}

结果:
横扫Spark之 - 9个常见的行动算子,Spark,spark,大数据,分布式,行动算子

5. takeOrdered()

  这个是取排序之后的前几个元素
  takeOrdered没有shuffle,因为只需要每个分区取前三然后拉到一起再取一次前三就完事了

@Test
def takeOrdered(): Unit ={

val rdd1 = sc.parallelize(List(1, 7,98,3,7,86,23,54, 9, 42, 6),3)

val ints = rdd1.takeOrdered(3)

println(ints.toList)

Thread.sleep(1000000)
}

结果:
横扫Spark之 - 9个常见的行动算子,Spark,spark,大数据,分布式,行动算子

6. countByKey()

  统计每个key出现的次数,返回的结果是(key,次数)

@Test
def countByKey(): Unit ={
  val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))

  val rdd2 = rdd1.countByKey()

  println(rdd2.toList)
}

结果:
横扫Spark之 - 9个常见的行动算子,Spark,spark,大数据,分布式,行动算子

7. saveAS…()

  saveAsTextFile(path)将数据保存成text文件,有几个task就保存几个文件
  saveAsSequenceFile(path)将数据保存成Sequencefile文件【只有kv类型RDD有该操作,单值的没有】
  saveAsObjectFile(path)将数据序列化成对象保存到文件

@Test
def save(): Unit ={

  val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))

  rdd1.saveAsTextFile("output/text")  // 为啥保存出来8个文件因为有8个task
  rdd1.saveAsObjectFile("output/ObjectFile")
  rdd1.saveAsSequenceFile("output/SequenceFile")
}

结果:
横扫Spark之 - 9个常见的行动算子,Spark,spark,大数据,分布式,行动算子

8. foreach()

  遍历RDD中的每个元素

@Test
def foreach(): Unit = {
  val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))

  rdd1.foreach(println)
}

结果:
横扫Spark之 - 9个常见的行动算子,Spark,spark,大数据,分布式,行动算子

9. foreachPartition() ***

  对每个分区遍历,参数列表传入的函数是针对每个分区的操作,有多少个分区函数就执行多少次
  foreachPartition的使用场景是:一般用于将数据写入mysql/redis/hbase等位置,可以减少连接的创建、销毁次数,提高效率

@Test
def foreachPartition(): Unit ={

  val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))

  rdd1.foreachPartition(it=>{

    var connection:Connection = null
    var statement:PreparedStatement = null

    try{
      connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456")
      statement = connection.prepareStatement("insert into wc values(?,?)")

      //  计数器
      var count = 0

      it.foreach(x=>{
        statement.setString(1,x._1)
        statement.setInt(2,x._2)

        // 添加到批中,一批一批的执行
        statement.addBatch()

        // 满1000条执行一批
        if(count % 1000 == 0){
          statement.executeBatch()
          // todo 执行完批后要记得clearBatch !!!!!
          statement.clearBatch()
        }
        count = count+1
      })
      // 最后不满1000条的也执行一次
      statement.executeBatch()

    }catch {
      case e:Exception => e.printStackTrace()
    }finally {
      if (connection != null) {
        connection.close()
      }
      if (statement != null) {
        statement.close()
      }
    }
  })
}

结果:
横扫Spark之 - 9个常见的行动算子,Spark,spark,大数据,分布式,行动算子文章来源地址https://www.toymoban.com/news/detail-829643.html

到了这里,关于横扫Spark之 - 9个常见的行动算子的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据课程K2——Spark的RDD弹性分布式数据集

    文章作者邮箱:yugongshiye@sina.cn              地址:广东惠州 ⚪ 了解Spark的RDD结构; ⚪ 掌握Spark的RDD操作方法; ⚪ 掌握Spark的RDD常用变换方法、常用执行方法; 初学Spark时,把RDD看做是一个集合类型(类似于Array或List),用于存储数据和操作数据,但RDD和普通集合的区别

    2024年02月12日
    浏览(33)
  • 大数据开源框架环境搭建(七)——Spark完全分布式集群的安装部署

    前言:七八九用于Spark的编程实验 大数据开源框架之基于Spark的气象数据处理与分析_木子一个Lee的博客-CSDN博客_spark舆情分析 目录 实验环境: 实验步骤: 一、解压 二、配置环境变量:  三、修改配置文件  1.修改spark-env.sh配置文件: 2.修改配置文件slaves: 3.分发配置文件:

    2024年02月11日
    浏览(31)
  • 云计算与大数据第16章 分布式内存计算平台Spark习题

    1、Spark是Hadoop生态(  B  )组件的替代方案。 A. Hadoop     B. MapReduce        C. Yarn             D.HDFS 2、以下(  D  )不是Spark的主要组件。 A. Driver      B. SparkContext       C. ClusterManager D. ResourceManager 3、Spark中的Executor是(  A  )。 A.执行器      B.主节

    2024年02月14日
    浏览(45)
  • 分布式计算中的大数据处理:Hadoop与Spark的性能优化

    大数据处理是现代计算机科学的一个重要领域,它涉及到处理海量数据的技术和方法。随着互联网的发展,数据的规模不断增长,传统的计算方法已经无法满足需求。因此,分布式计算技术逐渐成为了主流。 Hadoop和Spark是目前最为流行的分布式计算框架之一,它们都提供了高

    2024年01月23日
    浏览(36)
  • 数据存储和分布式计算的实际应用:如何使用Spark和Flink进行数据处理和分析

    作为一名人工智能专家,程序员和软件架构师,我经常涉及到数据处理和分析。在当前大数据和云计算的时代,分布式计算已经成为了一个重要的技术方向。Spark和Flink是当前比较流行的分布式计算框架,它们提供了强大的分布式计算和数据分析功能,为数据处理和分析提供了

    2024年02月16日
    浏览(41)
  • 【Spark分布式内存计算框架——Spark 基础环境】1. Spark框架概述

    第一章 说明 整个Spark 框架分为如下7个部分,总的来说分为Spark 基础环境、Spark 离线分析和Spark实时分析三个大的方面,如下图所示: 第一方面、Spark 基础环境 主要讲述Spark框架安装部署及开发运行,如何在本地模式和集群模式运行,使用spark-shell及IDEA开发应用程序,测试及

    2024年02月11日
    浏览(41)
  • 分布式计算框架:Spark、Dask、Ray 分布式计算哪家强:Spark、Dask、Ray

    目录 什么是分布式计算 分布式计算哪家强:Spark、Dask、Ray 2 选择正确的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式计算是一种计算方法,和集中式计算是相对的。 随着计算技术的发展, 有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成

    2024年02月11日
    浏览(41)
  • Spark单机伪分布式环境搭建、完全分布式环境搭建、Spark-on-yarn模式搭建

    搭建Spark需要先配置好scala环境。三种Spark环境搭建互不关联,都是从零开始搭建。 如果将文章中的配置文件修改内容复制粘贴的话,所有配置文件添加的内容后面的注释记得删除,可能会报错。保险一点删除最好。 上传安装包解压并重命名 rz上传 如果没有安装rz可以使用命

    2024年02月06日
    浏览(55)
  • spark分布式解压工具

    ​ spark解压缩工具,目前支持tar、gz、zip、bz2、7z压缩格式,默认解压到当前路下,也支持自定义的解压输出路径。另外支持多种提交模式,进行解压任务,可通过自定义配置文件,作为spark任务的资源设定 2.1 使用hadoop的FileSystem类,对tos文件的进行读取、查找、写入等操作

    2024年02月02日
    浏览(31)
  • Spark分布式内存计算框架

    目录 一、Spark简介 (一)定义 (二)Spark和MapReduce区别 (三)Spark历史 (四)Spark特点 二、Spark生态系统 三、Spark运行架构 (一)基本概念 (二)架构设计 (三)Spark运行基本流程 四、Spark编程模型 (一)核心数据结构RDD (二)RDD上的操作 (三)RDD的特性 (四)RDD 的持

    2024年02月04日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包