【pyspark从入门到放弃】DataFrame

这篇具有很好参考价值的文章主要介绍了【pyspark从入门到放弃】DataFrame。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

环境安装

pyspark支持通过pypip、conda下载,或者手动下载。
笔者通过pip install命令从pypip下载并配置安装了3.5.0版本的Spark。

创建实例

使用spark的第一步就是拿到一个SparkSession对象。最简单的方法是

SparkSession.builder.getOrCreate()

即,直接使用默认参数创建实例。也可以做一些配置,比如

SparkSession.builder \
        .appName(app_name) \
        .enableHiveSupport() \
        .getOrCreate()

DataFrame

创建DataFrame

DataFrame是类似pandas库中的DataFrame的类型,可以转换为SparkSession支持的View。
创建一个DataFrame通常使用SparkSession#createDataFrame命令。如要创建一个DataFrame满足第一列的名称是integer_value,类型是整形;第二列的名称是text_value,类型是字符串;共有三行数据,分别为(1, ‘a’), (2, ‘b’), (3, ‘abcdefghijklmnopqrstuvwxyz’),则可以使用下面命令创建

data_frame: pyspark.sql.DataFrame = spark.createDataFrame([
    Row(integer_value=1, text_value='a'),
    Row(integer_value=2, text_value='b'),
	Row(integer_value=3, text_value='abcdefghijklmnopqrstuvwxyz'),
], schema='integer_value int, text_value string')

查看DataFrame

  • 使用DataFrame#show可以查看数据内容。
  • 使用DataFrame#printSchema可以查看数据结构。
show

执行data_frame.show()即可查看DataFrame数据内容,得到结果如下:

+-------------+--------------------+
|integer_value|          text_value|
+-------------+--------------------+
|            1|                   a|
|            2|                   b|
|            3|abcdefghijklmnopq...|
+-------------+--------------------+

show方法其实有三个参数如下:

参数名 类型 描述
n int 只取头部的n个数据,不传取所有数据
truncate Union[bool, int] 不填或者True或者小于等于0的整数:
如果字段长度不超过20个字符,则全部展示;
如果字段长度超过20,则只展示前面17个字符,跟随...表示只展示部分

False:
展示字段的全部内容

大于0的整数:假设数字为num
如果字段长度不超过num,则全部展示;
如果字段长度超过num,而且num > 3,则只展示前面num-3个字符,跟随...表示只展示部分
如果字段长度超过num,而且num ≤ 3,则只展示前面num个字符
vertical bool 是否竖向展示数据。
不填或者False:表格形式展示数据
True:键值对列表形式展示数据

n=2,执行data_frame.show(n=2)得到结果如下:

+-------------+----------+
|integer_value|text_value|
+-------------+----------+
|            1|         a|
|            2|         b|
+-------------+----------+

truncate=3,执行data_frame.show(truncate=3),得到结果如下:

+-------------+----------+
|integer_value|text_value|
+-------------+----------+
|            1|         a|
|            2|         b|
|            3|       abc|
+-------------+----------+

truncate=4,执行data_frame.show(truncate=4),得到结果如下:


+-------------+----------+
|integer_value|text_value|
+-------------+----------+
|            1|         a|
|            2|         b|
|            3|      a...|
+-------------+----------+

truncate=True,执行data_frame.show(truncate=True),得到结果如下:


+-------------+--------------------+
|integer_value|          text_value|
+-------------+--------------------+
|            1|                   a|
|            2|                   b|
|            3|abcdefghijklmnopq...|
+-------------+--------------------+

vertical=True,执行data_frame.show(vertical=True),得到结果如下:

-RECORD 0-----------------------------
 integer_value | 1                    
 text_value    | a                    
-RECORD 1-----------------------------
 integer_value | 2                    
 text_value    | b                    
-RECORD 2-----------------------------
 integer_value | 3                    
 text_value    | abcdefghijklmnopq... 
columns

获取 列名组成的列表。执行print(data_frame.columns)得到打印结果如下:

['integer_value', 'text_value']
printSchema

执行data_frame.printSchema()查看DataFrame结构,得到结果如下:

root
 |-- integer_value: integer (nullable = true)
 |-- text_value: string (nullable = true)
select

select方法可以案列打印数据,如执行data_frame.select("integer_value", "text_value").show()得到结果:

+-------------+--------------------+
|integer_value|          text_value|
+-------------+--------------------+
|            1|                   a|
|            2|                   b|
|            3|abcdefghijklmnopq...|
+-------------+--------------------+

执行data_frame.select("integer_value").show()得到结果:

+-------------+
|integer_value|
+-------------+
|            1|
|            2|
|            3|
+-------------+
describe

describe方法用于计算数据的基本统计特征,包括数量(count)、平均数(mean)、标准差(stddev)、最小值(min)、最大值(max)等。
执行data_frame.describe().show()得到结果:

+-------+-------------+----------+
|summary|integer_value|text_value|
+-------+-------------+----------+
|  count|            3|         3|
|   mean|          2.0|      NULL|
| stddev|          1.0|      NULL|
|    min|            1|         a|
|    max|            3|         b|
+-------+-------------+----------+

describe方法也可以指定列计算。执行ddata_frame.describe(['integer_value', 'text_value']).show()得到相同结果

summary

summary方法和describe一样,用于计算数据的基本统计特征,不能指定要统计的列,但是拥有更多字段,包括数量(count)、平均数(mean)、标准差(stddev)、最小值(min)、25%位数(25%)中位数(100%)75%位数(75%)、最大值(max)等。
执行data_frame.summary().show()得到结果:

+-------+-------------+----------+
|summary|integer_value|text_value|
+-------+-------------+----------+
|  count|            3|         3|
|   mean|          2.0|      NULL|
| stddev|          1.0|      NULL|
|    min|            1|         a|
|    25%|            1|      NULL|
|    50%|            2|      NULL|
|    75%|            3|      NULL|
|    max|            3|         b|
+-------+-------------+----------+

配合select方法指定目标列,即可解决summary方法不能指定列的问题。

collect

collect方法用于将DataFrame中的数据写入内存中,数据量太大可能导致out-of-memory异常。执行print(data_frame.collect())得到打印结果:

[Row(integer_value=1, text_value='a'), Row(integer_value=2, text_value='b'), Row(integer_value=3, text_value='abcdefghijklmnopqrstuvwxyz')]
take和tail

数据量太大的情况下使用collect方法可能导致内存溢出,take方法和tail方法通过限制查询的数据条数,来规避此问题。take方法支持从前往后查询指定个数的数据;tail方法支持从后往前查询指定个数的数据。

filter

filter方法支持对DataFrame中的数据进行过滤,便于准确找到目标数据。执行data_frame.filter('integer_value > 1').show(),得到结果:

+-------------+--------------------+
|integer_value|          text_value|
+-------------+--------------------+
|            2|                   b|
|            3|abcdefghijklmnopq...|
+-------------+--------------------+
列名

直接.列名就可以获得列对象,执行print(data_frame.integer_value),得到结果Column<'integer_value'>;执行data_frame.select(data_frame.integer_value).show(),得到结果

+-------------+
|integer_value|
+-------------+
|            1|
|            2|
|            3|
+-------------+

DataFrame的修改

toPandas

toPandas方法用于把pysparkDataFrame类型数据转换为pandasDataFrame类型数据。和collect方法一样,如果数据量太大,可能导致out-of-memory异常。

withColumn

withColumn会生成一个新的DataFrame实例,内部数据和有原有DataFrame数据新增或更新一列,后相同,原有DataFrame的值不变。
执行data_frame.withColumn('upper_text', pyspark.sql.functions.upper(data_frame.text_value)).show(),得到结果:

+-------------+--------------------+--------------------+
|integer_value|          text_value|          first_char|
+-------------+--------------------+--------------------+
|            1|                   a|                   A|
|            2|                   b|                   B|
|            3|abcdefghijklmnopq...|ABCDEFGHIJKLMNOPQ...|
+-------------+--------------------+--------------------+

再执行data_frame.show(),还是和调用withColumn方法前一样。

createOrReplaceTempView

执行下面代码

data_frame.createOrReplaceTempView('temp_table')
spark.sql('select * from temp_table').show()

代码将DataFrame转换为临时视图,并查看此临时视图的内容得到结果如下:文章来源地址https://www.toymoban.com/news/detail-795646.html

+-------------+--------------------+
|integer_value|          text_value|
+-------------+--------------------+
|            1|                   a|
|            2|                   b|
|            3|abcdefghijklmnopq...|
+-------------+--------------------+

到了这里,关于【pyspark从入门到放弃】DataFrame的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • pyspark学习_dataframe常用操作_01

    1. 创建DataFrame    本文使用DataFrame通过读取json文件获取数据,代码如下:       2.  DataFrame常见操作       2.1 printSchema 2.2 show  2.3 select 2.4 groupBy  2.5 filter 2.6 sort 2.7 replace 2.8 alias 2.9 withColumn 2.10 foreach

    2024年01月25日
    浏览(49)
  • [spark] DataFrame 的 checkpoint

    在 Apache Spark 中,DataFrame 的 checkpoint 方法用于强制执行一个物理计划并将结果缓存到分布式文件系统,以防止在计算过程中临时数据丢失。这对于长时间运行的计算过程或复杂的转换操作是有用的。 具体来说, checkpoint 方法执行以下操作: 将 DataFrame 的物理计划执行,并将结

    2024年02月03日
    浏览(42)
  • 【spark】dataframe慎用limit

    官方:limit通常和order by一起使用,保证结果是确定的 limit 会有两个步骤: LocalLimit ,发生在每个partition GlobalLimit,发生shuffle,聚合到一个parttion 当提取的n大时,第二步是比较耗时的 如果对取样顺序没有要求,可用tablesample替代,使用详解。 官方 Stop using the LIMIT clause wrong

    2024年02月10日
    浏览(45)
  • Spark与PySpark(1.概述、框架、模块)

    目录 1.Spark 概念 2. Hadoop和Spark的对比 3. Spark特点 3.1 运行速度快 3.2 简单易用 3.3 通用性强 3.4 可以允许运行在很多地方 4. Spark框架模块 4.1 Spark Core 4.2 SparkSQL 4.3 SparkStreaming 4.4 MLlib 4.5 GraphX 5. Spark的运行模式 5.1 本地模式(单机) Local运行模式 5.2 Standalone模式(集群) 5.3 Hado

    2024年02月02日
    浏览(43)
  • Spark(15):SparkSQL之DataFrame

    目录 0. 相关文章链接 1. DataFrame的作用 2. 创建DataFrame 3. SQL 语法 4. DSL 语法 5. RDD 转换为 DataFrame 6. DataFrame 转换为 RDD  Spark文章汇总          Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有

    2024年02月13日
    浏览(42)
  • spark、pyspark 常用的模版 demo 网址

    1、我自己有时候用百度或者其他的搜索出来的spark 常用案例,质量有的好有的差有时候就很烦。特地分享一个我常用的质量高的网站地址

    2024年02月11日
    浏览(240)
  • PySpark-Spark SQL基本介绍

    目录 Spark SQL基本介绍 Spark SQL特点 Spark SQL与Hive的异同 Spark SQL的数据结构 Spark SQL的入门 创建SparkSession对象 DataFrame详解 DataFrame基本介绍  DataFrame的构建方式 RDD构建DataFrame  内部初始化数据得到DataFrame schema总结 读取外部文件得到DataFrame Text方式读取 CSV方式读取 JSON方式读取 概

    2024年01月16日
    浏览(62)
  • Spark RDD、DataFrame、DataSet比较

    在Spark的学习当中,RDD、DataFrame、DataSet可以说都是需要着重理解的专业名词概念。尤其是在涉及到数据结构的部分,理解清楚这三者的共性与区别,非常有必要。 RDD,作为Spark的核心数据抽象,是Spark当中不可或缺的存在,而在SparkSQL中,Spark为我们提供了两个新的抽象,分别

    2024年02月04日
    浏览(36)
  • 10-用PySpark建立第一个Spark RDD

    PySpark实战笔记系列第一篇 Apache Spark的核心组件的基础是RDD。所谓的RDD,即 弹性分布式数据集(Resiliennt Distributed Datasets) ,基于RDD可以实现Apache Spark各个组件在多个计算机组成的集群中进行无缝集成,从而能够在一个应用程序中完成海量数据处理。 只读不能修改 :只能通过

    2024年04月08日
    浏览(47)
  • 基于Headless构建高可用spark+pyspark集群

    Headless 服务类型并不分配容器云虚拟 IP,而是直接暴露所属 Pod 的 DNS 记录。没有默认负载均衡器,可直接访问 Pod IP 地址。因此,当我们需要与集群内真实的 Pod IP 地址进行直接交互时,Headless 服务就很有用。 其中Service的关键配置如下: clusterIP: None ,不让其获取clusterIP ,

    2024年02月06日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包