Hadoop-MapReduce排序(超级详细)

这篇具有很好参考价值的文章主要介绍了Hadoop-MapReduce排序(超级详细)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

N.1 MapReduce的模型

————————————————————————文章来源地址https://www.toymoban.com/news/detail-768283.html

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

———————————————————————— 

(1)map

map task会从本地⽂件系统读取数据,转换成key-value形式的键值对集合。使⽤的是hadoop内置的数据类型,⽐如longwritable、text等。

(2)shuffle

[1] 溢出

[2] 分区:mapper的key-value在输出之后会进⾏⼀个partition分区操作,默认使⽤的是hashpartitioner,可以通过重写hashpartitioner的getpartition⽅法来⾃定义分区规则。

[3] 归并排序:会对key进⾏进⾏sort排序,grouping分组操作将相同key的value合并分组输出。当然,在这⾥可以使⽤⾃定义的数据类型,重写WritableComparator的Comparator⽅法来⾃定义排序规则

[4] 预合并排序

[5] reduce拉取的总归并排序:和归并排序排序一样

(3)reduce

reduce阶段的排序,负责接接收shuffle处理好的数据,直接循环迭代( key,valus{..} )即可。最后将数据保存或者显⽰,结束整个job。(当然这里要在做排序也是可以的)

N.2 WritableComparable排序概念

0)排序阶段的介绍_sefl来源百度:

(1)shuffle过程中执行过程中的排序(默认字典排序,默认是过group分组,一个对象单独一组),分别是:

[1] 溢出

[2] 分区阶段:根据分区以及key进行快速排序

[3] 预合并阶段:是在排序的基础上做合并,在排序。

[4] 归并阶段:将同一个分区的多个溢写文件进行归并排序,合成大的溢出文件。

[5] reduce拉取阶段总归并:会对不同的mapTask进行归并排序。

(2)reduce阶段的排序,负责接接收shuffle处理好的数据,直接循环迭代( key,valus{..} )即可。

1)排序的分类:

(1)部分排序(某一个分区,或者是部分文件排序):

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。

(2)全排序(就一个分区,且所有文件合在一起排序):

如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。

替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为上述文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。

(3)辅助排序(某一个分区,或者部分文件排序,这个一般涉及对象的排序):(GroupingComparator分组)执行默认的。

[1] 默认情况下,如果使用自定义对象排序,每new出一个就单独分成一组。

GroupingComparator是在reduce阶段分组来使用的,由于reduce阶段,如果key相同的一组,只取第一个key作为key,迭代所有的values。 如果reduce的key是自定义的对象,我们只需要bean里面的某个属性相同就认为这样的key是相同的,这是我们就需要之定义GroupCoparator来“欺骗”reduce了如果每一个对象key都是单独一组,那么有些状态数据就要传递了,就会增加复杂度。所以辅助排序,相当优化。

[2] 一般来说,大多数MapReduce程序会避免让reduce函数依赖于value的排序。但是,有时也需要通过特定的方法对key进行排序和分组等, 以实现对value的排序。

(4)二次排序(某一个分区,或者部分文件排序,这里是二次排序)

[1] 二次排序 就是对key进行第一与二次的排序,案例:如key是一个对象,对象有多个属性,而value是别的值,而使用二次排序可以对key 里面的属性进行二次排序。

[2] 在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序,如果是一个条件就是一次排序,定义一个类实现WritableComparable接口重写compareTo方法,就可以实现排序。

@Override

public int compareTo(FlowBean o) {

        // 倒序排列,从大到小

//FlowBean o表示后面的一个对象。-1表示交换位置,1表示不交换位置。

        return this.sumFlow > o.getSumFlow() ? -1 : 1;

}

N.3 二次排序

1)源数据:

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

————————————————————————

2)以下代码使用的是一个分区,输出也是一个文件。

package study190616_2;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean>{

private int id;

private double price;

//反序列化调用

public FlowBean(){

}

//比较器

public int compareTo(FlowBean o) {

//-1不交换位置(负数) 1交换位置(正数)

//比较原则 从上到下 “升”序排序,如果遇到相同的数据,就按后面的数据降序。

if(this.id>o.id){

return 1;

}else if(this.id<o.id){

return -1;

}else {

//自定义的二次排序,

return this.price>o.price? -1:1;

}

}

//序列化

public void write(DataOutput dataOutput) throws IOException {

dataOutput.writeInt(this.id);

dataOutput.writeDouble(this.price);

}

//反序列化

public void readFields(DataInput dataInput) throws IOException {

this.id = dataInput.readInt();

this.price = dataInput.readDouble();

}

public void setId(int id) {

this.id = id;

}

public double getPrice() {

return price;

}

public void setPrice(double price) {

this.price = price;

}

@Override

public String toString() {

return this.id+"\t"+this.price;

}

}

package study190616_2;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**

* KEYIN, VALUEIN, KEYOUT, VALUEOUT

* LongWritable,Text, FlowBean,NullWritable

* map 的输入 map的输出

*

* */

public class SortMap extends Mapper<LongWritable,Text,FlowBean,NullWritable> {

FlowBean f = new FlowBean();

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//获取数据

String line = value.toString();

//切分

String[] split = line.split("\t");

//赋值/数据封装

//数据行:0000001 Pdt_01 222.8

f.setId(Integer.parseInt(split[0]));

f.setPrice(Double.parseDouble(split[2]));

context.write(f,NullWritable.get()); //注意 这里是按照对象传到reduce ,而每一个对象是通过new出来的 ,所以地址不一样,在reduce接受的时候,每个对象不合在一起,它们的value也不会集中成一个迭代器,也就是说个对象都有自己的迭代器。所以后面的章节会学到赋值排序,让它们合起来

}

}

package study190616_2;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SortReduce extends Reducer<FlowBean,NullWritable,FlowBean,NullWritable> {

@Override

protected void reduce(FlowBean flowBean, Iterable <NullWritable> values, Context context) throws IOException, InterruptedException {

//输出

context.write(flowBean, NullWritable.get()); //这里的reduce自己写 因为每一个对象不一样 ,迭代器不会聚集值。

}

}

package study190616_2;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MainWritable {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

args=new String[]{"C:\\Users\\HMTX\\Desktop\\0616-压缩\\GroupingComparator.txt","C:\\Users\\HMTX\\Desktop\\0616-压缩\\t11"};

// 1 获取job信息

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

// 2 加载jar包

job.setJarByClass(MainWritable.class);

// 3 关联map和reduce

job.setMapperClass(SortMap.class);

job.setReducerClass(SortReduce.class);

// 4 设置最终输出类型

job.setMapOutputKeyClass(FlowBean.class);

job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(FlowBean.class);

job.setOutputValueClass(NullWritable.class);

// 5 设置输入和输出路径

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 6 提交

job.waitForCompletion(true);

}

}

3)远行结果

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

 ————————————————————————

N.4 GroupingComparator分组排序

1)如果使用某一个字段进行辅助排序,那么这个字段"必须"在之前"有过排序"的处理,所有"辅助"顾名思义就是在前者排序好的基础上发挥的作用, 单独使用的辅助排序 很可能生成的结果顺序是乱的,最好不要使用。而辅助里面在使用排序 一般 都跟之前的字段排序规则一样,其实辅助主要的不是排序,而是分组才是关键(可以认为写个排序的代码就是想分组)。所以 下面的案例 使用了二次排序作为辅助排序的基础。

2)源数据

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

————————————————————————

package study190616_3;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {

private int id;

private double price;

public OrderBean(){

}

//排序,先按ID排序,相等ID在价格排序

public int compareTo(OrderBean o) {

//-1不交换位置(负数) 1交换位置(正数)

if(this.id>o.id){

return 1;

}else if(this.id<o.id){

return -1;

}else {

return this.price>o.price? -1:1;

}

}

//序列化

public void write(DataOutput dataOutput) throws IOException {

dataOutput.writeInt(this.id);

dataOutput.writeDouble(this.price);

}

//返序列化

public void readFields(DataInput dataInput) throws IOException {

this.id = dataInput.readInt();

this.price = dataInput.readDouble();

}

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public double getPrice() {

return price;

}

public void setPrice(double price) {

this.price = price;

}

@Override

public String toString() {

return this.id+"\t"+this.price;

}

}

package study190616_3;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class OrderMap extends Mapper<LongWritable,Text,OrderBean, Text> {

//输入文件的原的行数据格式:0000002 Pdt_06 722.4

OrderBean f = new OrderBean();

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//获取数据

String line = value.toString();

//切分

String[] split = line.split("\t");

//赋值/数据封装

f.setId(Integer.parseInt(split[0]));

f.setPrice(Double.parseDouble(split[2]));

context.write(f,new Text(split[1]));

}

}

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class OrderReduce extends Reducer<OrderBean, Text,OrderBean,Text> {

@Override

protected void reduce(OrderBean flowBean, Iterable <Text> values, Context context) throws IOException, InterruptedException {

//输出

for(Text text:values){

//输出验证下

//System.out.println("key:" + flowBean.toString() + "," + "value:" + text.toString());

context.write(flowBean,text); //注意 这里的value 会合并成迭代器 ,因为后面的代码设置了分组

/**使用对象的某字段值分组,对象间的某字段的值相同,则这些对象就会组成一个"变化key",而value会聚集成迭代器,而这个变化key是根据遍历迭代器产生value"对应的对象做为key"。

如果不遍历迭代器,context.write(Bean,value)只使用一次 默认就是使用排序为第一的对象作为变量key

**/

}

//输出验证下

System.out.println("=========分隔符==========");

}

}

/**

注意这里 遍历的时候 没有跳出循环 对象却换了

* key:3 222.8,value:Pdt_01

* key:3 33.8,value:Pdt_02

* =========分隔符==========

* key:1 222.8,value:Pdt_01

* key:1 25.8,value:Pdt_06

* =========分隔符==========

* key:2 722.4,value:Pdt_05

* key:2 522.8,value:Pdt_03

* key:2 122.4,value:Pdt_04

* =========分隔符==========

当然 如果不遍历迭代器,就写一个

System.out.println("key:" + flowBean.toString() + "," + "value:" + text.toString());

那么输出的结构就是每一个组的第一个数据:

* key:3 222.8,value:Pdt_01

* =========分隔符==========

* key:1 222.8,value:Pdt_01

* =========分隔符==========

* key:2 722.4,value:Pdt_05

* =========分隔符=========

*/

package study190616_3;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;

public class OrderPartition extends Partitioner<OrderBean, Text> {

@Override

public int getPartition(OrderBean orderBean, Text text, int i) {

//&表示位运算(还可以做逻辑运算符),orderBean.getId() & 2147483647得出结果为1,1%3=0...1,所以余数为1

//有0、1 、2的返回值

return (orderBean.getId() & 2147483647) % i;

}

}

package study190616_3;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

public class GroupComparee extends WritableComparator{

//构造方法

protected GroupComparee(){

//调用父类方法,里面传入参数,true表示开启辅助排序

//调用父类方法的WritableComparator(Class<? extends WritableComparable> keyClass, boolean createInstances){..}

super(OrderBean.class,true);

}

@Override

public int compare(WritableComparable a, WritableComparable b) {

OrderBean oa = (OrderBean) a;

OrderBean ob = (OrderBean) b;

/**使用对象的某字段值分组,对象间的某字段的值相同,则这些对象就会组成一个"变化key,就是一个key",而value会聚集成迭代器,而这个变化key是根据遍历迭代器产生value"对应的对象做为key"。

如果不遍历迭代器,context.write(Bean,value)只使用一次 默认就是使用排序为第一的对象作为变量key

**/

//对key排序就是对key进行分组,在组排序

if(oa.getId()>ob.getId()){

return 1;

}else if(oa.getId()<ob.getId()){

return -1;

}else {

///相等

return 0;

}

}

}

package study190616_3;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MainOrder {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

args=new String[]{"C:\\Users\\HMTX\\Desktop\\0616-压缩\\GroupingComparator.txt","C:\\Users\\HMTX\\Desktop\\0616-压缩\\t111qq11"};

// 1 获取job信息

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

// 2 加载jar包

job.setJarByClass(MainOrder.class);

// 3 关联map和reduce

job.setMapperClass(OrderMap.class);

job.setReducerClass(OrderReduce.class);

// 4 设置最终输出类型

job.setMapOutputKeyClass(OrderBean.class);

job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(OrderBean.class);

job.setOutputValueClass(Text.class);

//指定辅助排序

job.setGroupingComparatorClass(GroupComparee.class);

//指定分区

job.setPartitionerClass(OrderPartition.class);

job.setNumReduceTasks(3);

// 5 设置输入和输出路径

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 6 提交

job.waitForCompletion(true);

}

}

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

————————————————————————

到了这里,关于Hadoop-MapReduce排序(超级详细)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MapReduce排序机制(Hadoop)

    在MapReduce中, 排序的目的是为了方便Reduce阶段的处理,通常是为了将相同键的键值对聚合在一起,以便进行聚合操作或其他处理。 对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使 用率达到一定 阈值 后,再对缓冲区中的数据进行一次快速排序,并将这

    2024年04月24日
    浏览(34)
  • Hadoop3教程(十四):MapReduce中的排序

    排序是MR中最重要的操作之一,也是面试中可能被问到的重点。 MapTask和ReduceTask中都会对数据按照KEY来排序,主要是为了效率,排完序之后,相同key值的数据会被放在一起,更方便下一步(如Reducer())的汇总处理。 默认排序是按照 字典顺序 (字母由小到大,或者是数字由小

    2024年02月07日
    浏览(68)
  • Hadoop之MapReduce 详细教程

    MapReduce 思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。 1、 Map 负责“分” ,即把复杂

    2024年02月03日
    浏览(67)
  • 【大数据】Hadoop_MapReduce➕实操(附详细代码)

    MapReduce是hadoop的核心组件之一,hadoop要分布式包括两部分,一是分布式文件系统hdfs,一是分布式计算框,就是mapreduce,二者缺一不可,也就是说,可以通过mapreduce很容易在hadoop平台上进行分布式的计算编程 sftp命令:Windows下登录Hadoop102 xftp root@hadoop102 , lcd 切换Windows路径,

    2024年02月01日
    浏览(39)
  • Hadoop3教程(十一):MapReduce的详细工作流程

    本小节将展示一下整个MapReduce的全工作流程。 首先是Map阶段: 首先,我们有一个待处理文本文件的集合; 客户端开始切片规划; 客户端提交各种信息(如切片规划文件、代码文件及其他配置数据)到yarn; yarn接收信息,计算所需的MapTask数量(按照切片数); MapTask启动,读

    2024年02月07日
    浏览(51)
  • 【云计算与大数据技术】Hadoop MapReduce的讲解(图文解释,超详细必看)

    MapReduce 是一种分布式计算框架,能够处理大量数据 ,并提供容错 、可靠等功能 , 运行部署在大规模计算集群中,MapReduce计算框架采用主从架构,由 Client、JobTracker、TaskTracker组成 用户编写 MapReduce程序,通过Client提交到JobTracker JobTracker负责管理运行的 TaskTracker节点;负责Job的调度

    2024年02月13日
    浏览(46)
  • 大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——MapTask工作机制

    MapTask工作机制如下图所示。 (1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。 (2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。 (3)Collect收集阶段:在用户编写map()函数中,当数据处

    2023年04月08日
    浏览(66)
  • Hadoop MapReduce解析

    Hadoop MapReduce是一个用于处理大量数据的编程模型和一个相应的实现框架。MapReduce作业通常分为两个阶段:Map阶段和Reduce阶段。 Map阶段 在Map阶段,你编写的Map函数会对输入数据进行处理。每个输入数据片段(例如一行文本)都会被Map函数处理,并产生中间键值对。 以单词计数

    2024年04月14日
    浏览(32)
  • 【Hadoop】MapReduce详解

    🦄 个人主页 ——🎐开着拖拉机回家_大数据运维-CSDN博客 🎐✨🍁 🪁🍁🪁🍁🪁🍁🪁🍁 🪁🍁🪁🍁🪁🍁🪁 🪁🍁🪁🍁🪁🍁🪁🍁🪁🍁🪁🍁 感谢点赞和关注 ,每天进步一点点!加油! 目录 一、MapReduce概述 1. 1 MapReduce 介绍 1.2 MapReduce 定义 1.3 MapReduce优缺点 1.2.1.优

    2024年02月05日
    浏览(51)
  • Hadoop 2:MapReduce

    理解MapReduce思想 MapReduce的思想核心是“先分再合,分而治之”。 所谓“分而治之”就是把一个复杂的问题,按照一定的“分解”方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果,然后把各部分的结果组成整个问题的最终结果。 这种思想来源于日

    2024年02月06日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包