需求说明
统计连续三天下单且下单金额保持增长(第一天 < 第二天 < 第三天)的用户,在其基础上计算各个用户连续三天下单的总金额与下单量。
数据集
模拟数据集如下所示:
user1, 2022-05-01, 51
user1, 2022-05-02, 35
user1, 2022-05-02, 35
user1, 2022-05-02, 37
user1, 2022-05-03, 74
user1, 2022-05-04, 43
user1, 2022-05-05, 65
user1, 2022-05-06, 87
user1, 2022-05-07, 35
user2, 2022-05-01, 13
user2, 2022-05-02, 21
user2, 2022-05-03, 35
user2, 2022-05-04, 41
user2, 2022-05-05, 54
user2, 2022-05-06, 60
user3, 2022-05-01, 55
user3, 2022-05-02, 46
user3, 2022-05-03, 34
user3, 2022-05-04, 27
user3, 2022-05-05, 18
user3, 2022-05-06, 45
user4, 2022-05-01, 30
user4, 2022-05-01, 35
user4, 2022-05-02, 34
user4, 2022-05-03, 31
user4, 2022-05-03, 31
user4, 2022-05-04, 35
user4, 2022-05-05, 30
user4, 2022-05-06, 26
user5, 2022-05-01, 10
user5, 2022-05-01, 11
user5, 2022-05-01, 12
user5, 2022-05-02, 27
user5, 2022-05-03, 25
user5, 2022-05-04, 42
user5, 2022-05-04, 46
user5, 2022-05-04, 13
user5, 2022-05-05, 61
user5, 2022-05-06, 80
user5, 2022-05-07, 10
user5, 2022-05-11, 20
字段排列顺序: 用户 ID,下单时间,下单金额。
需求分析
1.连续三天
-
使用
lag
结合窗口函数分别获取前两天的日期 -
计算当前日期与一天前日期之间的差值是否为
1
-
计算一天前日期与两天前日期之间的差值是否为
1
2.连续增长
-
使用
lag
结合窗口函数分别获取前两天每天的下单总金额 -
计算当天下单总额是否大于一天前
-
计算一天前下单总额是否大于两天前
3.连续三天下单总金额
- 相邻的三天下单总金额求和,满足上面(1 和 2)两个条件
4.连续三天下单量
- 相邻的三天下单量求和,满足上面(1 和 2)两个条件
需求实现 —— Spark SQL API
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql._
object Continue3 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("Test")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// TODO 1.读取模拟数据集
val data: DataFrame = spark.read
.csv("test.txt")
.toDF("user_id", "order_date" , "order_amount")
.withColumn("order_date",functions.to_date($"order_date"))
// TODO 2.计算相邻的三天
// 创建窗口函数,以用户 ID 进行分组,按照下单日期顺序排列
val windowSpec: WindowSpec = Window.partitionBy("user_id").orderBy("order_date")
// 获取一天前的日期(前第一条数据),默认值设为:"1970-01-01"
val lagDay: Column = functions.lag("order_date", 1, "1970-01-01").over(windowSpec)
// 获取两天前的日期(前第二条数据),默认值设为:"1970-01-01"
val lag2Day: Column = functions.lag("order_date", 2, "1970-01-01").over(windowSpec)
// 获取当前日期与前一天日期的差值
val value1: Column = functions.datediff($"order_date", lagDay)
// 获取一天前日期与两天前日期的差值
val value2: Column = functions.datediff(lagDay, lag2Day)
// TODO 3.分别计算每个用户前两天的下单总金额和下单量
// 获取每天的下单总金额与下单量
val groupedOrders: DataFrame =
data.groupBy("user_id", "order_date")
.agg(
functions.sum("order_amount").alias("total_order_amount"),
functions.count("user_id").alias("total_order_cnt")
)
// 获取一天前的下单总金额
val lagMoney: Column = functions.lag("total_order_amount", 1).over(windowSpec)
// 获取两天前的下单总金额
val lag2Money: Column = functions.lag("total_order_amount", 2).over(windowSpec)
// 获取三天的下单总金额
val threeDayTotalMoney: Column = functions.sum("total_order_amount").over(windowSpec.rowsBetween(-2, 0))
// 获取三天的下单量
val threeDayTotalCnt: Column = functions.sum("total_order_cnt").over(windowSpec.rowsBetween(-2, 0))
// TODO 4.获取最终结果
groupedOrders
.withColumn("value1",value1)
.withColumn("value2",value2)
.withColumn("lagMoney",lagMoney)
.withColumn("lag2Money",lag2Money)
.withColumn("lagDay",lagDay)
.withColumn("lag2Day",lag2Day)
.withColumn("threeDayTotalMoney",threeDayTotalMoney)
.withColumn("threeDayTotalCnt",threeDayTotalCnt)
// 判断三天是否相邻且保持连续增长
.where($"value1" - $"value2" === 0 && $"total_order_amount" > $"lagMoney" && $"lagMoney" > $"lag2Money")
.show(false)
}
}
输出结果如下所示:
+-------+----------+------------------+---------------+------+------+--------+---------+----------+----------+------------------+----------------+
|user_id|order_date|total_order_amount|total_order_cnt|value1|value2|lagMoney|lag2Money|lagDay |lag2Day |threeDayTotalMoney|threeDayTotalCnt|
+-------+----------+------------------+---------------+------+------+--------+---------+----------+----------+------------------+----------------+
|user1 |2022-05-06|87.0 |1 |1 |1 |65.0 |43.0 |2022-05-05|2022-05-04|195.0 |3 |
|user2 |2022-05-03|35.0 |1 |1 |1 |21.0 |13.0 |2022-05-02|2022-05-01|69.0 |3 |
|user2 |2022-05-04|41.0 |1 |1 |1 |35.0 |21.0 |2022-05-03|2022-05-02|97.0 |3 |
|user2 |2022-05-05|54.0 |1 |1 |1 |41.0 |35.0 |2022-05-04|2022-05-03|130.0 |3 |
|user2 |2022-05-06|60.0 |1 |1 |1 |54.0 |41.0 |2022-05-05|2022-05-04|155.0 |3 |
+-------+----------+------------------+---------------+------+------+--------+---------+----------+----------+------------------+----------------+
字段解析如下:
root
|-- user_id: 用户 ID
|-- order_date: 当前日期
|-- total_order_amount: 当天的下单总金额
|-- total_order_cnt: 当天的下单总量
|-- value1: 当前日期与前一天日期的差值
|-- value2: 一天前日期与两天前日期的差值
|-- lagMoney: 一天前的下单总金额
|-- lag2Money: 两天前的下单总金额
|-- lagDay: 一天前日期
|-- lag2Day: 两天前日期
|-- threeDayTotalMoney: 连续三天的总下单金额
|-- threeDayTotalCnt: 连续三天的总下单量
需求实现 —— Spark SQL
import org.apache.spark.sql._
object Continue3_sql_way {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("Test")
.master("local[*]")
.getOrCreate()
// TODO 1.读取模拟数据集
spark.read
.csv("F:\\头歌相关信息\\国赛项目\\coder_first\\src\\main\\java\\task3\\test.txt")
.toDF("user_id", "order_date" , "order_amount")
.createOrReplaceTempView("data")
// TODO 2.获取各个用户每天的的下单总额与下单量
spark.sql(
"""
|select
| user_id,
| order_date,
| count(user_id) total_order_cnt,
| sum(order_amount) total_order_amount
|from
| data
|group by
| user_id,
| order_date
|""".stripMargin).createOrReplaceTempView("groupData")
// TODO 3.获取各个用户前两天的日期、单天下单总额、连续三天的下单总额、连续三天的下单总量
spark.sql(
"""
|select
| user_id,
| order_date,
| total_order_cnt,
| total_order_amount,
| lag(order_date,1,"1970-01-01") over(partition by user_id order by order_date) lagDay,
| lag(order_date,2,"1970-01-01") over(partition by user_id order by order_date) lag2Day,
| lag(total_order_amount,1,"1970-01-01") over(partition by user_id order by order_date) lagMoney,
| lag(total_order_amount,2,"1970-01-01") over(partition by user_id order by order_date) lag2Money,
|
| sum(total_order_amount) over(partition by user_id order by order_date rows between 2 preceding
| and current row) threeDayTotalMoney,
|
| sum(total_order_cnt) over(partition by user_id order by order_date rows between 2 preceding
| and current row) threeDayTotalCnt
|from
| groupData
|""".stripMargin).createOrReplaceTempView("windowData")
// TODO 4.判断是否连续三下单金额都保持增长
spark.sql(
"""
|select
| *
|from
| windowData
|where
| datediff(order_date,lagDay) = 1
| and
| datediff(lagDay,lag2Day) = 1
| and
| total_order_amount > lagMoney
| and
| lagMoney > lag2Money
|""".stripMargin).show(false)
}
}
输出结果如下所示:
+-------+-----------+---------------+------------------+-----------+-----------+--------+---------+------------------+----------------+
|user_id|order_date |total_order_cnt|total_order_amount|lagDay |lag2Day |lagMoney|lag2Money|threeDayTotalMoney|threeDayTotalCnt|
+-------+-----------+---------------+------------------+-----------+-----------+--------+---------+------------------+----------------+
|user1 | 2022-05-06|1 |87.0 | 2022-05-05| 2022-05-04|65.0 |43.0 |195.0 |3 |
|user2 | 2022-05-03|1 |35.0 | 2022-05-02| 2022-05-01|21.0 |13.0 |69.0 |3 |
|user2 | 2022-05-04|1 |41.0 | 2022-05-03| 2022-05-02|35.0 |21.0 |97.0 |3 |
|user2 | 2022-05-05|1 |54.0 | 2022-05-04| 2022-05-03|41.0 |35.0 |130.0 |3 |
|user2 | 2022-05-06|1 |60.0 | 2022-05-05| 2022-05-04|54.0 |41.0 |155.0 |3 |
+-------+-----------+---------------+------------------+-----------+-----------+--------+---------+------------------+----------------+
这里使用了 Spark SQL API 与 Spark SQL 两种方式来进行解决,思路都一样,希望对你有所帮助。文章来源:https://www.toymoban.com/news/detail-462059.html
如果你有碰到复杂的指标计算需求无法解决,欢迎各位同学在评论区或者私信与我进行讨论(无偿),学无止境,冲呀!文章来源地址https://www.toymoban.com/news/detail-462059.html
到了这里,关于【指标计算】Spark 统计连续三天下单且下单额保持增长的用户的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!