赛题来源2023年全国职业院校技能大赛赛题第1套任务B中指标计算模块
子任务三:指标计算
编写Scala代码,使用Spark计算相关指标。
注:在指标计算中,不考虑订单信息表中order_status字段的值,将所有订单视为有效订单。计算订单金额或订单总金额时只使用final_total_amount字段。需注意dwd所有的维表取最新的分区。
第一题
根据dwd层表统计每个省份、每个地区、每个月下单的数量和下单的总金额,存入MySQL数据库shtd_result的provinceeverymonth表中(表结构如下),然后在Linux的MySQL命令行中根据订单总数、订单总金额、省份表主键均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下
字段 |
类型 |
中文含义 |
备注 |
provinceid |
int |
省份表主键 |
|
provincename |
text |
省份名称 |
|
regionid |
int |
地区表主键 |
|
regionname |
text |
地区名称 |
|
totalconsumption |
double |
订单总金额 |
当月订单总金额 |
totalorder |
int |
订单总数 |
当月订单总数 |
year |
int |
年 |
订单产生的年 |
month |
int |
月 |
订单产生的月 |
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object Compute01 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "atguigu")
// TODO 创建spark连接
val conf = new SparkConf().setMaster("local[*]").setAppName("Compute01")
val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
// 开启动态分区
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
// 关闭打印日志
spark.sparkContext.setLogLevel("OFF")
// TODO 执行核心查询SQL
val result = spark.sql(
"""
|select
| province.id provinceid,
| province.name provincename,
| region.id regionid,
| region.region_name regionname,
| sum(final_total_amount)
| over(partition by province.id,region.id,year(od.create_time),month(od.create_time)) totalconsumption,
| count(od.id)
| over(partition by province.id,region.id,year(od.create_time),month(od.create_time)) totalorder,
| year(od.create_time) year,
| month(od.create_time) month
|from (
| select
| id,
| province_id,
| final_total_amount,
| trade_body,
| create_time
| from dwd.act_order_info
|) od
|left join (
| select
| id,
| name,
| region_id
| from ods.dim_province
| where etl_date = (
| select max(etl_date)
| from ods.dim_province
| )
|) province on od.province_id = province.id
|left join (
| select
| id,
| region_name
| from dwd.dim_region
| where etl_date = (
| select max(etl_date)
| from ods.dim_province
| )
|) region on province.region_id = region.id
|""".stripMargin)
// 查看结果
result.show()
// TODO 结果保存到指定的表中
result.write
.format("jdbc") // 使用jdbc格式写入带mysql
.mode(SaveMode.Append) // 保存方式为追加
.option("Driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://shtd_result")
.option("user", "root")
.option("password", "000000")
.option("dbtable", "provinceeverymonth")
.save()
// shtd_result.provinceeverymonth
// TODO 关闭spark连接
spark.close()
}
}
结果查询SQL
-- 订单总数
select
*
from dwd.shtd_result.provinceeverymonth
order by totalorder desc
limit 5;
-- 订单总金额
select
*
from dwd.shtd_result.provinceeverymonth
order by totalconsumption desc
limit 5;
-- 省份表主键
select
*
from dwd.shtd_result.provinceeverymonth
order by provinceid desc
limit 5;
第二题
请根据dwd层表计算出2020年4月每个省份的平均订单金额和所有省份平均订单金额相比较结果(“高/低/相同”),存入MySQL数据库shtd_result的provinceavgcmp表(表结构如下)中,然后在Linux的MySQL命令行中根据省份表主键、该省平均订单金额均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下
字段 |
类型 |
中文含义 |
备注 |
provinceid |
int |
省份表主键 |
|
provincename |
text |
省份名称 |
|
provinceavgconsumption |
double |
该省平均订单金额 |
|
allprovinceavgconsumption |
double |
所有省平均订单金额 |
|
comparison |
text |
比较结果 |
该省平均订单金额和所有省平均订单金额比较结果,值为:高/低/相同 |
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object Compute02 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "atguigu")
val conf = new SparkConf().setMaster("local[*]").setAppName("Compute02")
val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.sparkContext.setLogLevel("OFF")
val result = spark.sql(
"""
|select
| provinceid,
| provincename,
| provinceavgconsumption,
| allprovinceavgconsumption,
| case
| when provinceavgconsumption > allprovinceavgconsumption then '高'
| when provinceavgconsumption < allprovinceavgconsumption then '低'
| else '相同'
| end comparison -- 比较结果
|from
|(
| select
| id provinceid,
| name provincename
| from dwd.dim_province
| where etl_date = (
| select max(etl_date) from ods.base_province
| )
|) province
|left join (
| select
| province_id,
| avg(final_total_amount) provinceavgconsumption -- 该省平均订单金额
| from ods.order_info
| where create_time between '2020-04-01' and '2020-04-30'
| group by dwd.act_order_info
|) od on od.province_id = province.provinceid
|left join (
| select
| province_id,
| avg(final_total_amount) allprovinceavgconsumption -- 所有省平均订单金额
| from dwd.act_order_info
| where create_time between '2020-06-01' and '2022-06-30'
|) avgorder on avgorder.province_id = province.provinceid
|""".stripMargin)
result
.write
.format("jdbc")
.mode(SaveMode.Append)
.option("Driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://shtd_result")
.option("user", "root")
.option("password", "000000")
.option("dbtable", "provinceavgcmp")
.save()
spark.close()
}
}
结果查询SQL
-- 省份表主键
select *
from shtd_result.provinceavgcmp
order by provinceid desc
limit 5;
-- 该省平均订单金额
select
*
from shtd_result.provinceavgcmp
order by provinceavgconsumption desc
limit 5;
第三题
根据dwd层表统计在两天内连续下单并且下单金额保持增长的用户,存入MySQL数据库shtd_result的usercontinueorder表(表结构如下)中,然后在Linux的MySQL命令行中根据订单总数、订单总金额、客户主键均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下
字段 |
类型 |
中文含义 |
备注 |
userid |
int |
客户主键 |
|
username |
text |
客户名称 |
|
day |
text |
日 |
记录下单日的时间,格式为 yyyyMMdd_yyyyMMdd 例如: 20220101_20220102 |
totalconsumption |
double |
订单总金额 |
连续两天的订单总金额 |
totalorder |
int |
订单总数 |
连续两天的订单总数 |
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
object Compute03 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "atguigu")
val conf = new SparkConf().setMaster("local[*]").setAppName("Compute03")
val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.sparkContext.setLogLevel("OFF")
val result = spark.sql(
"""
| select
| userid,
| username,
| buy_date_first,
| buy_date_second,
| concat(buy_date_first, '_', buy_date_second) day,
| totalconsumption,
| totalorder
| from (
| select
| od1.user_id userid,
| od2.consignee username,
| buy_date_first,
| buy_date_second,
| totalconsumption,
| od1.totalorder,
| datediff(buy_date_second, buy_date_first) part_date_num,
| if (buy_amount_second - total_amount > 0, 1, 0) part_amount_increase
| from (
| select
| user_id ,
| create_time buy_date_first, -- 获取当前时间的下一天
| count(id) totalorder,
| lead(create_time, 1, "9999-12-31 00:00:00") over (partition by user_id order by create_time) buy_date_second,
| lead(final_total_amount) over(partition by user_id order by create_time) buy_amount_second,
| sum(total_amount) over (partition by user_id) totalconsumption
| from dwd.act_order_info
| group by user_id, date_format(create_time, 'yyyyMMdd')
| ) od1
| left join (
| select
| user_id,
| consignee,
| final_total_amount
| from dwd.act_order_info
| ) od2 on od1.user_id = od2.user_id
| )
| where part_date_num = 1 -- 连续两天的订单
| and part_amount_increase = 1 -- 订单金额保持增长
|""".stripMargin)
result
.write
.format("jdbc")
.mode(SaveMode.Append)
.option("Driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://shtd_result")
.option("user", "root")
.option("password", "000000")
.option("dbtable", "provinceavgcmp")
.save()
spark.close()
}
}
结果查询sql文章来源:https://www.toymoban.com/news/detail-791431.html
-- 订单总数
select
*
from shtd_result.usercontinueorder
order by totalorder desc
limit 5;
-- 订单总金额
select
*
from shtd_result.usercontinueorder
order by totalconsumption
limit 5;
-- 客户主键
select
*
from shtd_result.usercontinueorder
order by userid desc
limit 5;
指标计算部分的难点就是多表查询的部分已经开窗函数的合理运用,因此熟练掌握HiveSQL中高级函数的部分是非常重要的,不然此部分将会很难完成文章来源地址https://www.toymoban.com/news/detail-791431.html
到了这里,关于全国职业院校技能大赛-大数据 离线数据处理模块-指标计算的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!