数据清洗【大数据比赛长期更新】

这篇具有很好参考价值的文章主要介绍了数据清洗【大数据比赛长期更新】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

数据清洗

1.题目分析

使用Scala编写spark工程代码,将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。

根据以上提示分析得出以下:

  1. 当然是使用scala编写spark代码
  2. 将ods库的全部数据抽取到hive的dwd库中,ods和dwd都是数仓中的分层(具体可看数仓的分层概念)
  3. 表中涉及到的timestamp类型或者缺少时分秒的字段,需要进行时间格式化,转换为 yyyy-MM-dd HH:mm:ss格式

接下来我们看具体的题目:

抽取ods库中user_info表中昨天的分区(任务一生成的分区)数据,并结合dim_user_info最新分区现有的数据,根据id合并数据到dwd库中dim_user_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据operate_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,同时若operate_time为空,则用create_time填充,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli执行show partitions dwd.dim_user_info命令,将结果截图粘贴至对应报告中;

首先这个题目是要我们实现从ods.user_info抽取一个分区的数据与dwd.dim_user_info最新分区的数据根据id进行合并,取出operate_time字段最大的一条数据,分区字段依旧是etldate,剩下的都是对dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time这四个字段填充值得解释,这道题得难点其实在在合并数据上。

2.代码实现

接下来我们针对题目理解开始代码实现

  1. 首先我们需要确保服务器已经启动hadoop、hive的metastore
	hadoop 启动命令:start-all.sh
	hive metastore 启动命令:hive --service metastore &

jps(jdk/bin下的命令)命令:查看hadoop各个组件是否成功运行
netstat -ntpl | grep 9083 :查看hive metastore 默认的9083端口是否已经正在运行

  1. 创建scala on spark工程,添加pom依赖,添加scala框架支持(如果idea创建文件时候没有scala选项)
    此过程没有难点,所以不在叙述

  2. 项目的初始化

    先将hive安装目录下的配置文件(hive-site.xml)复制到resources目录下,目的是为了让spark程序读取hive的连接信息
    scala spark 数据清洗,大数据比赛,大数据,spark,hive,hadoop,scala

编写初始化代码

package com.rj.qx

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.expressions.{UserDefinedFunction, Window}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.LongType

import java.util.Properties

object UserInfo {
  //设置hadoop的用户
  System.setProperty("HADOOP_USER_NAME", "root")
  // 设置日志等级,oFF代表关闭日志
  Logger.getLogger("org").setLevel(Level.OFF)

  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession
      .builder()
      // 集群跑需要注释
      .master("local[*]")
      .appName("Merge user_info from MYSQL to ODS")
      // 客户端访问datanode的时候是通过主机域名访问,就不会出现通过内网IP来访问了
      .config("dfs.client.use.datanode.hostname", "true")
      // hive开启动态分区
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      // 连接服务中的hive metastore
      .config("hive.metastore.uris", "thrift://bigdata1:9083")
      // 需要开启hive支持,不开启会连接默认hive
      .enableHiveSupport()
      .getOrCreate()

	  // 关闭程序
      session.stop()
      }
}
  1. 经过分析我们第一步需要先抽取ods昨天的分区,比赛中也就是数据抽取阶段抽取的数据
 // 读取ods的昨天分区数据
    val ods_user_nfo: Dataset[Row] = session
      .table("ods.user_info")
      .where(col("etldate") === "20230321")
  1. 读取dwd最新分区,取出分区字段最大的即可
  // 读取dwd的最新分区数据
    val dwd_user_info: Dataset[Row] = session
      .table("dwd.dim_user_info")
      .where("etldate = (SELECT MAX(etldate) FROM dwd.dim_user_info)")
  1. 接下来就是最关键的一步,就是合并数据

    这一步需要使用spark的窗口函数ROW_NUMBER实现,row_number()函数是为每一行数据生成行号,从1开始

    那我们可以对id进行分组,根据operate_time倒序排序取出行号为1的那条数据就是我们最终合并保留的数据

// 合并ods和dwd的分区数据,添加一个行号列,取出行号为1的后删除字段
val merged_user_info: Dataset[Row] = ods_user_nfo.union(dwd_user_info)
      // 根据ID进行分组
      .withColumn("rowNumber",
        row_number()
          .over(Window.partitionBy("id").orderBy(desc("operate_time"))
          )
      )
      .where(col("rowNumber") === 1)
      .drop("rowNumber")
  1. 我们对题目要求的时间进行填充
// 如果operate_time为空,则使用create_time填充
    val user_info_with_operate_time: DataFrame = merged_user_info.withColumn("operate_time",
      when(col("operate_time").isNull, col("create_time")).otherwise(col("operate_time"))
    )
    import session.implicits._

    val modify_timeFunction: UserDefinedFunction = session
      .udf
      .register("modify_time", (id: Long) => {
        val insert_time: String = dwd_user_info.filter((r: Row) => {
          r.get(0).toString.toLong.equals(id)
        }).select("dwd_insert_time").first().get(0).toString
        insert_time
      })

    val ids: Array[Long] = dwd_user_info.select("id").map((_: Row) (0).toString.toLong).collect()

    val user_info_with_dwd_cols: DataFrame = user_info_with_operate_time
      .withColumn("dwd_insert_user", lit("user1"))
      .withColumn("dwd_modify_user", lit("user1"))
      .withColumn("dwd_insert_time",
        when(col("id").isin(ids: _*), modify_timeFunction(col("id")))
          .otherwise(date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
      )
      .withColumn("dwd_modify_time",
        when(col("dwd_modify_time").isNull, date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
          .otherwise(when(col("id").isin(ids: _*), date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
            .otherwise(col("dwd_modify_time"))
          )
      )

  1. 将结果写入dwd库的dim_user_info分区表

注意:因为本人的数据库中dwd.dim_user_info只有一个分区的数据,所以可以进行覆盖,但是如果还有其他分区,需要将我们合并的结果覆盖掉dwd.dim_user_info最新的一个分区

  // 将结果写入dwd库的dim_user_info分区表
    user_info_with_dwd_cols.write
      .mode("append")
      .partitionBy("etldate")
      .saveAsTable("dwd.dim_user_info")
  1. 显示dim_user_info的所有分区
session.sql("SHOW PARTITIONS dwd.dim_user_info").show(false)

如果有问题,欢迎一起讨论!文章来源地址https://www.toymoban.com/news/detail-752420.html

到了这里,关于数据清洗【大数据比赛长期更新】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于Hadoop的豆瓣电影的数据抓取、数据清洗、大数据分析(hdfs、flume、hive、mysql等)、大屏可视化

    项目介绍 有需要整个项目的可以私信博主,提供部署和讲解,对相关案例进行分析和深入剖析 环境点击顶部下载 = 本研究旨在利用Python的网络爬虫技术对豆瓣电影网站进行数据抓取,并通过合理的数据分析和清洗,将非结构化的数据转化为结构化的数据,以便于后续的大数

    2024年02月11日
    浏览(46)
  • Spark学习——DataFrame清洗HDFS日志并存入Hive中

    目录 1.开启Hadoop集群和Hive元数据、Hive远程连接 2.配置 3.读取日志文件并清洗 4.单独处理第四列的数据——方法一: 5.单独处理第四列的数据——方法二:  6.单独处理第四列的数据——方法三:  7.数据清洗结果展示 8.存入Hive中 9.DataGrip中的代码 HDFS日志文件内容: 我们要将

    2023年04月12日
    浏览(33)
  • 大数据:Hadoop基础常识hive,hbase,MapReduce,Spark

    Hadoop是根据Google三大论文为基础研发的,Google 三大论文分别是: MapReduce、 GFS和BigTable。 Hadoop的核心是两个部分: 一、分布式存储(HDFS,Hadoop Distributed File System)。 二、分布式计算(MapReduce)。 MapReduce MapReduce是“ 任务的分解与结果的汇总”。 Map把数据切分——分布式存放

    2024年04月25日
    浏览(54)
  • Spark、RDD、Hive 、Hadoop-Hive 和传统关系型数据库区别

    Hive Hadoop Hive 和传统关系型数据库区别 Spark 概念 基于内存的分布式计算框架 只负责算 不负责存 spark 在离线计算 功能上 类似于mapreduce的作用 MapReduce的缺点 运行速度慢 (没有充分利用内存) 接口比较简单,仅支持Map Reduce 功能比较单一 只能做离线计算 Spark优势 运行速度快

    2024年02月13日
    浏览(42)
  • 利用Hadoop处理离线数据:Hive和Spark离线数据处理实现

    作者:禅与计算机程序设计艺术 引言 随着大数据时代的到来,越来越多的数据产生于各种业务系统。这些数据往往需要在离线环境中进行处理,以降低数据处理的时间和成本。Hadoop作为目前最为流行的分布式计算框架,提供了强大的离线数据处理能力。Hive和Spark作为Hadoop生

    2024年02月11日
    浏览(40)
  • 大数据毕业设计选题推荐-收视点播数据分析-Hadoop-Spark-Hive

    ✨ 作者主页 :IT研究室✨ 个人简介:曾从事计算机专业培训教学,擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐 ⬇⬇⬇ Java项目 Python项目 安卓项目 微信小程序项目

    2024年02月05日
    浏览(48)
  • 大数据篇 | Hadoop、HDFS、HIVE、HBase、Spark之间的联系与区别

    Hadoop是一个开源的分布式计算框架,用于存储和处理大规模数据集。它提供了一个可扩展的分布式文件系统(HDFS)和一个分布式计算框架(MapReduce),可以在大量廉价硬件上进行并行计算。 HDFS(Hadoop Distributed File System)是Hadoop的分布式文件系统。它被设计用于在集群中存储

    2024年02月16日
    浏览(55)
  • Mac 配置Hadoop、spark、Scala、jdk

    下载地址: Java Downloads | Oracle 1.下载好使用 终端 进行解压 2.配置环境变量 1.终端打开 .bash_profile 2.将以下代码放进 .bash_profile 里面(注意修改路径) 3.esc按键 + :号键 输入 wq (保存并退出) 4.重新加载 .bash_profile 文件 5.输入以下代码检查配置是否成功 下载链接: News | Apache

    2024年03月26日
    浏览(42)
  • 大数据毕业设计选题推荐-热门旅游景点数据分析-Hadoop-Spark-Hive

    ✨ 作者主页 :IT研究室✨ 个人简介:曾从事计算机专业培训教学,擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐 ⬇⬇⬇ Java项目 Python项目 安卓项目 微信小程序项目

    2024年02月05日
    浏览(56)
  • 大数据毕业设计选题推荐-旅游景点游客数据分析-Hadoop-Spark-Hive

    ✨ 作者主页 :IT毕设梦工厂✨ 个人简介:曾从事计算机专业培训教学,擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐 ⬇⬇⬇ Java项目 Python项目 安卓项目 微信小程序

    2024年02月05日
    浏览(49)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包