【指标计算】Spark 统计连续三天下单且下单额保持增长的用户

这篇具有很好参考价值的文章主要介绍了【指标计算】Spark 统计连续三天下单且下单额保持增长的用户。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

需求说明

统计连续三天下单且下单金额保持增长(第一天 < 第二天 < 第三天)的用户,在其基础上计算各个用户连续三天下单的总金额与下单量。

数据集

模拟数据集如下所示:

user1, 2022-05-01, 51
user1, 2022-05-02, 35
user1, 2022-05-02, 35
user1, 2022-05-02, 37
user1, 2022-05-03, 74
user1, 2022-05-04, 43
user1, 2022-05-05, 65
user1, 2022-05-06, 87
user1, 2022-05-07, 35
user2, 2022-05-01, 13
user2, 2022-05-02, 21
user2, 2022-05-03, 35
user2, 2022-05-04, 41
user2, 2022-05-05, 54
user2, 2022-05-06, 60
user3, 2022-05-01, 55
user3, 2022-05-02, 46
user3, 2022-05-03, 34
user3, 2022-05-04, 27
user3, 2022-05-05, 18
user3, 2022-05-06, 45
user4, 2022-05-01, 30
user4, 2022-05-01, 35
user4, 2022-05-02, 34
user4, 2022-05-03, 31
user4, 2022-05-03, 31
user4, 2022-05-04, 35
user4, 2022-05-05, 30
user4, 2022-05-06, 26
user5, 2022-05-01, 10
user5, 2022-05-01, 11
user5, 2022-05-01, 12
user5, 2022-05-02, 27
user5, 2022-05-03, 25
user5, 2022-05-04, 42
user5, 2022-05-04, 46
user5, 2022-05-04, 13
user5, 2022-05-05, 61
user5, 2022-05-06, 80
user5, 2022-05-07, 10
user5, 2022-05-11, 20

字段排列顺序: 用户 ID,下单时间,下单金额。

需求分析

1.连续三天

  • 使用 lag 结合窗口函数分别获取前两天的日期

  • 计算当前日期与一天前日期之间的差值是否为 1

  • 计算一天前日期与两天前日期之间的差值是否为 1

2.连续增长

  • 使用 lag 结合窗口函数分别获取前两天每天的下单总金额

  • 计算当天下单总额是否大于一天前

  • 计算一天前下单总额是否大于两天前

3.连续三天下单总金额

  • 相邻的三天下单总金额求和,满足上面(1 和 2)两个条件

4.连续三天下单量

  • 相邻的三天下单量求和,满足上面(1 和 2)两个条件

需求实现 —— Spark SQL API

import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql._

object Continue3 {

    def main(args: Array[String]): Unit = {

        val spark: SparkSession = SparkSession
                .builder()
                .appName("Test")
                .master("local[*]")
                .getOrCreate()

        import spark.implicits._

        // TODO 1.读取模拟数据集
        val data: DataFrame = spark.read
                .csv("test.txt")
                .toDF("user_id", "order_date" , "order_amount")
                .withColumn("order_date",functions.to_date($"order_date"))

        // TODO 2.计算相邻的三天
        
        // 创建窗口函数,以用户 ID 进行分组,按照下单日期顺序排列
        val windowSpec: WindowSpec = Window.partitionBy("user_id").orderBy("order_date")

        // 获取一天前的日期(前第一条数据),默认值设为:"1970-01-01"
        val lagDay: Column = functions.lag("order_date", 1, "1970-01-01").over(windowSpec)

        // 获取两天前的日期(前第二条数据),默认值设为:"1970-01-01"
        val lag2Day: Column = functions.lag("order_date", 2, "1970-01-01").over(windowSpec)

        // 获取当前日期与前一天日期的差值
        val value1: Column = functions.datediff($"order_date", lagDay)

        // 获取一天前日期与两天前日期的差值
        val value2: Column = functions.datediff(lagDay, lag2Day)


        // TODO 3.分别计算每个用户前两天的下单总金额和下单量

        // 获取每天的下单总金额与下单量
        val groupedOrders: DataFrame =
            data.groupBy("user_id", "order_date")
                .agg(
                    functions.sum("order_amount").alias("total_order_amount"),
                    functions.count("user_id").alias("total_order_cnt")
                )

        // 获取一天前的下单总金额
        val lagMoney: Column = functions.lag("total_order_amount", 1).over(windowSpec)

        // 获取两天前的下单总金额
        val lag2Money: Column = functions.lag("total_order_amount", 2).over(windowSpec)

        // 获取三天的下单总金额
        val threeDayTotalMoney: Column = functions.sum("total_order_amount").over(windowSpec.rowsBetween(-2, 0))

        // 获取三天的下单量
        val threeDayTotalCnt: Column = functions.sum("total_order_cnt").over(windowSpec.rowsBetween(-2, 0))


        // TODO 4.获取最终结果
        groupedOrders
                .withColumn("value1",value1)
                .withColumn("value2",value2)
                .withColumn("lagMoney",lagMoney)
                .withColumn("lag2Money",lag2Money)
                .withColumn("lagDay",lagDay)
                .withColumn("lag2Day",lag2Day)
                .withColumn("threeDayTotalMoney",threeDayTotalMoney)
                .withColumn("threeDayTotalCnt",threeDayTotalCnt)
                // 判断三天是否相邻且保持连续增长
                .where($"value1" - $"value2" === 0 && $"total_order_amount" > $"lagMoney" && $"lagMoney" > $"lag2Money")
                .show(false)


    }
}

输出结果如下所示:

+-------+----------+------------------+---------------+------+------+--------+---------+----------+----------+------------------+----------------+
|user_id|order_date|total_order_amount|total_order_cnt|value1|value2|lagMoney|lag2Money|lagDay    |lag2Day   |threeDayTotalMoney|threeDayTotalCnt|
+-------+----------+------------------+---------------+------+------+--------+---------+----------+----------+------------------+----------------+
|user1  |2022-05-06|87.0              |1              |1     |1     |65.0    |43.0     |2022-05-05|2022-05-04|195.0             |3               |
|user2  |2022-05-03|35.0              |1              |1     |1     |21.0    |13.0     |2022-05-02|2022-05-01|69.0              |3               |
|user2  |2022-05-04|41.0              |1              |1     |1     |35.0    |21.0     |2022-05-03|2022-05-02|97.0              |3               |
|user2  |2022-05-05|54.0              |1              |1     |1     |41.0    |35.0     |2022-05-04|2022-05-03|130.0             |3               |
|user2  |2022-05-06|60.0              |1              |1     |1     |54.0    |41.0     |2022-05-05|2022-05-04|155.0             |3               |
+-------+----------+------------------+---------------+------+------+--------+---------+----------+----------+------------------+----------------+

字段解析如下:

root
 |-- user_id: 用户 ID
 |-- order_date: 当前日期
 |-- total_order_amount: 当天的下单总金额
 |-- total_order_cnt: 当天的下单总量
 |-- value1: 当前日期与前一天日期的差值
 |-- value2: 一天前日期与两天前日期的差值
 |-- lagMoney: 一天前的下单总金额
 |-- lag2Money: 两天前的下单总金额
 |-- lagDay: 一天前日期
 |-- lag2Day: 两天前日期
 |-- threeDayTotalMoney: 连续三天的总下单金额
 |-- threeDayTotalCnt: 连续三天的总下单量

需求实现 —— Spark SQL

import org.apache.spark.sql._

object Continue3_sql_way {

    def main(args: Array[String]): Unit = {

        val spark: SparkSession = SparkSession
                .builder()
                .appName("Test")
                .master("local[*]")
                .getOrCreate()

        // TODO 1.读取模拟数据集
        spark.read
                .csv("F:\\头歌相关信息\\国赛项目\\coder_first\\src\\main\\java\\task3\\test.txt")
                .toDF("user_id", "order_date" , "order_amount")
                .createOrReplaceTempView("data")

        // TODO 2.获取各个用户每天的的下单总额与下单量
        spark.sql(
            """
              |select
              |     user_id,
              |     order_date,
              |     count(user_id) total_order_cnt,
              |     sum(order_amount) total_order_amount
              |from
              |     data
              |group by
              |     user_id,
              |     order_date
              |""".stripMargin).createOrReplaceTempView("groupData")

        // TODO 3.获取各个用户前两天的日期、单天下单总额、连续三天的下单总额、连续三天的下单总量
        spark.sql(
            """
              |select
              |     user_id,
              |     order_date,
              |     total_order_cnt,
              |     total_order_amount,
              |     lag(order_date,1,"1970-01-01") over(partition by user_id order by order_date) lagDay,
              |     lag(order_date,2,"1970-01-01") over(partition by user_id order by order_date) lag2Day,
              |     lag(total_order_amount,1,"1970-01-01") over(partition by user_id order by order_date) lagMoney,
              |     lag(total_order_amount,2,"1970-01-01") over(partition by user_id order by order_date) lag2Money,
              |
              |     sum(total_order_amount) over(partition by user_id order by order_date rows between 2 preceding
              |     and current row) threeDayTotalMoney,
              |
              |     sum(total_order_cnt) over(partition by user_id order by order_date rows between 2 preceding
              |     and current row) threeDayTotalCnt
              |from
              |     groupData
              |""".stripMargin).createOrReplaceTempView("windowData")


        // TODO 4.判断是否连续三下单金额都保持增长
        spark.sql(
            """
              |select
              |     *
              |from
              |     windowData
              |where
              |     datediff(order_date,lagDay) = 1
              |     and
              |     datediff(lagDay,lag2Day) = 1
              |     and
              |     total_order_amount > lagMoney
              |     and
              |     lagMoney > lag2Money
              |""".stripMargin).show(false)



    }
}

输出结果如下所示:

+-------+-----------+---------------+------------------+-----------+-----------+--------+---------+------------------+----------------+
|user_id|order_date |total_order_cnt|total_order_amount|lagDay     |lag2Day    |lagMoney|lag2Money|threeDayTotalMoney|threeDayTotalCnt|
+-------+-----------+---------------+------------------+-----------+-----------+--------+---------+------------------+----------------+
|user1  | 2022-05-06|1              |87.0              | 2022-05-05| 2022-05-04|65.0    |43.0     |195.0             |3               |
|user2  | 2022-05-03|1              |35.0              | 2022-05-02| 2022-05-01|21.0    |13.0     |69.0              |3               |
|user2  | 2022-05-04|1              |41.0              | 2022-05-03| 2022-05-02|35.0    |21.0     |97.0              |3               |
|user2  | 2022-05-05|1              |54.0              | 2022-05-04| 2022-05-03|41.0    |35.0     |130.0             |3               |
|user2  | 2022-05-06|1              |60.0              | 2022-05-05| 2022-05-04|54.0    |41.0     |155.0             |3               |
+-------+-----------+---------------+------------------+-----------+-----------+--------+---------+------------------+----------------+

这里使用了 Spark SQL API 与 Spark SQL 两种方式来进行解决,思路都一样,希望对你有所帮助。

如果你有碰到复杂的指标计算需求无法解决,欢迎各位同学在评论区或者私信与我进行讨论(无偿),学无止境,冲呀!文章来源地址https://www.toymoban.com/news/detail-462059.html

到了这里,关于【指标计算】Spark 统计连续三天下单且下单额保持增长的用户的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • HQL解决连续三天登陆问题

    统计连续登录天数超过3天的用户,输出信息包括:用户id,登录天数,起始时间,结束时间; 思路1:row_number() 1.通过对用户id进行开窗函数row_number,对登陆时间进行降序排列 2.使用date_sub(login_date,rn)函数进行日期求出差值日期 3.对user_id和diff_date分组求出时间的区间范围 4.对结果

    2024年02月11日
    浏览(42)
  • 创新案例|Amazon如何打造增长飞轮保持每年20%以上的营收增速

    作为世界五百强中的头部企业,亚马逊的价值定位经历了三次转变,从成为“地球上最大的书店”,到成为最大的综合网络零售商,再到成为“最以客户为中心的企业”,亚马逊最终以“客户中心”破除了对企业价值定位的束缚,不再拘泥于某个单一行业。正是这种兼具激进

    2024年02月06日
    浏览(45)
  • 连续三年增长,徐福记为什么越战越勇?

    30年,一个零食品牌能发生什么变化?对徐福记来说,这是一场漫长的拉力赛。 这个过程,是研究消费者喜好变迁的过程,是孜孜不倦创新原料、产品、生产工艺和先进技术的过程,更是徐福记证明自身品牌价值的过程——回顾过往,过去三年是市场不确定性弥漫的三年,但

    2024年02月01日
    浏览(38)
  • 【云计算】3台虚拟机完成Spark Yarn集群部署并编写Scala应用程序实现单词计数统计

    目录 1.准备环境          2.安装spark并配置环境 3.安装scala并配置环境 4.安装编辑器idea 5.编写Spark Scala应用程序实现单词计数统计 6.Spark On Yarn配置 虚拟机:vmware workstation16 linux版本:centOS 7 linux 分布式环境:hadoop 3.1.1 (1)创建三台虚拟机,并准备好linux环境和hadoop,确保h

    2023年04月15日
    浏览(46)
  • 010:连续跌3天,同时这三天收盘价都在20日均线下,第四天上涨的概率--以京泉华为例

    对于《连续跌三天,压第四天上涨的盈利计算》,我们可以继续优化这个策略,增加条件:同时三天都收盘在20日均线下。 因为我们上一篇《获取20日均线数据到excel表中》获得了20日均线数据,我们可以利用均线数据来编写新的脚本。这里我们用$京泉华(SZ002885)$为例子。 步骤

    2024年02月07日
    浏览(38)
  • 破局数据分析滞后难题,赋能企业高速增长的指标管理解决方案

    指标是什么? 业务发展过程中,企业内外部都会产生很多的业务数据,对这些数据进行采集、计算、落库、分析后,形成的统计结果称为指标。简单来说,指标是业务被拆解、量化后形成的数量特征,企业利用数据指标对业务进行精准的号脉,实现对业务的科学管理和有效优

    2024年03月09日
    浏览(54)
  • 【Matlab数理统计知识点合集】新手入门第十三天

    掌握随机数的产生 了解概率密度函数等函数的使用 掌握统计图表的绘制方法 随机数是专门的随机试验的结果。在统计学的不同技术中需要使用随机数,比如在从统计总体中抽取有代表性的样本的时候,或者在将实验动物分配到不同的试验组的过程中,或者在进行蒙特卡罗模

    2023年04月11日
    浏览(46)
  • SQL统计连续登陆3天的用户(连续活跃超3天用户)

    1. 数据准备 2. 方法一: 差值计算 user_id active_date rn 10001 2023-02-01 1 10001 2023-02-03 2 10001 2023-02-04 3 10001 2023-02-05 4 10002 2023-02-02 1 10002 2023-02-03 2 10002 2023-02-04 3 10002 2023-02-05 4 10002 2023-02-07 5 … … … user_id active_date rn sub_date 10001 2023-02-01 1 2023-01-31 10001 2023-02-03 2 2023-02-01 10001 2023-02-04 3

    2024年04月25日
    浏览(30)
  • 大数据之指标计算(6) -- 编写Hive SQL代码,根据dwd层dwd.fact_environment_data表,统计检测设备的每月平均湿度与厂内检测结果做对比存入Mysql数据库中

      本题来源于全国职业技能大赛之大数据技术赛项工业数据处理赛题 - 离线数据处理 - 指标计算 注:由于个人设备问题,代码执行结果以及最后数据显示结果将不会给出。   提示:以下是本篇文章正文内容,下面案例可供参考(使用Scala语言编写)   涉及组件:Hive 涉及知

    2024年02月08日
    浏览(56)
  • 实时数仓建设第2问:怎样使用flink sql快速无脑统计当天下单各流程(已发货,确认收货等等)状态的订单数量

    实时统计当天下单各流程状态(已支付待卖家发货,卖家通知物流揽收,待买家收货等等)中的订单数量。 订单表的binlog数据发送到kafka,flink从kafka接受消息进行指标统计。因为每笔订单的状态会发生变化,比如上午为【已支付待卖家发货】,这个时候【已支付待卖家发货】指标

    2024年02月16日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包