hadoop --- MapReduce

这篇具有很好参考价值的文章主要介绍了hadoop --- MapReduce。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

MapReduce定义:

MapReduce可以分解为Map (映射) + Reduce (规约) , 具体过程: 

  1.  Map : 输入数据集被切分成多个小块,并分配给不同的计算节点进行处理
  2. Shuffle and Sort:洗牌和排序,在 Map 阶段结束后,将每个 Mapper 生成的键值对按照键进行排序,并将相同键的值归并在一起,并将相同的键发送给后续的reduce
  3. Reduce: 规约计算,每个计算节点独立处理它们的键值对,并生成最终的输出结果。

         MapReduce是一个分布式运算程序的编程框架,用于用户开发“基于Hadoop的数据分析应用”的核心框架。f核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

优点:

  1. 易于编程:用户只关心,业务逻辑。实现框架的接口。
  2. 良好扩展性:可以动态增加服务器,解决计算资源不够问题
  3. 高容错性:任何一台机器挂掉,可以将任务转移到其他节点。
  4. 并行处理:能够有效地利用集群中多个计算节点进行并行计算,提高处理速度。
  5. 适合海量数据计算(TB、PB)几千台服务器共同计算

缺点:

  1. 不擅长实时计算。Mysql
  2. 不擅长流式计算。Sparkstreaming flink
  3. 不擅长DAG有向无环图计算。Spark 

MapReduce架构: 

        MapReduce中,执行MapReduce任务的机器角色有两种: JobTracker 和 TaskTracker, 其中JobTracker 用于任务调度, TaskTracker用于执行任务。 一个Hadoop集群中, 只有一台JobTracker。

hadoop --- MapReduce,大数据,hadoop,mapreduce,大数据

         当Client向JobTracker提交作业时, JobTracker会讲作业拆分到多个TaskTracker去执行, TaskTracker会定时发送心跳信息,如果一段时间JobTracker未收到TaskTracker的心跳信息,则认定该TaskTracker出现故障, 会讲该TaskTracker的任务分配给其他TackTracker。

MapReduce执行过程: 

hadoop --- MapReduce,大数据,hadoop,mapreduce,大数据

  1.  客户端启动一个job
  2. 客户端向JobTracker请求一个JobID
  3. JobClient讲运行作业所需要的资源复制到HDFS上, 包括jar文件、配置文件、客户端计算所得的输入划分信息,并存档在以JobID 为名的文件夹中。
  4. JobClient 提交任务给JobTracker.
  5. JobTracker 调度作业,并根据输入划分信息为每一个划分创建一个map任务,并将map任务分配给taskTracker执行。【图中的5/6步骤】
  6. TaskTracker每隔一段时间给JobTracker发送一个Heartbeat告诉JobTracker它仍然在运行,同时心跳还携带很多比如map任务完成的进度等信息。当JobTracker收到作业的最后一个任务完成信息时,便把作业设置成“成功”,JobClient再传达信息给用户。

MapReduce并行度与调优

        MapReduce并行度是指在执行MapReduce任务时,同时处理的数据块的数量。它与系统资源的利用、任务执行效率和任务完成时间等方面密切相关。正确设置并行度可以提高任务的并行性和整体性能。

        并行度调优是通过合理设置MapReduce任务中的并行度参数来优化任务的执行效率和资源利用,并行度调优策略: 

  1. Map任务并行度:可以通过适当增加并行度来充分利用集群中的计算资源。可以通过调整参数mapreduce.job.maps来设置Map任务的数量。通常情况下,Map任务的数量可以设置为输入分片的数量或者集群中可用的计算槽位数,以充分利用集群资源。比如:

    1. 如果硬件配置为2*12core + 64G,恰当的map并行度是大约每个节点20-100个map,最好每个map的执行时间至少一分钟。
    2. 如果job的每个map或者 reduce task的运行时间都只有30-40秒钟,那么就减少该job的map或者reduce数,每一个task(map|reduce)的setup和加入到调度器中进行调度,这个中间的过程可能都要花费几秒钟,所以如果每个task都非常快就跑完了,就会在task的开始和结束的时候浪费太多的时间
  2. Reduce任务并行度:可以通过调整并行度来提高任务的执行效率。可以手动设置 job.setNumReduceTasks(num), 也可以通过调整参数mapreduce.job.reduces来设置Reduce任务的数量

    1. 如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜。
    2. ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask
    3. 尽量不要运行太多的ReduceTask,对大多数job来说,最好reduce的个数最多和集群中的reduce持平,或者比集群的 reduce slots小 ,这个对于小集群而言,尤其重要。
  3. Combiner的使用:Combiner是在Map阶段之后,在将数据传递给Reduce任务之前对中间结果进行合并的函数。通过合理设置Combiner的使用,可以减少数据传输和磁盘IO的开销,从而提高任务的执行效率。Combiner使用的原则是,有或没有都不能影响业务逻辑。

    1. 减少数据传输:Combiner可以将Map任务局部合并的结果发送给下一个阶段的Reduce任务,减少了网络传输的开销。
    2. 减少磁盘IO:Combiner的结果可以在Map任务本地进行合并,减少了磁盘读写的次数和数据量。
    3. 提高任务执行效率:通过减少数据传输和磁盘IO的开销,Combiner可以加快任务的执行速度,提高整体性能。
  4. 数据本地性优化:在设置并行度时,可以考虑数据的本地性,尽量将处理同一数据块的任务调度到该数据块所在的节点执行,以减少数据的网络传输开销。
  5. 资源调优:MapReduce任务的并行度调优还涉及到对集群资源的调优。可以通过增加节点的数量、增加计算和存储资源,或者调整任务执行时的内存分配等方式来提高并行度和任务的整体性能

   注意:      

        Combiner函数不是MapReduce框架的强制要求,其是否被使用由具体的MapReduce程序决定。且Combiner使用的前提是输入和输出类型要保持一致。可以通过实现自定义的Combiner函数或使用原始类型的内置Combiner函数(如Sum、Max、Min等)来实现合并操作。

Shuffle机制:

        Shuffle机制指在Map阶段输出后,将Mapper产生的中间键值对按照键进行分组和排序的过程,以及将相同键的值分发给相应的Reducer任务进行最终的聚合操作。

shuffle过程主要分三个步骤: 

  1. partition(分区):在Map阶段输出后, 中间键值对需要按照制定的分区进行划分,以确定哪些键值应该由哪个Reducer任务进行处理。默认的分区函数会根据键的hash值进行分区,确保相同键的数据被分到同一个Reducer上。
  2. sort(排序):分区完成后,相同分区的键值对将根据键进行排序,以保证相同的键值在排序后是连续的,方便后续的聚合操作。排序可以使用外排算法或者内排算法来实现,具体实现取决于系统的可用内存和数据量大小
  3. Combine and Transfer (合并和传输):排序后, 相同的键的值可被合并,并发送给对应的Reducer任务进行聚合操作。这里设计网络传输的过程,将数据从Map节点发送到对应的Reducer节点, 为了提高性能,可使用Combiner函数进行局部合并,减少数据传输的开销

   Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快 。

MapReduce wordCount的案例: 

hadoop数据类型
Java类型 Hadoop Writable类型
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable
Null NullWritabl

统计文档中单词出现的频次

1、引入pom依赖: 

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency> 

2、序列化类 Writable

【当前案例中并未使用到自定义的序列化接口】

Hadoop有自己的序列化机制--Writable, 相比于Java的序列化,hadoop的序列化更紧凑、快速、支持多语言。

Hadoop的序列化步骤: 

  1. 实现Writable接口
  2. 反序列化时需要调用无参构造,所以序列化对象必须要有无参构造
  3. 重写序列化方法write() 
  4. 重写反序列化方法readFidlds()
  5. 反序列化的顺序和序列化的顺序必须完全一致 
  6. 重写toString() ,将结果显示在文件中 
  7. 实现Comparable接口,将自定义的序列化对象放在key中传输
//1 实现Writable接口
@Data
public class FlowBeanWritable implements Writable, Comparable<FlowBeanWritable> {
    private long upFlow; 
    private long downFlow; 
    private long sumFlow; 
    //2 提供无参构造
    public FlowBeanWritable() { }
     
    //4 实现序列化和反序列化方法,注意顺序一定要保持一致
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
    }

    //5 重写ToString
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    // 6 如果作为Key传输,则还需要实现compareTo方法
    @Override
	public int compareTo(FlowBeanWritable o) {
		// 倒序排列,从大到小
		return this.sumFlow > o.getSumFlow() ? -1 : 1;
	}
}

3、编写Mapper 类,实现Mapper接口

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private Text outK = new Text();
    private IntWritable outV = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        // 1 获取一行并将其转成String类型来处理
        String line = value.toString();
        // 2 将String类型按照空格切割后存进String数组
        String[] words = line.split(" ");
        // 3 依次取出单词,将每个单词和次数包装成键值对,写入context上下文中供后续调用
        for (String word : words) {
            // 先将String类型,转为text,再包装成健值对
            outK.set(word);
            context.write(outK, outV);
        }
    }
}

Mapper<LongWritable, Text, Text, IntWritable> 泛型里面有四个类, 这里其实是两对键值对:

  1. LongWritable 、Text :表示输入数据,LongWritable表示数据的索引,类似于第几行数据; Text表示读取的文件内容。一般使用系统默认的键值对。
  2. Text、IntWritable: 表示输出数据, Text表示输入的单词, IntWritable表示该单词出现的次数。这个键值对需要根据业务需求来确定。

4、编写Reducer类,继承Reduce抽象类

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    IntWritable outV = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        outV.set(sum);
        //写出
        context.write(key,outV);
    }
}

Reducer<Text, IntWritable, Text, IntWritable>  泛型里面有四个类, 这里也是两对键值对:

  • Text, IntWritable:第一个键值对,要跟Mapper的输出泛型保持一致
  • Text, IntWritable:第二个键值对,表示输出的结果数据,因为这里要输出的是单词出现的次数,所以还是 Text、IntWritable 类型 

Reduce是每组会执行一次,就是相同的key是会分到同一组的,所以此处只需计算每个key的count叠加即可

5、编写Driver驱动类 文章来源地址https://www.toymoban.com/news/detail-555958.html


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取配置信息以及job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 关联当前Driver程序的jar
        job.setJarByClass(WordCountDriver.class);

        // 指定Mapper和Reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 设置输入、输出的k、v类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 将job提交给yarn运行
        Boolean result = job.waitForCompletion(Boolean.TRUE);
    }

}

到了这里,关于hadoop --- MapReduce的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【大数据】Hadoop_MapReduce➕实操(附详细代码)

    MapReduce是hadoop的核心组件之一,hadoop要分布式包括两部分,一是分布式文件系统hdfs,一是分布式计算框,就是mapreduce,二者缺一不可,也就是说,可以通过mapreduce很容易在hadoop平台上进行分布式的计算编程 sftp命令:Windows下登录Hadoop102 xftp root@hadoop102 , lcd 切换Windows路径,

    2024年02月01日
    浏览(37)
  • Hadoop mapreduce课程设计-全球历史平均气温数据分析

    文章目录 前言 一、工具介绍 二、mapreduce数据处理 1.数据集准备  2.要求:对不同洲的平均温度处理--得到各大洲的平均温度 2.1 mapper阶段 2.2 reduce阶段 2.3 分区 2.4 Driver阶段 3.结果展示  4.将数据放入mongodb数据库 4.1 ktr展示 4.2 mongodb数据展示 ​编辑  5.使用pandas和pyecharts将数据

    2024年02月03日
    浏览(49)
  • 大数据技术之Hadoop:MapReduce与Yarn概述(六)

    目录 一、分布式计算 二、分布式资源调度 2.1 什么是分布式资源调度 2.2 yarn的架构 2.2.1 核心架构 2.2.2 辅助架构 前面我们提到了Hadoop的三大核心功能:分布式存储、分布式计算和资源调度,分别由Hadoop的三大核心组件可以担任。 即HDFS是分布式存储组件,MapReduce是分布式计算

    2024年02月09日
    浏览(36)
  • 大数据面试题集锦-Hadoop面试题(三)-MapReduce

    你准备好面试了吗?这里有一些面试中可能会问到的问题以及相对应的答案。如果你需要更多的面试经验和面试题,关注一下\\\"张飞的猪大数据分享\\\"吧,公众号会不定时的分享相关的知识和资料。 目录 1、谈谈Hadoop序列化和反序列化及自定义bean对象实现序列化? 2、FileInputForma

    2024年02月11日
    浏览(55)
  • Hadoop学习:深入解析MapReduce的大数据魔力之数据压缩(四)

    压缩的优点:以减少磁盘IO、减少磁盘存储空间。 压缩的缺点:增加CPU开销。 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 1)压缩算法对比介绍 2)压缩性能的比较 压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否 可以

    2024年02月12日
    浏览(42)
  • Hadoop快速入门+MapReduce案例(赠送17到23年往年真题答案+MapReduce代码文件)-----大数据与人工智能比赛

    Hadoop的核心就是HDFS和MapReduce HDFS为海量数据提供了 存储 而MapReduce为海量数据提供了 计算框架 一.HDFS 整个HDFS有三个重要角色: NameNode (名称节点)、 DataNode (数据节点)和 Client (客户机) NameNode :是Master节点(主节点) DataNode : 是Slave节点(从节点),是文件存储的基本

    2024年02月20日
    浏览(50)
  • 大数据:Hadoop基础常识hive,hbase,MapReduce,Spark

    Hadoop是根据Google三大论文为基础研发的,Google 三大论文分别是: MapReduce、 GFS和BigTable。 Hadoop的核心是两个部分: 一、分布式存储(HDFS,Hadoop Distributed File System)。 二、分布式计算(MapReduce)。 MapReduce MapReduce是“ 任务的分解与结果的汇总”。 Map把数据切分——分布式存放

    2024年04月25日
    浏览(54)
  • 大型数据集处理之道:深入了解Hadoop及MapReduce原理

    在大数据时代,处理海量数据是一项巨大挑战。而Hadoop作为一个开源的分布式计算框架,以其强大的处理能力和可靠性而备受推崇。本文将介绍Hadoop及MapReduce原理,帮助您全面了解大型数据集处理的核心技术。 Hadoop简介 Hadoop是一个基于Google MapReduce论文和Google文件系统的分布

    2024年02月07日
    浏览(40)
  • 大数据技术之Hadoop:提交MapReduce任务到YARN执行(八)

    目录 一、前言 二、示例程序 2.1 提交wordcount示例程序 2.2 提交求圆周率示例程序 三、写在最后 我们前面提到了MapReduce,也说了现在几乎没有人再写MapReduce代码了,因为它已经过时了。然而不写代码不意味着它没用,当下很火的HIve技术就到了MapReduce,所以MapReduce还是相当重要

    2024年02月08日
    浏览(41)
  • 【云计算与大数据技术】大数据系统总体架构概述(Hadoop+MapReduce )

    企业级大数据应用框架需要满足业务的需求,一是要求能够满足基于数据容量大,数据类型多,数据流通快的大数据基本处理需求,能够支持大数据的采集,存储,处理和分析,二是要能够满足企业级应用在可用性,可靠性,可扩展性,容错性,安全性和隐私性等方面的基本

    2024年02月09日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包