【黑马程序员】PySpark学习

这篇具有很好参考价值的文章主要介绍了【黑马程序员】PySpark学习。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Spark

Spark是什么

  • 定义:Apache Spark是用于大规模数据处理的统一分析引擎
  • 简单来说,spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算PB、TB乃至EB级别的海量数据

PySpark

  • Spark对Python语言的支持重点体现在Python第三方库:PySpark上
  • PySpark是由Spark官方开发的Python语言第三方库
  • Python开发者可以使用pip程序快速的安装PySpark库

基础准备

Spark库安装

  • 使用pip命令直接安装
pip install pyspark

构建pyspark执行环境入口对象

  • 想要使用pyspark库完成数据处理,首先需要构建一个执行环境入口对象
  • pyspark的执行环境入口对象是:类SparkContext的类对象
  • 代码示例
# *_*coding:utf-8 *_*
# 导包
from pyspark import SparkConf, SparkContext

# 创建SparkConf对象
# setMaster("local[*]")表明spark运行模式是单机,运行在本地
# setAppName("test") 给当前spark程序起一个名字
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于sparkConf构建sparkContext类对象
sc = SparkContext(conf=conf)
# 查看spark运行版本
print(sc.version)
# 停止pyspark程序
sc.stop()

PySpark的编程模型

  • SparkContext类对象,是PySpark变成中一切功能的入口
  • PySpark的编程,主要分为三大步骤
    • 数据输入:通过SparkContext类对象的成员方法,完成数据的读取操作,读取后得到RDD类对象
    • 数据处理计算:通过RDD类对象的成员方法完成各种数据计算的需求
    • 数据输出:将处理完成后的RDD对象,调用各种成员方法完成,写出文件,转换为list等操作

数据输入

RDD对象

  • PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象
  • RDD全称是:弹性分布式数据集
  • PySpark针对数据的处理都是以RDD对象作为载体
    • 数据存储在RDD内
    • 各类数据的计算方法,也都是RDD的成员方法
    • RDD的数据计算方法,返回值依旧是RDD对象

黑马程序员 pyspark,Python,大数据,python,spark,PySpark

Python数据容器转RDD对象

  • PySpark支持通过SparkContext对象的parallelize成员方法将list/tuple/set/dict/str转换为PySpark的RDD对象
  • 注意
    • 字符串会被拆分成一个个的字符存入RDD对象
    • 字典仅有key会被存入RDD对象
  • 代码示例
# *_*coding:utf-8 *_*
from pyspark import SparkContext, SparkConf

conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于sparkConf构建sparkContext类对象
sc = SparkContext(conf=conf)
# rdd = sc.parallelize(数据容器对象)
rdd1 = sc.parallelize([1, 2, 3, 4])
rdd2 = sc.parallelize((1, 2, 3, 4))
rdd3 = sc.parallelize({1, 2, 3, 4})
rdd4 = sc.parallelize({"a": 1, "b": 2})
rdd5 = sc.parallelize("erdtfhdsadg")

print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())

sc.stop()
  • 运行结果
[1, 2, 3, 4]                                                                    
[1, 2, 3, 4]
[1, 2, 3, 4]
['a', 'b']
['e', 'r', 'd', 't', 'f', 'h', 'd', 's', 'a', 'd', 'g']

读取文件转RDD对象

  • 通过textFile()方法将文件转成RDD对象

  • 代码示例

# *_*coding:utf-8 *_*
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster('local[*]').setAppName("test_spark_app")
sc = SparkContext(conf=conf)
rdd = sc.textFile('./读取文件转RDD对象.py')
print(rdd.collect())
sc.stop()

数据计算

map方法

  • PySpark的数据计算,都是基于RDD对象来进行的,RDD对象内置丰富的:成员方法(算子)

  • 功能:map算子是将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD

  • 语法:

rdd.map(func)
# func: f:(T)->U
# (T)->U 表示的是方法的定义
# ()表示传入参数,(T)表示传入一个参数,()表示没有传入参数
# T是泛型的代称,在这里表示任意类型
# U也是泛型的代称,在这里表示任意类型

# ->U 表示返回值
# (T)->U 总结起来的意思是:这是一个方法,接收一个参数传入,传入参数类型不限,返回一个返回值,返回值类型不限
# (A)->A 总结起来的意思是:这是一个方法,接收一个参数传入,传入参数类型不限,返回一个返回值,返回值和传入参数类型一致
  • 代码示例
# *_*coding:utf-8 *_*
from pyspark import SparkContext, SparkConf

# map方法
sc = SparkContext(conf=SparkConf().setAppName("test_spark_app").setMaster("local[*]"))
# 准备一个rdd
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 调用map方法进行计算
# 对传入的值先进行*10在进行+5
rdd1 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)
# 查看计算完毕后rdd中的内容
print(rdd1.collect())

flatMap方法

  • 功能:对rdd执行map操作,然后进行解除嵌套操作

  • 解除嵌套

# 嵌套的list
lst = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
# 解除嵌套后的list
lst = [1, 2, 3, 4, 5, 6, 7, 8, 9]
  • 代码示例
# *_*coding:utf-8 *_*

from pyspark import SparkContext, SparkConf

# flatmap方法
sc = SparkContext(conf=SparkConf().setAppName("test_spark_app").setMaster("local[*]"))
# 准备一个rdd
rdd = sc.parallelize(["fdsf fsf eerer", "fdtfydus adas ouore", "ier wir hdgi ldre"])
# 需求:将RDD数据里面的一个个单词提取出来
rdd1 = rdd.map(lambda x: x.split(" "))
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(f'map方法转化完内容:{rdd1.collect()}')
print(f'flatmap方法转化完内容:{rdd2.collect()}')
sc.stop()

reduceByKey方法

  • 功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成数据的聚合操作

  • 用法文章来源地址https://www.toymoban.com/news/detail-854711.html

rdd.reduceByKey(func)
# func:(V,V)->V
# 接收两个传入参数(类型要一致),返回一个返回值,类型和传入要求一致
  • 代码示例
# *_*coding:utf-8 *_*

from pyspark import SparkContext, SparkConf

# flatmap方法
sc = SparkContext(conf=SparkConf().setAppName("test_spark_app").setMaster("local[*]"))
# 准备一个rdd
rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])
# reduceByKey 自动按照key分组,然后根据聚合逻辑完成value聚合操作
# rdd中数据有两个key:'a'和'b'
# 聚合逻辑为:不断的将value值相加
rdd1 = rdd.reduceByKey(lambda a, b: a + b)
print(f'map方法转化完内容:{rdd1.collect()}')
sc.stop()

Filter方法

  • 功能:过滤想要的数据进行保留
  • 语法
rdd.filter(func)
# func: (T)->bool  传入一个随机类型参数,返回值必须是bool类型,返回为True的数据被保留,返回为False的被丢弃
  • 代码示例
# *_*coding:utf-8 *_*
from pyspark import SparkContext, SparkConf

sc = SparkContext(conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app"))
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 需求:保留奇数
# rdd1 = rdd.filter(lambda x: x % 2 == 1)
# print(rdd1.collect())
print(rdd.filter(lambda x: x % 2 == 1).collect())

distinct方法

  • 功能:对rdd数据进行去重返回新的rdd
  • 语法:rdd.distinct()
  • 代码示例:
# *_*coding:utf-8 *_*
from pyspark import SparkContext, SparkConf

sc = SparkContext(conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app"))
rdd = sc.parallelize([1, 1, 2, 4, 2, 1])
print(rdd.distinct().collect())

sortby方法

  • 功能:基于指定的排序规则对RDD数据进行排序
  • 语法:
rdd.sortBy(func, ascending=False, numPartitions=1)
# func: (T)-> U:告知RDD中的那个数据进行排序,比如lambda x: x[1]表示按照RDD中的第二列元素进行排序
# ascending True升序,False降序
# numPartitions:用多少分区排序
  • 代码示例
# *_*coding:utf-8 *_*
from pyspark import SparkContext, SparkConf

sc = SparkContext(conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app"))
rdd = sc.parallelize([("dasds", 5), ("ferew", 2), ("dsgyds", 7), ("dsdsfds", 4), ("dsfsfs", 2)])
print(rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1).collect())

数据输出

collect方法

  • 功能:将RDD各个分区的数据,统一收集到Driver中,形成一个list对象
  • 用法:rdd.collect(),返回值是一个list

reduce方法

  • 功能:对RDD数据按照传入的逻辑进行聚合
  • 语法:
rdd.reduce(func)
# func: (T,T) ->T
# 2个参数 1个返回值,返回值和传入参数要求类型一致
  • 代码示例
# *_*coding:utf-8 *_*
from pyspark import SparkContext, SparkConf

sc = SparkContext(conf=SparkConf().setMaster("local[*]").setAppName("test_spark_app"))
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.reduce(lambda a, b: a + b))

take方法

  • 功能:取RDD的前N个元素,组成list后返回
  • 用法:rdd.take(N)

count方法

  • 功能:计算RDD有多少条数据,返回一个数字
  • 用法:rdd.count()

输出到文件

  • saveAsTextFile方法
  • 功能:将RDD的数据写入到文本文件中
  • 支持 本地写出,hdfs等文件系统
  • 代码示例
sc.parallelize([1, 2, 3, 4, 5]).saveAsTextFile("输出文件路径")

到了这里,关于【黑马程序员】PySpark学习的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • SpringBoot-黑马程序员-学习笔记(三)

    目录 30.springboot整合MyBatis-plus 32.SSM整合 38.MP中的条件查询 小知识:许多放在类前面的注解,比如@Mapper,@Service都是将该类定义成一个Bean,交给spring管理 39.Service模块 1.创建普通springboot项目,勾选Mysql 框架 2.在pom包里面导入mybatis-plus的坐标 3.把数据层的类继承BaseMapper这个接口

    2024年02月07日
    浏览(29)
  • 学习笔记-微服务基础(黑马程序员)

    spring cloud spring cloud alibaba eureka-server 注册中心 eureka-client 客户端 每30s发送心跳 服务 服务消费者 服务提供者 依赖 启动类 添加注解 @EnableEurekaServer 配置文件 application.yml 依赖 配置文件 application.yml 添加注解 @LoadBlanced 修改url 自定义负载均衡策略 1、定义新的IRule,将轮询策略(

    2024年04月13日
    浏览(39)
  • 学习笔记-微服务高级(黑马程序员)

    测试软件 jmeter 雪崩问题 个微服务往往依赖于多个其它微服务,服务提供者I发生了故障,依赖于当前服务的其它服务随着时间的推移形成级联失败 超时处理 设定超时时间,请求超过一定时间没有响应就返回错误信息 仓壁模式 限定每个业务能使用的线程数,避免耗尽整个tom

    2024年04月25日
    浏览(39)
  • Linux命令基础,黑马程序员学习笔记

    command [-options] [parameter] command:命令本身 -options:[可选,非必填]命令的一些选项,可以通过选项控制命令的行为细节 parameter:[可选,非必填]命令的参数,多数用于命令的指向目标等 示例: ls -l /home/itheima ls是命令本身,-l是选项, /home/itheima是参数意思是以列表的形式,显示/home

    2024年02月19日
    浏览(40)
  • [学习笔记]黑马程序员-Hadoop入门视频教程

    黑马程序员大数据Hadoop入门视频教程,适合零基础自学的大数据Hadoop教程 学习目标 1.理解大数据基本概念 2.掌握数据分析基本步骤 3.理解分布式、集群概念 4.学会VMware虚拟机的导入与使用 5.掌握Linux常用操作命令使用 6.掌握vi/vim编辑器基础使用 1.1.1 企业数据分析方向 数据分

    2024年02月13日
    浏览(41)
  • Python黑马程序员(Spark实战)笔记

     pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark 注意:字符串返回的是[\\\'a\\\',\\\'b\\\',\\\'c\\\',\\\'d\\\',\\\'e\\\',\\\'f\\\',\\\'g\\\']   字典返回的是[\\\'key1\\\',\\\'key2\\\']   读取hello.txt的内容: 注意: 如果没有添加上行代码程序会报出错误! Caused by: org.apache.spark.SparkException: Python worker failed to connect back.  解释器的位置

    2024年02月05日
    浏览(45)
  • 黑马程序员--分布式搜索ElasticSearch学习笔记

    黑马视频地址:https://www.bilibili.com/video/BV1LQ4y127n4/ 想获得最佳的阅读体验,请移步至我的个人博客 SpringCloud学习笔记 消息队列MQ学习笔记 Docker学习笔记 分布式搜索ElasticSearch学习笔记 ElasticSearch的作用 ElasticSearch 是一款非常强大的开源搜素引擎,具备非常强大的功能,可以帮

    2024年02月04日
    浏览(32)
  • 黑马程序员 Java设计模式学习笔记(一)

    目录 一、设计模式概述 1.1、23种设计模式有哪些? 1.2、软件设计模式的概念 1.3、学习设计模式的必要性 1.4、设计模式分类 二、UML图 2.1、类图概述 2.2、类图的作用 2.3、类图表示法 类的表示方式 类与类之间关系的表示方式 关联关系 聚合关系 组合关系 依赖关系 继承关系

    2024年01月19日
    浏览(36)
  • 黑马程序员Docker快速入门到项目部署(学习笔记)

    目录 一、Docker简介 二、安装Docker 2.1、卸载旧版 2.2、配置Docker的yum库 2.3、安装Docker 2.4、启动和校验 2.5、配置镜像加速 2.5.1、注册阿里云账号 2.5.2、开通镜像服务 2.5.3、配置镜像加速 三、快速入门 3.1、部署MYSQL 3.2、命令解读 四、Docker基础 4.1、常见命令 4.1.1、命令介绍 4.1

    2024年01月25日
    浏览(36)
  • 《黑马程序员2023新版黑马程序员大数据入门到实战教程,大数据开发必会的Hadoop、Hive,云平台实战项目》学习笔记总目录

    本文是对《黑马程序员新版大数据入门到实战教程》所有知识点的笔记进行总结分类。 学习视频:黑马程序员新版大数据 学习时总结的学习笔记以及思维导图会在后续更新,请敬请期待。 前言:配置三台虚拟机,为集群做准备(该篇章请到原视频进行观看,不在文章内详细

    2024年02月03日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包