云计算与大数据实验五 MapReduce编程

这篇具有很好参考价值的文章主要介绍了云计算与大数据实验五 MapReduce编程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、实验目的

  1. 了解Mapper类,Reducer类和Job类

  2. 掌握什么是MapReduce及使用MapReduce进行运算

  3. 掌握挖掘父子辈关系,给出祖孙辈关系的表格

二、实验内容

  1. 使用Map/Reduce计算班级中年龄最大的学生

  2. 使用Map/Reduce编程实现文件合并和去重操作

  3. 对给定的表格进行信息挖掘

  4. 编写实现日期操作的程序

三、实验步骤

(一)使用Map/Reduce计算班级中年龄最大的学生

什么是MapReduce

MapReduce是一种可用于数据处理的编程模型,我们现在设想一个场景,你接到一个任务,任务是:挖掘分析我国气象中心近年来的数据日志,该数据日志大小有3T,让你分析计算出每一年的最高气温,如果你现在只有一台计算机,如何处理呢?我想你应该会读取这些数据,并且将读取到的数据与目前的最大气温值进行比较。比较完所有的数据之后就可以得出最高气温了。不过以我们的经验都知道要处理这么多数据肯定是非常耗时的。

云计算与大数据实验五 MapReduce编程

创建file01输入内容:

Hello World Bye World

创建file02输入内容:

Hello Hadoop Goodbye Hadoop

将文件上传到HDFS/usr/input/目录下:

不要忘了启动DFSstart-dfs.sh

然后创建文件夹并上传:

云计算与大数据实验五 MapReduce编程

点击评测,运行代码,可以看到/usr/output目录下已经生成了文件。

云计算与大数据实验五 MapReduce编程

我们来查看part--r-00000文件的内容:

云计算与大数据实验五 MapReduce编程

可以看到统计的数据已经生成在文件中了。

编程要求

使用MapReduce计算班级每个学生的最好成绩,输入文件路径为/user/test/input,请将计算后的结果输出到/user/test/output/目录下。

输入文件的数据格式如下: 张三 12 李四 13 张三 89 李四 92 ...

依照如上格式你应该输出:

张三 89 李四 92

相关代码:

1.	//首先在命令行启动 hadoop: start-all.sh
2.	import java.io.IOException;
3.	import java.util.StringTokenizer;
4.	
5.	import java.io.IOException;
6.	import java.util.StringTokenizer;
7.	import org.apache.hadoop.conf.Configuration;
8.	import org.apache.hadoop.fs.Path;
9.	import org.apache.hadoop.io.*;
10.	import org.apache.hadoop.io.Text;
11.	import org.apache.hadoop.mapreduce.Job;
12.	import org.apache.hadoop.mapreduce.Mapper;
13.	import org.apache.hadoop.mapreduce.Reducer;
14.	import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15.	import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16.	import org.apache.hadoop.util.GenericOptionsParser;
17.	
18.	public class WordCount {
19.	    /********** Begin **********/
20.	    public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
21.	        private final static IntWritable one = new IntWritable(1);
22.	        private Text word = new Text();
23.	        private int maxValue = 0;
24.	        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
25.	            StringTokenizer itr = new StringTokenizer(value.toString(),"\n");
26.	            while (itr.hasMoreTokens()) {
27.	                String[] str = itr.nextToken().split(" ");
28.	                String name = str[0];
29.	                one.set(Integer.parseInt(str[1]));
30.	                word.set(name);
31.	                context.write(word,one);
32.	            }
33.	            //context.write(word,one);
34.	        }
35.	    }
36.	    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
37.	        private IntWritable result = new IntWritable();
38.	        public void reduce(Text key, Iterable<IntWritable> values, Context context)
39.	                throws IOException, InterruptedException {
40.	            int maxAge = 0;
41.	            int age = 0;
42.	            for (IntWritable intWritable : values) {
43.	                maxAge = Math.max(maxAge, intWritable.get());
44.	            }
45.	            result.set(maxAge);
46.	            context.write(key, result);
47.	        }
48.	    }
49.	    public static void main(String[] args) throws Exception {
50.	        Configuration conf = new Configuration();
51.	        Job job = new Job(conf, "word count");
52.	        job.setJarByClass(WordCount.class);
53.	        job.setMapperClass(TokenizerMapper.class);
54.	        job.setCombinerClass(IntSumReducer.class);
55.	        job.setReducerClass(IntSumReducer.class);
56.	        job.setOutputKeyClass(Text.class);
57.	        job.setOutputValueClass(IntWritable.class);
58.	        String inputfile = "/user/test/input";
59.	        String outputFile = "/user/test/output/";
60.	        FileInputFormat.addInputPath(job, new Path(inputfile));
61.	        FileOutputFormat.setOutputPath(job, new Path(outputFile));
62.	        job.waitForCompletion(true);
63.	    /********** End **********/
64.	    }
65.	}

(二)使用Map/Reduce编程实现文件合并和去重操作

map类

首先我们来看看Mapper对象:

云计算与大数据实验五 MapReduce编程

在编写MapReduce程序时,要编写一个类继承Mapper类,这个Mapper类是一个泛型类型,它有四个形参类型,分别指定了map()函数的输入键,输入值,和输出键,输出值的类型。就第一关的例子来说,输入键是一个长整型,输入值是一行文本,输出键是单词,输出值是单词出现的次数。

云计算与大数据实验五 MapReduce编程

Hadoop提供了一套可优化网络序列化传输的基本类型,而不是直接使用Java内嵌的类型。这些类型都在org.apache.hadoop.io包中,这里使用LongWritable(相当于Java中的Long类型),Text类型(相当于Java中的String类型)和IntWritable(相当于Integer类型)。

map()函数的输入是一个键和一个值,我们一般首先将包含有一行输入的text值转换成JavaString类型,然后再使用对字符串操作的类或者其他方法进行操作即可。

Reducer类

同样Reducer也有四个参数类型用于指定输入和输出类型,reduce()函数的输入类型必须匹配map函数的输出类型,即Text类型和IntWritable类型,在这种情况下,reduce函数的输出类型也必须是TextIntWritable类型,即分别输出单词和次数。

云计算与大数据实验五 MapReduce编程

Job类

云计算与大数据实验五 MapReduce编程

一般我们用Job对象来运行MapReduce作业,Job对象用于指定作业执行规范,我们可以用它来控制整个作业的运行,我们在Hadoop集群上运行这个作业时,要把代码打包成一个JAR文件(Hadoop在集群上发布的这个文件),不用明确指定JAR文件的名称,在Job对象的setJarByClass()函数中传入一个类即可,Hadoop利用这个类来查找包含他的JAR文件。addInputPath()函数和setOutputPath()函数用来指定作业的输入路径和输出路径。值的注意的是,输出路径在执行程序之前不能存在,否则Hadoop会拒绝执行你的代码。

最后我们使用waitForCompletion()方法提交代码并等待执行,该方法唯一的参数是一个布尔类型的值,当该值为true时,作业会把执行过程打印到控制台,该方法也会返回一个布尔值,表示执行的成败。

编程要求

对于两个输入文件,即文件file1和文件file2,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件file3。 为了完成文件合并去重的任务,你编写的程序要能将含有重复内容的不同文件合并到一个没有重复的整合文件,规则如下:

  • 第一列按学号排列;
  • 学号相同,按x,y,z排列;
  • 输入文件路径为:/user/tmp/input/
  • 输出路径为:/user/tmp/output/

程序会对你编写的代码进行测试: 输入已经指定了测试文本数据:需要你的程序输出合并去重后的结果。 下面是输入文件和输出文件的一个样例供参考。

输入文件file1的样例如下: 20150101 x 20150102 y 20150103 x 20150104 y 20150105 z 20150106 x

输入文件file2的样例如下: 20150101 y 20150102 y 20150103 x 20150104 z 20150105 y

根据输入文件file1file2合并得到的输出文件file3的样例如下:

20150101 x 20150101 y 20150102 y 20150103 x 20150104 y 20150104 z 20150105 y 20150105 z 20150106 x

相关代码:

1.	import java.io.IOException;
2.	
3.	
4.	import java.util.*;
5.	import org.apache.hadoop.conf.Configuration;
6.	import org.apache.hadoop.fs.Path;
7.	import org.apache.hadoop.io.*;
8.	import org.apache.hadoop.mapreduce.Job;
9.	import org.apache.hadoop.mapreduce.Mapper;
10.	import org.apache.hadoop.mapreduce.Reducer;
11.	import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12.	import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13.	import org.apache.hadoop.util.GenericOptionsParser;
14.	public class Merge {
15.	
16.	    /**
17.	     * @param args
18.	     * 对A,B两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C
19.	     */
20.	    //在这重载map函数,直接将输入中的value复制到输出数据的key上 注意在map方法中要抛出异常:throws IOException,InterruptedException
21.	    /********** Begin **********/
22.	    public static class Map extends Mapper<LongWritable, Text, Text, Text >
23.	    {
24.	        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
25.	                throws IOException, InterruptedException {
26.	            String str = value.toString();
27.	            String[] data = str.split(" ");
28.	            Text t1= new Text(data[0]);
29.	            Text t2 = new Text(data[1]);
30.	            context.write(t1,t2);
31.	        }
32.	    } 
33.	    /********** End **********/
34.	
35.	    //在这重载reduce函数,直接将输入中的key复制到输出数据的key上  注意在reduce方法上要抛出异常:throws IOException,InterruptedException
36.	    /********** Begin **********/
37.	    public static class Reduce  extends Reducer<Text, Text, Text, Text>
38.	    {
39.	        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
40.	                throws IOException, InterruptedException {
41.	            List<String> list = new ArrayList<>();
42.	            for (Text text : values) {
43.	                String str = text.toString();
44.	                if(!list.contains(str)){
45.	                    list.add(str);
46.	                }
47.	            }
48.	            Collections.sort(list);
49.	            for (String text : list) {
50.	                context.write(key, new Text(text));
51.	            }
52.	        }
53.	    /********** End **********/
54.	    }
55.	
56.	    public static void main(String[] args) throws Exception{
57.	        Configuration conf = new Configuration();
58.	         Job job = new Job(conf, "word count");
59.	        job.setJarByClass(Merge.class);
60.	        job.setMapperClass(Map.class);
61.	        job.setCombinerClass(Reduce.class);
62.	        job.setReducerClass(Reduce.class);
63.	        job.setOutputKeyClass(Text.class);
64.	        job.setOutputValueClass(Text.class);
65.	        String inputPath = "/user/tmp/input/";  //在这里设置输入路径
66.	        String outputPath = "/user/tmp/output/";  //在这里设置输出路径
67.	        FileInputFormat.addInputPath(job, new Path(inputPath));
68.	        FileOutputFormat.setOutputPath(job, new Path(outputPath));
69.	        System.exit(job.waitForCompletion(true) ? 0 : 1);
70.	    }
71.	}

(三)对给定的表格进行信息挖掘

编程要求

你编写的程序要能挖掘父子辈关系,给出祖孙辈关系的表格。规则如下:

  • 孙子在前,祖父在后;
  • 输入文件路径:/user/reduce/input
  • 输出文件路径:/user/reduce/output

输入文件内容如下: child parent Steven Lucy Steven Jack Jone Lucy Jone Jack Lucy Mary Lucy Frank Jack Alice Jack Jesse David Alice David Jesse Philip David Philip Alma Mark David Mark Alma

输出文件内容如下:

grand_child    grand_parent
Mark    Jesse
Mark    Alice
Philip    Jesse
Philip    Alice
Jone    Jesse
Jone    Alice
Steven    Jesse
Steven    Alice
Steven    Frank
Steven    Mary
Jone    Frank
Jone    Mary

相关代码:

1.	import java.io.IOException;
2.	import java.util.*;
3.	
4.	import org.apache.hadoop.conf.Configuration;
5.	import org.apache.hadoop.fs.Path;
6.	import org.apache.hadoop.io.IntWritable;
7.	import org.apache.hadoop.io.Text;
8.	import org.apache.hadoop.mapreduce.Job;
9.	import org.apache.hadoop.mapreduce.Mapper;
10.	import org.apache.hadoop.mapreduce.Reducer;
11.	import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12.	import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13.	import org.apache.hadoop.util.GenericOptionsParser;
14.	
15.	public class simple_data_mining {
16.	    public static int time = 0;
17.	
18.	    /**
19.	     * @param args
20.	     * 输入一个child-parent的表格
21.	     * 输出一个体现grandchild-grandparent关系的表格
22.	     */
23.	    //Map将输入文件按照空格分割成child和parent,然后正序输出一次作为右表,反序输出一次作为左表,需要注意的是在输出的value中必须加上左右表区别标志
24.	    public static class Map extends Mapper<Object, Text, Text, Text>{
25.	        public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
26.	            /********** Begin **********/
27.	
28.	
29.	            String child_name = new String();
30.	            String parent_name = new String();
31.	            String relation_type = new String();
32.	            String line = value.toString();
33.	            int i = 0;
34.	            while(line.charAt(i) != ' '){
35.	                i++;
36.	            }
37.	            String[] values = {line.substring(0,i),line.substring(i+1)};
38.	            if(values[0].compareTo("child") != 0){
39.	                child_name = values[0];
40.	                parent_name = values[1];
41.	                relation_type = "1";//左右表区分标志
42.	                context.write(new Text(values[1]), new Text(relation_type+"+"+child_name+"+"+parent_name));
43.	                //左表
44.	                relation_type = "2";
45.	                context.write(new Text(values[0]), new Text(relation_type+"+"+child_name+"+"+parent_name));
46.	                //右表
47.	            }
48.	
49.	
50.	
51.	
52.	
53.	
54.	            /********** End **********/
55.	        }
56.	    }
57.	
58.	    public static class Reduce extends Reducer<Text, Text, Text, Text>{
59.	        public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
60.	                /********** Begin **********/
61.	            if(time == 0){   //输出表头
62.	                context.write(new Text("grand_child"), new Text("grand_parent"));
63.	                time++;
64.	            }
65.	            int grand_child_num = 0;
66.	            String grand_child[] = new String[10];
67.	            int grand_parent_num = 0;
68.	            String grand_parent[]= new String[10];
69.	            Iterator ite = values.iterator();
70.	            while(ite.hasNext()){
71.	                String record = ite.next().toString();
72.	                int len = record.length();
73.	                int i = 2;
74.	                if(len == 0) continue;
75.	                char relation_type = record.charAt(0);
76.	                String child_name = new String();
77.	                String parent_name = new String();
78.	                //获取value-list中value的child
79.	                while(record.charAt(i) != '+'){
80.	                    child_name = child_name + record.charAt(i);
81.	                    i++;
82.	                }
83.	                i=i+1;
84.	                //获取value-list中value的parent
85.	                while(i<len){
86.	                    parent_name = parent_name+record.charAt(i);
87.	                    i++;
88.	                }
89.	                //左表,取出child放入grand_child
90.	                if(relation_type == '1'){
91.	                    grand_child[grand_child_num] = child_name;
92.	                    grand_child_num++;
93.	                }
94.	                else{//右表,取出parent放入grand_parent
95.	                    grand_parent[grand_parent_num] = parent_name;
96.	                    grand_parent_num++;
97.	                }
98.	            }
99.	            if(grand_parent_num != 0 && grand_child_num != 0 ){
100.	                for(int m = 0;m<grand_child_num;m++){
101.	                    for(int n=0;n<grand_parent_num;n++){
102.	                        context.write(new Text(grand_child[m]), new Text(grand_parent[n]));
103.	                        //输出结果
104.	                    }
105.	                }
106.	            }
107.	
108.	                /********** End **********/
109.	
110.	        }
111.	    }
112.	    public static void main(String[] args) throws Exception{
113.	        // TODO Auto-generated method stub
114.	        Configuration conf = new Configuration();
115.	        Job job = Job.getInstance(conf,"Single table join");
116.	        job.setJarByClass(simple_data_mining.class);
117.	        job.setMapperClass(Map.class);
118.	        job.setReducerClass(Reduce.class);
119.	        job.setOutputKeyClass(Text.class);
120.	        job.setOutputValueClass(Text.class);
121.	        String inputPath = "/user/reduce/input";   //设置输入路径
122.	        String outputPath = "/user/reduce/output";   //设置输出路径
123.	        FileInputFormat.addInputPath(job, new Path(inputPath));
124.	        FileOutputFormat.setOutputPath(job, new Path(outputPath));
125.	        System.exit(job.waitForCompletion(true) ? 0 : 1);
126.	
127.	    }
128.	
129.	}

四、实验心得 

掌握了什么是MapReduce及使用MapReduce进行运算

掌握了挖掘父子辈关系,给出祖孙辈关系的表格文章来源地址https://www.toymoban.com/news/detail-423540.html

到了这里,关于云计算与大数据实验五 MapReduce编程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 大数据实验 实验四:NoSQL 和关系数据库的操作比较

    理解四种数据库(MySQL、HBase、Redis 和 MongoDB)的概念以及不同点; 熟练使用四种数据库操作常用的 Shell 命令; 熟悉四种数据库操作常用的 Java API。 操作系统:centos7 Hadoop 版本:3.3; MySQL 版本:8.0.22; HBase 版本:2.4.11; Redis 版本:5.0.5; MongoDB 版本:5.0; JDK 版本:1.8; Java

    2024年04月16日
    浏览(29)
  • 云计算与大数据入门实验四 —— MapReduce 初级编程实践

    通过实验掌握基本的 MapReduce 编程方法 掌握用 MapReduce 解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等 (一)编程实现文件合并和去重操作 对于两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个

    2024年02月05日
    浏览(29)
  • 大数据实验 实验二:熟悉HDFS常用操作

    附件中有word版本的实验报告 理解HDFS在Hadoop体系结构中的角色。 熟练使用HDFS操作常用的Shell命令。 熟悉HDFS操作常用的Java API。 Oracle VM VirtualBox虚拟机 系统版本centos7 JDK1.8版本 Hadoop-3.1.3 Windows11 Java IDE:IDEA 1.向HDFS中上传任意文本文件,如果指定的文件在HDFS中已经存在,由用户

    2024年04月12日
    浏览(27)
  • 大数据实验 实验三:熟悉常用的HBase操作

    (1)理解HBase在Hadoop体系结构中的角色; (2)熟练使用HBase操作常用的Shell命令; (3)熟悉HBase操作常用的Java API。 操作系统:centos7; Hadoop版本:3.3; HBase版本:2.2.2; JDK版本:1.8; Java IDE:IDEA。 (1) 列出HBase所有的表的相关信息,例如表名、创建时间等; (2) 在终端

    2024年02月04日
    浏览(34)
  • 大数据实验三-HBase编程实践

    目录 一.实验内容 二.实验目的 三.实验过程截图及说明 1、安装HBase 2、配置伪分布式模式: 3、使用hbase的shell命令来操作表: 4、使用hbase提供的javaAPI来编程实现类似操作: 5、实验总结及心得体会 6、完整报告在文章开头,挂载。 HBase编程实践: 1)在Hadoop基础上安装H

    2024年04月12日
    浏览(55)
  • 云计算与大数据实验三 HDFS的基本操作

    一、实验目的 理解 HDFS 架构和工作原理 掌握 HDFS 部署环境和步骤 掌握 HDFS( 集群的启动 start-dfs.sh) 启动 使用 Hadoop 命令 ( 文件的增 / 删 / 改 / 查 / 上传 / 下载 ) 来操作分布式文件系统 二、实验内容 HDFS 伪分布式环境搭建 HDFS( 集群的启动 start-dfs.sh) 启动 练习 Hadoop 命令 ( 文件

    2024年02月04日
    浏览(26)
  • 云计算技术 实验七 MapReduce编程基础

    参考资料为: 教材代码-林子雨编著《大数据基础编程、实验和案例教程(第2版)》教材所有章节代码_厦大数据库实验室博客 1 . 实验学时 4学时 2 . 实验目的 熟悉MapReduce编程框架。 了解Map部分和Reduce部分的工作原理。 实现简单的MapReduce编程。 3 . 实验内容 (一)实现词频统

    2024年02月05日
    浏览(31)
  • 云计算与大数据实验七 HBase的安装与基本操作

    一、实验目的 回顾 Hadoop 和 Zookeeper安装与配置 掌握 HBase 安装与配置 理解HBase工作原理 掌握HBase表的基本操作 二、实验内容 HBase 安装与配置 使用 HBase shell 指令创建表 使用 HBase shell 命令添加/删除数据 使用命令删除表 三、实验步骤 (一)HBase 安装与配置 HBase安装 HBase 的安

    2024年02月03日
    浏览(32)
  • 大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现

    图1:MaxCompute MapReduce各个阶段思路设计 设计思路分析分为六个模块:input输入数据、splitting拆分、Mapping映射、Shuffing派发、Reducing缩减、Final result输出。 输入数据:直接读入文本不进行分片,数据项本身作为单个Map Worker的输入。 Map阶段:Map处理输入,每获取一个数字,将数

    2024年02月05日
    浏览(39)
  • 【大数据实训】基于赶集网租房信息的数据分析与可视化(七)

    温馨提示:文末有 CSDN 平台官方提供的博主 的联系方式,有偿帮忙部署 一、实验环境 (1)Linux: Ubuntu 16.04 (2)Python: 3.6 (3)Hadoop:3.1.3(4)Spark: 2.4.0(5)Web框架:flask 1.0.3 (6)可视化工具:Echarts (7)开发工具:Visual Studio Code 二、小组成员及分工 (1)成员:林xx,x

    2024年02月03日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包