pyspark常用语法(含pandas对比)

这篇具有很好参考价值的文章主要介绍了pyspark常用语法(含pandas对比)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.排名函数

dense_rank():相同数具有相同的排名,始终具有连续的排名值

import pyspark.sql.functions as F
from pyspark.sql.window import Window

data = [(1, 'John'),
        (1, 'Mike'),
        (1, 'Emma'),
        (4, 'Sarah')]
df = spark.createDataFrame(data, ['id', 'name'])
window = Window.orderBy(col('id'))
df = df.withColumn("frame_id", F.dense_rank().over(window))
df.show()

pyspark常用语法(含pandas对比),开发语言,python,pandas,大数据,spark

补充一个其他的常用的:

rank():相同数具有相同的排名,下一个跳过去

pyspark常用语法(含pandas对比),开发语言,python,pandas,大数据,spark
row_number():相同数具有不同的排名,下一个接着

pyspark常用语法(含pandas对比),开发语言,python,pandas,大数据,spark

2.pandas中的map函数,pyspark不支持map ,when...otherwise

pyspark df一列分数,如果大于零值变为‘good’, 小于变为‘no

pandas:
df['score'] = df['score'].map(lambda x : 'good' if x> 0 else 'no')

pyspark
from pyspark.sql.functions import when
df = df.withColumn('score', when(df.score > 0, 'good').otherwise('no'))

3.groupby

分组求简单的最大最小平均值等:

根据"obj_frame_id"分组,求'rel_pos_x'平均值生成新列为"rel_pos_x"

data_add = data.groupBy("obj_frame_id").agg(F.avg('rel_pos_x').alias("rel_pos_x")))

另一种写法: 

window_spec = Window.partitionBy('follow_id')
df = df.withColumn('follow_start_time', F.min('ts').over(window_spec))
df = df.withColumn('follow_end_time', F.max('ts').over(window_spec))
df = df.withColumn('follow_count_time', F.count('ts').over(window_spec))

 分组求复杂的写pandas_udf的

df = df.groupby(['obj_id']).applyInPandas(group_udf, schema=df.schema)
不能有空值

注意:这种方法返回值里不能有空值,会报错而且也不会具体告诉你是什么错

4.用字典去替换df一列的值

比如:我有一个字典{‘a’: 111, b:222}, df有一列值为‘a',‘b’,我想把它替换成111,222

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType
import pyspark.sql.functions as F
from pyspark.sql.functions import col,udf

replace_udf = udf(lambda x: algo_dict.get(x, x), StringType())
data_scene = data_scene.withColumn("code_name", replace_udf(data_scene["algorithm_id"]))


pandas:
data_scene['code_name']=data_scene['algorithm_id'].map(algo_dict)

 5.排序

pandas:

df = df.sort_values('header timestamp')

pyspark:

df = df.sort('header timestamp')

 6.增加一列常数或用另一列赋值

注:pyspark不支持把一列list赋给一列df

pandas:
df['oritentation pitch'] = 0

pyspark:
from pyspark.sql.functions import lit
df = df.withColumn('oritentation pitch', lit(0))


pandas:
df['bag_timestamp'] = df['header_timestamp']

pyspark:
新建一列bag_timestamp,其值是已有列header_timestamp
df = df.withColumn('bag_timestamp', df['header_timestamp'])

7.去重drop_duplicates()

虽然两个写法一样,但是pyspark没有keep这个参数

pandas里:

  • keep: 'first', 'last', False,默认为first

        决定保留的数据行。

        first:保留第一个出现的重复数据

        last:保留最后一个出现的重复数据

        False:删除所有的重复行

pandas:
df = df.drop_duplicates(subset='列名',keep='last')

pyspark:
df = df.drop_duplicates(subset='列名')

8.拼接两列 F.concat

举例:我的'object_id'是1,'class_label_pred'是car,新增的'obj_id_class'值为'1_car'

pyspark:

obj_table = obj_table.withColumn('obj_id_class', F.concat(F.col('object_id'), lit('_'), F.col('class_label_pred').cast('string')))

或者:
data=data.withColumn('obj_id_class',concat_ws('_',"obj_id","obj_class"))

pandas:
obj_table['obj_id_class'] = obj_table["object_id"].map(str) + '_' + 
                       obj_table["class_label_pred"].map(str)

9.常见的过滤

找df 'position_x'列绝对值大于10的

pyspark:
obj_table = obj_table[(F.abs(df.position_x)<= 10)]

pandas:
obj_table = obj_table[(df.position_x.abs()<= 10)]

 10.取行limit

取df最后一行作为一个新的df

df2 = df.orderBy(F.col('ts').desc()).limit(1)

11.pandas中的diff() 转 pyspark

from pyspark.sql.functions import lit, lag, lead, col,when

data = [(1, 10), (2, 20), (3, 15), (4, 25), (5, 30)]
df = spark.createDataFrame(data, ['id', 'value'])
windowSpec = Window.orderBy('id')
df = df.withColumn('diff', col('value') - lag('value').over(windowSpec))

lag相当于pandas中的shift()

lead相当于pandas中的shift(-1)

window_spec = Window.orderBy('ts')
data = data.withColumn('last_front_count', lag(col('front_count')).over(window_spec))

12.填空值fillna()

pandas:
df["yaw_flag"] = df["yaw_flag"].fillna(0)

pyspark:
df = df.fillna(0, subset=["yaw_flag"])

再简单介绍几个其他的

 1.使用matplotlib画图时,pyspark需要用df[['col']].collect()

import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('agg')

y1.plot(data_result_i[['time']].collect(), data_result_i[['ego_velocity_x']].collect(), color='dodgerblue', label='ego_spd',
            linewidth=3)

2.遇到的错误 

初步解决办法是关闭spark重启或将自定义udf放到函数里面去,我也不太理解这个问题

pyspark常用语法(含pandas对比),开发语言,python,pandas,大数据,spark

 附之前的pyspark:

pandas、pyspark、spark相互转换,语法对比(超详细)

python spark 求dataframe一列的max,min,median

python spark 纵向合并多个Datafame文章来源地址https://www.toymoban.com/news/detail-769325.html

到了这里,关于pyspark常用语法(含pandas对比)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 )

    在 PySpark 中 RDD 对象 提供了一种 数据计算方法 RDD#map 方法 ; 该 RDD#map 函数 可以对 RDD 数据中的每个元素应用一个函数 , 该 被应用的函数 , 可以将每个元素转换为另一种类型 , 也可以针对 RDD 数据的 原始元素进行 指定操作 ; 计算完毕后 , 会返回一个新的 RDD 对象 ; map 方法 , 又

    2024年02月14日
    浏览(55)
  • 【Python】PySpark 数据计算 ③ ( RDD#reduceByKey 函数概念 | RDD#reduceByKey 方法工作流程 | RDD#reduceByKey 语法 | 代码示例 )

    RDD#reduceByKey 方法 是 PySpark 中 提供的计算方法 , 首先 , 对 键值对 KV 类型 RDD 对象 数据 中 相同 键 key 对应的 值 value 进行分组 , 然后 , 按照 开发者 提供的 算子 ( 逻辑 / 函数 ) 进行 聚合操作 ; 上面提到的 键值对 KV 型 的数据 , 指的是 二元元组 , 也就是 RDD 对象中存储的数据是

    2024年02月14日
    浏览(54)
  • Python之Pandas的常用技能【写入数据】

    1、背景: 最近在工作中遇到越来越多的的使用pandas或者python来处里写入操作,尤其是对excel文件或者csv文件的操作更是常见,这里将写入操作总结如下,方便记忆,也分享给大家,希望对阅读者能够有所帮助 2、pandas写入数据的各种场景使用详解 2.1、df.to_excel()参数详解 2.2

    2024年01月17日
    浏览(48)
  • PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解

    目录 前言 一、PySpark基础功能  1.Spark SQL 和DataFrame 2.Pandas API on Spark 3.Streaming 4.MLBase/MLlib 5.Spark Core 二、PySpark依赖 Dependencies 三、DataFrame 1.创建 创建不输入schema格式的DataFrame 创建带有schema的DataFrame 从Pandas DataFrame创建 通过由元组列表组成的RDD创建 2.查看 DataFrame.show() spark.sql.

    2024年01月18日
    浏览(56)
  • python-数据分析-numpy、pandas、matplotlib的常用方法

    输出方式不同 里面包含的元素类型 使用 索引/切片 访问ndarray元素 切片 左闭右开 np.array(list) np.arange() np.random.randn() - - - 服从标准正态分布- - - 数学期望 μ - - - 标准方差 s 使用matplotlib.pyplot模块验证标准正态分布 np.random.randint(起始数,终止数(行,列)) 数据分析 - - - 数据清洗

    2024年02月10日
    浏览(99)
  • 【100天精通Python】Day55:Python 数据分析_Pandas数据选取和常用操作

    目录 Pandas数据选择和操作 1 选择列和行 2 过滤数据 3 添加、删除和修改数据

    2024年02月09日
    浏览(63)
  • 【go语言开发】本地缓存的使用,从简单到复杂写一个本地缓存,并对比常用的开源库

    本文主要介绍go语言中本地缓存的使用,首先由简单到复杂手写3个本地缓存示例,使用内置的sync,map等数据结构封装cache,然后介绍常见的一些开源库,以及对比常用的开源库 本地缓存 是指将一部分数据存储在应用程序本地内存中,以提高数据访问速度和应用程序性能的技

    2024年02月04日
    浏览(36)
  • Rust vs Go:常用语法对比(十二)

    题图来自 Rust vs Go in 2023 [1] 221. Remove all non-digits characters Create string t from string s, keeping only digit characters 0, 1, 2, 3, 4, 5, 6, 7, 8, 9. 删除所有非数字字符 168 [src/main.rs:7] t = \\\"14\\\" 222. Find first index of an element in list Set i to the first index in list items at which the element x can be found, or -1 if items doe

    2024年02月15日
    浏览(51)
  • Rust vs Go:常用语法对比(九)

    题图来自 Golang vs Rust - The Race to Better and Ultimate Programming Language 161. Multiply all the elements of a list Multiply all the elements of the list elements by a constant c 将list中的每个元素都乘以一个数 [4.0, 7.0, 8.0] 162. Execute procedures depending on options execute bat if b is a program option and fox if f is a program optio

    2024年02月15日
    浏览(37)
  • Rust vs Go:常用语法对比(二)

    21. Swap values 交换变量a和b的值 输出 a: 10, b: 3 or 输出 22. Convert string to integer 将字符串转换为整型 or 输出 or 输出 or 输出 23. Convert real number to string with 2 decimal places Given a real number x, create its string representation s with 2 decimal digits following the dot. 给定一个实数,小数点后保留两位小数

    2024年02月16日
    浏览(45)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包