Hadoop生态之Mapreduce

这篇具有很好参考价值的文章主要介绍了Hadoop生态之Mapreduce。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

今天给大家带来的是Hadoop生态中的Mapreduce,看到这里诸佬们可能就有疑惑了呢,啥是Mapreduce?小小的脑袋大大的疑惑。
输入分片映射重排合并输出,hadoop,mapreduce,云原生,大数据,分布式
在上篇博客中博主使用了王者来举例子,如果把Hadoop当作王者的话,HDFS是后台存储点券数据的系统的话,那么我们今天介绍的Mapreduce就是某者用来计算优惠力度,并且计算游戏里最终到账的点券。(虽然博主不怎么充钱)
输入分片映射重排合并输出,hadoop,mapreduce,云原生,大数据,分布式

1.MapReduce概述

1.1 MapReduce定义

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。

MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

1.2 MapReduce优缺点

1.2.1 优点

1)MapReduce易于编程
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。
2)良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
3)高容错性
MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。
4)适合PB级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。

1.2.2 缺点

1)不擅长实时计算
MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。
2)不擅长流式计算
流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。
3)不擅长DAG(有向无环图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

2.MapReduce的运行机制

诸佬们从MapReduce的名字以及上面的介绍中,应该也可以知道,MapReduce实现中最重要的两个概念:Map和Reduce。

Map
Map的任务是:处理原始数据、为数据打标签、对数据进行分发(严格来说这并不完全是map的职责)

处理原始数据
这一阶段是对原始数据进行预处理的阶段,可以从行和列两个角度来考虑。

行:比如我们需要对数据按照时间过滤,只选择本周一的数据,其他数据过滤掉不处理。

列:比如原始数据有10列,我们只需要其中的5列,其他列过滤掉不处理。

举例:

假如我的HDFS上有一周的支出数据,我们想统计周一周二的支出情况,接下来我们会一步步解释这个过程。下图是其中一部分记录:

输入分片映射重排合并输出,hadoop,mapreduce,云原生,大数据,分布式

map:处理原始数据
可以看出过滤掉了非周一周二的数据,并且删除了使用人的字段。

为数据打标签
map处理完原始数据之后,接下来就要将数据分组,从而分配给合适的reduce去处理,分组的第一步就是打标签。
输入分片映射重排合并输出,hadoop,mapreduce,云原生,大数据,分布式

map:为数据打标签
可以看出,对每一条数据加了一条对应天数的标签。

对数据进行分发
打完标签之后,就需要对数据进行分发,严格来说,这并不完全属于Map的职责,其中也用到了一个神秘的中间环节:shuffle。不过入门来看,我们就单纯任务这属于Map。

分发的意思是,打完标签之后,要对数据进行分类处理,然后再发送给Reduce;分类的依据,就是上面对其打的自定义标签。
输入分片映射重排合并输出,hadoop,mapreduce,云原生,大数据,分布式

map:分发有标签的数据
可以看出,对每一条数据,按照标签分配,由原来的一个列表,变成了现在的两个列表。

Map阶段到此完成,接下来的任务就是要等着Reduce来取数了。

Reduce
Reduce的任务是:拉取Map分类好的数据(这也并不完全是Reduce的职责)、执行具体的计算

拉取Map分类好的数据
之前说到,Map已经将数据分类,我们直接拉取Reduce需要的数据就好了;但是要注意的是,我们是在一个分布式的环境中执行的任务,所以,Reduce的数据来源可能是多个Map中属于自己的块。
输入分片映射重排合并输出,hadoop,mapreduce,云原生,大数据,分布式

reduce:获取map分发的数据
可以看到,Reduce按照Map分类的key拉取到了自己应该处理的当日数据。

执行具体的计算
Reduce在拿到所有自己的数据之后,接下来就可以执行自定义的计算逻辑了,最简单的就是计数、去重。
输入分片映射重排合并输出,hadoop,mapreduce,云原生,大数据,分布式

reduce:执行具体的计算
可以看到,Reduce已经完成了所需要的单日支出计算功能。

PS:
Map和Reduce的职责并不是完全绝对的,比如过滤操作可以在Map,也可以在Reduce,只是因为在Map做可以减少传输的数据量,减少网络IO压力和时间消耗,所以做了上述的分工。

3. Hadoop序列化

3.1 序列化概述

1)什么是序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
2)为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
3)为什么不用Java的序列化
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。
4)Hadoop序列化特点:
(1)紧凑 :高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)互操作:支持多语言的交互

3.2 自定义bean对象实现序列化接口(Writable)

在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。
具体实现bean对象序列化步骤如下7步。
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() {
	super();
}

(3)重写序列化方法

@Override
public void write(DataOutput out) throws IOException {
	out.writeLong(upFlow);
	out.writeLong(downFlow);
	out.writeLong(sumFlow);
}

(4)重写反序列化方法

@Override
public void readFields(DataInput in) throws IOException {
	upFlow = in.readLong();
	downFlow = in.readLong();
	sumFlow = in.readLong();
}

(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用。
(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。。

@Override
public int compareTo(FlowBean o) {
	// 倒序排列,从大到小
	return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

诸佬们如果想深入理解序列化案例,可以参考硅谷的经典wordcount案例,有时候经常阅读源码也是一个非常好的习惯呢
输入分片映射重排合并输出,hadoop,mapreduce,云原生,大数据,分布式

4.MapReduce的框架原理

Hadoop 划分工作为任务。有两种类型的任务:
Map 任务 (分割及映射)
Reduce 任务 (重排,还原)
如上所述完整的执行流程(执行 Map 和 Reduce 任务)是由两种类型的实体的控制,称为Jobtracker : 就像一个主(负责提交的作业完全执行)
多任务跟踪器 : 充当角色就像从机,它们每个执行工作
对于每一项工作提交执行在系统中,有一个 JobTracker 驻留在 Namenode 和 Datanode 驻留多个 TaskTracker。

输入分片映射重排合并输出,hadoop,mapreduce,云原生,大数据,分布式

4.1 InputFormat数据输入

4.1.1 切片与MapTask并行度决定机制

1)问题引出
MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。
思考:1G的数据,启动8个MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?
2)MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。
输入分片映射重排合并输出,hadoop,mapreduce,云原生,大数据,分布式

4.1.2 Job提交流程源码和切片源码详解

1)Job提交流程源码详解
waitForCompletion()

submit();

// 1建立连接
	connect();	
		// 1)创建提交Job的代理
		new Cluster(getConfiguration());
			// (1)判断是本地运行环境还是yarn集群运行环境
			initialize(jobTrackAddr, conf); 

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)

	// 1)创建给集群提交数据的Stag路径
	Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

	// 2)获取jobid ,并创建Job路径
	JobID jobId = submitClient.getNewJobID();

	// 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);	
	rUploader.uploadFiles(job, jobSubmitDir);

	// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
		maps = writeNewSplits(job, jobSubmitDir);
		input.getSplits(job);

	// 5)向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
	conf.writeXml(out);

	// 6)提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

5.MapReduce开发总结

1)输入数据接口:InputFormat

(1)默认使用的实现类是:TextInputFormat
(2)TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。
(3)CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。

2)逻辑处理接口:Mapper

用户根据业务需求实现其中三个方法:map() setup() cleanup ()

3)Partitioner分区

(1)有默认实现 HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces
(2)如果业务上有特别的需求,可以自定义分区。

4)Comparable排序

(1)当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。
(2)部分排序:对最终输出的每一个文件进行内部排序。
(3)全排序:对所有数据进行排序,通常只有一个Reduce。
(4)二次排序:排序的条件有两个。

5)Combiner合并

Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。

6)逻辑处理接口:Reducer

用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()

7)输出数据接口:OutputFormat

(1)默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对,向目标文本文件输出一行。
(2)用户还可以自定义OutputFormat。

6.MapReduce常见面试题

最后博主再给诸佬奉上几个常见面试题希望大家能三连一波。
输入分片映射重排合并输出,hadoop,mapreduce,云原生,大数据,分布式

1. Mapreduce 的 map 数量 和 reduce 数量是由什么决定的 ,怎么配置

map数量是由任务提交时,传来的切片信息决定的,切片有多少,map数量就有多少
科普:什么是切片?切片的数量怎么决定?
举例:输入路径中有两个文件,a.txt(130M),b.txt(1M),切片是一块128M,但是不会跨越文件,每个文件单独切片,所以这个路径提交之后获得的切片数量是3,大小分别是128M,2M,1M

reduce的数量是可以自己设置的

2. MapReduce优化经验

设置合理的map和reduce的个数。合理设置blocksize
避免出现数据倾斜
combine函数
对数据进行压缩
小文件处理优化:事先合并成大文件,combineTextInputformat,在hdfs上用- mapreduce将小文件合并成SequenceFile大文件(key:文件名,value:文件内容)
参数优化

3. 分别举例什么情况要使用 combiner,什么情况不使用?

求平均数的时候就不需要用combiner,因为不会减少reduce执行数量,运行结果也会出错。在其他的时候,可以依据情况,使用combiner,来减少map的输出数量,减少拷贝到reduce的文件,从而减轻reduce的压力,节省网络开销,提升执行效率

4. MR运行流程解析

这些都是必须记住的,面试经常考

map操作
输入分片映射重排合并输出,hadoop,mapreduce,云原生,大数据,分布式

reduce操作
输入分片映射重排合并输出,hadoop,mapreduce,云原生,大数据,分布式

上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,

5. suffle阶段运行流程(必背)

输入分片映射重排合并输出,hadoop,mapreduce,云原生,大数据,分布式

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
具体Shuffle过程详解,如下:
(1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中
(2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
(3)多个溢出文件会被合并成大的溢出文件
(4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序
(5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
(6)ReduceTask会抓取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
(7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)
注意:
(1)Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。
(2)缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb默认100M。

6. map端的shuffle

(1)partition:

(1)过程:经过map函数处理后输出新的<key,value>,它首先被存储到环形缓冲区的kvbuffer,环形缓冲区默认是100M,并且对每个key/value对hash一个partition值,相同的partition值为同一个分区。

(2)作用:分区之后,每个reduce就会处理对应的partition,减少reduce的压力。

(2)sort/combiner/compress:

(1)过程:对环形缓冲区内的partition值和key值进行排序;如果用户设置了combiner,会对每个partition中的数据进行预处理,相当于是map端的reduce;如果用户设置了compress,会对combiner的数据进行压缩。

(2)作用:sort作用是在内部排序,减少reduce的压力;combiner作用是节省网络带宽和本地磁盘的IO;compress作用是减少本地磁盘的读写和减少reduce拷贝map端数据时的网络带宽。

(3)spill:

(1)过程:因为环形缓冲区的内存不够用,所以必须要写到本地磁盘中。将排序好的数据spill到本地磁盘中。

(2)作用:数据量非常大,全部放到内存不现实,所以最后还是会存到本地磁盘中。

(4)merge:

(1)过程:因为会产生多次spill,本身存放数据的out文件和存放数据偏移量索引index文件会产生多个,把多个文件合并在一起。

(2)作用:方便reduce的一次性拷贝。

7. reduce端的shuffle

(1)copy:

(1)过程:reduce拷贝map最终的输出的磁盘数据,一个reduce拷贝每个map节点的相同partition数据。

(2)作用:拷贝后的数据不止一份,先进行合并操作,为后面的排序做准备。

(2)merge:

(1)过程:reduce拷贝map最终的输出的磁盘数据,一个reduce拷贝每个map节点的相同partition数据。

(2)作用:拷贝后的数据不止一份,先进行合并操作,为后面的排序做准备。

(3)、sort:这里和map端的一样,但是reduce端的缓冲区更加灵活一点,如果内存够用,就是内存到内存的merge,不够用了就是内存到磁盘的merge,最后是磁盘到磁盘的merge。

(4)、group:将排序好的数据进行分组,分组默认是将相同的key的value放在一起。作用是为了reduce函数更好的计算相同key值出现的次数。

8.Split

8.1 分片概念

这里的分片只是逻辑分片,根据文件的字节索引进行分割。比如0—1MB位置定义为第一个分片,1MB-2MB定义为为第二个分片,依次类推……而原来的大文件还是原来的大文件,不会受到影响.
因此,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。

8.2 分片数量与Map Task数量的关系

Map Task的个数等于split的个数。 mapreduce在处理大文件的时候,会根据一定的规则,把大文件划分成多个分片,这样能够提高map的并行度。 划分出来的就是InputSplit,每个map处理一个InputSplit,因此,有多少个InputSplit,就有多少个map task。

8.3 由谁来划分分片?

主要是 InputFormat类 来负责划分Split。InputFormat类有2个重要的作用:

1)将输入的数据切分为多个逻辑上的InputSplit,其中每一个InputSplit作为一个map的输入。

2)提供一个RecordReader,用于将InputSplit的内容转换为可以作为map输入的k,v键值对。

FileInputFormat是InputFormat的子类,是使用比较广泛的类,输入格式如果是hdfs上的文件,基本上用的都是FileInputFormat的子类,如TextInputFormat用来处理普通的文件,SequceFileInputFormat用来处理Sequce格式文件。 FileInputFormat类中的getSplits(JobContext job)方法是划分split的主要逻辑。

8.4 分片的大小

每个输入分片的大小是固定的,默认为128M。

分片大小范围可以在mapred-site.xml中设置

8.5 默认分片大小与Block分块大小相同的原因是什么?

优点就是可以实现分块优化,减少网络传输数据,使用本地数据运行map任务。

如果分片跨越两个数据块的话,对于任何一个HDFS节点,分片中的另外一块数据就需要通过网络传输到Map任务节点,效率更低!文章来源地址https://www.toymoban.com/news/detail-783116.html

到了这里,关于Hadoop生态之Mapreduce的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Hadoop 2:MapReduce

    理解MapReduce思想 MapReduce的思想核心是“先分再合,分而治之”。 所谓“分而治之”就是把一个复杂的问题,按照一定的“分解”方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果,然后把各部分的结果组成整个问题的最终结果。 这种思想来源于日

    2024年02月06日
    浏览(45)
  • hadoop --- MapReduce

    MapReduce定义: MapReduce可以分解为Map (映射) + Reduce (规约) , 具体过程:   Map : 输入数据集被切分成多个小块,并分配给不同的计算节点进行处理 Shuffle and Sort:洗牌和排序,在 Map 阶段结束后,将每个 Mapper 生成的键值对按照键进行排序,并将相同键的值归并在一起,并将相

    2024年02月15日
    浏览(53)
  • hadoop之MapReduce简介

    MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。 简单说MapReduce是一个框架,一个分布式计算框架,

    2024年02月04日
    浏览(52)
  • 【Hadoop】MapReduce详解

    🦄 个人主页 ——🎐开着拖拉机回家_大数据运维-CSDN博客 🎐✨🍁 🪁🍁🪁🍁🪁🍁🪁🍁 🪁🍁🪁🍁🪁🍁🪁 🪁🍁🪁🍁🪁🍁🪁🍁🪁🍁🪁🍁 感谢点赞和关注 ,每天进步一点点!加油! 目录 一、MapReduce概述 1. 1 MapReduce 介绍 1.2 MapReduce 定义 1.3 MapReduce优缺点 1.2.1.优

    2024年02月05日
    浏览(51)
  • hadoop-MapReduce

    1. MapReduce设计思想 2. MapReduce分布式计算的基本原理 3. 使用Java进行MapReduce编程 4. 在Hadoop集群中提交MapReduce任务 5.Yarn工作机制 1. MapReduce设计思想 1.1  什么是MapReduce 1 )MapReduce是一个分布式计算框架 它将大型数据操作作业分解为可以跨服务器集群并行执行的单个任务。 起源于

    2024年02月10日
    浏览(42)
  • MapReduce排序机制(Hadoop)

    在MapReduce中, 排序的目的是为了方便Reduce阶段的处理,通常是为了将相同键的键值对聚合在一起,以便进行聚合操作或其他处理。 对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使 用率达到一定 阈值 后,再对缓冲区中的数据进行一次快速排序,并将这

    2024年04月24日
    浏览(34)
  • hadoop之mapreduce详解

         优化前我们需要知道hadoop适合干什么活,适合什么场景,在工作中,我们要知道业务是怎样的,能才结合平台资源达到最有优化。除了这些我们当然还要知道mapreduce的执行过程,比如从文件的读取,map处理,shuffle过程,reduce处理,文件的输出或者存储。在工作中,往往

    2024年02月15日
    浏览(44)
  • 【Hadoop】- MapReduce概述[5]

    目录 前言 一、分布式计算框架 - MapReduce 二、MapReduce执行原理 MapReduce是一种 分布式计算框架 ,由Google开发。它的设计目标是将大规模数据集的处理和生成任务分布到一个由廉价计算机组成的集群中。 在MapReduce模型中,输入数据被分割成若干小块,并在集群中的多个节点上并

    2024年04月25日
    浏览(34)
  • Hadoop之MapReduce概述

    MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。 MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。 MapReduce优缺点 优点 1)MapReduce易于编

    2024年02月08日
    浏览(57)
  • Hadoop MapReduce 调优参数

    前言: 下列参数基于 hadoop v3.1.3 版本,共三台服务器,配置都为 4 核, 4G 内存。 MapReduce 调优参数详解 这个参数定义了在 Reduce 阶段同时进行的拷贝操作的数量,用于从 Map 任务获取数据,增加此值可以加速 Shuffle 阶段的执行。 默认值: 5 建议配置: 10 定义了在 Reduce 阶段输

    2024年02月10日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包