MapReduce序列化之统计各部门员工薪资总和

这篇具有很好参考价值的文章主要介绍了MapReduce序列化之统计各部门员工薪资总和。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

MapReduce序列化之统计各部门员工薪资总和

1.1 实验目的

通过MapReduce的序列化方法统计各个部门员工薪水总和。

1.2 实验环境

  1. 搭建IDEA+Maven开发环境
  2. VMwarePro16+Centos7+xshell7+Xftp7
  3. Hadoop2.7.7

1.3 需求描述

员工的数据如下:
MapReduce序列化之统计各部门员工薪资总和
各字段描述如下:

MapReduce序列化之统计各部门员工薪资总和

7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10

将此文件保存在hdfs,作为输入文件。

hdfs dfs -put emp.csv /input_emp

(改为自己的文件目录)

1.4 实验步骤

1.4.1 采用IDEA创建一个Maven工程

在本地运行时需要添加以下依赖jar包;若不在本地运行,这里可以不添加依赖,Linux系统布署的hadoop已经有这些jar包了。(注:我的本地windows系统没有安装hadoop,所以并没有在本地运行)

修改pom.xml,增加节点,代码如下:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.serial</groupId>
  <artifactId>serialsalarytol</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>serialsalarytol</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.7.7</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>2.7.7</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>2.7.7</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.7.7</version>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

1.4.2 自己动手开发Java程序

(1)编写Emplyee.java类,代码如下:

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Employee implements Writable {

    private int empno;
    private String ename;
    private String job;
    private int mgr;
    private String hiredate;
    private int sal;
    private int comm;
    private int deptno;

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        //案例三:序列化
        dataOutput.writeInt(this.empno);
        dataOutput.writeUTF(this.ename);
        dataOutput.writeUTF(this.job);
        dataOutput.writeInt(this.mgr);
        dataOutput.writeUTF(this.hiredate);
        dataOutput.writeInt(this.sal);
        dataOutput.writeInt(this.comm);
        dataOutput.writeInt(this.deptno);

    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        //反序列化
        this.empno = dataInput.readInt();
        this.ename = dataInput.readUTF();
        this.job = dataInput.readUTF();
        this.mgr = dataInput.readInt();
        this.hiredate = dataInput.readUTF();
        this.sal = dataInput.readInt();
        this.comm = dataInput.readInt();
        this.deptno = dataInput.readInt();

    }

    public int getEmpno() {
        return empno;
    }

    public void setEmpno(int empno) {
        this.empno = empno;
    }

    public String getEname() {
        return ename;
    }

    public void setEname(String ename) {
        this.ename = ename;
    }

    public String getJob() {
        return job;
    }

    public void setJob(String job) {
        this.job = job;
    }

    public int getMgr() {
        return mgr;
    }

    public void setMgr(int mgr) {
        this.mgr = mgr;
    }

    public String getHiredate() {
        return hiredate;
    }

    public void setHiredate(String hiredate) {
        this.hiredate = hiredate;
    }

    public int getSal() {
        return sal;
    }

    public void setSal(int sal) {
        this.sal = sal;
    }

    public int getComm() {
        return comm;
    }

    public void setComm(int comm) {
        this.comm = comm;
    }

    public int getDeptno() {
        return deptno;
    }

    public void setDeptno(int deptno) {
        this.deptno = deptno;
    }


}

(2)编写SalaryTotalMapper.java类,并继承父类Mapper,代码如下:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SalaryTotalMapper  extends Mapper<LongWritable, Text, IntWritable,Employee> {
    @Override
    protected  void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
        //模式匹配
        String data = value1.toString();
        //分词
        String [] words = data.split(",");
        //创建员工对象
        Employee e = new Employee();
        //设置员工属性
        e.setEmpno(Integer.parseInt(words[0]));//员工号
        e.setEname(words[1]);//姓名
        e.setJob(words[2]);//职位
        try{
            e.setMgr(Integer.parseInt(words[3]));//老板号
        }catch (Exception ex){
            e.setMgr(-1);
        }
        e.setHiredate(words[4]);//入职日期
        e.setSal(Integer.parseInt(words[5]));//月薪
        try{
            e.setComm(Integer.parseInt(words[6]));//奖金
        }catch (Exception ex){
            e.setComm(0);
        }
        e.setDeptno(Integer.parseInt(words[7]));//部门号
        //输出:k2部门号 v2员工对象
        context.write(new IntWritable(e.getDeptno()),e);
    }

}

(3)编写SalaryTotalReducer.java类,并继承父类Reducer,代码如下:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SalaryTotalReducer extends Reducer<IntWritable,Employee,IntWritable,IntWritable> {
    //对v3中的每个员工进行工资求和
    @Override
    protected void reduce(IntWritable k3, Iterable<Employee> v3, Context context) throws IOException, InterruptedException {
        int total = 0;
        for(Employee e:v3){
            total = total + e.getSal();

        }
        context.write(k3,new IntWritable(total));
    }
}

(4)写主类,让程序能够运行起来,代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;

import java.io.IOException;

public class SalaryTotalMain {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(SalaryTotalMain.class);
        //指定job的mapper和输出类型 k2 v2
        job.setMapperClass(SalaryTotalMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Employee.class);
        //指定job的reducer和输出类型 k4 v4
        job.setReducerClass(SalaryTotalReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        //指定job的输入输出的路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        job.waitForCompletion(true);
    }
}

以上工作完成之后,打jar包,使用maven时,将以下工作目录改成自己的本地目录。

MapReduce序列化之统计各部门员工薪资总和

1.4.3 使用maven生命周期package打jar包

MapReduce序列化之统计各部门员工薪资总和

1.4.4 通过xftp将jar包上传到linux系统

MapReduce序列化之统计各部门员工薪资总和

1.4.5 在hadoop环境运行jar包

   通过命令:**“hadoop jar jar包名 包名.主类名 输入数据文件地址(前面将emp.csv传到HDFS的位置) 输出数据文件地址(自己不用创建,会自动创建,不能重复)”**在linux系统运行。
hadoop jar saltotal-1.0-SNAPSHOT.jar 自己写的包名.SalaryTotalMain  /input1_emp /output_emp2

MapReduce序列化之统计各部门员工薪资总和

出现map 100% reduce 100%即运行成功。

1.4.6 查看输出结果

MapReduce序列化之统计各部门员工薪资总和
MapReduce序列化之统计各部门员工薪资总和

或者直接使用shell命令查看。

MapReduce序列化之统计各部门员工薪资总和

1.5 实验中遇到的问题总结

实验过程中遇到了很多问题,不过都解决了,下面是问题小结:

1.5.1 问题描述

(1)emp.csv 数据源导入错误,第一行少了部分数据,导致运行时数据类型转换错误。
(2)Browse Directory 下载输出结果报错。
MapReduce序列化之统计各部门员工薪资总和

(3)Reducer 处理过程写错了,统计成各部门Mgr(直接领导的员工ID),造成统计数据结果错误。
MapReduce序列化之统计各部门员工薪资总和
MapReduce序列化之统计各部门员工薪资总和

1.5.2 问题分析

当发现输出结果出现偏差时,需要认真检查自己写的程序代码和源数据。

1.5.3 解决方法

(1)Browse Directory 下载输出结果报错,采取了CSDN给出的解决方法:在本地配置主机映射。

MapReduce序列化之统计各部门员工薪资总和

如果还有其它问题,欢迎大家评论交流~~~文章来源地址https://www.toymoban.com/news/detail-401952.html

到了这里,关于MapReduce序列化之统计各部门员工薪资总和的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 序列化,反序列化之实例

    介绍文章 __construct() 当一个对象创建时自动调用 __destruct() 当对象被销毁时自动调用 (php绝大多数情况下会自动调用销毁对象) __sleep() 使**用serialize()函数时触发 __wakeup 使用unserialse()**函数时会自动调用 __toString 当一个对象被当作一个字符串被调用 __call() 在对象上下文中调用不

    2024年02月14日
    浏览(45)
  • Qt 对象序列化/反序列化

    阅读本文大概需要 3 分钟 日常开发过程中,避免不了对象序列化和反序列化,如果你使用 Qt 进行开发,那么有一种方法实现起来非常简单和容易。 我们知道 Qt 的元对象系统非常强大,基于此属性我们可以实现对象的序列化和反序列化操作。 比如有一个学生类,包含以下几

    2024年02月13日
    浏览(42)
  • 【网络】协议定制+序列化/反序列化

    如果光看定义很难理解序列化的意义,那么我们可以从另一个角度来推导出什么是序列化, 那么究竟序列化的目的是什么? 其实序列化最终的目的是为了对象可以 跨平台存储,和进行网络传输 。而我们进行跨平台存储和网络传输的方式就是IO,而我们的IO支持的数据格式就是

    2024年02月08日
    浏览(43)
  • 协议,序列化,反序列化,Json

    协议究竟是什么呢?首先得知道主机之间的网络通信交互的是什么数据,像平时使用聊天APP聊天可以清楚,用户看到的不仅仅是聊天的文字,还能够看到用户的头像昵称等其他属性。也就可以证明网络通信不仅仅是交互字符串那么简单。事实上网络通信还可能会通过一个结构

    2024年02月13日
    浏览(40)
  • Spring Boot 序列化、反序列化

    在软件开发中,序列化和反序列化是一种将对象转换为字节流以便存储或传输的机制。序列化将对象转换为字节流,而反序列化则将字节流转换为对象。序列化和反序列化在许多应用场景中都起着重要的作用,比如在网络通信中传输对象、将对象存储到数据库中、实现分布式

    2024年02月15日
    浏览(43)
  • Unity-序列化和反序列化

    序列化是指把对象转换为字节序列的过程,而反序列化是指把字节序列恢复为对象的过程。序列化最主要的用途就是传递对象和保存对象。 在Unity中保存和加载、prefab、scene、Inspector窗口、实例化预制体等都使用了序列化与反序列化。 1 自定义的具有Serializable特性的非抽象、

    2024年01月24日
    浏览(57)
  • 【Linux】序列化和反序列化

    在网络编程中,直接使用 结构体 进行数据传输会出错,因为 本质上socket无法传输结构体 ,我们只有将结构体装换为字节数组,或者是字符串格式来传输,然后对端主机收到了数据,再将其转化为结构体,这就是序列化和反序列化的过程! 序列化 (Serialization)是将对象的状态

    2024年02月10日
    浏览(43)
  • Java序列化和反序列化

    目录 一、序列化和反序列化 二、Java序列化演示 三、反序列化漏洞 1、含义 ​序列化就是内存中的对象写入到IO流中,保存的格式可以是二进制或者文本内容。反序列化就是IO流还原成对象。 2、用途 (1)传输网络对象 (2)保存Session 1、序列化 java.io.ObjectOutputStream代表对象

    2023年04月25日
    浏览(39)
  • 什么是序列化和反序列化?

    JSON(JavaScript Object Notation)和XML(eXtensible Markup Language)是两种常用的数据交换格式,用于在不同系统之间传输和存储数据。 JSON是一种轻量级的数据交换格式,它使用易于理解的键值对的形式表示数据。JSON数据结构简单明了,易于读写和解析,是基于JavaScript的一种常用数据

    2024年02月09日
    浏览(58)
  • 协议定制 + Json序列化反序列化

    1.1 结构化数据 协议是一种 “约定”,socket api的接口, 在读写数据时,都是按 “字符串” 的方式来发送接收的。如果我们要传输一些\\\"结构化的数据\\\" 怎么办呢? 结构化数据: 比如我们在QQ聊天时,并不是单纯地只发送了消息本身,是把自己的头像、昵称、发送时间、消息本身

    2024年02月09日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包