MapReduce初级编程实践

这篇具有很好参考价值的文章主要介绍了MapReduce初级编程实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

实验环境

  • ubuntu18.04虚拟机和一个win10物理主机
  • 编程环境 IDEA
  • 虚拟机ip:192.168.1.108
  • JDK:1.8

实验内容


使用Java编程一个WordCount程序,并将该程序打包成Jar包在虚拟机内执行

首先使用IDEA创建一个Maven项目
在pom.xml文件内引入依赖和打包为Jar包的插件:

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>2.4.11</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.4.11</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>2.4.11</version>
        </dependency>
    </dependencies>

    <build>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.MyProgramDriver</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

编写对应的程序:
MyProgramDriver类用于执行程序入口:

import org.apache.hadoop.util.ProgramDriver;

public class MyProgramDriver {
    public static void main(String[] args) {
        int exitCode = -1;
        ProgramDriver programDriver = new ProgramDriver();
        try {
            programDriver.addClass("com.WordCount", WordCount.class, "com.WordCount Program");
            exitCode = programDriver.run(args);
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }
        System.exit(exitCode);
    }
}


WordCount程序:


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

public class WordCount {
    public WordCount() {
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }

        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        for(int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public IntSumReducer() {
        }

        public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0;

            IntWritable val;
            for(Iterator var5 = values.iterator(); var5.hasNext(); sum += val.get()) {
                val = (IntWritable)var5.next();
            }

            this.result.set(sum);
            context.write(key, this.result);
        }
    }

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public TokenizerMapper() {
        }

        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());

            while(itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write(this.word, one);
            }

        }
    }
}

项目结构截图:
MapReduce初级编程实践

在右侧点击maven的package进行项目打包为Jar文件
MapReduce初级编程实践

打包完成后的打包文件在target目录下
MapReduce初级编程实践
之后将打包好的Jar包发送到虚拟机内,我是放在/root/hadoop/a_dir目录下,放在哪随意,但自己要知道在哪
MapReduce初级编程实践

然后编写输入文件input1和input2,内容分别为:
MapReduce初级编程实践

然后将两个文件上传到hadoop的系统路径,这里我放在了hadoop的/root/input目录下,注意不是物理路径,是Hadoop启动后的网络路径
MapReduce初级编程实践

之后执行程序:

bin/hadoop jar a_dir/MyMapReduce-1.0-SNAPSHOT.jar com.WordCount /root/input/* /root/out

其中a_dir/MyMapReduce-1.0-SNAPSHOT.jar是需要执行的Jar包的路径,com.WordCount是需要执行的WordCount程序名称,这个名称就是在MyProgramDriver内注明的名称

MapReduce初级编程实践

/root/input/* 是输入的文件, /root/out是输出路径

查看输出:
MapReduce初级编程实践


编程实现文件合并和去重操作

输入样例:

20150101	x
20150102	y
20150103	x
20150104	y
20150105	z
20150106	x
20150101	y
20150102	y
20150103	x
20150104	z
20150105	y

主要思想:使用map将文件的每一行使用正则拆分为key,value ,如将20150101 x拆分后的key为20150101,value为x,类型为Text类型,将map处理后的由shuffle处理送往reduce进行处理,在reduce内使用HashSet的去重特性(在HashSet内的元素不重复)对输入的值进行去重。

;

Merge程序代码:


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;

public class Merge {

    public Merge() {
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {


        Configuration conf = new Configuration();
        String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: merge <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "merge");
        job.setJarByClass(Merge.class);
        job.setMapperClass(Merge.MyMapper.class);
        job.setCombinerClass(Merge.MyReduce.class);
        job.setReducerClass(Merge.MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        for(int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }


    public static class MyMapper extends Mapper<Object, Text, Text, Text> {

        public MyMapper() {

        }

        @Override
        public void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            String line = value.toString();
            //匹配空白符
            String[] split = line.split("\\s+");
            if (split.length <= 1) {
                return;
            }
            context.write(new Text(split[0]), new Text(split[1]));
        }
    }

    public static class MyReduce extends Reducer<Text, Text, Text, Text> {

        public MyReduce() {

        }

        @Override
        public void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            //使用HashSet进行去重操作
            HashSet<String> hashSet = new HashSet<>();
            Iterator<Text> iterator = values.iterator();
            while (iterator.hasNext()) {
                hashSet.add(iterator.next().toString());
            }
            Iterator<String> hashIt = hashSet.iterator();
            while (hashIt.hasNext()) {
                Text val = new Text(hashIt.next());
                context.write(key, val);
            }
        }
    }
}

将Merge程序写入MyProgramDriver类:


import org.apache.hadoop.util.ProgramDriver;

public class MyProgramDriver {
    public static void main(String[] args) {
        int exitCode = -1;
        ProgramDriver programDriver = new ProgramDriver();
        try {
            programDriver.addClass("com.WordCount", WordCount.class, "com.WordCount Program");
            programDriver.addClass("Merge", Merge.class, "xll");
            exitCode = programDriver.run(args);
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }
        System.exit(exitCode);
    }
}

将程序打包后发送到虚拟机,运行程序:

bin/hadoop jar a_dir/MyMapReduce-1.0-SNAPSHOT.jar Merge /root/input/* /root/out

运行结果:
MapReduce初级编程实践


编程实现对输入文件的排序

思路:在Map端将数值分离出来形成<key,1>这样的键值对,由于排序是MapReduce的默认操作,所以在Reduce端只需要将Map端分离出来的值进行输出就行,将Map端的key值设置为Reduce端的value值。

MyConf类代码:
这里我将一般需要进行的配置提取出来了,减少以后一下代码的重复

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

public class MyConf {
    public static void setConf(Class mainClass,Class outKeyClass, Class outValueClass, String[] args) throws IOException, InterruptedException, ClassNotFoundException, InstantiationException, IllegalAccessException {

        Configuration conf = new Configuration();
        String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("otherArgs length error, length < 2");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, mainClass.getName());
        Class[] innerClass = mainClass.getClasses();
        for (Class c : innerClass) {
            if (c.getSimpleName().equals("MyReduce")) {
                job.setReducerClass(c);
//                job.setCombinerClass(c);
            } else if (c.getSimpleName().equals("MyMapper")) {
                job.setMapperClass(c);
            }
         }
        job.setJarByClass(mainClass);
        job.setOutputKeyClass(outKeyClass);
        job.setOutputValueClass(outValueClass);

        for(int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

Sort类:

import com.utils.MyConf;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Sort {
    public Sort() {}

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, InstantiationException, IllegalAccessException {

        MyConf.setConf(Sort.class, IntWritable.class, IntWritable.class, args);
    }

    public static class MyMapper extends Mapper<Object, org.apache.hadoop.io.Text, IntWritable, IntWritable> {

        @Override
        protected void map(Object key, Text value, Mapper<Object, Text, IntWritable, IntWritable>.Context context) throws IOException, InterruptedException {
            String var = value.toString();
            context.write(new IntWritable(Integer.parseInt(var.trim())), new IntWritable(1));
        }
    }

    public static class MyReduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

       static int sort = 1;

        @Override
        protected void reduce(IntWritable key, Iterable<IntWritable> values, Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context) throws IOException, InterruptedException {

            for (IntWritable va : values) {
                context.write(new IntWritable(sort), key);
                sort++;
            }

        }
    }
}

然后再将Sort类注入MyProgramDriver类就可以了
MapReduce初级编程实践

程序的输入:
MapReduce初级编程实践

打包后放在虚拟机运行

bin/hadoop jar a_dir/MyMapReduce-1.0-SNAPSHOT.jar Sort /root/input* /root/out5

运行结果:
MapReduce初级编程实践


对给定的表格进行信息挖掘

思路:(参考),举个例子:
steven lucy
lucy mary
这个输入在经过map(map的具体逻辑参考下面的代码)出来后得到输出:
<steven,old#lucy>,<lucy,young#steven>,<lucy,old#mary>,<mary,young#lucy>,
之后经过shuffle处理之后得到输入:
<steven,old#lucy>,<lucy,<young#steven,old#mary>>,<mary,young#lucy>,
之后每个键值对作为Reduce端的输入
<lucy,<young#steven,old#mary>>键值对在经过reduce的逻辑处理后得到一个有效输出:
<steven, mary>

InfoFind类:

package com;

import com.utils.MyConf;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;

public class InfoFind {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, InstantiationException, IllegalAccessException {
        MyConf.setConf(InfoFind.class, Text.class, Text.class, args);
    }

    public static class MyMapper extends Mapper<Object, Text, Text, Text> {

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] splStr = value.toString().split("\\s+");
            String child = splStr[0];
            String parent = splStr[1];

            if (child.equals("child") && parent.equals("parent"))
                return;
            context.write(new Text(child), new Text("old#" + parent));
            context.write(new Text(parent), new Text("young#" + child));
        }
    }

    public static class MyReduce extends Reducer<Text, Text, Text, Text> {
        private static boolean head = true ;
        public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException
        {
            if(head)
            {
                context.write(new Text("grandchild"), new Text("grandparent"));
                head = false;
            }
            ArrayList<String> grandchild = new ArrayList<>();
            ArrayList<String> grandparent = new ArrayList<>();
            String[] temp;
            for(Text val:values)
            {
                temp = val.toString().split("#");
                if(temp[0].equals("young"))
                    grandchild.add(temp[1]);
                else
                    grandparent.add(temp[1]);
            }

            for(String gc:grandchild)
                for(String gp:grandparent)
                    context.write(new Text(gc), new Text(gp));
        }
    }

}

输入:
MapReduce初级编程实践

运行:

bin/hadoop jar a_dir/MyMapReduce-1.0-SNAPSHOT.jar InfoFind /root/input/* /root/out6

输出:
MapReduce初级编程实践


参考资料

https://blog.csdn.net/u013384984/article/details/80229459 (一个重点内容)
https://blog.csdn.net/qq_43310845/article/details/123298811
https://blog.csdn.net/zhangwenbingbingyu/article/details/52210348
https://www.cnblogs.com/ginkgo-/p/13273671.html文章来源地址https://www.toymoban.com/news/detail-425398.html

到了这里,关于MapReduce初级编程实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 实验5 MapReduce初级编程实践(3)——对给定的表格进行信息挖掘

    通过实验掌握基本的MapReduce编程方法; 掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。 操作系统:Linux(建议Ubuntu16.04或Ubuntu18.04) Hadoop版本:3.1.3 下面给出一个child-parent的表格,要求挖掘其中的父子辈关系,给出祖孙辈关系的表格。

    2024年02月10日
    浏览(37)
  • 实验5 MapReduce初级编程实践(2)——编写程序实现对输入文件的排序

    通过实验掌握基本的MapReduce编程方法; 掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。 操作系统:Linux(建议Ubuntu16.04或Ubuntu18.04) Hadoop版本:3.1.3 现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数

    2024年02月09日
    浏览(35)
  • MapReduce初级编程实践

    ubuntu18.04虚拟机和一个win10物理主机 编程环境 IDEA 虚拟机ip:192.168.1.108 JDK:1.8 使用Java编程一个WordCount程序,并将该程序打包成Jar包在虚拟机内执行 首先使用IDEA创建一个Maven项目 在pom.xml文件内引入依赖和打包为Jar包的插件: 编写对应的程序: MyProgramDriver类用于执行程序入口

    2023年04月26日
    浏览(32)
  • 【大数据实验五】 MapReduce初级编程实践

    1实验目的 1.通过实验掌握基本的MapReduce编程方法; 2.掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。 2实验平台 已经配置完成的Hadoop伪分布式环境。 (1)操作系统:Linux(Ubuntu18.04) (2)Hadoop版本:3.1.3 3实验内容和要求 1.编程实现文件

    2024年02月03日
    浏览(137)
  • ubuntu18.04环境搭建

    sudo apt install make sudo apt install make-guile 运行Ubuntu的主机 打开终端,输入【sudo apt update】命令。 输入密码,确认授权。 输入【sudo apt install git】命令。 输入【Y】,确认命令执行。 输入【git --version】命令,查看安装版本。 Git当前版本为【2.30.2】,就此安装完成。 打开Ubuntu终端

    2023年04月22日
    浏览(97)
  • 实验SparkSQL编程初级实践

    实践环境: Oracle VM VirtualBox 6.1.12 Ubuntu 16.04 Hadoop3.1.3 JDK1.8.0_162 spark2.4.0 python3.5 Windows11系统下pycharm2019.1专业版 实验目的: 通过实验掌握Spark SQL的基本编程方法; 熟悉RDD到DataFrame的转化方法; 熟悉利用Spark SQL管理来自不同数据源的数据。 实验内容,步骤与实验结果: Spark S

    2024年02月04日
    浏览(53)
  • Ubuntu 18.04开发环境搭建

            工作不易,为了避免未来需要重装系统的进行折腾,个人进行了Ubuntu环境配置的整合,方便自己未来能顺畅的配置好开发环境,同时分享给大家。本文多出有转载其他文,并相应的标注了转载内容,如有侵权请联系博主删除。 vmware下载: 链接:https://pan.baidu.com

    2024年02月02日
    浏览(48)
  • ubuntu18.04安装部署环境

    更新apt源 下载mysql-server 查看mysql的状态 进入mysql终端 设置root密码 登录mysql 回到不用密码的方式登录 添加账户 root账号远程访问 下载redis-server 检查redis服务状态 修改配置文件开启远程连接 重启服务 下载nginx 配置文件夹 启动nginx服务

    2024年02月01日
    浏览(50)
  • 实验4 RDD编程初级实践

    (1)熟悉Spark的RDD基本操作及键值对操作; (2)熟悉使用RDD编程解决实际具体问题的方法。 操作系统:Ubuntu16.04 Spark版本:2.1.0 实验内容与完成情况: 1.spark-shell 交互式编程 (1)该系总共有多少学生; (2)该系共开设

    2023年04月13日
    浏览(67)
  • 实验8 Flink初级编程实践

    由于CSDN上传md文件总是会使图片失效 完整的实验文档地址如下: https://download.csdn.net/download/qq_36428822/85814518 实验环境:本机:Windows 10 专业版 Intel® Core™ i7-4790 CPU @ 3.60GHz 8.00 GB RAM 64 位操作系统, 基于 x64 的处理器 Oracle VM VirtualBox 虚拟机:Linux Ubuntu 64-bit RAM 2048MB 处理器数量

    2024年02月09日
    浏览(26)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包