数据清洗
1.题目分析
使用Scala编写spark工程代码,将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。
根据以上提示分析得出以下:
- 当然是使用scala编写spark代码
- 将ods库的全部数据抽取到hive的dwd库中,ods和dwd都是数仓中的分层(具体可看数仓的分层概念)
- 表中涉及到的timestamp类型或者缺少时分秒的字段,需要进行时间格式化,转换为 yyyy-MM-dd HH:mm:ss格式
接下来我们看具体的题目:
抽取ods库中user_info表中昨天的分区(任务一生成的分区)数据,并结合dim_user_info最新分区现有的数据,根据id合并数据到dwd库中dim_user_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据operate_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,同时若operate_time为空,则用create_time填充,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli执行show partitions dwd.dim_user_info命令,将结果截图粘贴至对应报告中;
首先这个题目是要我们实现从ods.user_info抽取一个分区的数据与dwd.dim_user_info最新分区的数据根据id进行合并,取出operate_time字段最大的一条数据,分区字段依旧是etldate,剩下的都是对dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time这四个字段填充值得解释,这道题得难点其实在在合并数据上。
2.代码实现
接下来我们针对题目理解开始代码实现
- 首先我们需要确保服务器已经启动hadoop、hive的metastore
hadoop 启动命令:start-all.sh
hive metastore 启动命令:hive --service metastore &
jps(jdk/bin下的命令)命令:查看hadoop各个组件是否成功运行
netstat -ntpl | grep 9083 :查看hive metastore 默认的9083端口是否已经正在运行
-
创建scala on spark工程,添加pom依赖,添加scala框架支持(如果idea创建文件时候没有scala选项)
此过程没有难点,所以不在叙述 -
项目的初始化
先将hive安装目录下的配置文件(hive-site.xml)复制到resources目录下,目的是为了让spark程序读取hive的连接信息
编写初始化代码
package com.rj.qx
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.expressions.{UserDefinedFunction, Window}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.LongType
import java.util.Properties
object UserInfo {
//设置hadoop的用户
System.setProperty("HADOOP_USER_NAME", "root")
// 设置日志等级,oFF代表关闭日志
Logger.getLogger("org").setLevel(Level.OFF)
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession
.builder()
// 集群跑需要注释
.master("local[*]")
.appName("Merge user_info from MYSQL to ODS")
// 客户端访问datanode的时候是通过主机域名访问,就不会出现通过内网IP来访问了
.config("dfs.client.use.datanode.hostname", "true")
// hive开启动态分区
.config("hive.exec.dynamic.partition.mode", "nonstrict")
// 连接服务中的hive metastore
.config("hive.metastore.uris", "thrift://bigdata1:9083")
// 需要开启hive支持,不开启会连接默认hive
.enableHiveSupport()
.getOrCreate()
// 关闭程序
session.stop()
}
}
- 经过分析我们第一步需要先抽取ods昨天的分区,比赛中也就是数据抽取阶段抽取的数据
// 读取ods的昨天分区数据
val ods_user_nfo: Dataset[Row] = session
.table("ods.user_info")
.where(col("etldate") === "20230321")
- 读取dwd最新分区,取出分区字段最大的即可
// 读取dwd的最新分区数据
val dwd_user_info: Dataset[Row] = session
.table("dwd.dim_user_info")
.where("etldate = (SELECT MAX(etldate) FROM dwd.dim_user_info)")
-
接下来就是最关键的一步,就是合并数据
这一步需要使用spark的窗口函数ROW_NUMBER实现,row_number()函数是为每一行数据生成行号,从1开始
那我们可以对id进行分组,根据operate_time倒序排序取出行号为1的那条数据就是我们最终合并保留的数据
// 合并ods和dwd的分区数据,添加一个行号列,取出行号为1的后删除字段
val merged_user_info: Dataset[Row] = ods_user_nfo.union(dwd_user_info)
// 根据ID进行分组
.withColumn("rowNumber",
row_number()
.over(Window.partitionBy("id").orderBy(desc("operate_time"))
)
)
.where(col("rowNumber") === 1)
.drop("rowNumber")
- 我们对题目要求的时间进行填充
// 如果operate_time为空,则使用create_time填充
val user_info_with_operate_time: DataFrame = merged_user_info.withColumn("operate_time",
when(col("operate_time").isNull, col("create_time")).otherwise(col("operate_time"))
)
import session.implicits._
val modify_timeFunction: UserDefinedFunction = session
.udf
.register("modify_time", (id: Long) => {
val insert_time: String = dwd_user_info.filter((r: Row) => {
r.get(0).toString.toLong.equals(id)
}).select("dwd_insert_time").first().get(0).toString
insert_time
})
val ids: Array[Long] = dwd_user_info.select("id").map((_: Row) (0).toString.toLong).collect()
val user_info_with_dwd_cols: DataFrame = user_info_with_operate_time
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_insert_time",
when(col("id").isin(ids: _*), modify_timeFunction(col("id")))
.otherwise(date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
)
.withColumn("dwd_modify_time",
when(col("dwd_modify_time").isNull, date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
.otherwise(when(col("id").isin(ids: _*), date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
.otherwise(col("dwd_modify_time"))
)
)
- 将结果写入dwd库的dim_user_info分区表
注意:因为本人的数据库中dwd.dim_user_info只有一个分区的数据,所以可以进行覆盖,但是如果还有其他分区,需要将我们合并的结果覆盖掉dwd.dim_user_info最新的一个分区文章来源:https://www.toymoban.com/news/detail-752420.html
// 将结果写入dwd库的dim_user_info分区表
user_info_with_dwd_cols.write
.mode("append")
.partitionBy("etldate")
.saveAsTable("dwd.dim_user_info")
- 显示dim_user_info的所有分区
session.sql("SHOW PARTITIONS dwd.dim_user_info").show(false)
如果有问题,欢迎一起讨论!文章来源地址https://www.toymoban.com/news/detail-752420.html
到了这里,关于数据清洗【大数据比赛长期更新】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!