离线数据处理 任务二:数据清洗

这篇具有很好参考价值的文章主要介绍了离线数据处理 任务二:数据清洗。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

hive数据库和表的创建

给dim添加最新状态记录

任务 

        接着上一篇数据抽取的任务继续 需用到上篇ods数据抽取的数据继续练习

hive数据库和表的创建

        1、创建dwd数据库

create database dwd;

        2、创建dim_user_info 表,分区字段etl_date

CREATE TABLE `dim_user_info`  (
  `id` bigint,
  `login_name` string,
  `nick_name` string,
  `passwd` string,
  `name` string,
  `phone_num` string,
  `email` string,
  `head_img` string,
  `user_level` string,
  `birthday` timestamp,
  `gender` string,
  `create_time` timestamp,
  `operate_time` timestamp,
  `dwd_insert_user` string,
  `dwd_insert_time` timestamp,
  `dwd_modify_user` string,
  `dwd_modify_time` timestamp
)  PARTITIONED BY ( `etl_date` string)
row format delimited
fields terminated by "\001"
stored as textfile;

        3、创建dim_sku_info 表,分区字段 etl_dat 

CREATE TABLE `dim_sku_info`  (
  `id` bigint,
  `spu_id` bigint,
  `price` decimal(10, 0),
  `sku_name` string,
  `sku_desc`  string,
  `weight` decimal(10, 2),
  `tm_id` bigint,
  `category3_id` bigint,
  `sku_default_img` string,
  `create_time` timestamp,
  `dwd_insert_user` string,
  `dwd_insert_time` timestamp,
  `dwd_modify_user` string,
  `dwd_modify_time` timestamp
) PARTITIONED BY ( `etl_date` string)
row format delimited
fields terminated by "\001"
stored as textfile;

         4、创建dim_base_province 表,分区字段 etl_date

CREATE TABLE `dim_base_province`  (
  `id` bigint,
  `name` string,
  `region_id` string,
  `area_code` string,
  `iso_code` string,
  `create_time` timestamp,
  `dwd_insert_user` string,
  `dwd_insert_time` timestamp,
  `dwd_modify_user` string,
  `dwd_modify_time` timestamp
)  PARTITIONED BY ( `etl_date` string)
row format delimited
fields terminated by "\001"
stored as textfile;

        5、 创建dim_base_region 表,分区字段是 etl_date

CREATE TABLE `dim_base_region`  (
  `id` string,
  `region_name` string,
  `create_time` timestamp,
  `dwd_insert_user` string,
  `dwd_insert_time` timestamp,
  `dwd_modify_user` string,
  `dwd_modify_time` timestamp
) PARTITIONED BY ( `etl_date` string)
row format delimited
fields terminated by "\001"
stored as textfile;

        6、 创建fact_order_info 表 分区字段etl_date 

CREATE TABLE `fact_order_info`  (
  `id` bigint,
  `consignee` string,
  `consignee_tel`string,
  `final_total_amount` decimal(16, 2),
  `order_status` string,
  `user_id` bigint,
  `delivery_address` string,
  `order_comment` string,
  `out_trade_no` string,
  `trade_body` string,
  `create_time` timestamp,
  `operate_time` timestamp,
  `expire_time` timestamp,
  `tracking_no` string,
  `parent_order_id` bigint,
  `img_url` string,
  `province_id` int,
  `benefit_reduce_amount` decimal(16, 2),
  `original_total_amount` decimal(16, 2),
  `feight_fee` decimal(16, 2),
  `dwd_insert_user` string,
  `dwd_insert_time` timestamp,
  `dwd_modify_user` string,
  `dwd_modify_time` timestamp
) PARTITIONED BY ( `etl_date` string)
row format delimited
fields terminated by "\001"
stored as textfile;

        7、创建fact_order_detail 表 分区字段是etl_date 

CREATE TABLE `fact_order_detail`  (
  `id` bigint,
  `order_id` bigint,
  `sku_id` bigint,
  `sku_name` string,
  `img_url` string,
  `order_price` decimal(10, 2),
  `sku_num` string,
  `create_time` timestamp,
  `source_type` string,
  `source_id` bigint,
  `dwd_insert_user` string,
  `dwd_insert_time` timestamp,
  `dwd_modify_user` string,
  `dwd_modify_time` timestamp
) PARTITIONED BY ( `etl_date` string)
row format delimited
fields terminated by "\001"
stored as textfile;

离线数据处理 任务二:数据清洗

给dim添加最新状态记录

        这里可以自己给dwd数据库中dim打头的所有表 自己给里面添加最新状态(分区时间)的数据方便实验测试结果。测试完成可以在hive的dwd数据库的操作表中使用select查看按题目要求添加后的新状态记录以及题目要求的其余内容。

        1、dim_user_info表

INSERT INTO `dim_user_info` VALUES (82, 'rz4gxf1', '阿仁+1', NULL, '康仁', '13437298274', 'rz4gxf1@0355.net', NULL, '1', '2004-04-26', 'M', '2020-04-26 18:57:55', '2020-04-26 06:02:36','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');
INSERT INTO `dim_user_info` VALUES (85, 'cte9ov7mv', '波宁+1', NULL, '华良海', '13751595688', 'p6vilkg81l9w@qq.com', NULL, '1', '1989-04-26', 'M', '2020-04-26 18:57:55', '2020-04-26 23:54:52','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');

        2、dim_sku_info表

INSERT INTO `dim_sku_info` VALUES (1, 1, 2220, '荣耀10青春版+1 幻彩渐变 2400万AI自拍 全网通版4GB+64GB 渐变蓝 移动联通电信4G全面屏手机 双卡双待', 'new sku_desc', 0.24, 2, 61, 'http://AOvKmfRQEBRJJllwCwCuptVAOtBBcIjWeJRsmhbJ','2021-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');
INSERT INTO `dim_sku_info` VALUES (2, 2, 3321, 'TCL 55A950C 55英寸32核人工智能+1 HDR曲面超薄4K电视金属机身(枪色)', 'new sku_desc', 15.24, 4, 86, 'http://JfJSvAnPkErYPcUsbgCuokhjxKiLeqpDXakZqFeE','2021-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');

        3、dim_base_province表

INSERT INTO `dim_base_province` VALUES (1, '北京+1', '1', '110000', 'CN-11','2024-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');
INSERT INTO `dim_base_province` VALUES (2, '天津市+1', '1', '120000', 'CN-12','2024-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');

        4、dim_base_region表

INSERT INTO `dim_base_region` VALUES ('1', '华北+1','2024-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');
INSERT INTO `dim_base_region` VALUES ('2', '华东+1','2024-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');

任务 

要求:使用Scala编写spark工程代码,将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。

1、抽取ods库中user_info表中昨天的分区(任务一生成的分区)数据,并结合dim_user_info最新分区现有的数据,根据id合并数据到dwd库中dim_user_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据operate_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,同时若operate_time为空,则用create_time填充,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli执行show partitions dwd.dim_user_info命令 

def main(args: Array[String]): Unit = {
    //添加hdfs用户名
    System.getProperty("HADOOP_user_name","root")
    System.getProperty("user.name","root")
    //创建Session对象
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("sparkOneHive")
      .enableHiveSupport()
      .getOrCreate()

    //结构 插入最终查询结果字段(字段开窗过滤(dos.user_info的查询字段 union all dwd.dim_user_info)条件 开窗排序=1)
    spark.sql(
      """
        |insert overwrite table dwd.dim_user_info
        |partition (etl_date = '20230403')
        |select
        |id,
        |login_name,
        |nick_name,
        |passwd,
        |name,
        |phone_num,
        |email,
        |head_img,
        |user_level,
        |birthday,
        |gender,
        |create_time,
        |operate_time,
        |dwd_insert_user,
        |insert_time,
        |dwd_modify_user,
        |dwd_modify_time
        |from
          |(select
          |id,
          |login_name,
          |nick_name,
          |passwd,
          |name,
          |phone_num,
          |email,
          |head_img,
          |user_level,
          |birthday,
          |gender,
          |create_time,
          |operate_time,
          |dwd_insert_user,
          |dwd_insert_time,
          |dwd_modify_user,
          |dwd_modify_time,
          |min(dwd_insert_time) over(partition by id) insert_time,
          |row_number() over(partition by id order by operate_time desc) row
          |from
            |(select
            |id,
            |login_name,
            |nick_name,
            |passwd,
            |name,
            |phone_num,
            |email,
            |head_img,
            |user_level,
            |birthday,
            |gender,
            |create_time,
            |if(operate_time is null,create_time,operate_time) operate_time,
            |"user1" dwd_insert_user,
            |cast(date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_insert_time,
            |"user1" dwd_modify_user,
            |cast(date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_modify_time
            |from ods.user_info
            |where etl_date = '20230330'
            |union all
            |select
            |id,
            |login_name,
            |nick_name,
            |passwd,
            |name,
            |phone_num,
            |email,
            |head_img,
            |user_level,
            |birthday,
            |gender,
            |create_time,
            |if(operate_time is null,create_time,operate_time) operate_time,
            |dwd_insert_user,
            |dwd_insert_time,
            |dwd_modify_user,
            |dwd_modify_time
            |from dwd.dim_user_info
            |where etl_date = '20230403'
            |)dw
          |)tmp where row = 1
        |""".stripMargin)
    
    spark.sql("select * from dwd.dim_user_info").show()
    spark.sql("show partitions dwd.dim_user_info").show()
    
    spark.close()
  }

离线数据处理 任务二:数据清洗

2、抽取ods库sku_info表中昨天的分区(任务一生成的分区)数据,并结合dim_sku_info最新分区现有的数据,根据id合并数据到dwd库中dim_sku_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据create_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条数据第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli查询表dim_sku_info的字段id、sku_desc、dwd_insert_user、dwd_modify_time、etl_date,条件为最新分区的数据,id大于等于15且小于等于20,并且按照id升序排序

def main(args: Array[String]): Unit = {
    //添加hdfs用户名
    System.getProperty("HADOOP_user_name","root")
    System.getProperty("user.name","root")
    //创建Session对象
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("sparkOneHive")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql("""insert overwrite table dwd.dim_sku_info partition (etl_date = '20230403')
                |select
                |id,
                |spu_id,
                |price,
                |sku_name,
                |sku_desc,
                |weight,
                |tm_id,
                |category3_id,
                |sku_default_img,
                |create_time,
                |dwd_insert_user,
                |insert_time,
                |dwd_modify_user,
                |dwd_modify_time
                |from
                | (select
                | id,
                | spu_id,
                | price,
                | sku_name,
                | sku_desc,
                | weight,
                | tm_id,
                | category3_id,
                | sku_default_img,
                | create_time,
                | dwd_insert_user,
                | dwd_insert_time,
                | dwd_modify_user,
                | dwd_modify_time,
                | min(dwd_insert_time) over(partition by id)  insert_time,
                | row_number() over(partition by id order by create_time desc) rw
                |from
                |  (select
                |  id,
                |  spu_id,
                |  price,
                |  sku_name,
                |  sku_desc,
                |  weight,
                |  tm_id,
                |  category3_id,
                |  sku_default_img,
                |  cast(date_format(create_time , 'yyyy-MM-dd HH-mm-ss') as timestamp) create_time,
                |  'user1' dwd_insert_user,
                |  cast(date_format(current_timestamp() , 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_insert_time,
                |  'user1' dwd_modify_user,
                |  cast(date_format(current_timestamp() , 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_modify_time
                |  from ods.sku_info
                |  where etl_date = '20230330'
                |  union all
                |  select
                |  id,
                |  spu_id,
                |  price,
                |  sku_name,
                |  sku_desc,
                |  weight,
                |  tm_id,
                |  category3_id,
                |  sku_default_img,
                |  cast(create_time as timestamp) create_time,
                |  dwd_insert_user,
                |  dwd_insert_time,
                |  dwd_modify_user,
                |  dwd_modify_time
                |  from dwd.dim_sku_info
                | where etl_date = '20230403'
                | ) dw
                | )tmp where rw = 1""".stripMargin)

    spark.sql("select id,sku_desc,dwd_insert_user,dwd_modify_time from dwd.dim_sku_info where id >= 15 and id <= 20 order by id").show()
    spark.sql("show partitions dwd.dim_sku_info").show()

    spark.close()
  }

离线数据处理 任务二:数据清洗

离线数据处理 任务二:数据清洗

3、抽取ods库base_province表中昨天的分区(任务一生成的分区)数据,并结合dim_province最新分区现有的数据,根据id合并数据到dwd库中dim_province的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据create_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条数据第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli在表dwd.dim_province最新分区中,查询该分区中数据的条数 

def main(args: Array[String]): Unit = {
    //添加hdfs用户名
    System.getProperty("HADOOP_user_name","root")
    System.getProperty("user.name","root")
    //创建Session对象
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("sparkOneHive")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql("""
                |insert overwrite table dwd.dim_base_province
                |partition (etl_date = '20230403')
                |select
                |id,
                |name,
                |region_id,
                |area_code,
                |iso_code,
                |create_time,
                |dwd_insert_user,
                |dwd_insert_time,
                |dwd_modify_user,
                |dwd_modify_time
                |from
                |	(select
                |	id,
                |	name,
                |	region_id,
                |	area_code,
                |	iso_code,
                |	create_time,
                |	dwd_insert_user,
                |	dwd_insert_time,
                |	dwd_modify_user,
                |	dwd_modify_time,
                |	min(dwd_insert_time) over(partition by id) insert_time,
                |	row_number() over(partition by id order by create_time desc) rw
                |	from
                |		(select
                |		id,
                |		name,
                |		region_id,
                |		area_code,
                |		iso_code,
                |		create_time,
                |		'user1' dwd_insert_user,
                |		cast(date_format(current_timestamp() , 'yyyy-MM-dd HH-mm-ss') as timestamp) dwd_insert_time,
                |		'user1' dwd_modify_user,
                |		cast(date_format(current_timestamp() , 'yyyy-MM-dd HH-mm-ss') as timestamp) dwd_modify_time
                |		from ods.base_province where etl_date = '20230405'
                |		union all
                |		select
                |		id,
                |		name,
                |		region_id,
                |		area_code,
                |		iso_code,
                |		create_time,
                |		dwd_insert_user,
                |		dwd_insert_time,
                |		dwd_modify_user,
                |		dwd_modify_time
                |		from dwd.dim_base_province where etl_date = '20230403'
                |		)dw
                |	)tmp where rw = 1
                | """.stripMargin)

    spark.sql("select count(*) from dwd.dim_base_province").show()
    spark.sql("show partitions dwd.dim_base_province").show()
    spark.close()
  }

离线数据处理 任务二:数据清洗

离线数据处理 任务二:数据清洗

4、抽取ods库base_region表中昨天的分区(任务一生成的分区)数据,并结合dim_region最新分区现有的数据,根据id合并数据到dwd库中dim_region的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据create_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条数据第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli在表dwd.dim_region最新分区中,查询该分区中数据的条数 

def main(args: Array[String]): Unit = {
    //添加hdfs用户名
    System.getProperty("HADOOP_user_name","root")
    System.getProperty("user.name","root")
    //创建Session对象
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("sparkOneHive")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql(
      """
        |insert overwrite table dwd.dim_base_region partition (etl_date = '20230403')
        |select
        |id,
        |region_name,
        |create_time,
        |dwd_insert_user,
        |insert_time,
        |dwd_modify_user,
        |dwd_modify_time
        |from
        |	(select
        |	id,
        |	region_name,
        |	create_time,
        |	dwd_insert_user,
        |	dwd_insert_time,
        |	dwd_modify_user,
        |	dwd_modify_time,
        |	min(dwd_insert_time) over(partition by id) insert_time,
        |	row_number() over(partition by id order by create_time desc) rw
        |	from
        |		(select
        |		id,
        |		region_name,
        |		create_time,
        |		'user1' dwd_insert_user,
        |		cast(date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_insert_time,
        |		'user1' dwd_modify_user,
        |		cast(date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_modify_time
        |		from ods.base_region where etl_date = '20230401'
        |		union all
        |		select
        |		id,
        |		region_name,
        |		create_time,
        |		dwd_insert_user,
        |		dwd_insert_time,
        |		dwd_modify_user,
        |		dwd_modify_time
        |		from dwd.dim_base_region
        |		where etl_date = '20230403'
        |		)dw
        |	)tmp where rw = 1
        |""".stripMargin)

    spark.sql("select * from dwd.dim_base_region").show()
    spark.sql("show partitions dwd.dim_base_region").show()
    spark.close()
  }

离线数据处理 任务二:数据清洗

5、将ods库中order_info表昨天的分区(任务一生成的分区)数据抽取到dwd库中fact_order_info的动态分区表,分区字段为etl_date,类型为String,取create_time值并将格式转换为yyyyMMdd,同时若operate_time为空,则用create_time填充,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”,dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。使用hive cli执行show partitions dwd.fact_order_info命令 

使用动态分区需要在hive-site.xml配置文件种添加如下

<!--配置动态分区-->
    <property>
        <name>hive.exec.dynamic.partition.mode</name>
        <value>nonstrict</value>
    </property>
def main(args: Array[String]): Unit = {
    //添加hdfs用户名
    System.getProperty("HADOOP_user_name","root")
    System.getProperty("user.name","root")
    //创建Session对象
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("sparkOneHive")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql("""
                |insert overwrite table dwd.fact_order_info partition(etl_date)
                |select
                |	id,
                |	consignee,
                |	consignee_tel,
                |	final_total_amount,
                |	order_status,
                |	user_id,
                |	delivery_address,
                |	order_comment,
                |	out_trade_no,
                |	trade_body,
                |	cast(date_format(create_time,'yyyyMMdd') as timestamp) create_time,
                |	cast(date_format(if(operate_time is null,create_time,operate_time),'yyyyMMdd') as timestamp) operate_time,
                |	expire_time,
                |	tracking_no,
                |	parent_order_id,
                |	img_url,
                |	province_id,
                |	benefit_reduce_amount,
                |	original_total_amount,
                |	feight_fee,
                |	'user1' dwd_insert_user,
                |	cast(date_format(current_timestamp() , 'yyyy-MM-dd') as timestamp) dwd_insert_time,
                |	'user1' dwd_modify_user,
                |	cast(date_format(current_timestamp() , 'yyyy-MM-dd') as timestamp) dwd_modify_time,
                |	etl_date
                |	from ods.order_info where etl_date = '20230401'""".stripMargin)

    spark.sql("select * from dwd.fact_order_info").show()
    spark.sql("show partitions dwd.fact_order_info").show()
    spark.close()
  }

离线数据处理 任务二:数据清洗

6、将ods库中order_detail表昨天的分区(任务一中生成的分区)数据抽取到dwd库中fact_order_detail的动态分区表,分区字段为etl_date,类型为String,取create_time值并将格式转换为yyyyMMdd,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”,dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。使用hive cli执行show partitions dwd.fact_order_detail命令 

def main(args: Array[String]): Unit = {
    //添加hdfs用户名
    System.getProperty("HADOOP_user_name","root")
    System.getProperty("user.name","root")
    //创建Session对象
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("sparkOneHive")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql(
      """
        |insert overwrite table dwd.fact_order_detail partition(etl_date)
        |select
        |id,
        |order_id,
        |sku_id,
        |sku_name,
        |img_url,
        |order_price,
        |sku_num,
        |cast(date_format(create_time,'yyyyMMdd') as timestamp) create_time,
        |source_type,
        |source_id,
        |'user1' dwd_insert_user,
        |cast(date_format(current_timestamp() , 'yyyy-MM-dd HH-mm-ss') as timestamp) dwd_insert_time,
        |'user1' dwd_modify_user,
        |cast(date_format(current_timestamp() , 'yyyy-MM-dd HH-mm-ss') as timestamp) dwd_modify_time,
        |etl_date
        |from ods.order_detail where etl_date = '20230401'
        |""".stripMargin)

    spark.sql("select * from dwd.fact_order_detail").show()
    spark.sql("show partitions dwd.fact_order_detail").show()
    spark.close()
  }

离线数据处理 任务二:数据清洗文章来源地址https://www.toymoban.com/news/detail-406718.html

到了这里,关于离线数据处理 任务二:数据清洗的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【数据处理】建立数据库索引并定时重建索引

    给表 建立索引能加速查询 (我的习惯是给经常查询的列建立索引,如果经常查询的是id列,我会给将id设置为主键),长时间查询后会变慢(具体原因目前不清楚),公司前辈说 定期重建索引就可以解决问题 ,我就在 Microsoft Sql Server Management Studio 里设置了定时“自动重建索

    2024年01月25日
    浏览(35)
  • Spring数据库事务处理

    事务回滚丢失更新: 目前大部分数据库已经通过锁的机制来避免了事务回滚丢失更新。 数据库锁的机制: 锁可以分为乐观锁和悲观锁,而悲观锁又分为:读锁(共享锁)和写锁(排它锁),而数据库实现了悲观锁中的读锁和写锁,而乐观锁则需要开发人员自己实现。 数据库在设

    2024年02月07日
    浏览(36)
  • 【Jetpack】使用 Room 中的 Migration 升级数据库异常处理 ( 多个数据库版本的迁移 | fallbackToDestructiveMigration() 函数处理升级异常 )

    Room Migration 数据库迁移工具 是 Android Jetpack Architecture Components ( 架构组件 ) 的一部分 , 它是一个方便的 数据库迁移工具 , 用于为 Android 中使用 Room 框架创建的数据库 提供 自动化迁移方案 ; Room Migration 数据库迁移工具用途如下 : 数据库修改 : 修改数据库表结构 ; 迁移代码 : 为

    2024年02月08日
    浏览(34)
  • 利用java.sql包--访问和处理数据库数据

    The java.sql package in Java provides the API for interacting with relational databases using JDBC (Java Database Connectivity). JDBC is a standard Java API that allows Java programs to connect to and interact with various database management systems (DBMS) using SQL (Structured Query Language). The java.sql package contains several important interfaces and

    2024年02月10日
    浏览(29)
  • JAVA开发(手工处理数据库表数据的一些示例算法)

    背景: 在项目开发中,有时候需要手动处理一下数据库表的数据。涉及到数据得到备份、恢复,清洗,计算,合并等操作。 举例记录一下最近对数据的一些处理过程。 1、对数据表进行数据量统计 2、记住数据库表的数据,然后进行备份 3、我们再对数据进行处理之前一定记

    2024年02月07日
    浏览(29)
  • 【软考数据库】第十三章 云计算与大数据处理

    目录 13.1 云计算 13.1.1 云计算的关键特征 13.1.2 云计算分类 13.1.3 云关键技术 13.1.4 云计算的安全 13.1.5 云安全实施的步骤 13.2 大数据  前言: 笔记来自《文老师软考数据库》教材精讲,精讲视频在b站,某宝都可以找到,个人感觉通俗易懂。 13.1.1 云计算的关键特征 云计算是与

    2024年01月23日
    浏览(37)
  • Python天气数据处理、数据清洗

    文章目录 前言 一、获取原始数据 二、数据处理 1.代码 2.处理结果 总结         在工作的时候,需要做一个天气情况的报表,一开始没学习爬虫的时候,需要手动到天气网站上去截取天气数据做到表格里,复制粘贴下来的数据需要做一些处理,考虑用Python简化这些步骤。

    2024年02月01日
    浏览(75)
  • 【数据库】Sql Server数据迁移,处理自增字段赋值

    给自己一个目标,然后坚持一段时间,总会有收获和感悟! 在实际项目开发中,如果遇到高版本导入到低版本,或者低版本转高版本,那么就会出现版本不兼容无法导入,此时通过程序遍历创建表和添加数据方式可以解决 在 SQL Server 中,数据迁移是常见的场景之一。 以下是

    2024年02月08日
    浏览(43)
  • (四)springboot 数据枚举类型的处理(从前端到后台数据库)

    枚举是一个被命名的整型常数的集合,用于声明一组带标识符的常数。枚举在曰常生活中很常见,例如一个人的性别只能是“男”或者“女”,一周的星期只能是 7 天中的一个等。类似这种当一个变量有几种固定可能的取值时,就可以将它定义为枚举类型。 注意实体类里的性

    2024年02月04日
    浏览(25)
  • [运维|数据库] PostgreSQL数据库对MySQL的 READS SQL DATA 修饰符处理

    在 PostgreSQL 中,访问权限通常是通过数据库角色和表级别的权限进行管理,而不需要类似 MySQL 中的 READS SQL DATA 修饰符。 要在 PostgreSQL 中管理数据库对象的访问权限,您可以使用以下 SQL 命令: GRANT :授予用户或角色对表、视图等对象的特定权限。 REVOKE :撤销用户或角色对

    2024年02月07日
    浏览(58)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包