Hadoop-MapReduce排序(超级详细)

这篇具有很好参考价值的文章主要介绍了Hadoop-MapReduce排序(超级详细)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

N.1 MapReduce的模型

————————————————————————文章来源地址https://www.toymoban.com/news/detail-768283.html

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

———————————————————————— 

(1)map

map task会从本地⽂件系统读取数据,转换成key-value形式的键值对集合。使⽤的是hadoop内置的数据类型,⽐如longwritable、text等。

(2)shuffle

[1] 溢出

[2] 分区:mapper的key-value在输出之后会进⾏⼀个partition分区操作,默认使⽤的是hashpartitioner,可以通过重写hashpartitioner的getpartition⽅法来⾃定义分区规则。

[3] 归并排序:会对key进⾏进⾏sort排序,grouping分组操作将相同key的value合并分组输出。当然,在这⾥可以使⽤⾃定义的数据类型,重写WritableComparator的Comparator⽅法来⾃定义排序规则

[4] 预合并排序

[5] reduce拉取的总归并排序:和归并排序排序一样

(3)reduce

reduce阶段的排序,负责接接收shuffle处理好的数据,直接循环迭代( key,valus{..} )即可。最后将数据保存或者显⽰,结束整个job。(当然这里要在做排序也是可以的)

N.2 WritableComparable排序概念

0)排序阶段的介绍_sefl来源百度:

(1)shuffle过程中执行过程中的排序(默认字典排序,默认是过group分组,一个对象单独一组),分别是:

[1] 溢出

[2] 分区阶段:根据分区以及key进行快速排序

[3] 预合并阶段:是在排序的基础上做合并,在排序。

[4] 归并阶段:将同一个分区的多个溢写文件进行归并排序,合成大的溢出文件。

[5] reduce拉取阶段总归并:会对不同的mapTask进行归并排序。

(2)reduce阶段的排序,负责接接收shuffle处理好的数据,直接循环迭代( key,valus{..} )即可。

1)排序的分类:

(1)部分排序(某一个分区,或者是部分文件排序):

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。

(2)全排序(就一个分区,且所有文件合在一起排序):

如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。

替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为上述文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。

(3)辅助排序(某一个分区,或者部分文件排序,这个一般涉及对象的排序):(GroupingComparator分组)执行默认的。

[1] 默认情况下,如果使用自定义对象排序,每new出一个就单独分成一组。

GroupingComparator是在reduce阶段分组来使用的,由于reduce阶段,如果key相同的一组,只取第一个key作为key,迭代所有的values。 如果reduce的key是自定义的对象,我们只需要bean里面的某个属性相同就认为这样的key是相同的,这是我们就需要之定义GroupCoparator来“欺骗”reduce了如果每一个对象key都是单独一组,那么有些状态数据就要传递了,就会增加复杂度。所以辅助排序,相当优化。

[2] 一般来说,大多数MapReduce程序会避免让reduce函数依赖于value的排序。但是,有时也需要通过特定的方法对key进行排序和分组等, 以实现对value的排序。

(4)二次排序(某一个分区,或者部分文件排序,这里是二次排序)

[1] 二次排序 就是对key进行第一与二次的排序,案例:如key是一个对象,对象有多个属性,而value是别的值,而使用二次排序可以对key 里面的属性进行二次排序。

[2] 在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序,如果是一个条件就是一次排序,定义一个类实现WritableComparable接口重写compareTo方法,就可以实现排序。

@Override

public int compareTo(FlowBean o) {

        // 倒序排列,从大到小

//FlowBean o表示后面的一个对象。-1表示交换位置,1表示不交换位置。

        return this.sumFlow > o.getSumFlow() ? -1 : 1;

}

N.3 二次排序

1)源数据:

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

————————————————————————

2)以下代码使用的是一个分区,输出也是一个文件。

package study190616_2;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean>{

private int id;

private double price;

//反序列化调用

public FlowBean(){

}

//比较器

public int compareTo(FlowBean o) {

//-1不交换位置(负数) 1交换位置(正数)

//比较原则 从上到下 “升”序排序,如果遇到相同的数据,就按后面的数据降序。

if(this.id>o.id){

return 1;

}else if(this.id<o.id){

return -1;

}else {

//自定义的二次排序,

return this.price>o.price? -1:1;

}

}

//序列化

public void write(DataOutput dataOutput) throws IOException {

dataOutput.writeInt(this.id);

dataOutput.writeDouble(this.price);

}

//反序列化

public void readFields(DataInput dataInput) throws IOException {

this.id = dataInput.readInt();

this.price = dataInput.readDouble();

}

public void setId(int id) {

this.id = id;

}

public double getPrice() {

return price;

}

public void setPrice(double price) {

this.price = price;

}

@Override

public String toString() {

return this.id+"\t"+this.price;

}

}

package study190616_2;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**

* KEYIN, VALUEIN, KEYOUT, VALUEOUT

* LongWritable,Text, FlowBean,NullWritable

* map 的输入 map的输出

*

* */

public class SortMap extends Mapper<LongWritable,Text,FlowBean,NullWritable> {

FlowBean f = new FlowBean();

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//获取数据

String line = value.toString();

//切分

String[] split = line.split("\t");

//赋值/数据封装

//数据行:0000001 Pdt_01 222.8

f.setId(Integer.parseInt(split[0]));

f.setPrice(Double.parseDouble(split[2]));

context.write(f,NullWritable.get()); //注意 这里是按照对象传到reduce ,而每一个对象是通过new出来的 ,所以地址不一样,在reduce接受的时候,每个对象不合在一起,它们的value也不会集中成一个迭代器,也就是说个对象都有自己的迭代器。所以后面的章节会学到赋值排序,让它们合起来

}

}

package study190616_2;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SortReduce extends Reducer<FlowBean,NullWritable,FlowBean,NullWritable> {

@Override

protected void reduce(FlowBean flowBean, Iterable <NullWritable> values, Context context) throws IOException, InterruptedException {

//输出

context.write(flowBean, NullWritable.get()); //这里的reduce自己写 因为每一个对象不一样 ,迭代器不会聚集值。

}

}

package study190616_2;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MainWritable {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

args=new String[]{"C:\\Users\\HMTX\\Desktop\\0616-压缩\\GroupingComparator.txt","C:\\Users\\HMTX\\Desktop\\0616-压缩\\t11"};

// 1 获取job信息

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

// 2 加载jar包

job.setJarByClass(MainWritable.class);

// 3 关联map和reduce

job.setMapperClass(SortMap.class);

job.setReducerClass(SortReduce.class);

// 4 设置最终输出类型

job.setMapOutputKeyClass(FlowBean.class);

job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(FlowBean.class);

job.setOutputValueClass(NullWritable.class);

// 5 设置输入和输出路径

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 6 提交

job.waitForCompletion(true);

}

}

3)远行结果

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

 ————————————————————————

N.4 GroupingComparator分组排序

1)如果使用某一个字段进行辅助排序,那么这个字段"必须"在之前"有过排序"的处理,所有"辅助"顾名思义就是在前者排序好的基础上发挥的作用, 单独使用的辅助排序 很可能生成的结果顺序是乱的,最好不要使用。而辅助里面在使用排序 一般 都跟之前的字段排序规则一样,其实辅助主要的不是排序,而是分组才是关键(可以认为写个排序的代码就是想分组)。所以 下面的案例 使用了二次排序作为辅助排序的基础。

2)源数据

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

————————————————————————

package study190616_3;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {

private int id;

private double price;

public OrderBean(){

}

//排序,先按ID排序,相等ID在价格排序

public int compareTo(OrderBean o) {

//-1不交换位置(负数) 1交换位置(正数)

if(this.id>o.id){

return 1;

}else if(this.id<o.id){

return -1;

}else {

return this.price>o.price? -1:1;

}

}

//序列化

public void write(DataOutput dataOutput) throws IOException {

dataOutput.writeInt(this.id);

dataOutput.writeDouble(this.price);

}

//返序列化

public void readFields(DataInput dataInput) throws IOException {

this.id = dataInput.readInt();

this.price = dataInput.readDouble();

}

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public double getPrice() {

return price;

}

public void setPrice(double price) {

this.price = price;

}

@Override

public String toString() {

return this.id+"\t"+this.price;

}

}

package study190616_3;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class OrderMap extends Mapper<LongWritable,Text,OrderBean, Text> {

//输入文件的原的行数据格式:0000002 Pdt_06 722.4

OrderBean f = new OrderBean();

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//获取数据

String line = value.toString();

//切分

String[] split = line.split("\t");

//赋值/数据封装

f.setId(Integer.parseInt(split[0]));

f.setPrice(Double.parseDouble(split[2]));

context.write(f,new Text(split[1]));

}

}

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class OrderReduce extends Reducer<OrderBean, Text,OrderBean,Text> {

@Override

protected void reduce(OrderBean flowBean, Iterable <Text> values, Context context) throws IOException, InterruptedException {

//输出

for(Text text:values){

//输出验证下

//System.out.println("key:" + flowBean.toString() + "," + "value:" + text.toString());

context.write(flowBean,text); //注意 这里的value 会合并成迭代器 ,因为后面的代码设置了分组

/**使用对象的某字段值分组,对象间的某字段的值相同,则这些对象就会组成一个"变化key",而value会聚集成迭代器,而这个变化key是根据遍历迭代器产生value"对应的对象做为key"。

如果不遍历迭代器,context.write(Bean,value)只使用一次 默认就是使用排序为第一的对象作为变量key

**/

}

//输出验证下

System.out.println("=========分隔符==========");

}

}

/**

注意这里 遍历的时候 没有跳出循环 对象却换了

* key:3 222.8,value:Pdt_01

* key:3 33.8,value:Pdt_02

* =========分隔符==========

* key:1 222.8,value:Pdt_01

* key:1 25.8,value:Pdt_06

* =========分隔符==========

* key:2 722.4,value:Pdt_05

* key:2 522.8,value:Pdt_03

* key:2 122.4,value:Pdt_04

* =========分隔符==========

当然 如果不遍历迭代器,就写一个

System.out.println("key:" + flowBean.toString() + "," + "value:" + text.toString());

那么输出的结构就是每一个组的第一个数据:

* key:3 222.8,value:Pdt_01

* =========分隔符==========

* key:1 222.8,value:Pdt_01

* =========分隔符==========

* key:2 722.4,value:Pdt_05

* =========分隔符=========

*/

package study190616_3;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;

public class OrderPartition extends Partitioner<OrderBean, Text> {

@Override

public int getPartition(OrderBean orderBean, Text text, int i) {

//&表示位运算(还可以做逻辑运算符),orderBean.getId() & 2147483647得出结果为1,1%3=0...1,所以余数为1

//有0、1 、2的返回值

return (orderBean.getId() & 2147483647) % i;

}

}

package study190616_3;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

public class GroupComparee extends WritableComparator{

//构造方法

protected GroupComparee(){

//调用父类方法,里面传入参数,true表示开启辅助排序

//调用父类方法的WritableComparator(Class<? extends WritableComparable> keyClass, boolean createInstances){..}

super(OrderBean.class,true);

}

@Override

public int compare(WritableComparable a, WritableComparable b) {

OrderBean oa = (OrderBean) a;

OrderBean ob = (OrderBean) b;

/**使用对象的某字段值分组,对象间的某字段的值相同,则这些对象就会组成一个"变化key,就是一个key",而value会聚集成迭代器,而这个变化key是根据遍历迭代器产生value"对应的对象做为key"。

如果不遍历迭代器,context.write(Bean,value)只使用一次 默认就是使用排序为第一的对象作为变量key

**/

//对key排序就是对key进行分组,在组排序

if(oa.getId()>ob.getId()){

return 1;

}else if(oa.getId()<ob.getId()){

return -1;

}else {

///相等

return 0;

}

}

}

package study190616_3;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MainOrder {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

args=new String[]{"C:\\Users\\HMTX\\Desktop\\0616-压缩\\GroupingComparator.txt","C:\\Users\\HMTX\\Desktop\\0616-压缩\\t111qq11"};

// 1 获取job信息

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

// 2 加载jar包

job.setJarByClass(MainOrder.class);

// 3 关联map和reduce

job.setMapperClass(OrderMap.class);

job.setReducerClass(OrderReduce.class);

// 4 设置最终输出类型

job.setMapOutputKeyClass(OrderBean.class);

job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(OrderBean.class);

job.setOutputValueClass(Text.class);

//指定辅助排序

job.setGroupingComparatorClass(GroupComparee.class);

//指定分区

job.setPartitionerClass(OrderPartition.class);

job.setNumReduceTasks(3);

// 5 设置输入和输出路径

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 6 提交

job.waitForCompletion(true);

}

}

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

————————————————————————

mapreduce 排序,# Hadoop生态圈,hadoop,mapreduce,大数据

————————————————————————

到了这里,关于Hadoop-MapReduce排序(超级详细)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MapReduce排序机制(Hadoop)

    在MapReduce中, 排序的目的是为了方便Reduce阶段的处理,通常是为了将相同键的键值对聚合在一起,以便进行聚合操作或其他处理。 对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使 用率达到一定 阈值 后,再对缓冲区中的数据进行一次快速排序,并将这

    2024年04月24日
    浏览(23)
  • Hadoop3教程(十四):MapReduce中的排序

    排序是MR中最重要的操作之一,也是面试中可能被问到的重点。 MapTask和ReduceTask中都会对数据按照KEY来排序,主要是为了效率,排完序之后,相同key值的数据会被放在一起,更方便下一步(如Reducer())的汇总处理。 默认排序是按照 字典顺序 (字母由小到大,或者是数字由小

    2024年02月07日
    浏览(59)
  • Hadoop之MapReduce 详细教程

    MapReduce 思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。 1、 Map 负责“分” ,即把复杂

    2024年02月03日
    浏览(56)
  • 【大数据】Hadoop_MapReduce➕实操(附详细代码)

    MapReduce是hadoop的核心组件之一,hadoop要分布式包括两部分,一是分布式文件系统hdfs,一是分布式计算框,就是mapreduce,二者缺一不可,也就是说,可以通过mapreduce很容易在hadoop平台上进行分布式的计算编程 sftp命令:Windows下登录Hadoop102 xftp root@hadoop102 , lcd 切换Windows路径,

    2024年02月01日
    浏览(29)
  • Hadoop3教程(十一):MapReduce的详细工作流程

    本小节将展示一下整个MapReduce的全工作流程。 首先是Map阶段: 首先,我们有一个待处理文本文件的集合; 客户端开始切片规划; 客户端提交各种信息(如切片规划文件、代码文件及其他配置数据)到yarn; yarn接收信息,计算所需的MapTask数量(按照切片数); MapTask启动,读

    2024年02月07日
    浏览(40)
  • 【云计算与大数据技术】Hadoop MapReduce的讲解(图文解释,超详细必看)

    MapReduce 是一种分布式计算框架,能够处理大量数据 ,并提供容错 、可靠等功能 , 运行部署在大规模计算集群中,MapReduce计算框架采用主从架构,由 Client、JobTracker、TaskTracker组成 用户编写 MapReduce程序,通过Client提交到JobTracker JobTracker负责管理运行的 TaskTracker节点;负责Job的调度

    2024年02月13日
    浏览(33)
  • 大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——MapTask工作机制

    MapTask工作机制如下图所示。 (1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。 (2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。 (3)Collect收集阶段:在用户编写map()函数中,当数据处

    2023年04月08日
    浏览(53)
  • Hadoop 2:MapReduce

    理解MapReduce思想 MapReduce的思想核心是“先分再合,分而治之”。 所谓“分而治之”就是把一个复杂的问题,按照一定的“分解”方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果,然后把各部分的结果组成整个问题的最终结果。 这种思想来源于日

    2024年02月06日
    浏览(34)
  • 【Hadoop】- MapReduce概述[5]

    目录 前言 一、分布式计算框架 - MapReduce 二、MapReduce执行原理 MapReduce是一种 分布式计算框架 ,由Google开发。它的设计目标是将大规模数据集的处理和生成任务分布到一个由廉价计算机组成的集群中。 在MapReduce模型中,输入数据被分割成若干小块,并在集群中的多个节点上并

    2024年04月25日
    浏览(25)
  • Hadoop MapReduce解析

    Hadoop MapReduce是一个用于处理大量数据的编程模型和一个相应的实现框架。MapReduce作业通常分为两个阶段:Map阶段和Reduce阶段。 Map阶段 在Map阶段,你编写的Map函数会对输入数据进行处理。每个输入数据片段(例如一行文本)都会被Map函数处理,并产生中间键值对。 以单词计数

    2024年04月14日
    浏览(24)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包