N.1 MapReduce的模型
————————————————————————文章来源地址https://www.toymoban.com/news/detail-768283.html
————————————————————————
(1)map map task会从本地⽂件系统读取数据,转换成key-value形式的键值对集合。使⽤的是hadoop内置的数据类型,⽐如longwritable、text等。 (2)shuffle [1] 溢出 [2] 分区:mapper的key-value在输出之后会进⾏⼀个partition分区操作,默认使⽤的是hashpartitioner,可以通过重写hashpartitioner的getpartition⽅法来⾃定义分区规则。 [3] 归并排序:会对key进⾏进⾏sort排序,grouping分组操作将相同key的value合并分组输出。当然,在这⾥可以使⽤⾃定义的数据类型,重写WritableComparator的Comparator⽅法来⾃定义排序规则 [4] 预合并排序 [5] reduce拉取的总归并排序:和归并排序排序一样 (3)reduce reduce阶段的排序,负责接接收shuffle处理好的数据,直接循环迭代( key,valus{..} )即可。最后将数据保存或者显⽰,结束整个job。(当然这里要在做排序也是可以的) |
N.2 WritableComparable排序概念
0)排序阶段的介绍_sefl来源百度: (1)shuffle过程中执行过程中的排序(默认字典排序,默认是过group分组,一个对象单独一组),分别是: [1] 溢出 [2] 分区阶段:根据分区以及key进行快速排序 [3] 预合并阶段:是在排序的基础上做合并,在排序。 [4] 归并阶段:将同一个分区的多个溢写文件进行归并排序,合成大的溢出文件。 [5] reduce拉取阶段总归并:会对不同的mapTask进行归并排序。 (2)reduce阶段的排序,负责接接收shuffle处理好的数据,直接循环迭代( key,valus{..} )即可。 1)排序的分类: (1)部分排序(某一个分区,或者是部分文件排序): MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。 (2)全排序(就一个分区,且所有文件合在一起排序): 如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。 替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为上述文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。 (3)辅助排序(某一个分区,或者部分文件排序,这个一般涉及对象的排序):(GroupingComparator分组)执行默认的。 [1] 默认情况下,如果使用自定义对象排序,每new出一个就单独分成一组。 GroupingComparator是在reduce阶段分组来使用的,由于reduce阶段,如果key相同的一组,只取第一个key作为key,迭代所有的values。 如果reduce的key是自定义的对象,我们只需要bean里面的某个属性相同就认为这样的key是相同的,这是我们就需要之定义GroupCoparator来“欺骗”reduce了。如果每一个对象key都是单独一组,那么有些状态数据就要传递了,就会增加复杂度。所以辅助排序,相当优化。 [2] 一般来说,大多数MapReduce程序会避免让reduce函数依赖于value的排序。但是,有时也需要通过特定的方法对key进行排序和分组等, 以实现对value的排序。 (4)二次排序(某一个分区,或者部分文件排序,这里是二次排序): [1] 二次排序 就是对key进行第一与二次的排序,案例:如key是一个对象,对象有多个属性,而value是别的值,而使用二次排序可以对key 里面的属性进行二次排序。 [2] 在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序,如果是一个条件就是一次排序,定义一个类实现WritableComparable接口重写compareTo方法,就可以实现排序。 @Override public int compareTo(FlowBean o) { // 倒序排列,从大到小 //FlowBean o表示后面的一个对象。-1表示交换位置,1表示不交换位置。 return this.sumFlow > o.getSumFlow() ? -1 : 1; } |
N.3 二次排序
1)源数据: |
————————————————————————
————————————————————————
————————————————————————
2)以下代码使用的是一个分区,输出也是一个文件。
package study190616_2; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable<FlowBean>{ private int id; private double price; //反序列化调用 public FlowBean(){ } //比较器 public int compareTo(FlowBean o) { //-1不交换位置(负数) 1交换位置(正数) //比较原则 从上到下 “升”序排序,如果遇到相同的数据,就按后面的数据降序。 if(this.id>o.id){ return 1; }else if(this.id<o.id){ return -1; }else { //自定义的二次排序, return this.price>o.price? -1:1; } } //序列化 public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(this.id); dataOutput.writeDouble(this.price); } //反序列化 public void readFields(DataInput dataInput) throws IOException { this.id = dataInput.readInt(); this.price = dataInput.readDouble(); } public void setId(int id) { this.id = id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } @Override public String toString() { return this.id+"\t"+this.price; } } |
package study190616_2; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * KEYIN, VALUEIN, KEYOUT, VALUEOUT * LongWritable,Text, FlowBean,NullWritable * map 的输入 map的输出 * * */ public class SortMap extends Mapper<LongWritable,Text,FlowBean,NullWritable> { FlowBean f = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取数据 String line = value.toString(); //切分 String[] split = line.split("\t"); //赋值/数据封装 //数据行:0000001 Pdt_01 222.8 f.setId(Integer.parseInt(split[0])); f.setPrice(Double.parseDouble(split[2])); context.write(f,NullWritable.get()); //注意 这里是按照对象传到reduce ,而每一个对象是通过new出来的 ,所以地址不一样,在reduce接受的时候,每个对象不合在一起,它们的value也不会集中成一个迭代器,也就是说个对象都有自己的迭代器。所以后面的章节会学到赋值排序,让它们合起来 } } |
package study190616_2; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SortReduce extends Reducer<FlowBean,NullWritable,FlowBean,NullWritable> { @Override protected void reduce(FlowBean flowBean, Iterable <NullWritable> values, Context context) throws IOException, InterruptedException { //输出 context.write(flowBean, NullWritable.get()); //这里的reduce自己写 因为每一个对象不一样 ,迭代器不会聚集值。 } } |
package study190616_2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class MainWritable { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args=new String[]{"C:\\Users\\HMTX\\Desktop\\0616-压缩\\GroupingComparator.txt","C:\\Users\\HMTX\\Desktop\\0616-压缩\\t11"}; // 1 获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 加载jar包 job.setJarByClass(MainWritable.class); // 3 关联map和reduce job.setMapperClass(SortMap.class); job.setReducerClass(SortReduce.class); // 4 设置最终输出类型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(FlowBean.class); job.setOutputValueClass(NullWritable.class); // 5 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 提交 job.waitForCompletion(true); } } |
3)远行结果 |
————————————————————————
————————————————————————
N.4 GroupingComparator分组排序
1)如果使用某一个字段进行辅助排序,那么这个字段"必须"在之前"有过排序"的处理,所有"辅助"顾名思义就是在前者排序好的基础上发挥的作用, 单独使用的辅助排序 很可能生成的结果顺序是乱的,最好不要使用。而辅助里面在使用排序 一般 都跟之前的字段排序规则一样,其实辅助主要的不是排序,而是分组才是关键(可以认为写个排序的代码就是想分组)。所以 下面的案例 使用了二次排序作为辅助排序的基础。 2)源数据 |
————————————————————————
————————————————————————
————————————————————————
package study190616_3; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class OrderBean implements WritableComparable<OrderBean> { private int id; private double price; public OrderBean(){ } //排序,先按ID排序,相等ID在价格排序 public int compareTo(OrderBean o) { //-1不交换位置(负数) 1交换位置(正数) if(this.id>o.id){ return 1; }else if(this.id<o.id){ return -1; }else { return this.price>o.price? -1:1; } } //序列化 public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(this.id); dataOutput.writeDouble(this.price); } //返序列化 public void readFields(DataInput dataInput) throws IOException { this.id = dataInput.readInt(); this.price = dataInput.readDouble(); } public int getId() { return id; } public void setId(int id) { this.id = id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } @Override public String toString() { return this.id+"\t"+this.price; } } |
package study190616_3; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class OrderMap extends Mapper<LongWritable,Text,OrderBean, Text> { //输入文件的原的行数据格式:0000002 Pdt_06 722.4 OrderBean f = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取数据 String line = value.toString(); //切分 String[] split = line.split("\t"); //赋值/数据封装 f.setId(Integer.parseInt(split[0])); f.setPrice(Double.parseDouble(split[2])); context.write(f,new Text(split[1])); } } |
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class OrderReduce extends Reducer<OrderBean, Text,OrderBean,Text> { @Override protected void reduce(OrderBean flowBean, Iterable <Text> values, Context context) throws IOException, InterruptedException { //输出 for(Text text:values){ //输出验证下 //System.out.println("key:" + flowBean.toString() + "," + "value:" + text.toString()); context.write(flowBean,text); //注意 这里的value 会合并成迭代器 ,因为后面的代码设置了分组 /**使用对象的某字段值分组,对象间的某字段的值相同,则这些对象就会组成一个"变化key",而value会聚集成迭代器,而这个变化key是根据遍历迭代器产生value"对应的对象做为key"。 如果不遍历迭代器,context.write(Bean,value)只使用一次 默认就是使用排序为第一的对象作为变量key **/ } //输出验证下 System.out.println("=========分隔符=========="); } } /** 注意这里 遍历的时候 没有跳出循环 对象却换了 * key:3 222.8,value:Pdt_01 * key:3 33.8,value:Pdt_02 * =========分隔符========== * key:1 222.8,value:Pdt_01 * key:1 25.8,value:Pdt_06 * =========分隔符========== * key:2 722.4,value:Pdt_05 * key:2 522.8,value:Pdt_03 * key:2 122.4,value:Pdt_04 * =========分隔符========== 当然 如果不遍历迭代器,就写一个 System.out.println("key:" + flowBean.toString() + "," + "value:" + text.toString()); 那么输出的结构就是每一个组的第一个数据: * key:3 222.8,value:Pdt_01 * =========分隔符========== * key:1 222.8,value:Pdt_01 * =========分隔符========== * key:2 722.4,value:Pdt_05 * =========分隔符========= */ |
package study190616_3; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class OrderPartition extends Partitioner<OrderBean, Text> { @Override public int getPartition(OrderBean orderBean, Text text, int i) { //&表示位运算(还可以做逻辑运算符),orderBean.getId() & 2147483647得出结果为1,1%3=0...1,所以余数为1 //有0、1 、2的返回值 return (orderBean.getId() & 2147483647) % i; } } |
package study190616_3; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class GroupComparee extends WritableComparator{ //构造方法 protected GroupComparee(){ //调用父类方法,里面传入参数,true表示开启辅助排序 //调用父类方法的WritableComparator(Class<? extends WritableComparable> keyClass, boolean createInstances){..} super(OrderBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean oa = (OrderBean) a; OrderBean ob = (OrderBean) b; /**使用对象的某字段值分组,对象间的某字段的值相同,则这些对象就会组成一个"变化key,就是一个key",而value会聚集成迭代器,而这个变化key是根据遍历迭代器产生value"对应的对象做为key"。 如果不遍历迭代器,context.write(Bean,value)只使用一次 默认就是使用排序为第一的对象作为变量key **/ //对key排序就是对key进行分组,在组排序 if(oa.getId()>ob.getId()){ return 1; }else if(oa.getId()<ob.getId()){ return -1; }else { ///相等 return 0; } } } |
package study190616_3; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class MainOrder { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args=new String[]{"C:\\Users\\HMTX\\Desktop\\0616-压缩\\GroupingComparator.txt","C:\\Users\\HMTX\\Desktop\\0616-压缩\\t111qq11"}; // 1 获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 加载jar包 job.setJarByClass(MainOrder.class); // 3 关联map和reduce job.setMapperClass(OrderMap.class); job.setReducerClass(OrderReduce.class); // 4 设置最终输出类型 job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(Text.class); //指定辅助排序 job.setGroupingComparatorClass(GroupComparee.class); //指定分区 job.setPartitionerClass(OrderPartition.class); job.setNumReduceTasks(3); // 5 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 提交 job.waitForCompletion(true); } } |
————————————————————————
————————————————————————
————————————————————————
文章来源:https://www.toymoban.com/news/detail-768283.html
————————————————————————
到了这里,关于Hadoop-MapReduce排序(超级详细)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!