1.排名函数
dense_rank():相同数具有相同的排名,始终具有连续的排名值
import pyspark.sql.functions as F
from pyspark.sql.window import Window
data = [(1, 'John'),
(1, 'Mike'),
(1, 'Emma'),
(4, 'Sarah')]
df = spark.createDataFrame(data, ['id', 'name'])
window = Window.orderBy(col('id'))
df = df.withColumn("frame_id", F.dense_rank().over(window))
df.show()
补充一个其他的常用的:
rank():相同数具有相同的排名,下一个跳过去
row_number():相同数具有不同的排名,下一个接着
2.pandas中的map函数,pyspark不支持map ,when...otherwise
pyspark df一列分数,如果大于零值变为‘good’, 小于变为‘no
pandas:
df['score'] = df['score'].map(lambda x : 'good' if x> 0 else 'no')
pyspark
from pyspark.sql.functions import when
df = df.withColumn('score', when(df.score > 0, 'good').otherwise('no'))
3.groupby
分组求简单的最大最小平均值等:
根据"obj_frame_id"分组,求'rel_pos_x'平均值生成新列为"rel_pos_x"
data_add = data.groupBy("obj_frame_id").agg(F.avg('rel_pos_x').alias("rel_pos_x")))
另一种写法:
window_spec = Window.partitionBy('follow_id')
df = df.withColumn('follow_start_time', F.min('ts').over(window_spec))
df = df.withColumn('follow_end_time', F.max('ts').over(window_spec))
df = df.withColumn('follow_count_time', F.count('ts').over(window_spec))
分组求复杂的写pandas_udf的
df = df.groupby(['obj_id']).applyInPandas(group_udf, schema=df.schema)
不能有空值
注意:这种方法返回值里不能有空值,会报错而且也不会具体告诉你是什么错
4.用字典去替换df一列的值
比如:我有一个字典{‘a’: 111, b:222}, df有一列值为‘a',‘b’,我想把它替换成111,222
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType
import pyspark.sql.functions as F
from pyspark.sql.functions import col,udf
replace_udf = udf(lambda x: algo_dict.get(x, x), StringType())
data_scene = data_scene.withColumn("code_name", replace_udf(data_scene["algorithm_id"]))
pandas:
data_scene['code_name']=data_scene['algorithm_id'].map(algo_dict)
5.排序
pandas:
df = df.sort_values('header timestamp')
pyspark:
df = df.sort('header timestamp')
6.增加一列常数或用另一列赋值
注:pyspark不支持把一列list赋给一列df
pandas:
df['oritentation pitch'] = 0
pyspark:
from pyspark.sql.functions import lit
df = df.withColumn('oritentation pitch', lit(0))
pandas:
df['bag_timestamp'] = df['header_timestamp']
pyspark:
新建一列bag_timestamp,其值是已有列header_timestamp
df = df.withColumn('bag_timestamp', df['header_timestamp'])
7.去重drop_duplicates()
虽然两个写法一样,但是pyspark没有keep这个参数
pandas里:
- keep: 'first', 'last', False,默认为first
决定保留的数据行。
first:保留第一个出现的重复数据
last:保留最后一个出现的重复数据
False:删除所有的重复行
pandas:
df = df.drop_duplicates(subset='列名',keep='last')
pyspark:
df = df.drop_duplicates(subset='列名')
8.拼接两列 F.concat
举例:我的'object_id'是1,'class_label_pred'是car,新增的'obj_id_class'值为'1_car'
pyspark:
obj_table = obj_table.withColumn('obj_id_class', F.concat(F.col('object_id'), lit('_'), F.col('class_label_pred').cast('string')))
或者:
data=data.withColumn('obj_id_class',concat_ws('_',"obj_id","obj_class"))
pandas:
obj_table['obj_id_class'] = obj_table["object_id"].map(str) + '_' +
obj_table["class_label_pred"].map(str)
9.常见的过滤
找df 'position_x'列绝对值大于10的
pyspark:
obj_table = obj_table[(F.abs(df.position_x)<= 10)]
pandas:
obj_table = obj_table[(df.position_x.abs()<= 10)]
10.取行limit
取df最后一行作为一个新的df
df2 = df.orderBy(F.col('ts').desc()).limit(1)
11.pandas中的diff() 转 pyspark
from pyspark.sql.functions import lit, lag, lead, col,when
data = [(1, 10), (2, 20), (3, 15), (4, 25), (5, 30)]
df = spark.createDataFrame(data, ['id', 'value'])
windowSpec = Window.orderBy('id')
df = df.withColumn('diff', col('value') - lag('value').over(windowSpec))
lag相当于pandas中的shift()
lead相当于pandas中的shift(-1)
window_spec = Window.orderBy('ts')
data = data.withColumn('last_front_count', lag(col('front_count')).over(window_spec))
12.填空值fillna()
pandas:
df["yaw_flag"] = df["yaw_flag"].fillna(0)
pyspark:
df = df.fillna(0, subset=["yaw_flag"])
再简单介绍几个其他的
1.使用matplotlib画图时,pyspark需要用df[['col']].collect()
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('agg')
y1.plot(data_result_i[['time']].collect(), data_result_i[['ego_velocity_x']].collect(), color='dodgerblue', label='ego_spd',
linewidth=3)
2.遇到的错误
初步解决办法是关闭spark重启或将自定义udf放到函数里面去,我也不太理解这个问题
附之前的pyspark:
pandas、pyspark、spark相互转换,语法对比(超详细)
python spark 求dataframe一列的max,min,median文章来源:https://www.toymoban.com/news/detail-769325.html
python spark 纵向合并多个Datafame文章来源地址https://www.toymoban.com/news/detail-769325.html
到了这里,关于pyspark常用语法(含pandas对比)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!