Hadoop三大框架之MapReduce工作流程

这篇具有很好参考价值的文章主要介绍了Hadoop三大框架之MapReduce工作流程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、MapReduce基础

MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。

  • Map负责“分”,把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。

  • Reduce负责“合”,即对map阶段的结果进行全局汇总。

  • MapReduce运行在yarn集群。ResourceManager+NodeManager这两个阶段合起来就是MapReduce思想的体现。

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大数据,Powered by 金山文档

1.1 MapReduce设计构思

MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。

MapReduce设计并提供了一个同意的计算框架,为程序员隐藏了绝大多数系统层面的处理细节,为程序员提供了一个抽象和高层的编程接口和框架。程序员仅需要关心应用层的具体计算问题,仅需要编写少量的处理应用本身计算问题的程序代码。

Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现Map和Reduce,MapReduce处理的数据类型是<key,value>键值对。

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大数据,Powered by 金山文档

一个完整的MapReduce程序在分布式运行时有三类实例进程:

  1. MRAppMaster 负责整个程序的过程调度及状态协调

  1. MapTask 负责map阶段的整个数据处理流程

  1. ReduceTask 负责reduce阶段的整个数据处理流程

1.2 MapReduce工作原理

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大数据,Powered by 金山文档
mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大数据,Powered by 金山文档

1、分片操作:

FileInputstream,首先要计算切片大小,FileInputstream是一个抽象类,继承InputFormat接口,真正完成工作的是它的实现类,默认为是TextInputFormat,TextInputFormat是读取文件的,默认为一行一行读取,将输入文件切分为逻辑上的多个input split,input split是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念。

在进行Map计算之前,MapReduce会根据输入文件计算的切片数开启map任务,一个输入切片对应一个maptask,输入分片存储的并非数据本身,而是一个分片长度和一个记录数据位置的集合,每个input spilt中存储着该分片的数据信息如:文件块信息、起始位置、数据长度、所在节点列表等,并不是对文件实际分割成多个小文件,输入切片大小往往与hdfs的block关系密切,默认一个切片对应一个block,大小为128M;注意:尽管我们可以使用默认块大小或自定义的方式来定义分片的大小,但一个文件的大小,如果是在切片大小的1.1倍以内,仍作为一个片存储,而不会将那多出来的0.1单独分片。

2、数据格式化操作:

TextInputFormat 会创建RecordReader去读取数据,通过getCurrentkey,getCurrentvalue,nextkey,value等方法来读取,读取结果会形成key,value形式返回给maptask,key为偏移量,value为每一行的内容,此操作的作用为在分片中每读取一条记录就调用一次map方法,反复这一过程直到将整个分片读取完毕。

3、map阶段操作:

此阶段就是程序员通过需求偏写了map函数,将数据格式化的<K,V>键值对通过Mapper的map()方法逻辑处理,形成新的<k,v>键值对,通过Context.write输出到OutPutCollector收集器

map端的shuffle(数据混洗)过程:溢写(分区,排序,合并,归并)

溢写:

由map处理的结果并不会直接写入磁盘,而是会在内存中开启一个环形内存缓冲区,先将map结果写入缓冲区,这个缓冲区默认大小为100M,并且在配置文件里为这个缓冲区设了一个阀值,默认为0.8,同时map还会为输出操作启动一个守护线程,如果缓冲区内存达到了阀值0.8,这个线程会将内容写入到磁盘上,这个过程叫作spill(溢写)。

分区Partition

当数据写入内存时,决定数据由哪个Reduce处理,从而需要分区,默认分区方式采用hash函数对key进行哈布后再用Reduce任务数量进行取模,表示为hash(key)modR,这样就可以把map输出结果均匀分配给Reduce任务处理,Partition与Reduce是一一对应关系,类似于一个分片对应一个map task,最终形成的形式为(分区号,key,value)

排序Sort:

在溢出的数据写入磁盘前,会对数据按照key进行排序,默认采用快速排序,第一关键字为分区号,第二关键字为key。

合并combiner:

程序员可选是否合并,数据合并,在Reduce计算前对相同的key数据、value值合并,减少输出量,如(“a”,1)(“a”,1)合并之后(“a”,2)

归并menge

每块溢写会成一个溢写文件,这些溢写文件最终需要被归并为一个大文件,生成key对应的value-list,会进行归并排序<"a",1><"a",1>归并后<"a",<1,1>>。

Reduce 端的shffle

数据copy:map端的shffle结束后,所有map的输出结果都会保存在map节点的本地磁盘上,文件都经过分区,不同的分区会被copy到不同的Recuce任务并进行并行处理,每个Reduce任务会不断通过RPC向JobTracker询问map任务是否完成,JobTracker检测到map位务完成后,就会通过相关Reduce任务去aopy拉取数据,Recluce收到通知就会从Map任务节点Copy自己分区的数据此过程一般是Reduce任务采用写个线程从不同map节点拉取

归并数据

Map端接取的数据会被存放到 Reduce端的缓存中,如果缓存被占满,就会溢写到磁盘上,缓存数据来自不同的Map节点,会存在很多合并的键值对,当溢写启动时,相同的keg会被归并,最终各个溢写文件会被归并为一个大类件归并时会进行排序,磁盘中多个溢写文许归并为一个大文许可能需要多次归并,一次归并溢写文件默认为10个

4、Reduce阶段:

Reduce任务会执行Reduce函数中定义的各种映射,输出结果存在分布式文件系统中。

二、 MapReduce编程模型

2.1 编程模型概述

2.1.1 Map阶段2个步骤

  1. 设置InputFormat类,将数据切分为Key-Value(K1和V1)对,输入到第二步

  1. 自定义Map逻辑,将第一步的结果转换为另外的Key-Value(K2和V2)对,输出结果

2.1.2 Shuffle阶段4个步骤

  1. 对输出的Key-Value进行分区

  1. 对不同分区的数据按照相同的key排序

  1. (可选)对分组过的数据初步规约,降低数据的网络拷贝

  1. 对数据进行分组,相同的Key和Value放入一个集合中

2.1.3 Reduce阶段2个步骤

  1. 对多个Map任务的结果进行排序以及合并,编写Reduce函数实现自己的逻辑,对输入的Key-Value进行处理,转为新的Key-Value(K3和V3)输出

  1. 设置OutputFromat处理并保存Reduce输出的Key-Value数据

2.2 编程模型三部曲

(1)Input:一系列(K1,V1)。

(2)Map和Reduce:

Map:(K1,V1) -> list(K2,V2) (其中K2/V2是中间结果对)

Reduce:(K2,list(v2)) -> list(K3,V3)

(3)Output:一系列(K3,V3)。

三、词频统计WordCount

3.1 添加Maven依赖

# 将maven版本改为1.8
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

# 添加如下依赖
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>${hadoop-version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>${hadoop-version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>${hadoop-version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-common</artifactId>
    <version>${hadoop-version}</version>
</dependency>

3.2 代码实现

《wordcount.txt》

hello java
hello hadoop
hello java
hello java hadoop
java hadoop
hadoop java

Mapper类

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text text = new Text();
        IntWritable intWritable = new IntWritable();

        System.out.println("WordCountMap stage Key:"+key+" Value:"+value);
        String[] words = value.toString().split(" "); // "hello world"->[hello,world]
        for (String word : words) {
            text.set(word);
            intWritable.set(1);
            context.write(text,intWritable); // <hello,1> <word,1>
        }
    }
}

Reduce类

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReduce extends Reducer<Text, IntWritable,Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        System.out.println("Reduce stage Key:"+key+" Values"+values.toString());
        int count=0;
        for (IntWritable intWritable : values) {
            count += intWritable.get();
        }

        LongWritable longWritable = new LongWritable(count);
        System.out.println("Key:"+key+" ResultValue:"+longWritable.get());

        context.write(key,longWritable);
    }
}

Driver类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


import java.io.IOException;

public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(WordCountDriver.class);

        // 设置mapper类
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 设置Reduce类
        job.setReducerClass(WordCountReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 指定map输入的文件路径
        FileInputFormat.
                setInputPaths(job,new Path("D:\\Servers\\hadoopstu\\in\\wordcount.txt"));
        // 指定reduce结果的输出路径
        Path path = new Path("D:\\Servers\\hadoopstu\\out1");

        FileSystem fileSystem = FileSystem.get(path.toUri(), configuration);
        if(fileSystem.exists(path)){
            fileSystem.delete(path,true);
        }

        FileOutputFormat.setOutputPath(job,path);

        job.waitForCompletion(true);

    }
}

3.3 代码说明

3.3.1 对于map函数的方法

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大数据,Powered by 金山文档

protected void map(LongWritable key, Text value, Context context)

继承Mapper类,实现map方法,重写的map方法中包含三个参数,key、value就是输入的key、value键值对(<K1,V1>),context记录的是整个上下文,可以通过context将数据写出去。

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大数据,Powered by 金山文档

3.3.2 对于reduce函数的方法

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大数据,Powered by 金山文档

protected void reduce(Text key, Iterable<IntWritable> values, Context context)

继承Reduce类,实现reduce方法,reduce函数输入的是一个<K,V>形式,但是这里的value是以迭代器的形式Iterable<IntWritable> value。即,reduce的输入是一个key对应一组value。

reduce中context参数与map中的reduce参数作用一致。

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大数据,Powered by 金山文档

3.3.3 对于main函数的调用

mapreduce的工作流程,# MapReduce,hadoop,mapreduce,大数据,Powered by 金山文档
  1. 创建configuration()类,作用是读取MapReduce系统配置信息

  1. 创建job类

  1. 设置map函数、map函数输出的key、value类型

  1. 设置reduce函数、reduce函数输出的key、value类型,即最终存储再HDFS结果文件中的key、value类型文章来源地址https://www.toymoban.com/news/detail-786888.html

到了这里,关于Hadoop三大框架之MapReduce工作流程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——MapTask工作机制

    MapTask工作机制如下图所示。 (1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。 (2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。 (3)Collect收集阶段:在用户编写map()函数中,当数据处

    2023年04月08日
    浏览(65)
  • Hadoop三大“金刚”完美剖析 ─────── HDFS、MapReduce、YARN

    因为HDFS是分布式储存文件的模式,所以在储存文件的数据时,会将文件切分为大小一致的数据块, 如果出现文件大小不是128M的倍数时,那么最后一个文件会与之前切分文件大小不一致。 被切分成的数据块就是Block块,NameNode将Block块进行分布式储存到DataNode中。    (Block块

    2024年04月10日
    浏览(41)
  • Hadoop MapReduce 是如何工作的?

    作者:禅与计算机程序设计艺术 Hadoop MapReduce(以下简称MR)是一个分布式计算框架,基于Google开发,用于并行处理海量数据集。其提供简单、高效的数据处理能力,并可运行于多种平台上,广泛应用于数据分析领域。因此,掌握MR的原理及其工作方式对于利用它进行海量数据

    2024年02月10日
    浏览(37)
  • Hadoop-MapReduce-跟着日志理解整体流程

    vi input_01.txt vi input_02.txt vi input_03.txt 文本内容如下: -----------------input_01.txt---------------- java    scala   python c++     java    js go      go      vba c       c       c++ java    scala   python php     css     html js      java    java    scala   vba c#      .net R       R       R    

    2024年01月25日
    浏览(29)
  • Hadoop的第二个核心组件:MapReduce框架第四节

    MapReduce可以对海量数据进行计算,但是有些情况下,计算的结果可能来自于多个文件,每个文件的数据格式是不一致,但是多个文件存在某种关联关系,类似于MySQL中外键关系,如果想计算这样的结果,MR程序也是支持的。这种计算我们称之为join计算。 MR的join根据join数据的位

    2024年02月09日
    浏览(60)
  • Hadoop的第二个核心组件:MapReduce框架第三节

    InputFormat阶段 :两个作用 负责对输入的数据进行切片,切片的数据和Mapper阶段的MapTask的数量是相对应的。 负责MapTask读取切片数据时,如何将切片的数据转换成为Key-value类型的数据,包括key-value的数据类型的定义。 Mapper阶段 作用处理每一个切片数据的计算逻辑。 map方法的执

    2024年02月09日
    浏览(44)
  • Hadoop的第二个核心组件:MapReduce框架第一节

    Hadoop解决了大数据面临的两个核心问题:海量数据的存储问题、海量数据的计算问题 其中MapReduce就是专门设计用来解决海量数据计算问题的,同时MapReduce和HDFS不一样的地方在于,虽然两者均为分布式组件,但是HDFS是一个完善的软件,我们只需要使用即可,不需要去进行任何

    2024年02月09日
    浏览(32)
  • Hadoop的第二个核心组件:MapReduce框架第二节

    1、客户端在执行MR程序时,客户端先根据设置的InputFormat实现类去对输入的数据文件进行切片(getSplits),如果没有设置InputFormat实现类,MR程序会使用默认的实现类(TextInputFormat–FileInputFormat的子类)进行切片规划,生成一个切片规划文件 2、客户端的切片规划文件生成以后

    2024年02月09日
    浏览(37)
  • Hadoop入门学习笔记——四、MapReduce的框架配置和YARN的部署

    视频课程地址:https://www.bilibili.com/video/BV1WY4y197g7 课程资料链接:https://pan.baidu.com/s/15KpnWeKpvExpKmOC8xjmtQ?pwd=5ay8 Hadoop入门学习笔记(汇总) 本次YARN的部署结构如下图所示: 当前,共有三台服务器(虚拟机)构成集群,集群规划如下所示: 主机 部署的服务 node1 ResourceManager、N

    2024年02月04日
    浏览(44)
  • MapReduce是Hadoop的一个核心组件,它是一个编程模型和计算框架

    MapReduce是Hadoop的一个核心组件,它是一个编程模型和计算框架,用于处理和生成大数据集。MapReduce模型将大数据处理任务分解为两个阶段:Map阶段和Reduce阶段。在Map阶段,输入的数据被分割成一系列的键值对,然后通过用户定义的函数进行处理,生成中间的键值对。在Reduce阶

    2024年02月03日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包