第1关:MapReduce综合应用案例 — 电信数据清洗

这篇具有很好参考价值的文章主要介绍了第1关:MapReduce综合应用案例 — 电信数据清洗。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。

数据说明如下: a.txt

数据切分方式:,

数据所在位置:/user/test/input/a.txt

15733218050,15778423030,1542457633,1542457678,450000,530000

15733218050 15778423030 1542457633 1542457678 450000 530000
呼叫者手机号 接受者手机号 开始时间戳(s) 接受时间戳(s) 呼叫者地址省份编码 接受者地址省份编码

Mysql数据库:

用户名:root 密码:123123

数据库名:mydb

用户表:userphone

列名 类型 非空 是否自增 介绍
id int(11) 用户ID
phone varchar(255) 手机号
trueName varchar(255) 真实姓名

地址省份表:allregion

列名 类型 非空 是否自增 介绍
id int(11) 用户ID
CodeNum varchar(255) 编号
Address varchar(255) 地址

清洗规则:

  • 处理数据中的时间戳(秒级)将其转化为2017-06-21 07:01:58,年-月-日 时:分:秒 这种格式;

  • 处理数据中的省份编码,结合mysql的表数据对应,将其转换成省份名称;

  • 处理用户手机号,与mysql的表数据对应,关联用户的真实姓名;

  • 处理数据中的开始时间与结束时间并计算通信时长(以秒为单位);

  • 设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为: /user/test/input/a.txt (HDFS); 清洗后的数据存放于:/user/test/output (HDFS)

数据清洗后如下:

邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市

邓二 张倩 13666666666 15151889601 2018-03-29 10:58:12 2018-03-29 10:58:42 30 黑龙江省 上海市
用户名A 用户名B 用户A的手机号 用户B的手机号 开始时间 结束时间

step/com/LogMR.java

package com;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogMR {
    /********** begin **********/
    static class MyMapper extends Mapper<LongWritable, Text, PhoneLog, NullWritable> {
        Map<String, String> userMap = new HashMap<>();
        Map<String, String> addressMap = new HashMap<>();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        PhoneLog pl = new PhoneLog();
        Text text = new Text();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            Connection connection = DBHelper.getConnection();
            try {
                Statement statement = connection.createStatement();
                String sql = "select * from userphone";
                ResultSet resultSet = statement.executeQuery(sql);
                while (resultSet.next()) {
                    String phone = resultSet.getString(2);
                    String trueName = resultSet.getString(3);
                    userMap.put(phone, trueName);
                }
                String sql2 = "select * from allregion";
                ResultSet resultSetA = statement.executeQuery(sql2);
                while (resultSetA.next()) {
                    String phone = resultSetA.getString(2);
                    String trueName = resultSetA.getString(3);
                    addressMap.put(phone, trueName);
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String str = value.toString();
            String[] split = str.split(",");
            if (split.length == 6) {
                String trueName1 = userMap.get(split[0]);
                String trueName2 = userMap.get(split[1]);
                String address1 = addressMap.get(split[4]);
                String address2 = addressMap.get(split[5]);
                long startTimestamp = Long.parseLong(split[2]);
                String startTime = sdf.format(startTimestamp * 1000);
                long endTimestamp = Long.parseLong(split[3]);
                String endTime = sdf.format(endTimestamp * 1000);
                long timeLen = endTimestamp - startTimestamp;
                pl.SetPhoneLog(trueName1, trueName2, split[0], split[1], startTime, endTime, timeLen, address1,
                        address2);
                context.write(pl, NullWritable.get());
            }
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(LogMR.class);
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(PhoneLog.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(0);
        Path inPath = new Path("/user/test/input/a.txt");
        Path out = new Path("/user/test/output");
        FileInputFormat.setInputPaths(job, inPath);
        FileOutputFormat.setOutputPath(job, out);
        job.waitForCompletion(true);
    }
    /********** end **********/
}

step/com/DBHelper.java

package com;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class DBHelper {
    /********** begin **********/
    private static final String driver = "com.mysql.jdbc.Driver";
    private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";
    private static final String username = "root";// 数据库的用户名
    private static final String password = "123123";// 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。
    private static Connection conn = null; // 声明数据库连接对象
    static {
        try {
            Class.forName(driver);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    public static Connection getConnection() {
        if (conn == null) {
            try {
                conn = DriverManager.getConnection(url, username, password);
            } catch (SQLException e) {
                e.printStackTrace();
            } // 连接数据库
            return conn;
        }
        return conn;
    }
    /********** end **********/
}

step/com/phonelog.java

package com;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
public class PhoneLog implements WritableComparable<PhoneLog> {
    private String userA;
    private String userB;
    private String userA_Phone;
    private String userB_Phone;
    private String startTime;
    private String endTime;
    private Long timeLen;
    private String userA_Address;
    private String userB_Address;
    public PhoneLog() {
    }
    public void SetPhoneLog(String userA, String userB, String userA_Phone, String userB_Phone, String startTime,
            String endTime, Long timeLen, String userA_Address, String userB_Address) {
        this.userA = userA;
        this.userB = userB;
        this.userA_Phone = userA_Phone;
        this.userB_Phone = userB_Phone;
        this.startTime = startTime;
        this.endTime = endTime;
        this.timeLen = timeLen;
        this.userA_Address = userA_Address;
        this.userB_Address = userB_Address;
    }
    public String getUserA_Phone() {
        return userA_Phone;
    }
    public void setUserA_Phone(String userA_Phone) {
        this.userA_Phone = userA_Phone;
    }
    public String getUserB_Phone() {
        return userB_Phone;
    }
    public void setUserB_Phone(String userB_Phone) {
        this.userB_Phone = userB_Phone;
    }
    public String getUserA() {
        return userA;
    }
    public void setUserA(String userA) {
        this.userA = userA;
    }
    public String getUserB() {
        return userB;
    }
    public void setUserB(String userB) {
        this.userB = userB;
    }
    public String getStartTime() {
        return startTime;
    }
    public void setStartTime(String startTime) {
        this.startTime = startTime;
    }
    public String getEndTime() {
        return endTime;
    }
    public void setEndTime(String endTime) {
        this.endTime = endTime;
    }
    public Long getTimeLen() {
        return timeLen;
    }
    public void setTimeLen(Long timeLen) {
        this.timeLen = timeLen;
    }
    public String getUserA_Address() {
        return userA_Address;
    }
    public void setUserA_Address(String userA_Address) {
        this.userA_Address = userA_Address;
    }
    public String getUserB_Address() {
        return userB_Address;
    }
    public void setUserB_Address(String userB_Address) {
        this.userB_Address = userB_Address;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(userA);
        out.writeUTF(userB);
        out.writeUTF(userA_Phone);
        out.writeUTF(userB_Phone);
        out.writeUTF(startTime);
        out.writeUTF(endTime);
        out.writeLong(timeLen);
        out.writeUTF(userA_Address);
        out.writeUTF(userB_Address);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        userA = in.readUTF();
        userB = in.readUTF();
        userA_Phone = in.readUTF();
        userB_Phone = in.readUTF();
        startTime = in.readUTF();
        endTime = in.readUTF();
        timeLen = in.readLong();
        userA_Address = in.readUTF();
        userB_Address = in.readUTF();
    }
    @Override
    public String toString() {
        return userA + "," + userB + "," + userA_Phone + "," + userB_Phone + "," + startTime + "," + endTime + ","
                + timeLen + "," + userA_Address + "," + userB_Address;
    }
     @Override
     public int compareTo(PhoneLog pl) {
     if(this.hashCode() == pl.hashCode()) {
     return 0;
     }
     return -1;
     }
}

最后重启hadoop#start-all.sh  完成评测文章来源地址https://www.toymoban.com/news/detail-436289.html

到了这里,关于第1关:MapReduce综合应用案例 — 电信数据清洗的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Hadoop系统应用之MapReduce相关操作【IDEA版】---经典案例“倒排索引、数据去重、TopN”

      倒排索引是文档检索系统中最常用的数据结构,被广泛应用于全文搜索引擎。倒排索引主要用来存储某个单词(或词组)在一组文档中的存储位置的映射,提供了可以根据内容来查找文档的方式,而不是根据文档来确定内容,因此称为倒排索引(Inverted Index)。带有倒排索引

    2024年02月07日
    浏览(38)
  • Pandas实战100例 | 案例 3: 数据清洗 - 处理缺失值

    案例 3: 数据清洗 - 处理缺失值 知识点讲解 在现实世界的数据集中,经常会遇到缺失值。Pandas 提供了多种方法来处理这些缺失值,包括填充缺失值、删除含有缺失值的行或列。 示例代码 检测缺失值 填充缺失值

    2024年01月20日
    浏览(33)
  • 生态经济学领域里的R语言机器学(数据的收集与清洗、综合建模评价、数据的分析与可视化、数据的空间效应、因果推断等)

    近年来,人工智能领域已经取得突破性进展,对经济社会各个领域都产生了重大影响,结合了统计学、数据科学和计算机科学的机器学习是人工智能的主流方向之一,目前也在飞快的融入计量经济学研究。表面上机器学习通常使用大数据,而计量经济学则通常使用较小样本,

    2024年02月11日
    浏览(43)
  • 大数据MapReduce学习案例:数据去重

    数据去重主要是为了掌握利用并行化思想来对数据进行有意义的筛选,数据去重指去除重复数据的操作。在大数据开发中,统计大数据集上的多种数据指标,这些复杂的任务数据都会涉及数据去重。 文件file1.txt本身包含重复数据,并且与file2.txt同样出现重复数据,现要求使用

    2024年02月07日
    浏览(57)
  • Hive综合应用案例——用户学历查询

    任务描述 本关任务:查询出每一个用户从出生到现在的总天数 编程要求 在右侧编辑器补充hql语句,查询出每一个用户从出生到现在的总天数。 创建数据库:mydb 创建表:usertab 字段名 类型 注释 id int 用户id sex string 性别,f:女性,m:男性 time string 出生日期 education string 学历 oc

    2024年02月09日
    浏览(39)
  • 云计算安全综合应用案例分析

    阿里云致力于以 在线公共服务 的方式, 提供安全、可靠的计算和数据处理能力 让计算和人工智能成为普惠科技 此外,阿里云为全球客户部署 200 多个 飞天数据中心 ,通过底层统一的 飞天操作系统 ,为客户提供全球独有的混合云体验 根据已公开的资料, 阿里云的安全体系

    2024年02月03日
    浏览(75)
  • 【生态经济学】利用R语言进行经济学研究技术——从数据的收集与清洗、综合建模评价、数据的分析与可视化、因果推断等方面入手

    查看原文 如何快速掌握利用R语言进行经济学研究技术——从数据的收集与清洗、综合建模评价、数据的分析与可视化、因果推断等方面入手 近年来,人工智能领域已经取得突破性进展,对经济社会各个领域都产生了重大影响,结合了统计学、数据科学和计算机科学的机器学

    2024年02月12日
    浏览(39)
  • 模糊综合评价在实际问题中的应用(案例)

    目录 一、概述 二、一级模糊综合评价模型 三、多级模糊综合评价模型         模糊综合评价问题 是要 把论域中的对象对应评语集中一个指定的评语 或者 将方案作为评语集并选择一个最优的方案 。(两个角度)         在模糊综合评价中,引入了三个集合:      

    2024年02月06日
    浏览(67)
  • educoder中Hive综合应用案例 — 用户搜索日志分析

    第1关:2018年点击量最高的10个网站域名 第2关:同一种搜索词,哪个网站域名被用户访问最多

    2024年02月01日
    浏览(39)
  • Hadoop3.0大数据处理学习4(案例:数据清洗、数据指标统计、任务脚本封装、Sqoop导出Mysql)

    直播公司每日都会产生海量的直播数据,为了更好地服务主播与用户,提高直播质量与用户粘性,往往会对大量的数据进行分析与统计,从中挖掘商业价值,我们将通过一个实战案例,来使用Hadoop技术来实现对直播数据的统计与分析。下面是简化的日志文件,详细的我会更新

    2024年02月08日
    浏览(38)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包