spark 编程案例

这篇具有很好参考价值的文章主要介绍了spark 编程案例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

综合案例

以下案例结合了spark sql、dataframe、udf、读写文件等操作

# encoding:utf8
from pyspark.sql import SparkSession 
from pyspark.sql import functions as F

#需求1:各省销售额的统计
#需求2:T0P3销售省份中,有多少店铺达到过日销售额1000+
#需求3:T0P3省份中,各省的平均单单价
#需求4:T0P3省份中,各个省份的支付类型比例
#receivable:订单金额
#storeProvince:店铺省份
#dateTs:订单的销售日期
#payType:支付类型
#storeID:店铺ID
if __name__ == '__main__':
    spark = SparkSession.builder.\
        appName("SparkSQL ExampLe").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions","2").\ #local模式下调整为2
        config("spark.sql.warehouse.dir","hdfs://node1:8020/user/hive/warehouse").\ #hdfs配置
        config("hive.metastore.uris","thrift://node3:9083").\ # metastore配置,配置spark on hive
        enableHiveSupport().\
        getorCreate()
    #1.读取数据
    #省份信息,缺失值过滤,同时省份信息中会有”nu1”字符串
    #订单的金额,数据集中有的订单的金额是单笔超过10800的,这些是测试数
    #列值剪(SparkSQL会自动做这个优化)
    df = spark.read.format("json").load("../../data/input/mini.json").\
        dropna(thresh=1, subset=['storeProvince']).\
        filter("storeProvince != 'null'").\
        filter("receivable 10000").\
        select("storeProvince","storeID","receivable","dateTs","payType") 

    # 筛选必须数据
    #T0D0需求1:各省销售额统计
    province_sale_df = df.groupBy("storeProvince").sum("receivable").\
        withColumnRenamed("sum(receivable)", "money").\ # sum求和后,新生成的列名默认为聚合函数名和操作的列名,此处重命名
        withColumn("money", F.round("money",2)).\ # round四舍五入
        orderBy ("money",ascending=False)
        province_sale_df.show(truncate=False)
    #写出MySQL
    province_sale_df.write.mode("overwrite").\
        format("jdbc").\
        option("url","jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8").\
        option("dbtable","province_sale").\
        option("user","root").\
        option("password","2212072ok1").\
        option("encoding","utf-8").\
        save()
    #写出Hive表saveAsTable可以写出表要求已经配置好Spark On Hive,配置好后
    #会将表写入到HiVe的数据仓库中
    province_sale_df.write.mode ("overwrite").saveAsTable ("default.province_sale","parquet")

    #T000需求2:T0P3销售省份中,有多少店铺达到过日销售额1000+
    #2.1先找到T0P3的销售省份
    top3_province_df = province_sale_df.limit(3).select("storeProvince").withColumnRenamed("storeProvince","top3_storeProvince") #这里需要对top3的stroprovince列重命名,否则下面groupby会有问题
    #2.2和原始的DF进行内关联,数据关联后,就是全部都是T0P3省份的销售数据了
    top3_province_df_joined = df.join(top3_province_df, on = df['storeProvince'] == top3_province_df['top3_province'])
    top3_province_df_joined.persist(StorageLevel.MEMORY_AND_DISK)
    #广东省1 2021-01-03 1005
    #广东省2
    #广东省3·
    #湖南省1.
    #湖南省2。
    #广东省33
    #湖南省123
    #from_unixtime的精度是秒级,数据的精度是毫秒级,要对数据进行精度的裁剪
    province_hot_store_count_df = top3_province_df_joined.groupBy("storeProvince","storeID"
    F.from_unixtime(df['dateTs']sbstr(0,10),"yyyy-MM-dd").alias("day")).\ 
        sum("receivable").withColumnRenamed("sum(receivable)","money").\ #这里withColumnRenamed和上面alias都是重命名,只不过alias返回column对象,withColumnRenamed返回df
        filter("money > 1000").\
        dropDuplicates(subset=["storeID"]).\
        groupBy("storeProvince").count()
        province_hot_store_count_df.show()

    #写出MySQL
    province_hot_store_count_df.write.mode("overwrite").\
        format("jdbc").\
        option("url","jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8").\
        option("dbtable","province_hot_store_count").\
        option("user","root").\
        option("password","22120720k1").\
        option("encoding","utf-8").\
       save()
    #写出Hive
    province_hot_store_count_df.write.mode("overwrite").saveAsTable("default.province_hot_store_count","parquet")

    #T0D0需求3:T0P3省份中各个省份的平均订单价格(单单价)
    top3_province_order_avg_df = top3_province_df_joined.groupBy("storeProvince").\
        avg("receivable").\
        withColumnRenamed("avg(receivable)","money").\
        withColumn("money",F.round("money",2)).\
        orderBy("money",ascending=False)
    top3_province_order_avg_df.show(truncate=False)

    #T0D0需求4:T0P3省份中,各个省份的支付比例
    #湖南省支付宝33%
    #湖南省现金36%
    #广东省微信33%
    top3_province_df_joined.createTempView("province_pay")
    # 由于spark中没有将数字转换为含百分号的字符串函数,定义udf实现
    def udf_func(percent):
        return str(round(percent 100,2))+"%"
    #注册UDF
    my_udf = F.udf(udf_func,StringType())
    # 下面group by的total和storeProvince作用相同,只是为了语法正确加上,在 SELECT 列表中,除了聚合函数外,所有列都必须在 GROUP BY 子句中明确指定。否则,大多数数据库系统会抛出错误
    pay_type_df spark.sql("""
        SELECT storeProvince,payType, (COUNT(payType) / total) AS percent FROM
            (SELECT storeProvince,payType,count(1)OVER(PARTITION BY storeProvince)AS total FROM province_pay)AS sub
        GROUP BY storeProvince,payType,total
    """).withColumn("percent",my_udf("percent"))
    top3_province_df_joined.unpersist()
    

上面案例中,为什么 GROUP BY 需要包含 total 列:文章来源地址https://www.toymoban.com/news/detail-845677.html

语义一致性:在 SELECT 子句中,你使用了 COUNT(payType) / total 来计算百分比。由于 total 是通过窗口函数计算得出的,并且不是直接通过 GROUP BY 子句中的列聚合得到的,为了让查询的语义更加明确和一致,最好将 total 列也包含在 GROUP BY 子句中。这样,数据库知道如何根据 storeProvince、payType 和 total 的每个唯一组合来分组结果,并计算相应的 COUNT(payType)。

避免错误:如果不包含 total 在 GROUP BY 子句中,某些数据库系统可能会抛出错误,因为它们无法确定如何在没有显式分组的情况下处理这个非聚合列。即使某些数据库系统允许这样做(例如,通过假设 total 对于每个 storeProvince 和 payType 组合都是相同的),这也是一个不安全的做法,因为它可能导致逻辑错误或不可预测的结果。

逻辑准确性:在计算百分比时,确保每个 storeProvince 和 payType 组合都与其对应的 total 值匹配是非常重要的。通过将 total 列包含在 GROUP BY 子句中,你确保了这个匹配是精确的。

到了这里,关于spark 编程案例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Spark大数据分析与实战笔记(第三章 Spark RDD 弹性分布式数据集-02)

    人生很长,不必慌张。你未长大,我要担当。 传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。Spark中的RDD可以很好的解决这一缺点。 RDD是Spark提供的最重要的抽象概念

    2024年02月22日
    浏览(96)
  • 大数据开源框架环境搭建(七)——Spark完全分布式集群的安装部署

    前言:七八九用于Spark的编程实验 大数据开源框架之基于Spark的气象数据处理与分析_木子一个Lee的博客-CSDN博客_spark舆情分析 目录 实验环境: 实验步骤: 一、解压 二、配置环境变量:  三、修改配置文件  1.修改spark-env.sh配置文件: 2.修改配置文件slaves: 3.分发配置文件:

    2024年02月11日
    浏览(54)
  • 云计算与大数据第16章 分布式内存计算平台Spark习题

    1、Spark是Hadoop生态(  B  )组件的替代方案。 A. Hadoop     B. MapReduce        C. Yarn             D.HDFS 2、以下(  D  )不是Spark的主要组件。 A. Driver      B. SparkContext       C. ClusterManager D. ResourceManager 3、Spark中的Executor是(  A  )。 A.执行器      B.主节

    2024年02月14日
    浏览(119)
  • 分布式计算中的大数据处理:Hadoop与Spark的性能优化

    大数据处理是现代计算机科学的一个重要领域,它涉及到处理海量数据的技术和方法。随着互联网的发展,数据的规模不断增长,传统的计算方法已经无法满足需求。因此,分布式计算技术逐渐成为了主流。 Hadoop和Spark是目前最为流行的分布式计算框架之一,它们都提供了高

    2024年01月23日
    浏览(57)
  • 数据存储和分布式计算的实际应用:如何使用Spark和Flink进行数据处理和分析

    作为一名人工智能专家,程序员和软件架构师,我经常涉及到数据处理和分析。在当前大数据和云计算的时代,分布式计算已经成为了一个重要的技术方向。Spark和Flink是当前比较流行的分布式计算框架,它们提供了强大的分布式计算和数据分析功能,为数据处理和分析提供了

    2024年02月16日
    浏览(62)
  • 使用分布式HTTP代理爬虫实现数据抓取与分析的案例研究

    在当今信息爆炸的时代,数据已经成为企业决策和发展的核心资源。然而,要获取大规模的数据并进行有效的分析是一项艰巨的任务。为了解决这一难题,我们进行了一项案例研究,通过使用分布式HTTP代理爬虫,实现数据抓取与分析的有效整合。本文旨在分享我们的研究成果

    2024年02月15日
    浏览(52)
  • 分布式计算框架:Spark、Dask、Ray 分布式计算哪家强:Spark、Dask、Ray

    目录 什么是分布式计算 分布式计算哪家强:Spark、Dask、Ray 2 选择正确的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式计算是一种计算方法,和集中式计算是相对的。 随着计算技术的发展, 有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成

    2024年02月11日
    浏览(70)
  • Spark单机伪分布式环境搭建、完全分布式环境搭建、Spark-on-yarn模式搭建

    搭建Spark需要先配置好scala环境。三种Spark环境搭建互不关联,都是从零开始搭建。 如果将文章中的配置文件修改内容复制粘贴的话,所有配置文件添加的内容后面的注释记得删除,可能会报错。保险一点删除最好。 上传安装包解压并重命名 rz上传 如果没有安装rz可以使用命

    2024年02月06日
    浏览(83)
  • spark分布式解压工具

    ​ spark解压缩工具,目前支持tar、gz、zip、bz2、7z压缩格式,默认解压到当前路下,也支持自定义的解压输出路径。另外支持多种提交模式,进行解压任务,可通过自定义配置文件,作为spark任务的资源设定 2.1 使用hadoop的FileSystem类,对tos文件的进行读取、查找、写入等操作

    2024年02月02日
    浏览(48)
  • 分布式内存计算Spark环境部署与分布式内存计算Flink环境部署

    目录 分布式内存计算Spark环境部署 1.  简介 2.  安装 2.1【node1执行】下载并解压 2.2【node1执行】修改配置文件名称 2.3【node1执行】修改配置文件,spark-env.sh 2.4 【node1执行】修改配置文件,slaves 2.5【node1执行】分发 2.6【node2、node3执行】设置软链接 2.7【node1执行】启动Spark集群

    2024年02月08日
    浏览(76)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包