分布式计算模型MapReduce
1.MapReduce设计思想 2.MapReduce分布式计算的基本原理 3.使用Java进行MapReduce编程 4.在Hadoop集群中提交MapReduce任务 5.Yarn工作机制 |
1. MapReduce设计思想
1.1 什么是MapReduce 1)MapReduce是一个分布式计算框架 它将大型数据操作作业分解为可以跨服务器集群并行执行的单个任务。 起源于Google 2)适用于大规模数据处理场景 每个节点处理存储在该节点的数据 3)每个job包含Map(分类kv)和Reduce(计算)两部分 |
1.2 MapReduce的设计思想 1)分而治之:简化并行计算的编程模型 2)构建抽象模型:Map和Reduce 开发人员专注于实现Mapper和Reducer函数 3)隐藏系统层细节:开发人员专注于业务逻辑实现 |
1.3 MapReduce特点 1)优点:易于编程;可扩展性;高容错性;高吞吐量 2)不适用领域:难以实时计算;不适合流式计算;不适合DGA(有向图)计算 |
1.4 MapReduce编程规范 1)MapReduce框架处理的数据格式是<K,V>键值对形式 Mapper Map端接收<K,V>键值对数据,经过处理输出新的<K,V>键值对 Map端处理逻辑写在Mapper类中map()方法中 2)Reducer Reduce端搜集多个Mapper端输出的<K,V>数据,进行汇总 Reducer的业务逻辑写在reduce()方法中 每一组相同k的<k,Iterator<v>>组调用一次reduce()方法 |
2. 使用Java进行MapReduce编程
3.1 WordCount功能实现 (1)物料准备:wordcount (2)WordCountMapper (3)WordCountReduce (4)WordCountDriver 执行后输出: |
3. Hadoop集群中提交MapReduce任务
Idea打包工程成jar包,执行命令 |
4. MapReduce分布式计算的基本原理
4.1 Hadoop序列化 什么是序列化:序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。 (1)必须可序列化(serializable) 作用:网络传输以及持久化存储 IntWritable、LongWriteable、FloatWritable、Text、DoubleWritable, BooleanWritable、NullWritable等 (2)都继承了Writable接口 并实现write()和readFields()方法 (3)Keys必须实现WritableComparable接口 MapReduce框架会按照Key进行排序 Reduce阶段需要sort keys需要可比较 |
4.2 MapReduce框架原理 MapReduce执行流程: (1)split阶段:计算分片 (2)map阶段:调用map()方法对数据进行处理 (3)shffule阶段:主要负责将map端生成的数据传递给reduce端 (4)reduce阶段:对Shffule阶段传来的数据进行最后的整理合并 |
4.3 MapTask (1)Read阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个key/value。 (2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。 (3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。 (4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。 溢写阶段详情: 步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。 步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。 步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。 (5)Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。 当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。 在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。 让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。 |
4.4 ReduceTask (1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。 (2)Sort阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。 (3)Reduce阶段:reduce()函数将计算结果写到HDFS上。 |
4.5 InputFormat数据输入接口 切片与MapTask并行度决定机制1)问题引出 MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。 思考:1G的数据,启动8个MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度? 2)MapTask并行度决定机制 数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。 数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。 |
4.6 InputSplit(输入分片) |
4.7 Shuffle阶段 数据从Map输出到Reduce输入的过程 |
4.8 Combiner类 (1)Combiner相当于本地化的Reduce操作 在shuffle之前进行本地聚合 用于性能优化,可选项 输入和输出类型一致 (2)Reducer可以被用作Combiner的条件 符合交换律和结合律 (3)实现Combiner job.setCombinerClass(WCReducer.class) |
4.9 Partitioner类 (1)用于在Map端对key进行分区 默认使用的是HashPartitioner 获取key的哈希值 使用key的哈希值对Reduce任务数求模 决定每条记录应该送到哪个Reducer处理 (2)自定义Partitioner 继承抽象类Partitioner,重写getPartition方法 job.setPartitionerClass(MyPartitioner.class) |
5. MapReduce实现 SQL Join操作
4.1 map端join 1)使用场景 Map Join适用于一张表十分小、一张表很大的场景。 2)优点 思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办? 在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。 3)具体办法:采用DistributedCache (1)在Mapper的setup阶段,将文件读取到缓存集合中。 (2)在Driver驱动类中加载缓存。 //缓存普通文件到Task运行节点。 job.addCacheFile(new URI("file:///e:/cache/pd.txt")); //如果是集群运行,需要设置HDFS路径 job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt")); |
4.2 reduce端join Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。 Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。 |
6. Yarn
Yarn工作机制 (1)MR程序提交到客户端所在的节点。 (2)YarnRunner向ResourceManager申请一个Application。 (3)RM将该应用程序的资源路径返回给YarnRunner。 (4)该程序将运行所需资源提交到HDFS上。 (5)程序资源提交完毕后,申请运行mrAppMaster。 (6)RM将用户的请求初始化成一个Task。 (7)其中一个NodeManager领取到Task任务。 (8)该NodeManager创建容器Container,并产生MRAppmaster。 (9)Container从HDFS上拷贝资源到本地。 (10)MRAppmaster向RM 申请运行MapTask资源。 (11)RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。 (12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。 (13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。 (14)ReduceTask向MapTask获取相应分区的数据。 (15)程序运行完毕后,MR会向RM申请注销自己。 HDFS、YARN、MapReduce三者关系 作业提交过程之HDFS & MapReduce 作业提交全过程详解 (1)作业提交 第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。 第2步:Client向RM申请一个作业id。 第3步:RM给Client返回该job资源的提交路径和作业id。 第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。 第5步:Client提交完资源后,向RM申请运行MrAppMaster。 (2)作业初始化 第6步:当RM收到Client的请求后,将该job添加到容量调度器中。 第7步:某一个空闲的NM领取到该Job。 第8步:该NM创建Container,并产生MRAppmaster。 第9步:下载Client提交的资源到本地。 (3)任务分配 第10步:MrAppMaster向RM申请运行多个MapTask任务资源。 第11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。 (4)任务运行 第12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。 第13步:MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。 第14步:ReduceTask向MapTask获取相应分区的数据。 第15步:程序运行完毕后,MR会向RM申请注销自己。 (5)进度和状态更新 YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。 (6)作业完成文章来源:https://www.toymoban.com/news/detail-695883.html 除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。文章来源地址https://www.toymoban.com/news/detail-695883.html |
到了这里,关于hadoop-MapReduce的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!