并行计算框架Polars、Dask的数据处理性能对比

这篇具有很好参考价值的文章主要介绍了并行计算框架Polars、Dask的数据处理性能对比。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在Pandas 2.0发布以后,我们发布过一些评测的文章,这次我们看看,除了Pandas以外,常用的两个都是为了大数据处理的并行数据框架的对比测试。

本文我们使用两个类似的脚本来执行提取、转换和加载(ETL)过程。

测试内容

这两个脚本主要功能包括:

从两个parquet 文件中提取数据,对于小型数据集,变量path1将为“yellow_tripdata/ yellow_tripdata_2014-01”,对于中等大小的数据集,变量path1将是“yellow_tripdata/yellow_tripdata”。对于大数据集,变量path1将是“yellow_tripdata/yellow_tripdata*.parquet”;

进行数据转换:a)连接两个DF,b)根据PULocationID计算行程距离的平均值,c)只选择某些条件的行,d)将步骤b的值四舍五入为2位小数,e)将列“trip_distance”重命名为“mean_trip_distance”,f)对列“mean_trip_distance”进行排序

将最终的结果保存到新的文件

脚本

1、Polars

数据加载读取

 def extraction():
     """
     Extract two datasets from parquet files
     """
     path1="yellow_tripdata/yellow_tripdata_2014-01.parquet"
     df_trips= pl_read_parquet(path1,)
     path2 = "taxi+_zone_lookup.parquet"
     df_zone = pl_read_parquet(path2,)
 
     return df_trips, df_zone
 
 def pl_read_parquet(path, ):
     """
     Converting parquet file into Polars dataframe
     """
     df= pl.scan_parquet(path,)
     return df

转换函数

 def transformation(df_trips, df_zone):
     """
     Proceed to several transformations
     """
     df_trips= mean_test_speed_pl(df_trips, )
     
     df = df_trips.join(df_zone,how="inner", left_on="PULocationID", right_on="LocationID",)
     df = df.select(["Borough","Zone","trip_distance",])
   
     df = get_Queens_test_speed_pd(df)
     df = round_column(df, "trip_distance",2)
     df = rename_column(df, "trip_distance","mean_trip_distance")
 
     df = sort_by_columns_desc(df, "mean_trip_distance")
     return df
 
 
 def mean_test_speed_pl(df_pl,):
     """
     Getting Mean per PULocationID
     """
     df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())
     return df_pl
 
 def get_Queens_test_speed_pd(df_pl):
     """
     Only getting Borough in Queens
     """
 
     df_pl = df_pl.filter(pl.col("Borough")=='Queens')
 
     return df_pl
 
 def round_column(df, column,to_round):
     """
     Round numbers on columns
     """
     df = df.with_columns(pl.col(column).round(to_round))
     return df
 
 def rename_column(df, column_old, column_new):
     """
     Renaming columns
     """
     df = df.rename({column_old: column_new})
     return df
 
 def sort_by_columns_desc(df, column):
     """
     Sort by column
     """
     df = df.sort(column, descending=True)
     return df

保存

 def loading_into_parquet(df_pl):
     """
     Save dataframe in parquet
     """
     df_pl.collect(streaming=True).write_parquet(f'yellow_tripdata_pl.parquet')

其他代码

 import polars as pl
 import time
 
 def pl_read_parquet(path, ):
     """
     Converting parquet file into Polars dataframe
     """
     df= pl.scan_parquet(path,)
     return df
 
 def mean_test_speed_pl(df_pl,):
     """
     Getting Mean per PULocationID
     """
     df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())
     return df_pl
 
 def get_Queens_test_speed_pd(df_pl):
     """
     Only getting Borough in Queens
     """
 
     df_pl = df_pl.filter(pl.col("Borough")=='Queens')
 
     return df_pl
 
 def round_column(df, column,to_round):
     """
     Round numbers on columns
     """
     df = df.with_columns(pl.col(column).round(to_round))
     return df
 
 def rename_column(df, column_old, column_new):
     """
     Renaming columns
     """
     df = df.rename({column_old: column_new})
     return df
 
 
 def sort_by_columns_desc(df, column):
     """
     Sort by column
     """
     df = df.sort(column, descending=True)
     return df
 
 
 def main():
     
     print(f'Starting ETL for Polars')
     start_time = time.perf_counter()
 
     print('Extracting...')
     df_trips, df_zone =extraction()
        
     end_extract=time.perf_counter() 
     time_extract =end_extract- start_time
 
     print(f'Extraction Parquet end in {round(time_extract,5)} seconds')
     print('Transforming...')
     df = transformation(df_trips, df_zone)
     end_transform = time.perf_counter() 
     time_transformation =time.perf_counter() - end_extract
     print(f'Transformation end in {round(time_transformation,5)} seconds')
     print('Loading...')
     loading_into_parquet(df,)
     load_transformation =time.perf_counter() - end_transform
     print(f'Loading end in {round(load_transformation,5)} seconds')
     print(f"End ETL for Polars in {str(time.perf_counter()-start_time)}")
 
 
 if __name__ == "__main__":
     
     main()

2、Dask

函数功能与上面一样,所以我们把代码整合在一起:

 import dask.dataframe as dd
 from dask.distributed import Client
 import time
 
 def extraction():
     path1 = "yellow_tripdata/yellow_tripdata_2014-01.parquet"
     df_trips = dd.read_parquet(path1)
     path2 = "taxi+_zone_lookup.parquet"
     df_zone = dd.read_parquet(path2)
 
     return df_trips, df_zone
 
 def transformation(df_trips, df_zone):
     df_trips = mean_test_speed_dask(df_trips)
     df = df_trips.merge(df_zone, how="inner", left_on="PULocationID", right_on="LocationID")
     df = df[["Borough", "Zone", "trip_distance"]]
 
     df = get_Queens_test_speed_dask(df)
     df = round_column(df, "trip_distance", 2)
     df = rename_column(df, "trip_distance", "mean_trip_distance")
 
     df = sort_by_columns_desc(df, "mean_trip_distance")
     return df
 
 def loading_into_parquet(df_dask):
     df_dask.to_parquet("yellow_tripdata_dask.parquet", engine="fastparquet")
 
 def mean_test_speed_dask(df_dask):
     df_dask = df_dask.groupby("PULocationID").agg({"trip_distance": "mean"})
     return df_dask
 
 def get_Queens_test_speed_dask(df_dask):
     df_dask = df_dask[df_dask["Borough"] == "Queens"]
     return df_dask
 
 def round_column(df, column, to_round):
     df[column] = df[column].round(to_round)
     return df
 
 def rename_column(df, column_old, column_new):
     df = df.rename(columns={column_old: column_new})
     return df
 
 def sort_by_columns_desc(df, column):
     df = df.sort_values(column, ascending=False)
     return df
 
 
 
 def main():
     print("Starting ETL for Dask")
     start_time = time.perf_counter()
 
     client = Client()  # Start Dask Client
 
     df_trips, df_zone = extraction()
 
     end_extract = time.perf_counter()
     time_extract = end_extract - start_time
 
     print(f"Extraction Parquet end in {round(time_extract, 5)} seconds")
     print("Transforming...")
     df = transformation(df_trips, df_zone)
     end_transform = time.perf_counter()
     time_transformation = time.perf_counter() - end_extract
     print(f"Transformation end in {round(time_transformation, 5)} seconds")
     print("Loading...")
     loading_into_parquet(df)
     load_transformation = time.perf_counter() - end_transform
     print(f"Loading end in {round(load_transformation, 5)} seconds")
     print(f"End ETL for Dask in {str(time.perf_counter() - start_time)}")
 
     client.close()  # Close Dask Client
 
 if __name__ == "__main__":
     main()

测试结果对比

1、小数据集

我们使用164 Mb的数据集,这样大小的数据集对我们来说比较小,在日常中也时非常常见的。

下面是每个库运行五次的结果:

Polars

并行计算框架Polars、Dask的数据处理性能对比,python,开发语言,机器学习,Dask

Dask

并行计算框架Polars、Dask的数据处理性能对比,python,开发语言,机器学习,Dask

2、中等数据集

我们使用1.1 Gb的数据集,这种类型的数据集是GB级别,虽然可以完整的加载到内存中,但是数据体量要比小数据集大很多。

Polars

并行计算框架Polars、Dask的数据处理性能对比,python,开发语言,机器学习,Dask

Dask

并行计算框架Polars、Dask的数据处理性能对比,python,开发语言,机器学习,Dask

3、大数据集

我们使用一个8gb的数据集,这样大的数据集可能一次性加载不到内存中,需要框架的处理。

Polars

并行计算框架Polars、Dask的数据处理性能对比,python,开发语言,机器学习,Dask

Dask

并行计算框架Polars、Dask的数据处理性能对比,python,开发语言,机器学习,Dask

总结

从结果中可以看出,Polars和Dask都可以使用惰性求值。所以读取和转换非常快,执行它们的时间几乎不随数据集大小而变化;

可以看到这两个库都非常擅长处理中等规模的数据集。

由于polar和Dask都是使用惰性运行的,所以下面展示了完整ETL的结果(平均运行5次)。

并行计算框架Polars、Dask的数据处理性能对比,python,开发语言,机器学习,Dask

Polars在小型数据集和中型数据集的测试中都取得了胜利。但是,Dask在大型数据集上的平均时间性能为26秒。

这可能和Dask的并行计算优化有关,因为官方的文档说“Dask任务的运行速度比Spark ETL查询快三倍,并且使用更少的CPU资源”。

并行计算框架Polars、Dask的数据处理性能对比,python,开发语言,机器学习,Dask

上面是测试使用的电脑配置,Dask在计算时占用的CPU更多,可以说并行性能更好。

https://avoid.overfit.cn/post/74128cd8803b43f2a51ca4ff4fed4a95

作者:Luís Oliveira文章来源地址https://www.toymoban.com/news/detail-560253.html

到了这里,关于并行计算框架Polars、Dask的数据处理性能对比的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 分布式计算框架:Spark、Dask、Ray 分布式计算哪家强:Spark、Dask、Ray

    目录 什么是分布式计算 分布式计算哪家强:Spark、Dask、Ray 2 选择正确的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式计算是一种计算方法,和集中式计算是相对的。 随着计算技术的发展, 有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成

    2024年02月11日
    浏览(70)
  • 【PXIE301-211】基于PXIE总线的16路并行LVDS数据采集、4路低速、2路隔离RS422数据处理平台

    板卡概述 PXIE301-211A是一款基于PXIE总线架构的16路高速LVDS、4路低速LVDS采集、2路隔离RS422数据处理平台,该平台板卡采用Xilinx的高性能Kintex 7系列FPGA XC7K325T作为实时处理器,实现各个接口之间的互联。板载1组64位的DDR3 SDRAM用作数据缓存。板卡具有1个FMC(HPC)接口,通过扣上

    2024年02月03日
    浏览(34)
  • 分布式计算框架:Spark、Dask、Ray

    目录 什么是分布式计算 分布式计算哪家强:Spark、Dask、Ray 2 选择正确的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式计算是一种计算方法,和集中式计算是相对的。 随着计算技术的发展, 有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成

    2024年02月06日
    浏览(46)
  • THRUST:一个开源的、面向异构系统的并行编程语言:编程模型主要包括:数据并行性、任务并行性、内存管理、内存访问控制、原子操作、同步机制、错误处理机制、混合编程模型、运行时系统等

    作者:禅与计算机程序设计艺术 https://github.com/NVIDIA/thrust 2021年8月,当代科技巨头Facebook宣布其开发了名为THRUST的高性能计算语言,可用于在设备、集群和云环境中进行并行计算。它具有“易于学习”、“简单易用”等特征,正在逐步取代C++、CUDA、OpenCL等传统编程模型,成为

    2024年02月07日
    浏览(50)
  • 云计算与大数据处理技术_云计算与大数据处理

    AIoT技术分析:云计算一般的计算机技术很难支撑企业的运作,于是云计算顺应时代而生,广泛地应用到了企业中。 云计算的概念 云计算是一种新兴的商业计算模型。... 并支持大规模数据处理、高容错性和自我管理等特性,提供PB级的存储能力,使用结构化的文件来存储数据,并整个

    2024年02月01日
    浏览(66)
  • 大数据开源框架之基于Spark的气象数据处理与分析

    Spark配置请看: (30条消息) 大数据开源框架环境搭建(七)——Spark完全分布式集群的安装部署_木子一个Lee的博客-CSDN博客 目录 实验说明: 实验要求: 实验步骤: 数据获取: 数据分析: 可视化: 参考代码(适用于python3): 运行结果:         本次实验所采用的数据,从中

    2024年02月03日
    浏览(47)
  • 大数据分布式实时大数据处理框架Storm,入门到精通!

    介绍:Storm是一个分布式实时大数据处理框架,被业界称为实时版的Hadoop。 首先,Storm由Twitter开源,它解决了Hadoop MapReduce在处理实时数据方面的高延迟问题。Storm的设计目标是保证数据的实时处理,它可以在数据流入系统的同时进行处理,这与传统的先存储后处理的关系型数

    2024年01月23日
    浏览(53)
  • 数据流处理框架Flink与Kafka

    在大数据时代,数据流处理技术已经成为了一种重要的技术手段,用于处理和分析大量实时数据。Apache Flink和Apache Kafka是两个非常重要的开源项目,它们在数据流处理领域具有广泛的应用。本文将深入探讨Flink和Kafka的关系以及它们在数据流处理中的应用,并提供一些最佳实践

    2024年04月23日
    浏览(41)
  • 云计算与大数据处理:实时计算与数据流

    云计算和大数据处理是当今信息技术领域的两个热门话题。随着互联网的普及和人们生活中的各种设备的不断增多,我们生活中的数据量不断增加,这些数据需要存储和处理。云计算是一种基于互联网的计算资源共享和分配模式,可以让用户在需要时轻松获取计算资源,从而

    2024年04月13日
    浏览(46)
  • 数据架构与云计算:如何利用云计算资源进行数据处理

    随着数据的爆炸增长,数据处理和分析成为了企业和组织中的关键技能。云计算是一种新兴的技术,它可以让我们在分布式环境中进行数据处理和分析。在这篇文章中,我们将探讨如何利用云计算资源进行数据处理,以及相关的核心概念、算法原理、具体操作步骤和数学模型

    2024年04月14日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包