Python进阶知识:整理1 -> pySpark入门

这篇具有很好参考价值的文章主要介绍了Python进阶知识:整理1 -> pySpark入门。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1 编写执行入口 

# 1.导包
from pyspark import SparkConf, SparkContext

# 2. 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")

# 3. 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)   # 执行入口

# 4.打印pySpark的运行版本
# print(sc.version)

# 5.停止SparkContext对象的运行
sc.stop()

pySpark大数据分析过程分为3步:数据输入、数据计算、数据输出 ,以下内容将重点介绍这三个过程


 

2 数据输入

在数据输入完成后,都会得到一个RDD类的对象(RDD全称为弹性分布式数据集)

# 1.构建执行环境入口对象
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 2.通过parallelize方法将Python对象加载到Spark内,成为RDD对象
# 通过sc对象构建RDD
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((6, 7, 8, 9, 10))
rdd3 = sc.parallelize("adjsjfjsg")
rdd4 = sc.parallelize({1, 2, 3, 4})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})

# 如果要查看RDD对象的内容,可以通过collect方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())


# 3.用textFiled方法,读取文件数据加载到Spark内,成为RDD对象
rdd6 = sc.textFile("D:/hello.txt")
print(rdd6.collect())


sc.stop()

Python进阶知识:整理1 -> pySpark入门,Python,python,spark

3 数据计算

3.1 map算子

map算子是将RDD的数据进行一条条处理(处理的逻辑基于map算子接收的处理函数),返回新的RDD

import os
os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe"  # 设置环境变量,因为Spark找不到python解释器在什么地方

# 构建执行环境入口对象
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 1. map 算子
rdd = sc.parallelize([1, 2, 3, 4])

# 通过map方法将全部的元素都乘10
rdd_map = rdd.map(lambda x: x * 10)

print(rdd_map.collect())

# 链式调用
rdd_map1 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)
print(rdd_map1.collect())

Python进阶知识:整理1 -> pySpark入门,Python,python,spark

3.2 flatMap算子

对RDD进行map操作后,进行解除嵌套的作用

import os
os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe"  # 设置环境变量

# 构建执行环境入口对象
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

rdd = sc.parallelize(["a b c", "d e f", "h i j"])
# 需求:将RDD数据里面的一个个单词都提取出来
rdd2 = rdd.map(lambda x: x.split(" "))
print(f"map操作后的结果:{rdd2.collect()}")

#解嵌套
rdd3 = rdd.flatMap(lambda x: x.split(" "))
print(f"flatMap操作后的结果:{rdd3.collect()}")

Python进阶知识:整理1 -> pySpark入门,Python,python,spark

3.3 reduceByKey算子

reduceByKey算子:
       
功能:针对(K,V)类型的数据,按照K进行分组,然后根据你提供的聚合逻辑,完成
        组内数据(value)的聚合操作。

        (K,V)类型的数据 -> 二元元组

import os
os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe"  # 设置环境变量

# 构建执行环境入口对象
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5)])
result = rdd.reduceByKey(lambda x, y: x + y)

print(result.collect())

Python进阶知识:整理1 -> pySpark入门,Python,python,spark

3.4 单词计数案例

import os
os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe"  # 设置环境变量

# 1.构建执行环境入口对象
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 2.读取数据文件
rdd = sc.textFile("D:/hello.txt")

word_rdd = rdd.flatMap(lambda line: line.split(" "))
# print(word_rdd.collect())

#  3.对数据进行转换为二元元组
word_count_rdd = word_rdd.map(lambda word: (word, 1))

# 4. 对二元元组进行聚合
word_count_rdd_result = word_count_rdd.reduceByKey(lambda a, b: a + b)

print(word_count_rdd_result.collect())

3.5 filter算子

过滤想要的数据,进行保留

import os
os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe"  # 设置环境变量

# 1.构建执行环境入口对象
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9,10])
filter_rdd = rdd.filter(lambda x: x % 2 == 0)  # 得到True则保留

print(filter_rdd.collect())

Python进阶知识:整理1 -> pySpark入门,Python,python,spark

3.6 distinct算子

对RDD数据进行去重,返回新的RDD

import os
os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe"  # 设置环境变量

# 1.构建执行环境入口对象
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 1, 2, 3, 4])
distinct_rdd = rdd.distinct()

print(distinct_rdd.collect())

Python进阶知识:整理1 -> pySpark入门,Python,python,spark

3.7 sortBy算子

对RDD数据进行排序,基于你指定的排序依据

import os
os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe"  # 设置环境变量

# 1.构建执行环境入口对象
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 2.读取数据文件
rdd = sc.textFile("D:/hello.txt")

word_rdd = rdd.flatMap(lambda line: line.split(" "))
# print(word_rdd.collect())

#  3.对数据进行转换为二元元组
word_count_rdd = word_rdd.map(lambda word: (word, 1))

# 4. 对二元元组进行聚合
word_count_rdd_result = word_count_rdd.reduceByKey(lambda a, b: a + b)

# 5.对步骤四求的结果进行排序
word_count_rdd_result_sort = word_count_rdd_result.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
# 参数1设置排序的依据;参数2设置升序还是降序;参数3全局排序需要设置分区数为1
print(word_count_rdd_result_sort.collect())

3.8 数据计算综合案例

准备需要的文件

Python进阶知识:整理1 -> pySpark入门,Python,python,spark


import json
import os
os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe"  # 设置环境变量

# 1.构建执行环境入口对象
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# TODO 需求1:城市销售额排名
# 1.1 读取文件得到RDD
rdd = sc.textFile("D:/PyCharm_projects/python_study_projects/text/orders.txt")
# 1.2 取出JSON字符串
rdd_json = rdd.flatMap(lambda x: x.split("|"))
# print(rdd_json.collect())
# 1.3 json字符串转为字典
rdd_dict = rdd_json.map(lambda x: json.loads(x))
# print(rdd_dict.collect())
# 1.4 取出城市和销售额数据
# (城市, 销售额)
rdd_city_with_money = rdd_dict.map(lambda x: (x["areaName"], int(x["money"])))
# 1.5 按照城市分组
rdd_group = rdd_city_with_money.reduceByKey(lambda x, y: x + y)
#  1.6 按照销售额降序排序
result_rdd1 = rdd_group.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print(f"需求1的结果是:{result_rdd1.collect()}")



# TODO 需求2:全部城市有哪些商品类别在售卖
# 2.1 取出所有的商品类别
category_rdd = rdd_dict.map(lambda x: x["category"]).distinct()
print(f"需求2的结果是:{category_rdd.collect()}")



# TODO 需求3:北京市有哪些商品类别在售卖
# 3.1 过滤北京市的数据
beijing_data_rdd = rdd_dict.filter(lambda x: x["areaName"] == "北京")
# 3.2 取出所有商品类别
beijing_category_data_rdd = beijing_data_rdd.map(lambda x: x["category"]).distinct()
print(f"需求3的结果是:{beijing_category_data_rdd.collect()}")

Python进阶知识:整理1 -> pySpark入门,Python,python,spark

4 数据输出

import os
os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe"  # 设置环境变量
os.environ["HADOOP_HOME"] = "D:/Hadoop/hadoop-3.0.0"  # 输出为文件需要的配置

# 1.构建执行环境入口对象
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# conf.set("spark.default.parallelism", 1)  # 设置全局的并行度为1
sc = SparkContext(conf=conf)

# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])

# 1. 将RDD数据输出为Python对象
"""
    collect 算子:  ->  将RDD输出为list对象
        功能:将RDD各个分区内的数据统一收集到Driver中,形成一个List对象
        用法:rdd.collect()
"""
# print(rdd.collect())

"""
    reduce 算子:
        功能:将RDD数据按照你传入的逻辑进行聚合
        用法:rdd.reduce(func)
        # func: (T, T) ->  T      返回值和参数要求类型相同
"""
# result = rdd.reduce(lambda x, y: x + y)
# print(result)


"""
    take 算子:
        功能:取RDD的前N个元素,组合成list返回给你
        用法:rdd.take(N)
"""
# result1 = rdd.take(3)
# print(result1)

"""
    count 算子:
        功能:计算RDD有多少条数据,返回值是一个数字
        用法:rdd.count()
"""
# result2 = rdd.count()
# print(result2)




# 2. 将RDD数据输出为文件
"""
    saveAsTextFile 算子:
        功能:将RDD的数据写入文本文件中
        用法:rdd.saveAsTextFile(path)
"""
rdd.saveAsTextFile("D:/output")

5 pySaprk综合案例

表示当前行还未写完,下一行仍是这行的内容

以下都采取链式的写法:文章来源地址https://www.toymoban.com/news/detail-800359.html

import os
os.environ["PYSPARK_PYTHON"] = "D:/python3.7/python.exe"  # 设置环境变量

# 1.构建执行环境入口对象
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
conf.set("spark.default.parallelism", 1)  # 设置全局的并行度为1
sc = SparkContext(conf=conf)

# 读取文件
file_rdd = sc.textFile("D:/PyCharm_projects/python_study_projects/text/search_log.txt")

# TODO 需求1:热门搜索时间段Top3 (小时精度)
# 1.1 取出所有的时间并转换为小时
# 1.2 转换为(小时,1)的二元元组
# 1.3 Key分组,集合Value
# 1.4 降序排序,取前3
# \表示当前行还未写完,下一行仍是这行的内容
result1 = file_rdd.map(lambda x: x.split("\t")).\
    map(lambda x: x[0][:2]).\
    map(lambda x: (x, 1)).\
    reduceByKey(lambda a, b: a + b).\
    sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
    take(3)

print(f"需求1的结果是:{result1}")





# TODO 需求2:热门搜索词Top3
# 2.1 取出全部的搜索词
# 2.2 (词,1) 二元元组
# 2.3 分组集合
# 2.4 排序,取Top3
result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\
    reduceByKey(lambda a, b: a + b).\
    sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
    take(3)
print(f"需求2的结果是:{result2}")






# TODO 需求3:统计黑马程序员关键字在什么时段被搜索的最多
# 3.1 过滤内容,只保留黑马程序员关键字
# 3.2 转换为(小时, 1)  的二元元组
# 3.3 Key 分组聚合Value
# 3.4 排序,取前1
result3 = file_rdd.map(lambda x: x.split("\t")).\
    filter(lambda x: x[2] == "黑马程序员").\
    map(lambda x: (x[0][:2], 1)).\
    reduceByKey(lambda a, b: a + b).\
    sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
    take(1)
print(f"需求3的结果是:{result3}")






# TODO 需求4:将数据转换为JSON格式,写到文件中
# 4.1 转换为JSON格式的RDD
# 4.2 写出到文件
file_rdd.map(lambda x: x.split("\t")).\
    map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\
    saveAsTextFile("D:\output_json")  # hadoop报错,无法实现,是我自己的环境问题,代码没有问题

sc.stop()

到了这里,关于Python进阶知识:整理1 -> pySpark入门的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 林子雨 VirtualBox + Ubuntu[linux] 配置 java、hadoop、Spark[python]、pyspark快速配置流程

    按照步骤快速执行shell,最快速配置。 读者可以根据该篇随记快速回顾流程,以及用到的shell指令和相关配置文件。 是林老师教程的精简版,初次配置者只能作为流程参考,主要和林子雨Spark[python]版课程配套。  林老师厦大实验指南链接如下: Spark编程基础(Python版)教材官

    2024年04月12日
    浏览(34)
  • 后悔没早学这份Python神级文档!2023最新入门到进阶核心知识点学习文档!

    如今学 Python 的程序员越来越多,甚至不少人会把 Python 当作第一语言来学习。不过尽管 Python 功能强大上手轻松,但并不代表它的学习曲线不陡峭,得来全不费工夫。 当推开 Python 的大门,你会发现 Python 入门简单但精通很难。看似语法记得滚瓜烂熟,但一进入实际项目,就

    2024年02月06日
    浏览(36)
  • Python基础知识:整理10 异常相关知识

        当在主函数中没有捕获处理异常时,会报异常错误 处理后   

    2024年01月18日
    浏览(47)
  • Python计算机二级知识点整理

    1.  一个完整的二叉树包括根节点,左子树和右子树,不同的遍历方式的区别就是访问的顺序的不同,前序遍历是首先访问根节点然后再访问左子树和右子树,中序遍历是访问根节点在访问左子树和右子树之间,后序遍历是先访问左子树和右子树再访问根节点。 2.结构化程序

    2024年02月09日
    浏览(33)
  • 关于“Python”的核心知识点整理大全13

    目录 6.4.3 在字典中存储字典 6.5 小结 第7章 用户输入和while循环 7.1 函数 input()的工作原理 7.1.1 编写清晰的程序 7.1.2 使用 int()来获取数值输入 7.1.3 求模运算符 7.1.4 在 Python 2.7 中获取输入 7.2 while 循环简介 7.2.1 使用 while 循环 往期快速传送门👆(在文章最后): 6.4.3 在字典中

    2024年02月04日
    浏览(35)
  • 关于“Python”的核心知识点整理大全42

    目录 game_functions.py game_functions.py game_functions.py alien_invasion.py 14.4 小结 第15 章 生成数据 15.1 安装 matplotlib 15.1.1 在 Linux 系统中安装 matplotlib 15.1.2 在 OS X 系统中安装 matplotlib 注意 15.1.3 在 Windows 系统中安装 matplotlib 注意 15.1.4 测试 matplotlib 注意 15.1.5 matplotlib 画廊 15.2 绘制简单的

    2024年02月03日
    浏览(102)
  • 关于“Python”的核心知识点整理大全53

    目录 18.2.7 Django shell 注意 18.3 创建网页:学习笔记主页 18.3.1 映射 URL urls.py urls.py 注意 18.3.2 编写视图 views.py 18.3.3 编写模板 index.html 往期快速传送门👆(在文章最后): 感谢大家的支持!欢迎订阅收藏!专栏将持续更新! 输入一些数据后,就可通过交互式终端会话以编程方

    2024年01月25日
    浏览(32)
  • 关于“Python”的核心知识点整理大全33

    目录 12.8.3 将子弹存储到编组中 alien_invasion.py 注意 12.8.4 开火 game_functions.py 12.8.5 删除已消失的子弹 alien_invasion.py 12.8.6 限制子弹数量 settings.py game_functions.py 12.8.7 创建函数 update_bullets() game_functions.py alien_invasion.py 12.8.8 创建函数 fire_bullet() game_functions.py 12.9 小结 往期快速传送门

    2024年02月03日
    浏览(34)
  • 关于“Python”的核心知识点整理大全59

    目录 19.3.2 将数据关联到用户 1. 修改模型Topic models.py 2. 确定当前有哪些用户 3. 迁移数据库 注意 19.3.3 只允许用户访问自己的主题 views.py 19.3.4 保护用户的主题 views.py views.py 19.3.6 将新主题关联到当前用户 views.py 往期快速传送门👆(在文章最后): 感谢大家的支持!欢迎订阅

    2024年01月18日
    浏览(34)
  • 关于“Python”的核心知识点整理大全45

    目录 15.4.6 绘制直方图 die_visual.py 注意 15.4.7 同时掷两个骰子 dice_visual.py 15.4.8 同时掷两个面数不同的骰子 different_dice.py 15.5 小结 第 16 章 16.1 CSV 文件格式 16.1.1 分析 CSV 文件头 highs_lows.py 注意 16.1.2 打印文件头及其位置 highs_lows.py 往期快速传送门👆(在文章最后): 感谢大家

    2024年02月04日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包