Hadoop3 - MapReduce COVID-19 案例实践

这篇具有很好参考价值的文章主要介绍了Hadoop3 - MapReduce COVID-19 案例实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、COVID-19 案例

上篇文章对 MapReduce 进行了介绍,并编写了 WordCount 经典案例的实现,本篇为继续加深 MapReduce 的用法,实践 COVID-19 新冠肺炎案例,下面是上篇文章的地址:

https://blog.csdn.net/qq_43692950/article/details/127195121

COVID-19,简称“新冠肺炎”,世界卫生组织命名为“2019冠状病毒病” [1-2] ,是指2019新型冠状病毒感染导致的肺炎。现有美国 2021-01-28 号,各个县county的新冠疫情累计案例信息,包括确诊病例和死亡病例,数据格式如下所示:

date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)
2021-01-28,Pike,Alabama,01109,2704,35
2021-01-28,Randolph,Alabama,01111,1505,37
2021-01-28,Russell,Alabama,01113,3675,16
2021-01-28,Shelby,Alabama,01117,19878,141
2021-01-28,St. Clair,Alabama,01115,8047,147
2021-01-28,Sumter,Alabama,01119,925,28
2021-01-28,Talladega,Alabama,01121,6711,114
2021-01-28,Tallapoosa,Alabama,01123,3258,112
2021-01-28,Tuscaloosa,Alabama,01125,22083,283
2021-01-28,Walker,Alabama,01127,6105,185
2021-01-28,Washington,Alabama,01129,1454,27

数据集下载

https://download.csdn.net/download/qq_43692950/86805389

二、计算各个州的累积cases、deaths

创建 VO 类存储 cases、deaths 个数:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountVO implements Writable {

    private Long cases;//确诊病例数
    private Long deaths;//死亡病例数

    public void set(long cases, long deaths) {
        this.cases = cases;
        this.deaths = deaths;
    }

    /**
     *  序列化方法
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(cases);
        out.writeLong(deaths);
    }

    /**
     * 反序列化方法 注意顺序
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.cases = in.readLong();
        this.deaths =in.readLong();
    }

    @Override
    public String toString() {
        return  cases +"\t"+ deaths;
    }

}

创建 Mapper 类,截取出cases、deaths,以为 key ,CountVO 为 Value :

public class SumMapper extends Mapper<LongWritable, Text, Text, CountVO> {

    Text outKey = new Text();
    CountVO outValue = new CountVO();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split(",");
        //州
        outKey.set(fields[2]);
        //Covid数据 确诊病例 死亡病例
        outValue.set(Long.parseLong(fields[fields.length-2]),Long.parseLong(fields[fields.length-1]));
        context.write(outKey,outValue);
    }
}

创建 Reducer ,对 cases、deaths 累加:

public class SumReducer extends Reducer<Text, CountVO,Text, CountVO> {

    CountVO outValue = new CountVO();

    @Override
    protected void reduce(Text key, Iterable<CountVO> values, Context context) throws IOException, InterruptedException {
        long totalCases = 0;
        long totalDeaths =0;
        //累加统计
        for (CountVO value : values) {
            totalCases += value.getCases();
            totalDeaths +=value.getDeaths();
        }
        outValue.set(totalCases,totalDeaths);
        context.write(key,outValue);
    }
}

创建驱动类,加载上面的 Mapper 和 Reducer :

public class SumDriver extends Configured implements Tool {
    public static void main(String[] args) throws Exception{
        //配置文件对象
        Configuration conf = new Configuration();
        int status = ToolRunner.run(conf, new SumDriver(), args);
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        // 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来,
        // 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);

        // 输出目录必须为空,如果不为空则会报错提示
        FileSystem fs = FileSystem.get(getConf());
        if(fs.exists(output)){
            fs.delete(output,true);
        }
        // 创建作业实例
        Job job = Job.getInstance(getConf(), SumDriver.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(SumDriver.class);

        // 设置作业mapper reducer类
        job.setMapperClass(SumMapper.class);
        job.setReducerClass(SumReducer.class);

        // 设置作业mapper阶段输出key value数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CountVO.class);
        //设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CountVO.class);

        // 配置作业的输入数据路径
        FileInputFormat.addInputPath(job, input);
        // 配置作业的输出数据路径
        FileOutputFormat.setOutputPath(job, output);

        return job.waitForCompletion(true)? 0:1;
    }
}

数据集目录和输出目录通过参数传递进来,这里我将数据集放在了 D:/test/input 下:

Hadoop3 - MapReduce COVID-19 案例实践

如果是打包后放在 hadoop 集群运行,则:

hadoop jar <jar path> <driver class path> <args>
# 或者
yarn jar <jar path> <driver class path> <args>

Hadoop3 - MapReduce COVID-19 案例实践
运行成功后,到输出目录查看结果:

Hadoop3 - MapReduce COVID-19 案例实践
已成功统计出相关结果。

三、对上面计算的结果根据deaths进行倒叙排列

上麦已经计算出了每个州的cases、deaths,如果还需要根据deaths进行倒叙排列的话,我们可以针对上面 job 输出的结果在进行处理,利用 MapReducekey的排序行为,将上个 jobvalue 作为本次 jobkey

CountVO 进行修改,通过实现 Comparable 实现排序的效果,不过在上面我们已经实现了 Writable接口,在上篇文章中就讲到 Hadoop 为我们提供了 WritableComparable 已经实现好了 Writable, Comparable ,下面将 CountVO 中的 Writable 换成 WritableComparable

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountVO implements WritableComparable<CountVO> {

    private Long cases;//确诊病例数
    private Long deaths;//死亡病例数

    public void set(long cases, long deaths) {
        this.cases = cases;
        this.deaths = deaths;
    }

    /**
     *  序列化方法
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(cases);
        out.writeLong(deaths);
    }

    /**
     * 反序列化方法 注意顺序
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.cases = in.readLong();
        this.deaths =in.readLong();
    }

    @Override
    public String toString() {
        return  cases +"\t"+ deaths;
    }

    @Override
    public int compareTo(CountVO o) {
        return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
    }
}

compareTo 方法用于将当前对象与方法的参数进行比较。如果指定的数与参数相等返回0。如果指定的数小于参数返回 -1。如果指定的数大于参数返回 1。

创建 MapperkeyCountVO

public class SortSumMapper extends Mapper<LongWritable, Text, CountVO, Text> {

    CountVO outKey = new CountVO();
    Text outValue = new Text();

    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split("\t");
        outKey.set(Long.parseLong(fields[1]),Long.parseLong(fields[2]));
        outValue.set(fields[0]);
        context.write(outKey,outValue);
    }
}

编写 Reducer, 无需做任何操作直接 write 即可

public class SortSumReducer extends Reducer<CountVO, Text, Text,CountVO> {
    @Override
    protected void reduce(CountVO key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        Text outKey = values.iterator().next();
        context.write(outKey,key);
    }
}

编写驱动类:

public class SortSumDriver extends Configured implements Tool {
    public static void main(String[] args) throws Exception{
        //配置文件对象
        Configuration conf = new Configuration();
        int status = ToolRunner.run(conf, new SortSumDriver(), args);
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        // 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来,
        // 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);

        // 输出目录必须为空,如果不为空则会报错提示
        FileSystem fs = FileSystem.get(getConf());
        if(fs.exists(output)){
            fs.delete(output,true);
        }
        // 创建作业实例
        Job job = Job.getInstance(getConf(), SortSumDriver.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(SortSumDriver.class);

        // 设置作业mapper reducer类
        job.setMapperClass(SortSumMapper.class);
        job.setReducerClass(SortSumReducer.class);

        // 设置作业mapper阶段输出key value数据类型
        job.setMapOutputKeyClass(CountVO.class);
        job.setMapOutputValueClass(Text.class);
        //设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CountVO.class);

        // 配置作业的输入数据路径
        FileInputFormat.addInputPath(job, input);
        // 配置作业的输出数据路径
        FileOutputFormat.setOutputPath(job, output);

        return job.waitForCompletion(true)? 0:1;
    }
}

将上个 job 的结果放在 D:/test/input1 下,执行该驱动类:
Hadoop3 - MapReduce COVID-19 案例实践

Hadoop3 - MapReduce COVID-19 案例实践
执行成功后,到输出目录查看结果:

Hadoop3 - MapReduce COVID-19 案例实践
已经实现根据 死亡病例进行倒叙排列

四、对每个州的 deaths 筛选出Top3的县

修改 CountVO

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountVO implements WritableComparable<CountVO> {

    private String county;//县
    private Long cases;//确诊病例数
    private Long deaths;//死亡病例数

    public CountVO(CountVO vo){
        this.county = vo.getCounty();
        this.cases = vo.getCases();
        this.deaths = vo.getDeaths();
    }

    public void set(long cases, long deaths, String county) {
        this.cases = cases;
        this.deaths = deaths;
        this.county = county;
    }

    /**
     * 序列化方法
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(cases);
        out.writeLong(deaths);
        out.writeUTF(county);
    }

    /**
     * 反序列化方法 注意顺序
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.cases = in.readLong();
        this.deaths = in.readLong();
        this.county = in.readUTF();
    }

    @Override
    public String toString() {
        return county + "\t" + cases + "\t" + deaths;
    }

    @Override
    public int compareTo(CountVO o) {
        return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
    }
}

修改 SumMapper 类:

public class SumMapper extends Mapper<LongWritable, Text, Text, CountVO> {

    Text outKey = new Text();
    CountVO outValue = new CountVO();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split(",");
        //州
        outKey.set(fields[2]);
        //Covid数据 确诊病例 死亡病例 县
        outValue.set(Long.parseLong(fields[fields.length - 2]), Long.parseLong(fields[fields.length - 1]), fields[1]);
        context.write(outKey, outValue);
    }
}

修改 SumReducer 类:

public class SumReducer extends Reducer<Text, CountVO, Text, CountVO> {

    CountVO outValue = new CountVO();

    @Override
    protected void reduce(Text key, Iterable<CountVO> values, Context context) throws IOException, InterruptedException {
        List<CountVO> vList = new ArrayList<>();
        values.forEach(v -> vList.add(new CountVO(v)));
        vList.sort(CountVO::compareTo);
        vList.stream().filter(Objects::nonNull).limit(3).forEach(c -> {
            outValue.set(c.getCases(), c.getDeaths(), c.getCounty());
            try {
                context.write(key, outValue);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

驱动类无需修改,下面执行 Job ,到输出目录查看结果:

Hadoop3 - MapReduce COVID-19 案例实践
已经计算出了每个州的死亡病例 Top3

五、将二、三两个任务合并在一起执行

上面第三点依赖于第二点的结果,但是上面是分成了两个驱动类执行,在 MapReduce 中提供了工作流,可以通过一个提交来完成原来需要提交2次的任务。

修改驱动类:

public class SumDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        ControlledJob ctrljob1 = getJob1(conf);
        ControlledJob ctrljob2 = getJob2(conf);
        //设置依赖job的依赖关系
        ctrljob2.addDependingJob(ctrljob1);
        // 主控制容器,控制上面的总的两个子作业
        JobControl jobCtrl = new JobControl("mainCtrl");
        // 添加到总的JobControl里,进行控制
        jobCtrl.addJob(ctrljob1);
        jobCtrl.addJob(ctrljob2);
        // 在子线程启动
        Thread t = new Thread(jobCtrl);
        t.start();
        while(true) {
            if (jobCtrl.allFinished()) {// 如果作业成功完成,就打印成功作业的信息
                System.out.println(jobCtrl.getSuccessfulJobList());
                jobCtrl.stop();
                break;
            }
        }

    }

    private static ControlledJob getJob1(Configuration conf) throws IOException {
        Job job = Job.getInstance(conf, SumDriver.class.getSimpleName());
        job.setJarByClass(SumDriver.class);
        // 设置作业mapper reducer类
        job.setMapperClass(SumMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CountVO.class);

        job.setReducerClass(SumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CountVO.class);

        Path input = new Path("D:/test/input");
        Path output = new Path("D:/test/output");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(output)) {
            fs.delete(output, true);
        }
        FileInputFormat.addInputPath(job, input);
        FileOutputFormat.setOutputPath(job, output);
        //转化成受控作业
        ControlledJob ctrljob = new ControlledJob(conf);
        ctrljob.setJob(job);
        return ctrljob;
    }

    private static ControlledJob getJob2(Configuration conf) throws IOException {
        Job job = Job.getInstance(conf, SumDriver.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(SumDriver.class);

        // 设置作业mapper reducer类
        job.setMapperClass(SortSumMapper.class);
        job.setMapOutputKeyClass(CountVO.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(SortSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CountVO.class);

        Path input = new Path("D:/test/output");
        Path output = new Path("D:/test/output1");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(output)) {
            fs.delete(output, true);
        }
        FileInputFormat.addInputPath(job, input);
        FileOutputFormat.setOutputPath(job, output);
        //转化成受控作业
        ControlledJob ctrljob = new ControlledJob(conf);
        ctrljob.setJob(job);
        return ctrljob;
    }
}

执行后可以看到两个结果目录:
Hadoop3 - MapReduce COVID-19 案例实践
Hadoop3 - MapReduce COVID-19 案例实践文章来源地址https://www.toymoban.com/news/detail-478447.html

到了这里,关于Hadoop3 - MapReduce COVID-19 案例实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Hadoop3教程(十一):MapReduce的详细工作流程

    Hadoop3教程(十一):MapReduce的详细工作流程

    本小节将展示一下整个MapReduce的全工作流程。 首先是Map阶段: 首先,我们有一个待处理文本文件的集合; 客户端开始切片规划; 客户端提交各种信息(如切片规划文件、代码文件及其他配置数据)到yarn; yarn接收信息,计算所需的MapTask数量(按照切片数); MapTask启动,读

    2024年02月07日
    浏览(7)
  • Hadoop3教程(三十四):(生产调优篇)MapReduce生产经验汇总

    Hadoop3教程(三十四):(生产调优篇)MapReduce生产经验汇总

    MR程序执行效率的瓶颈,或者说当你觉得你的MR程序跑的比较慢的时候,可以从以下两点来分析: 计算机性能 节点的CPU、内存、磁盘、网络等,这种属于硬件上的检查; IO操作上的检查 是否发生了数据倾斜?即单一reduce处理了绝大部分数据 Map运行时间过长,导致Reduce一直在

    2024年02月08日
    浏览(13)
  • Hadoop3.0大数据处理学习3(MapReduce原理分析、日志归集、序列化机制、Yarn资源调度器)

    Hadoop3.0大数据处理学习3(MapReduce原理分析、日志归集、序列化机制、Yarn资源调度器)

    前言:如果想知道一堆牌中有多少张红桃,直接的方式是一张张的检查,并数出有多少张红桃。 而MapReduce的方法是,给所有的节点分配这堆牌,让每个节点计算自己手中有几张是红桃,然后将这个数汇总,得到结果。 官方介绍:MapReduce是一种分布式计算模型,由Google提出,

    2024年02月08日
    浏览(13)
  • HDFS编程实践(Hadoop3.1.3)

    HDFS编程实践(Hadoop3.1.3)

    1.目录操作 在HDFS中为hadoop用户创建一个用户目录 显示HDFS中与当前用户hadoop对应的用户目录下的内容: 创建一个input目录: 可以使用rm命令删除一个目录 上面命令中,“-r”参数表示如果删除“input”目录及其子目录下的所有内容,如果要删除的一个目录包含了子目录,则必

    2023年04月13日
    浏览(7)
  • HDFS编程实践(Hadoop3.1.3)

    HDFS编程实践(Hadoop3.1.3)

    Hadoop 分布式文件系统(Hadoop Distributed File System,HDFS)是Hadoop核心组件之一,如果已经安装了 Hadoop,其中就已经包含了 HDFS 组件,不需要另外安装。 接下来介绍Linux操作系统中关于HDFS文件操作的常用Shell命令,利用Web界面查看和管理Hadoop文件系统,以及利用Hadoop提供的Java API进

    2024年02月07日
    浏览(13)
  • Hadoop3教程(二十五):Yarn的多队列调度器使用案例

    Hadoop3教程(二十五):Yarn的多队列调度器使用案例

    生产环境下怎么创建队列? 调度器默认只会开一个default队列,这个肯定是不满足生产要求的; 可以按照框架来划分队列。比如说hive/spark/flink的任务分别放在不同的队列里,不过这么做的效率不高,企业用的不是很多。 按照业务模块来划分队列。比如说登录注册的业务,单

    2024年02月02日
    浏览(7)
  • hadoop学习:mapreduce入门案例二:统计学生成绩

    hadoop学习:mapreduce入门案例二:统计学生成绩

    这里相较于 wordcount,新的知识点在于学生实体类的编写以及使用 数据信息: 1. Student 实体类 2.  mapper 阶段,StudentMapper 类 3. reduce 阶段,StudentReduce 类 4. 驱动类,studentDriver 类

    2024年02月11日
    浏览(7)
  • 虚拟机+Hadoop下MapReduce的Wordcount案例

    虚拟机+Hadoop下MapReduce的Wordcount案例

    环境:ubuntu18.04 前提:Hadoop已经搭建好 抄作业记得改标题 输入内容(可以自定义,抄作业别写一样的) yarn-site.xml 内容如下,注意第一个property要改: ·输入hadoop classpath(任意路径下均可),将返回的内容复制在第一个property的value中 *修改配置文件之后要重启hadoop(关了又

    2024年02月07日
    浏览(5)
  • 【Hadoop_06】MapReduce的概述与wc案例

    【Hadoop_06】MapReduce的概述与wc案例

    MapReduce是一个 分布式运算程序 的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。 MapReduce核心功能是 将用户编写的业务逻辑代码 和 自带默认组件 整合成一个完整的 分布式运算程序 ,并发运行在一个Hadoop集群上。 1)MapReduce易于编程 它简单的实现一些接口

    2024年02月04日
    浏览(23)
  • 第三节 Hadoop学习案例——MapReduce课程设计 好友推荐功能

    第三节 Hadoop学习案例——MapReduce课程设计 好友推荐功能

    提示:文章内容主要以案例为主 目录 前言 项目说明 一,程序需求 1.需求 2.数据 二,编码操作 1.项目建包目录 2.FriendsRecommend.java  3.FriendsRecommendMapper.java 4.FriendsRecommendReduce.java 三,Xshell运行的步骤 1.创建目录 2.上传程序  3.分布式文件系统上传测试数据  4.执行程序 5. 查看结

    2024年02月07日
    浏览(11)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包