27.hadoop系列之50G数据清洗入库秒查询实践

这篇具有很好参考价值的文章主要介绍了27.hadoop系列之50G数据清洗入库秒查询实践。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 项目背景

目前本地有50G的企业年报csv数据, 需要清洗出通信地址,并需要与原有的亿条数据合并以供业务查询最新的企业通信地址

27.hadoop系列之50G数据清洗入库秒查询实践

2. 技术选型

Hadoop + ClickHouse

3. Hadoop数据清洗

我们50G的数据无须上传至集群处理,上传目前带宽2M/S, 巨慢,我直接在本地hadoop处理

我们先看下数据格式,以@_@分割,最后一列是杂乱的数据

315@_@102878404@_@91430802MA4PPBWA9Y@_@3@_@2021-03-19 15:29:05@_@2021-03-19 15:29:04@_@-@_@2019@_@<tr> <!--180 285 145--> <td>统一社会信用代码/注册号</td> <td>91430802MA4PPBWA9Y</td> <td>企业名称</td> <td>张家界恒晟广告传媒有限公司</td></tr><tr> <td>企业联系电话</td> <td>15874401535</td> <td>邮政编码</td> <td>427000</td></tr><tr> <td>企业经营状态</td> <td>开业</td> <td>从业人数</td> <td>1人</td></tr><tr> <td>电子邮箱</td> <td>-</td> <td>是否有网站或网店</td> <td>否</td></tr><tr> <td>企业通信地址</td> <td>湖南省张家界市永定区大庸桥办事处大庸桥居委会月亮湾小区金月阁5601号</td> <td>企业是否有投资信息<br>或购买其他公司股权</td> <td>否</td></tr><tr> <td>资产总额</td> <td>企业选择不公示</td> <td>所有者权益合计</td> <td>企业选择不公示</td></tr><tr> <td>销售总额</td> <td>企业选择不公示</td> <td>利润总额</td> <td>企业选择不公示</td></tr><tr> <td>营业总收入中主营业务收入</td> <td>企业选择不公示</td> <td>净利润</td> <td>企业选择不公示</td></tr><tr> <td>纳税总额</td>
public class Company implements Tool {

    private Configuration conf;

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "company");
        job.setJarByClass(CompanyDriver.class);
        job.setMapperClass(CompanyMapper.class);
        job.setReducerClass(CompanyReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(1);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    @Override
    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    @Override
    public Configuration getConf() {
        return conf;
    }

    public static class CompanyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        private Text keyOut = new Text();
        private Text valueOut = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] words = line.split("@_@");
            keyOut.set(key.toString());
            String company_id = words[1];
            String unified_code = words[2];
            String year = words[7];
            String company = StringUtils.substringBetween(words[8], "<td>统一社会信用代码/注册号</td> <td>", "</td> <td>企业名称</td>")
                    .replaceAll("\"", "");
            String mailAddress = StringUtils.substringBetween(words[8], "<td>企业通信地址</td> <td>", "</td> <td>企业是否有投资信息")
                    .replaceAll("\"", "");
            if (!company.contains("td") && !mailAddress.contains("td")) {
                valueOut.set(key.toString() + '@' + company_id + '@' + unified_code + '@' + year + '@' + company + '@' + mailAddress);
                context.write(valueOut, NullWritable.get());
            }
        }
    }

    public static class CompanyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
            // 防止相同数据丢失
            for (NullWritable value : values) {
                context.write(key, NullWritable.get());
            }
        }
    }
}
public class CompanyDriver {
    private static Tool tool;

    public static void main(String[] args) throws Exception {
        // 1. 创建配置文件
        Configuration conf = new Configuration();

        // 2. 判断是否有 tool 接口
        switch (args[0]) {
            case "company":
                tool = new Company();
                break;
            default:
                throw new RuntimeException(" No such tool: " + args[0]);
        }
        // 3. 用 Tool 执行程序
        // Arrays.copyOfRange 将老数组的元素放到新数组里面
        int run = ToolRunner.run(conf, tool, Arrays.copyOfRange(args, 1, args.length));
        System.exit(run);
    }
}

参数传递运行与先前文章一致,25.hadoop系列之Yarn Tool接口实现动态传参 不在重复,10分钟左右处理完毕,处理后约1.8G

27.hadoop系列之50G数据清洗入库秒查询实践

27.hadoop系列之50G数据清洗入库秒查询实践

4. ClickHouse ReplaceMergeTree实践

现在我们将处理后数据导入ClickHouse

4.1 创建表company_report及导入处理后的part-r-00000文件
CREATE TABLE etl.company_report (
    id      String,
    company_id      String,
    unified_code      String,
    year  String,
    company    String,
    mail_address   String
) ENGINE MergeTree()
PARTITION BY substring(unified_code, 2, 2) PRIMARY KEY (id) ORDER BY (id);
clickhouse-client --format_csv_delimiter="@" --input_format_with_names_use_header=0 --query="INSERT INTO etl.company_report FORMAT CSV" --host=192.168.0.222 --password=shenjian < part-r-00000
4.2 关联插入dwd_company表

在左连接的子查询中,我们取当前企业最新的年报中的通信地址,如下图所示

27.hadoop系列之50G数据清洗入库秒查询实践

# 关联导入,可能DataGrip客户端超时,就在ClickHouse-Client命令行运行即可
INSERT INTO etl.dwd_company(district, ent_name, reg_addr, unified_code, authority, region_code, reg_addr1, province_code, city_code, province_name, city_name, region_name, mail_address)
SELECT district, ent_name, reg_addr, unified_code, authority, region_code, reg_addr1, province_code, city_code, province_name, city_name, region_name, cr.mail_address
FROM etl.dwd_company c
LEFT JOIN (
    SELECT unified_code, argMax(mail_address, year) mail_address, argMax(year, year) new_year FROM etl.company_report GROUP BY unified_code
)  cr ON c.unified_code=cr.unified_code
WHERE cr.mail_address!='' and cr.mail_address is not null;

27.hadoop系列之50G数据清洗入库秒查询实践

这插入速度还行吧,插入后,存在两条记录,对于ReplaceMergeTree来说,无妨,看过之前文章的你应该很熟悉为啥了吧

27.hadoop系列之50G数据清洗入库秒查询实践

4.3 清洗企业通信地址

新建字段mail_address1,剔除省市区前缀信息,列式存储,全量更新很快,请不要单条那种更新

ALTER TABLE etl.dwd_company update mail_address1=replaceRegexpAll(mail_address, '^(.{2,}(省|自治区))?(.{2,}市)?(.{2,}(区|县))?', '') WHERE 1=1

27.hadoop系列之50G数据清洗入库秒查询实践

4.4 手动执行分区合并

如果线上对ClickHouse服务稳定性要求极高不建议这样操作,可能影响服务,可以参考9.ClickHouse系列之数据一致性保证

optimize table etl.dwd_company final;

后面可以将dwd_company中所需字段数据导入数据中间层dwm_company,略

欢迎关注公众号算法小生与我沟通交流文章来源地址https://www.toymoban.com/news/detail-468421.html

到了这里,关于27.hadoop系列之50G数据清洗入库秒查询实践的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • HStore表全了解:实时入库与高效查询利器

    摘要: 本文章将从使用者角度介绍HStore概念以及使用。 本文分享自华为云社区《GaussDB(DWS)HStore表讲解》,作者:大威天龙:- 。 面对实时入库和实时查询要求越来越高的趋势,已有的列存储无法支持并发更新入库,行存查询性能无法做到实时返回且空间压缩表现不佳。GaussD

    2024年02月08日
    浏览(26)
  • 【重新定义matlab强大系列三】MATLAB清洗离群数据(查找、填充或删除离群值)

    🔗 运行环境:matlab 🚩 撰写作者:左手の明天 🥇 精选专栏:《python》 🔥  推荐专栏:《算法研究》 ####  防伪水印—— 左手の明天 #### 💗 大家好🤗🤗🤗,我是 左手の明天 !好久不见💗 💗今天开启新的系列—— 重新定义matlab强大系列 💗 📆 

    2024年02月04日
    浏览(34)
  • 深度学习 Day27——J6ResNeXt-50实战解析

    🍨 本文为🔗365天深度学习训练营 中的学习记录博客 🍖 原作者:K同学啊 | 接辅导、项目定制 🚀 文章来源:K同学的学习圈子 : pytorch实现ResNeXt50详解算法,tensorflow实现ResNeXt50详解算法,ResNeXt50详解 电脑系统:Windows 11 语言环境:python 3.8.6 编译器:pycharm2020.2.3 深度

    2024年01月24日
    浏览(34)
  • Hadoop3教程(十九):MapReduce之ETL清洗案例

    ETL,即 Extract-Transform-Load 的缩写,用来描述数据从源端,经过抽取(Extract)、转换(transform),最后加载(load)到目标端的处理过程。 ETL主要应用于数据仓库,但不只是应用于数据仓库,毕竟这个更像是一类思想。 在运行核心的MR程序之前,往往要对数据进行清理,清除掉

    2024年02月06日
    浏览(36)
  • 金蝶云星空对接打通管易云分布式调入单查询接口与其他入库单新增完结接口接口

    金蝶K/3Cloud在总结百万家客户管理最佳实践的基础上,提供了标准的管理模式;通过标准的业务架构:多会计准则、多币别、多地点、多组织、多税制应用框架等,有效支持企业的运营管理;K/3Cloud提供了标准的业务建模:35种标准ERP领域模型、1046种模型元素、21243种模型元素

    2024年02月13日
    浏览(25)
  • Hadoop理论及实践-HDFS读写数据流程(参考Hadoop官网)

    主节点和副本节点通常指的是Hadoop分布式文件系统(HDFS)中的NameNode和DataNode。 NameNode(主节点):NameNode是Hadoop集群中的一个核心组件,它负责管理文件系统的命名空间和元数据。它记录了文件的目录结构、文件的块分配信息以及每个文件块所在的DataNode等关键信息。NameNo

    2024年02月14日
    浏览(36)
  • 【Hadoop综合实践】手机卖场大数据综合项目分析

    🚀 本文章实现了基于MapReduce的手机浏览日志分析 🚀 文章简介:主要包含了数据生成部分,数据处理部分,数据存储部分与数据可视化部分 🚀 【本文仅供参考!!非唯一答案】其中需求实现的方式有多种,提供的代码并非唯一写法,选择适合的方式即可。 手机日志分析需

    2024年02月08日
    浏览(33)
  • 《Hadoop核心技术》Hbase集群部署,创建表,删除表,插入数据,查询数据

    额前言:         我是一名正在学习《Hadoop核心技术》的学生,今天跟大家分享一下在虚拟机上在Hadoop集群中用Hbase进行简单的增删查 可以进行随机访问的存取和检索数据的存储平台         HBase 是一个开源的、分布式的、版本化的 NoSQL 数据库(也即非关系型数据库

    2024年02月03日
    浏览(33)
  • 使用Hadoop进行大数据分析的步骤与实践

    作者:禅与计算机程序设计艺术 随着互联网、移动互联网、物联网等新型设备的广泛普及,以及各种应用系统的不断发展,越来越多的数据产生出来,而这些数据将会对我们带来巨大的商业价值。如何有效地从海量数据中挖掘商业价值,是企业面临的一项重要课题。 大数据

    2024年02月07日
    浏览(30)
  • 大数据经典技术解析:Hadoop+Spark大数据分析原理与实践

    作者:禅与计算机程序设计艺术 大数据时代已经来临。随着互联网、移动互联网、物联网等新兴技术的出现,海量数据开始涌现。而在这些海量数据的基础上进行有效的处理,成为迫切需要解决的问题之一。Apache Hadoop和Apache Spark是目前主流开源大数据框架。由于其易于部署

    2024年02月07日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包