综合案例
以下案例结合了spark sql、dataframe、udf、读写文件等操作文章来源:https://www.toymoban.com/news/detail-845677.html
# encoding:utf8
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
#需求1:各省销售额的统计
#需求2:T0P3销售省份中,有多少店铺达到过日销售额1000+
#需求3:T0P3省份中,各省的平均单单价
#需求4:T0P3省份中,各个省份的支付类型比例
#receivable:订单金额
#storeProvince:店铺省份
#dateTs:订单的销售日期
#payType:支付类型
#storeID:店铺ID
if __name__ == '__main__':
spark = SparkSession.builder.\
appName("SparkSQL ExampLe").\
master("local[*]").\
config("spark.sql.shuffle.partitions","2").\ #local模式下调整为2
config("spark.sql.warehouse.dir","hdfs://node1:8020/user/hive/warehouse").\ #hdfs配置
config("hive.metastore.uris","thrift://node3:9083").\ # metastore配置,配置spark on hive
enableHiveSupport().\
getorCreate()
#1.读取数据
#省份信息,缺失值过滤,同时省份信息中会有”nu1”字符串
#订单的金额,数据集中有的订单的金额是单笔超过10800的,这些是测试数
#列值剪(SparkSQL会自动做这个优化)
df = spark.read.format("json").load("../../data/input/mini.json").\
dropna(thresh=1, subset=['storeProvince']).\
filter("storeProvince != 'null'").\
filter("receivable 10000").\
select("storeProvince","storeID","receivable","dateTs","payType")
# 筛选必须数据
#T0D0需求1:各省销售额统计
province_sale_df = df.groupBy("storeProvince").sum("receivable").\
withColumnRenamed("sum(receivable)", "money").\ # sum求和后,新生成的列名默认为聚合函数名和操作的列名,此处重命名
withColumn("money", F.round("money",2)).\ # round四舍五入
orderBy ("money",ascending=False)
province_sale_df.show(truncate=False)
#写出MySQL
province_sale_df.write.mode("overwrite").\
format("jdbc").\
option("url","jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8").\
option("dbtable","province_sale").\
option("user","root").\
option("password","2212072ok1").\
option("encoding","utf-8").\
save()
#写出Hive表saveAsTable可以写出表要求已经配置好Spark On Hive,配置好后
#会将表写入到HiVe的数据仓库中
province_sale_df.write.mode ("overwrite").saveAsTable ("default.province_sale","parquet")
#T000需求2:T0P3销售省份中,有多少店铺达到过日销售额1000+
#2.1先找到T0P3的销售省份
top3_province_df = province_sale_df.limit(3).select("storeProvince").withColumnRenamed("storeProvince","top3_storeProvince") #这里需要对top3的stroprovince列重命名,否则下面groupby会有问题
#2.2和原始的DF进行内关联,数据关联后,就是全部都是T0P3省份的销售数据了
top3_province_df_joined = df.join(top3_province_df, on = df['storeProvince'] == top3_province_df['top3_province'])
top3_province_df_joined.persist(StorageLevel.MEMORY_AND_DISK)
#广东省1 2021-01-03 1005
#广东省2
#广东省3·
#湖南省1.
#湖南省2。
#广东省33
#湖南省123
#from_unixtime的精度是秒级,数据的精度是毫秒级,要对数据进行精度的裁剪
province_hot_store_count_df = top3_province_df_joined.groupBy("storeProvince","storeID"
F.from_unixtime(df['dateTs']sbstr(0,10),"yyyy-MM-dd").alias("day")).\
sum("receivable").withColumnRenamed("sum(receivable)","money").\ #这里withColumnRenamed和上面alias都是重命名,只不过alias返回column对象,withColumnRenamed返回df
filter("money > 1000").\
dropDuplicates(subset=["storeID"]).\
groupBy("storeProvince").count()
province_hot_store_count_df.show()
#写出MySQL
province_hot_store_count_df.write.mode("overwrite").\
format("jdbc").\
option("url","jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&characterEncoding=utf8").\
option("dbtable","province_hot_store_count").\
option("user","root").\
option("password","22120720k1").\
option("encoding","utf-8").\
save()
#写出Hive
province_hot_store_count_df.write.mode("overwrite").saveAsTable("default.province_hot_store_count","parquet")
#T0D0需求3:T0P3省份中各个省份的平均订单价格(单单价)
top3_province_order_avg_df = top3_province_df_joined.groupBy("storeProvince").\
avg("receivable").\
withColumnRenamed("avg(receivable)","money").\
withColumn("money",F.round("money",2)).\
orderBy("money",ascending=False)
top3_province_order_avg_df.show(truncate=False)
#T0D0需求4:T0P3省份中,各个省份的支付比例
#湖南省支付宝33%
#湖南省现金36%
#广东省微信33%
top3_province_df_joined.createTempView("province_pay")
# 由于spark中没有将数字转换为含百分号的字符串函数,定义udf实现
def udf_func(percent):
return str(round(percent 100,2))+"%"
#注册UDF
my_udf = F.udf(udf_func,StringType())
# 下面group by的total和storeProvince作用相同,只是为了语法正确加上,在 SELECT 列表中,除了聚合函数外,所有列都必须在 GROUP BY 子句中明确指定。否则,大多数数据库系统会抛出错误
pay_type_df spark.sql("""
SELECT storeProvince,payType, (COUNT(payType) / total) AS percent FROM
(SELECT storeProvince,payType,count(1)OVER(PARTITION BY storeProvince)AS total FROM province_pay)AS sub
GROUP BY storeProvince,payType,total
""").withColumn("percent",my_udf("percent"))
top3_province_df_joined.unpersist()
上面案例中,为什么 GROUP BY 需要包含 total 列:文章来源地址https://www.toymoban.com/news/detail-845677.html
语义一致性:在 SELECT 子句中,你使用了 COUNT(payType) / total 来计算百分比。由于 total 是通过窗口函数计算得出的,并且不是直接通过 GROUP BY 子句中的列聚合得到的,为了让查询的语义更加明确和一致,最好将 total 列也包含在 GROUP BY 子句中。这样,数据库知道如何根据 storeProvince、payType 和 total 的每个唯一组合来分组结果,并计算相应的 COUNT(payType)。
避免错误:如果不包含 total 在 GROUP BY 子句中,某些数据库系统可能会抛出错误,因为它们无法确定如何在没有显式分组的情况下处理这个非聚合列。即使某些数据库系统允许这样做(例如,通过假设 total 对于每个 storeProvince 和 payType 组合都是相同的),这也是一个不安全的做法,因为它可能导致逻辑错误或不可预测的结果。
逻辑准确性:在计算百分比时,确保每个 storeProvince 和 payType 组合都与其对应的 total 值匹配是非常重要的。通过将 total 列包含在 GROUP BY 子句中,你确保了这个匹配是精确的。
到了这里,关于spark 编程案例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!