Spark的reduceByKey方法使用

这篇具有很好参考价值的文章主要介绍了Spark的reduceByKey方法使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、需求

在ODPS上我们有如下数据:

id category_id attr_id attr_name attr_value
205348 10000046 2 最优粘度 ["0W-40"]
205348 10000046 1 基础油类型 ["全合成"]
205348 10000046 3 级别 ["BMW Longlife 01"]

我们希望得到的结果如下:

(205348, 10000046, "基础油类型:全合成\n最优粘度:0W-40\n级别:BMW Longlife 01\n")

需求解读:

需要将(id, category_id)作为key,然后将(attr_id, attr_name, attr_value)进行reduce操作,在reduce之后的数据中对attr_id进行排序,再将attr_name和attr_value合并在一起。

二、reduce操作之字符串方式

这个是最简单的方式,大致思路如下:

首先,将(id, category_id)作为key。

然后,将attr_id、attr_name、attr_value合并成一个字符串attr_info:attr_id + "#" + attr_name + "#" + attr_value,然后attr_info再通过"&"进行合并。

示例代码如下:

xx.map{case(id, category_id, attr_id, attr_name, attr_value) => ((id, category_id), attr_id + "#" + attr_name + "#" + attr_value)}
	.reduceByKey(_ + "&" + _, 100)

然后在接下来的流程中首先split("#")得到不同的attr信息,再通过split("&")得到不同的attr的列信息。这就要求attr_id,attr_name,attr_value中不能包含"#"和"&"字符串。

所以这种方式有缺陷,就是当attr_id,attr_name,attr_value包含了"#"和"&"字符串时需要先replace一下,这样就改变了原数据的值。

三、reduce操作之列表方式

这种方式相对复杂一点,需要对输入数据进行预处理,但是逻辑清晰。

输入数据中(id, category_id)是key保持不变,(item_id, item_name, item_value)是一组tuple。

reduce操作会在同一个partition中,不同的partition之间进行数据合并,这要求数据的输入、输出类型保持不变

spark reducebykey用法,spark,spark,java,ajax

我们的初步想法:将item_id, item_name, item_value分别放到3个列表中,合并时就是列表之间的合并,合并完毕后使用时只需要遍历列表即可。

因为reduce操作的输入、输出类型不能变化,所以先放item_id, item_name, item_value初始化为一个列表,然后再进行列表之间的合并。

示例代码如下:

xx.map{case(id, category_id, attr_id, attr_name, attr_value) => 
	  val itemIdList = new ArrayList[Long]()
	  itemIdList.add(attr_id)
	  val itemNameList = new ArrayList[String]()
	  itemNameList.add(attr_name)
	  val itemValueList = new ArrayList[String]()
	  itemValueList.add(attr_value)
	  ((id, category_id), (itemIdList, itemNameList, itemValueList))

}.reduceByKey((x, y) => {
	  val itemIdList = new ArrayList[Long]()
	  for(i <- 0 until x._1.size()){
		itemIdList.add(x._1.get(i))
	  }
	  for(i <- 0 until y._1.size()){
		itemIdList.add(y._1.get(i))
	  }

	  val itemNameList = new ArrayList[String]()
	  for(i <- 0 until x._2.size()){
		itemNameList.add(x._2.get(i))
	  }
	  for(i <- 0 until y._2.size()){
		itemNameList.add(y._2.get(i))
	  }

	  val itemValueList = new ArrayList[String]()
	  for(i <- 0 until x._3.size()){
		itemValueList.add(x._3.get(i))
	  }
	  for(i <- 0 until y._3.size()){
		itemValueList.add(y._3.get(i))
	  }

	  (itemIdList, itemNameList, itemValueList)
}, 100)

再简单一点如下示例:

 

carCaseRawInfo.map(x => {
      val stepInfoList = new util.ArrayList[(Long, String, String, String)]()
      stepInfoList.add((x._4, x._5, x._6, x._7))

      ((x._1, x._3, x._3), stepInfoList)
    })
      .reduceByKey((x, y) => {
        val stepInfoList = new ArrayList[(Long, String, String, String)]()
        for(i <- 0 until x.size()){
          stepInfoList.add(x.get(i))
        }
        for(i <- 0 until y.size()){
          stepInfoList.add(y.get(i))
        }

        stepInfoList
      }, GlobalConfig.DEFAULT_PARTITIONS_NUM)

四、reduce之partition属性

首先提一下Shuffle过程,它的本意是洗牌、混乱的意思,类似于java中的Colletions.shuffle(List)方法,它会随机地打乱参数list里地元素顺序。MapReduce的Shuffle过程大致可以理解成:数据从map task输出到reduce task输入的这段过程。

而partition过程:分割map每个节点的结果,按照key分别映射给不同的reduce,这个是可以自定义的。

通过设置reduce中的numPartitions值,会在reduce操作之后进行repartition,避免数据不均衡堆在一个partition中。

五、reduceByKey和groupByKey的区别

从 shuffle 的角度: reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。

从功能的角度: reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey 。reduceByKey的分区内和分区间的计算规则是一样的文章来源地址https://www.toymoban.com/news/detail-852610.html

到了这里,关于Spark的reduceByKey方法使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java语言在Spark3.2.4集群中使用Spark MLlib库完成XGboost算法

    XGBoost是一种基于决策树的集成学习算法,它在处理结构化数据方面表现优异。相比其他算法,XGBoost能够处理大量特征和样本,并且支持通过正则化控制模型的复杂度。XGBoost也可以自动进行特征选择并对缺失值进行处理。 1、导入相关库 2、加载数据 3、准备特征向量 4、划分

    2023年04月12日
    浏览(27)
  • Java语言在Spark3.2.4集群中使用Spark MLlib库完成朴素贝叶斯分类器

    贝叶斯定理是关于随机事件A和B的条件概率,生活中,我们可能很容易知道P(A|B),但是我需要求解P(B|A),学习了贝叶斯定理,就可以解决这类问题,计算公式如下:     P(A)是A的先验概率 P(B)是B的先验概率 P(A|B)是A的后验概率(已经知道B发生过了) P(B|A)是

    2023年04月12日
    浏览(24)
  • 【Spark原理系列】Accumulator累加器原理用法示例源码详解

    源自专栏《SparkML:Spark ML系列专栏目录》 Accumulator是Spark中的一种分布式变量,用于在并行计算中进行累加操作。它是由MapReduce模型中的“全局计数器”概念演化而来的。 Accumulator提供了一个可写的分布式变量,可以在并行计算中进行累加操作。在Spark中,当一个任务对Accum

    2024年03月14日
    浏览(51)
  • Spark SQL示例用法所有函数示例权威详解一【建议收藏】

    Spark中所有功能的入口点是 SparkSession 类。要创建一个基本的 SparkSession ,只需使用 SparkSession.builder() : 完整示例代码可在Spark存储库的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”中找到。 在Spark 2.0中, SparkSession 提供了 对Hive功能的内置支持 ,包括 使用Hi

    2024年02月05日
    浏览(40)
  • 【Spark ML系列】Frequent Pattern Mining频繁挖掘算法功能用法示例源码论文详解

    挖掘频繁项、项集、子序列或其他子结构通常是分析大规模数据集的首要步骤,在数据挖掘领域已经成为一个活跃的研究课题。我们建议用户参考维基百科上关于关联规则学习的相关信息。 FP-growth算法在《Han et al., Mining frequent patterns without candidate generation》一文中进行了描述

    2024年02月19日
    浏览(27)
  • 【spark】java类在spark中的传递,scala object在spark中的传递

    记录一个比较典型的问题,先讲一下背景,有这么一个用java写的类 然后在spark中使用的时候: 原因: scala的object对应的就是java的静态成员,可以反过来理解java的所有静态成员可被抽取成伴生对象(虽然现实中是scala最终编译成java)。以上面的JavaClass0 例子可理解为等价的

    2024年02月11日
    浏览(26)
  • Spark内存资源分配——spark.executor.memory等参数的设置方法

    基于论坛上一些关于spark内存设置的文章,我对一个项目中实际运行的任务进行了内存参数分析和优化。如果要了解更多详细设置原理,可见文末的参考文章链接。 已知内存分配存在通过用户提交的参数设置进行静态分配,和yarn进行动态分配两种,所以本文对两种状况都根据

    2023年04月13日
    浏览(101)
  • Spark数据倾斜及解决方法

    数据倾斜是指少量的Task运行大量的数据,可能会导致OOM。数据过量是所有的Task都很慢。避免数据倾斜的方式主要有: 按照Key分组后,一组数据拼接成一个字符串,这样一个Key只有一条数据了。这个方式个人觉得有点僵硬。 增大或缩小Key的粒度:增大粒度一个Key包含更多的数

    2024年02月15日
    浏览(26)
  • Spark Executor端日志打印的方法

    大数据平台采用yarn client模式提交spark 任务,并且多个离线Spark作业共用一个Driver,好处便在于——节省提交任务的时间。但同时也加大了运维工作的难度,因为任务日志打印到同一个文件中。 为了区分开各个业务流程的日志,平台引入了log4j2 RoutingAppender,配置如下所示:

    2023年04月22日
    浏览(23)
  • 深入理解Spark编程中的map方法

    先上结论:不拘泥于形式,给一个东西,一顿操作,返回一个东西。且对每一条数据都相同的处理,处理完会生成新的东西,不改变之前你传进来的东西。        这个东西你可以理解为任何形式的数据,如map里的,对象,各种形式文件等等。  当深入理解 map 方法时,我们

    2024年01月24日
    浏览(31)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包