Python并行处理数据多进程/多线程,榨干你的CPU

这篇具有很好参考价值的文章主要介绍了Python并行处理数据多进程/多线程,榨干你的CPU。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

  最近在公司实习,给整了个活,像是数学建模一样的数据分析的活,目标是在几个互相有关联的大表中找出满足某条件的那些业务,其中第一步就是把两个表拼起来,就叫它们A和B吧,省略拼表过程中需要的逻辑判断。

串行

  两个长为M和N的表,在判断中需要一个M*N级别的判断,也就是N2的复杂度,当M和N都很大时,比如3w,那还是得花点时间的,比如七八分钟。所以得想办法加速。

使用concurrent.futures的线程池

  既然是在一个py进程里干的活,那自然就想到能不能多开几个线程,比如12个,反正每个判断是独立的,把一个表尽量平均拆成12份,让每个线程去做那一份的判断,最后再把结果返回给主进程进行拼接。这里我先使用了Python的concurrent.futures的线程池来看看效果。
  concurrent.futures 是 Python 的一个模块,它提供了一个高级接口,用于异步执行可调用对象,也可以理解为并发。异步执行可以使用线程(使用 ThreadPoolExecutor)或单独的进程(使用 ProcessPoolExecutor)来执行。两者都实现了相同的接口,由抽象的 Executor 类定义 。

  一个大致的使用框架:

import pandas as pd
import concurrent.futures

def sub_process(data: list) -> list:
	res = []
	# 处理
	return res

df = pd.read_csv('large_data.csv')
num_threads = 12
chunk_size = len(df) // num_threads

results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
    for i in range(num_threads):
        start = i * chunk_size
        end = (i + 1) * chunk_size if i < num_threads - 1 else len(df)
        data_chunk = df.iloc[start:end]
        future = executor.submit(sub_process, data_chunk)
        results.append(future)

final_result = []
for f in results:
    final_result.extend(f.result())

使用multiprocessing

  使用了上面这个后,效果并不好,CPU占用就没超过40%,肯定是那里出现了问题,查了一下可能是GIL锁的问题,它是一个互斥锁(mutex),用于防止多个本地线程同时执行 Python 字节码。这个锁主要是因为 CPython 的内存管理不是线程安全的,所以需要这个锁来保证线程安全。
  无奈,只能再尝试其他的,比如multiprocessing,既然不让我的多个线程同时执行,那我开多个进程总行了吧,无非就是多用点内存,多复制几个其他要用到的数据给进程。需要注意的是, 使用这个的话,在循环中启动进程时,没法接收到子进程处理后的结果返回值,所以需要在主进程中创建一个数据结构,让子进程执行完了把结果往里面放,完事儿再让主进程去接收,正好multiprocessing中就有队列的类,可以直接用。推荐进程数和CPU的超线程数一样。试过如果和物理核数一样的话,还是不能拉满CPU,总耗时也比不过设置为线程数。
  很喜欢看着CPU被拉满时任务管理器框框被占满的样子。

from multiprocessing import Process, Queue

def worker(pro_id, data_slice, q):
    result = [sum(data_slice)] * pro_id
    q.put((pro_id, result))

if __name__ == '__main__':
    data = list(range(100))
    num_worker = 12
    slice_size = len(data) // num_worker
    q = Queue()
    pool = []
    for i in range(num_worker):
        start = i * slice_size
        end = (i + 1) * slice_size if i < num_worker - 1 else len(data)
        data_slice = data[start:end]
        p = Process(target=worker, args=(i, data_slice, q))
        p.start()
        pool.append(p)
    result_list = []
    for _ in range(num_worker):
        result_list.append(q.get())
    for p in pool:
        p.join()
    # 如果不在意结果的顺序的话,可以不用排序,队列保存时也可去掉id项。
    result_list.sort(key=lambda x: x[0])
    final_result = [x[1] for x in result_list]
    print(final_result)

没了,主打一个短小精悍,浓缩的就是精华,希望能对你有所帮助

  其实如果要并发的任务只是纯计算的话,用上一个方法更加适合。文章来源地址https://www.toymoban.com/news/detail-609844.html

到了这里,关于Python并行处理数据多进程/多线程,榨干你的CPU的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Java基础教程】(四十二)多线程篇 · 上:多进程与多线程、并发与并行的关系,多线程的实现方式、线程流转状态、常用操作方法解析~

    理解进程与线程的区别; 掌握Java 中多线程的两种实现方式及区别; 掌握线程的基本操作方法; 进程是程序的一次动态执行过程,它经历了从代码加载、执行到执行完毕的一个完整过程,这个过程也是进程本身从产生、发展到最终消亡的过程 。多进程操作系统能同时运行多

    2024年02月16日
    浏览(46)
  • 【Python爬虫与数据分析】进程、线程、协程

    目录 一、概述 二、进程的创建 三、线程的创建 四、协程的创建 五、全局变量的共享问题 六、消息队列与互斥锁 七、池化技术 进程是系统分配资源的基本单位,线程是CPU调度的基本单位。 一个进程可包含多个线程,一个线程可包含多个协程,协程就是最小的任务执行单位

    2024年02月13日
    浏览(40)
  • Python 标准类库-并发执行之multiprocessing-基于进程的并行

    Python3.6 multiprocessing 是一个支持使用类似于线程模块的API派生进程的包。该包同时提供本地和远程并发,通过使用子进程而不是线程,有效地避开了全局解释器锁。因此, multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。它同时在Unix和Windows上运行。 该模块还引

    2024年02月09日
    浏览(34)
  • 【linux 多线程并发】多线程模型下的信号通信处理,与多进程处理的比较,属于相同进程的线程信号分发机制

    ​ 专栏内容 : 参天引擎内核架构 本专栏一起来聊聊参天引擎内核架构,以及如何实现多机的数据库节点的多读多写,与传统主备,MPP的区别,技术难点的分析,数据元数据同步,多主节点的情况下对故障容灾的支持。 手写数据库toadb 本专栏主要介绍如何从零开发,开发的

    2024年01月17日
    浏览(46)
  • selenium并发处理多个窗口线程/进程任务

    这里以百度搜索为例,通过不同的浏览器来启动不同的线程。

    2024年01月20日
    浏览(42)
  • python 开启5个线程处理list数据

    你可以使用如下代码来开启5个线程来处理列表数据: 在这个例子中,我们首先将要处理的列表划分成了5个子列表,每个子列表包含5个元素。然后,我们创建5个线程,每个线程分别处理一个子列表。最后,等待所有线程执行完毕。这样可以同时处理多个子列表,在一定程度

    2024年02月12日
    浏览(37)
  • Python爬虫:单线程、多线程、多进程

    在使用爬虫爬取数据的时候,当需要爬取的数据量比较大,且急需很快获取到数据的时候,可以考虑将单线程的爬虫写成多线程的爬虫。下面来学习一些它的基础知识和代码编写方法。 进程可以理解为是正在运行的程序的实例。进程是拥有资源的独立单位,而线程不是独立的

    2024年02月13日
    浏览(41)
  • 解放计算力:使用并行处理提升python for循环速度

    Python 是一门功能强大的编程语言,但在处理大规模数据或复杂计算任务时,性能可能成为一个瓶颈。幸运的是,Python 提供了多种方法来提升性能,其中之一是利用并行处理来加速循环操作。本文将介绍如何使用并行处理技术来优化 for 循环,从而提高 Python 程序的执行速度。

    2024年02月10日
    浏览(50)
  • 使用Sqoop的并行处理:扩展数据传输

    使用Sqoop的并行处理是在大数据环境中高效传输数据的关键。它可以显著减少数据传输的时间,并充分利用集群资源。本文将深入探讨Sqoop的并行处理能力,提供详细的示例代码,以帮助大家更全面地了解和应用这一技术。 在开始介绍Sqoop的并行处理技术之前,首先了解一下为

    2024年01月19日
    浏览(33)
  • 【python】13.进程和线程

    今天我们使用的计算机早已进入多CPU或多核时代,而我们使用的操作系统都是支持“多任务”的操作系统,这使得我们可以同时运行多个程序,也可以将一个程序分解为若干个相对独立的子任务,让多个子任务并发的执行,从而缩短程序的执行时间,同时也让用户获得更好的

    2024年01月17日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包