Hadoop3教程(十四):MapReduce中的排序

这篇具有很好参考价值的文章主要介绍了Hadoop3教程(十四):MapReduce中的排序。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

(99)WritableComparable排序

什么是排序

排序是MR中最重要的操作之一,也是面试中可能被问到的重点。

MapTask和ReduceTask中都会对数据按照KEY来排序,主要是为了效率,排完序之后,相同key值的数据会被放在一起,更方便下一步(如Reducer())的汇总处理。

默认排序是按照字典顺序(字母由小到大,或者是数字由小到大)排序,且实现该排序的方法是快速排序

什么时候需要排序

MR的过程中,什么时候用到了排序呢?

Map阶段:

  • 环形缓冲区溢写到磁盘之前,会将每个分区内数据分别进行一个快排,这个排序是在内存中完成的;(对key的索引,按照字典顺序排列)
  • 环形缓冲区多轮溢写完毕后,会形成一堆文件,这时候会对这些文件做merge归并排序,我理解是单个MapTask最终会汇总形成一个文件;

Reduce阶段:

  • ReduceTask会主动拉取MapTask们的输出文件,理论上是会优先保存到内存里,但是往往内存里放不下,所以多数情况下会直接溢写到磁盘,于是我们会得到多个文件。当文件数量超过阈值,之后需要做归并排序,合并成一个大文件。如果是内存中的数据超过阈值,则会进行一次合并后将数据溢写到磁盘。当所有数据拷贝完后,ReduceTask会统一对内存和磁盘上的所有数据进行一次归并排序
  • 文件合并后其实还可以进行一个分组排序,过于复杂,这里就不介绍了。

排序有哪些分类

MR里的排序还有部分排序全排序辅助排序二次排序的不同说法,注意,它们之间不是像那种传统的排序算法之间的区别,只是当排序在不同场景的时候,分别起了个名字。

MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部是有序的,这就是部分排序

最终输出结果只有一个文件,且文件内部有序。这就是全排序

全排序的实现方式是只设置一个ReduceTask。但是这种方式在处理大型文件时效率很低很低,因为一台机器处理全部数据,完全没有利用MR所提供的并行架构的优势,生产环境上完全不适用。

所以生产环境里,常用的还是部分排序。

辅助排序,就是GroupingComparator分组。

这个似乎是可选的,是在Reduce阶段,Reducer在从Map阶段主动拉取完数据后,会对所有文件做一次归并排序。做完归并排序之后,理论上就可以进行辅助排序。

辅助排序有啥用呢,就是当接收到的Key是个bean对象时,辅助排序可以让一个或者几个字段相同的key(全部字段不相同)进入同一个Reduce(),所以也起名叫做分组排序。

二次排序比较简单,在自定义排序过程中,如果compareTo中的判断条件为两个,那它就是二次排序。

如何实现自定义排序

说到这里,那 如何实现自定义排序 呢?

如果是bean对象作为key传输,那需要实现WritableComparable接口,重写compareTo方法,就可以实现自定义排序。

@Override
public int compareTo(FlowBean bean) {

	int result;
		
	// 按照总流量大小,倒序排列
	if (this.sumFlow > bean.getSumFlow()) {
		result = -1;
	}else if (this.sumFlow < bean.getSumFlow()) {
		result = 1;
	}else {
		result = 0;
	}

	return result;
}

(100)全排序案例

案例需求

之前我们做过一个案例,输入文件有一个,里面放的是每个手机号的上行流量和下行流量,输出同样是一个文件,里面放的除了手机号的上行流量和下行流量之外,还多了一行总流量。

这时候我们提一个新需求,就是我不止要这个输出文件,我还要这个文件里的内容,按照总流量降序排列。

思路分析

MapReduce里,只能对Key进行排序。在先前的需求里,我们是用手机号作为key,上行流量、下行流量和总流量组成一个bean,作为value,这样的安排显然不适合新需求。

因此我们需要改变一下,将上行流量、下行流量和总流量组成的bean作为key,而将手机号作为value,如此来排序。

所以第一步,我们需要对我们自定义的FlowBean对象声明WritableComparable接口,并重写CompareTo方法,这一步的目的是使得FlowBean可进行算数比较,从而允许排序:

@Override
public int CompareTo(FlowBean o){
    // 按照总流量,降序排列
    return this.sumFlow > o.getSumFlow()?-1:1;
}

注意这里,因为Hadoop里默认的字典排序是从小到大排序,如果想实现案例里由大到小的排序,那么当大于的时候,就要返回-1,从而将大的值排在前面。

其次,Mapper类里:

context.write(bean, 手机号)

bean成了key,手机号成了value。

最后,Reduce类里,需要循环输出,避免出现总流量相同的情况。

for (Text text: values){
    context.write(text, key);	// 注意顺序,原先的key放在value位置
}

2023-7-19 11:16:04 这里没懂。。。

哦哦明白了,什么样的数据会进一个Reducer呢,当然是key 值相同的会进同一个,又因为我们之前compareTo的时候用的是总流量,所以最后是总流量相同的记录会送进同一个Reducer,然后汇总成一条记录做输出,毕竟reducer就是用来做汇总的。

但"汇总成一条记录"这并不是我们想要的,我们需要的是把这些数据原模原样输出来。这就是为什么我们在Reducer的reduce()里面,要加上循环输出的原因。

实际代码

贴一下教程里的代码实现:

首先是FlowBean对象,需要声明WritableComparable接口,并重写CompareTo()

package com.atguigu.mapreduce.writablecompable;

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow; //上行流量
    private long downFlow; //下行流量
    private long sumFlow; //总流量

    //提供无参构造
    public FlowBean() {
    }

    //生成三个属性的getter和setter方法
    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    //实现序列化和反序列化方法,注意顺序一定要一致
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(this.upFlow);
        out.writeLong(this.downFlow);
        out.writeLong(this.sumFlow);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    //重写ToString,最后要输出FlowBean
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    @Override
    public int compareTo(FlowBean o) {

        //按照总流量比较,倒序排列
        if(this.sumFlow > o.sumFlow){
            return -1;
        }else if(this.sumFlow < o.sumFlow){
            return 1;
        }else {
            return 0;
        }
    }
}

然后编写Mapper类:

package com.atguigu.mapreduce.writablecompable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
    private FlowBean outK = new FlowBean();
    private Text outV = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //1 获取一行数据
        String line = value.toString();

        //2 按照"\t",切割数据
        String[] split = line.split("\t");

        //3 封装outK outV
        outK.setUpFlow(Long.parseLong(split[1]));
        outK.setDownFlow(Long.parseLong(split[2]));
        outK.setSumFlow();
        outV.set(split[0]);

        //4 写出outK outV
        context.write(outK,outV);
    }
}

然后编写Reducer类:

package com.atguigu.mapreduce.writablecompable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        //遍历values集合,循环写出,避免总流量相同的情况
        for (Text value : values) {
            //调换KV位置,反向写出
            context.write(value,key);
        }
    }
}

最后编写驱动类:

package com.atguigu.mapreduce.writablecompable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class FlowDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1 获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //2 关联本Driver类
        job.setJarByClass(FlowDriver.class);

        //3 关联Mapper和Reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        //4 设置Map端输出数据的KV类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        //5 设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //6 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\inputflow2"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\comparout"));

        //7 提交Job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

完成,仅做了解即可。

(101)二次排序案例

二次排序的概念很简单,其实之前提过了,就是在自定义排序的时候,判断条件有两个。

比如说,原先我对一堆人排序,是按照身高从高到低排,但是身高一样的就没法排序了,这时候我可以再加入一个判断条件,比如说如果身高一样的话,就按体重排序。

具体就是修改FlowBean的CompareTo方法,在第一条件相等的时候,添加第二判定条件。

public int compareTo(FlowBean o) {

    //按照总流量比较,倒序排列
    if(this.sumFlow > o.sumFlow){
        return -1;
    }else if(this.sumFlow < o.sumFlow){
        return 1;
    }else {
        if (this.upFlow > o.upFlow){
            return 1;
        } else if (this.upFlow < o.upFlow){
            return -1;
        }
        else {
            return 0;
        }
        
    }
}

如果有需要的话,还可以继续加第三判定条件。

(102) 区内排序案例

还是之前的手机号案例,之前我们想要的是,只有一个文件,然后文件内所有数据按照总流量降序排列。

现在我们提出一个新要求,按照前3位来分区输出,比如说136的在一个文件里,137的在一个文件里,以此类推。而且每个文件内部,还需要按照总流量降序排列。

本质上就是之前说的分区 + 排序,这两部分的结合。需要额外定义好Partitioner类。

贴一下教程里的代码示例,其实只需要在上一小节的基础上补充自定义分区类即可:

首先自定义好分区类:

package com.atguigu.mapreduce.partitionercompable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner2 extends Partitioner<FlowBean, Text> {

    @Override
    public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
        //获取手机号前三位
        String phone = text.toString();
        String prePhone = phone.substring(0, 3);

        //定义一个分区号变量partition,根据prePhone设置分区号
        int partition;
        if("136".equals(prePhone)){
            partition = 0;
        }else if("137".equals(prePhone)){
            partition = 1;
        }else if("138".equals(prePhone)){
            partition = 2;
        }else if("139".equals(prePhone)){
            partition = 3;
        }else {
            partition = 4;
        }

        //最后返回分区号partition
        return partition;
    }
}

然后在驱动类里注册好分区器:

// 设置自定义分区器
job.setPartitionerClass(ProvincePartitioner2.class);

// 设置对应的ReduceTask的个数
job.setNumReduceTasks(5);

其他跟上一小节保持一致即可。文章来源地址https://www.toymoban.com/news/detail-724441.html

参考文献

  1. 【尚硅谷大数据Hadoop教程,hadoop3.x搭建到集群调优,百万播放】

到了这里,关于Hadoop3教程(十四):MapReduce中的排序的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Hadoop3教程(二十四):Yarn的常用命令与参数配置实例

    本章我是仅做了解,所以很多地方并没有深入去探究,用处估计不大,可酌情参考。 列出所有Application : yarn application -list 根据Application状态过滤出指定Application ,如过滤出已完成的Application: yarn application -list -appStates FINISHED Application的状态有:ALL、NEW、NEW_SAVING、SUBMITTED、

    2024年02月08日
    浏览(81)
  • Hadoop3.0大数据处理学习3(MapReduce原理分析、日志归集、序列化机制、Yarn资源调度器)

    前言:如果想知道一堆牌中有多少张红桃,直接的方式是一张张的检查,并数出有多少张红桃。 而MapReduce的方法是,给所有的节点分配这堆牌,让每个节点计算自己手中有几张是红桃,然后将这个数汇总,得到结果。 官方介绍:MapReduce是一种分布式计算模型,由Google提出,

    2024年02月08日
    浏览(54)
  • Hadoop3 - MapReduce COVID-19 案例实践

    上篇文章对 MapReduce 进行了介绍,并编写了 WordCount 经典案例的实现,本篇为继续加深 MapReduce 的用法,实践 COVID-19 新冠肺炎案例,下面是上篇文章的地址: https://blog.csdn.net/qq_43692950/article/details/127195121 COVID-19,简称“新冠肺炎”,世界卫生组织命名为“2019冠状病毒病” [1-

    2024年02月08日
    浏览(38)
  • 【大数据基础】Hadoop3.1.3安装教程

    来源: https://dblab.xmu.edu.cn/blog/2441/ 前言:重装解决一切bug!事实上,问题中的绝大部分衍生问题都可以通过重装解决。 创建Hadoop用户 首先按 ctrl+alt+t 打开终端窗口,输入如下命令创建新用户 : 接着使用如下命令设置密码,可简单设置为 hadoop,按提示输入两次密码: 可为

    2024年02月09日
    浏览(65)
  • Hadoop3教程(三十六):(生产调优篇)企业开发场景中的参数调优案例概述

    这章仅做兴趣了解即可。 需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4 3 3) 当然,这只是个案例演示,生产环境中一般是结合机器

    2024年02月08日
    浏览(42)
  • 大数据-安装 Hadoop3.1.3 详细教程-伪分布式配置(Centos7)

    **相关资源:**https://musetransfer.com/s/q43oznf6f(有效期至2023年3月16日)|【Muse】你有一份文件待查收,请点击链接获取文件 1.检查是否安装ssh (CentOS 7 即使是最小化安装也已附带openssh 可跳过本步骤) 若已安装进行下一步骤 若未安装 请自行百度 本教程不做过多讲解 2.配置ss

    2023年04月08日
    浏览(40)
  • MapReduce排序机制(Hadoop)

    在MapReduce中, 排序的目的是为了方便Reduce阶段的处理,通常是为了将相同键的键值对聚合在一起,以便进行聚合操作或其他处理。 对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使 用率达到一定 阈值 后,再对缓冲区中的数据进行一次快速排序,并将这

    2024年04月24日
    浏览(33)
  • Hadoop3教程(二十八):(生产调优篇)NN、DN的多目录配置及磁盘间数据均衡

    NN多目录的意思是,本地目录可以配置成多个,且每个目录存放内容相同,这样的目的是增加可靠性。比如说下图这样: 但其实生产中不常用哈, 生产中要增加NN的可靠性的话,一般会开启NN的高可用,即在不同节点上开启多个NN,靠zookeeper来协调 。 所以本节就 了解一下即可

    2024年02月08日
    浏览(42)
  • Hadoop-MapReduce排序(超级详细)

    ———————————————————————— ————————————————————————  (1)map map task 会从本地⽂件系统读取数据,转换成key-value形式的键值对集合。使⽤的是hadoop内置的数据类型,⽐如longwritable、text等。 (2)shuffle [1] 溢出 [2] 分区

    2024年02月03日
    浏览(44)
  • Hadoop(01) Hadoop3.3.6安装教程,单机/伪分布式配置

    在安装 Hadoop 3.3.6 前,需要满足以下前置条件: Java Development Kit (JDK):Hadoop 是用 Java 编写的,因此需要安装并配置适当版本的 JDK。Hadoop 3.3.6 建议使用 JDK 8 或更高版本。确保正确安装 JDK,并设置 JAVA_HOME 环境变量。 SSH:Hadoop 集群中的节点需要通过 SSH 进行通信和管理。确保在

    2024年02月06日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包