在Python中优雅地用多进程:进程池 Pool、管道通信 Pipe、队列通信 Queue、共享内存 Manager Value

这篇具有很好参考价值的文章主要介绍了在Python中优雅地用多进程:进程池 Pool、管道通信 Pipe、队列通信 Queue、共享内存 Manager Value。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Python 自带的多进程库 multiprocessing 可实现多进程。我想用这些短例子示范如何优雅地用多线程。中文网络上,有些人只是翻译了旧版的 Python 官网的多进程文档。而我这篇文章会额外讲一讲下方加粗部分的内容。

  • 创建进程 Process,fork 直接继承资源,所以初始化更快,spawn 只继承必要的资源,所以更省内存,「程序的入口」 if name == main
  • 进程池 Pool,Pool 只能接受一个参数,但有办法传入多个
  • 管道通信 Pipe,最基本的功能,运行速度快
  • 队列通信 Queue,有最常用的功能,运行速度稍慢
  • 共享内存 Manager Value,Python3.9 新特性 真正的共享内存 shared_memory

如下所示,中文网络上一些讲 Python 多进程的文章,很多重要的东西没讲(毕竟只是翻译了 Python 官网的多进程旧版文档)。上方的加粗部分他们没讲,但是这是做多进程总需要知道的内容。

  • 若你无法流畅阅读有专人更新的 Python 官网多进程英文文档 ,那么姑且可看写于 2019 不保证更新的南山南:一篇文章搞定 Python 多进程 (全)
  • 知我莫言:谈谈 python 的 GIL、多线程、多进程 ,点赞多但旧,写于 2016,你还不如看我下方写的「简述何为多线程 threading 与多进程 processing」

1.多线程与多进程的区别

多线程 threading: 一个人有与异性聊天和看剧两件事要做。单线程的她可以看完剧再去聊天,但这样子可能就没人陪她聊天了「哼,发消息不回」。我们把她看成一个 CPU 核心,为她开起多线程——先看一会剧,偶尔看看新消息,在两件事(线程)间来回切换。多线程:单个 CPU 核心可以同时做几件事,不至于卡在某一步傻等着。

用处:爬取网站信息(爬虫),等待多个用户输入

多进程 processing: 一个人有很多砖需要搬,他领取手套、推车各种物资(向系统申请了资源)然后开始搬砖。然而他身边有很多人,我们让这些人去帮他!(一核有难,八核围观)。于是他们做了分工,砖很快就搬完了。多进程让多个 CPU 核心可以一起做事,不至于只有一人干活而其他人傻站着。

用处:进行高性能计算。只有多进程方案设计合理,才能加速计算。

在Python中优雅地用多进程:进程池 Pool、管道通信 Pipe、队列通信 Queue、共享内存 Manager Value,python,python,服务器,linux,深度强化学习,人工智能,深度学习,强化学习

2. 全局锁与多进程

为何在 Python 里用多进程这么麻烦? 因为 Python 的线程是操作系统线程,因此要有 Python 全局解释器锁。一个 python 解释器进程内有一条主线程,以及多条用户程序的执行线程。即使在多核 CPU 平台上,由于 GIL 的存在,所以禁止多线程的并行执行。——来自百度百科词条 全局解释器锁。发展历程:

  1. Python 全局锁。Python 3.2 的时候更新过 GIL。在我小时候,由于 Python GIL 的存在(全局解释器锁 Global Interpreter Lock) ,此时 Python 无法靠自己实现多进程
  2. 外部多进程通信。Python3.5。在 2015 年,要么用 Python 调用 C 语言(如 Numpy 此类用其他语言在底层实现多进程的第三方库),要么需要在外部代码(MPI 2015)
  3. 内置多进程通信。Python 3.6 才让 multiprocessing 逐渐发展成一个能用的 Python 内置多进程库,可以进行进程间的通信,以及有限的内存共享
  4. 共享内存。Python 3.8 在 2019 年增加了新特性 shared_memory

3.子进程 Process

多进程的主进程一定要写在程序入口 if name ==‘main’: 内部

def function1(id):  # 这里是子进程
    print(f'id {id}')

def run__process():  # 这里是主进程
    from multiprocessing import Process
    process = [mp.Process(target=function1, args=(1,)),
               mp.Process(target=function1, args=(2,)), ]
    [p.start() for p in process]  # 开启了两个进程
    [p.join() for p in process]   # 等待两个进程依次结束

# run__process()  # 主线程不建议写在 if外部。由于这里的例子很简单,你强行这么做可能不会报错
if __name__ =='__main__':
    run__process()  # 正确做法:主线程只能写在 if内部

尽管在这个简单的例子里,把主进程 run__process() 写在程序入口 if 外部不会有报错。但是你最好还是按我要求去做。详细解释的内容过长,我写在→「Python 程序入口有重要功能(多线程)而非编程习惯」

上面的例子只是用 Process 开启了多进程,不涉及进程通信。当我准备把一个串行任务编排成多进程时,我还需要多进程通信。进程池 Pool 可以让主程序获得子进程的计算结果(不太灵活,适合简单任务),管道 Pipe 队列 Queue 等等 可以让进程之间进行通信(足够灵活)。共享值 Value 共享数组 Array 共享内容 shared_memory(Python 3.6 Python3.9 的新特性,还不太成熟)下面开讲。

Python 多进程可以选择两种创建进程的方式,spawn 与 fork。分支创建:fork 会直接复制一份自己给子进程运行,并把自己所有资源的 handle 都让子进程继承,因而创建速度很快,但更占用内存资源。分产创建:spawn 只会把必要的资源的 handle 交给子进程,因此创建速度稍慢。详细解释请看 Stack OverFlow multiprocessing fork vs spawn 。(分产 spawn 是我自己随便翻译的,有更好的翻译请推荐。我绝不把 handle 翻译成句柄)

multiprocessing.set_start_method('spawn')  # default on WinOS or MacOS
multiprocessing.set_start_method('fork')   # default on Linux (UnixOS)

请注意:我说 分支 fork 在初始化创建多进程的时候比 分产 spawn 快,而不是说高性能计算会比较快。通常高性能计算需要让程序运行很久,因此为了节省内存以及进程安全,我建议选择 spawn。

4.进程池 Pool

几乎 Python 多进程代码都需要你明明白白地调用 Process。而进程池 Pool 会自动帮我们管理子进程。Python 的 Pool 不方便传入多个参数,我这里提供两个解决思路:

思路 1:函数 func2 需要传入多个参数,现在把它改成一个参数,无论你直接让 args 作为一个元组 tuple、词典 dict、类 class 都可以

思路 2:使用 function.partial Passing multiple parameters to pool.map() function in Python。这个不灵活的方法固定了其他参数,且需要导入 Python 的内置库,我不推荐

import time

def func2(args):  # multiple parameters (arguments)
    # x, y = args
    x = args[0]  # write in this way, easier to locate errors
    y = args[1]  # write in this way, easier to locate errors

    time.sleep(1)  # pretend it is a time-consuming operation
    return x - y


def run__pool():  # main process
    from multiprocessing import Pool

    cpu_worker_num = 3
    process_args = [(1, 1), (9, 9), (4, 4), (3, 3), ]

    print(f'| inputs:  {process_args}')
    start_time = time.time()
    with Pool(cpu_worker_num) as p:
        outputs = p.map(func2, process_args)
    print(f'| outputs: {outputs}    TimeUsed: {time.time() - start_time:.1f}    \n')

    '''Another way (I don't recommend)
    Using 'functions.partial'. See https://stackoverflow.com/a/25553970/9293137
    from functools import partial
    # from functools import partial
    # pool.map(partial(f, a, b), iterable)
    '''

if __name__ =='__main__':
    run__pool()

5.管道 Pipe

顾名思义,管道 Pipe 有两端,因而 main_conn, child_conn = Pipe() ,管道的两端可以放在主进程或子进程内,我在实验中没发现主管道口 main_conn 和子管道口 child_conn 的区别。两端可以同时放进去东西,放进去的对象都经过了深拷贝:用 conn.send() 在一端放入,用 conn.recv() 另一端取出,管道的两端可以同时给多个进程。conn 是 connect 的缩写。

import time

def func_pipe1(conn, p_id):
    print(p_id)

    time.sleep(0.1)
    conn.send(f'{p_id}_send1')
    print(p_id, 'send1')

    time.sleep(0.1)
    conn.send(f'{p_id}_send2')
    print(p_id, 'send2')

    time.sleep(0.1)
    rec = conn.recv()
    print(p_id, 'recv', rec)

    time.sleep(0.1)
    rec = conn.recv()
    print(p_id, 'recv', rec)


def func_pipe2(conn, p_id):
    print(p_id)

    time.sleep(0.1)
    conn.send(p_id)
    print(p_id, 'send')
    time.sleep(0.1)
    rec = conn.recv()
    print(p_id, 'recv', rec)


def run__pipe():
    from multiprocessing import Process, Pipe

    conn1, conn2 = Pipe()

    process = [Process(target=func_pipe1, args=(conn1, 'I1')),
               Process(target=func_pipe2, args=(conn2, 'I2')),
               Process(target=func_pipe2, args=(conn2, 'I3')), ]

    [p.start() for p in process]
    print('| Main', 'send')
    conn1.send(None)
    print('| Main', conn2.recv())
    [p.join() for p in process]

if __name__ =='__main__':
    run__pipe()

如果追求运行更快,那么最好使用管道 Pipe 而非下面介绍的队列 Queue,详细请移步 Python pipes and queues performance ↓

So yes, pipes are faster than queues - but only by 1.5 to 2 times, what did surprise me was that Python 3 is MUCH slower than Python 2 - most other tests I have done have been a bit up and down (as long as it is Python 3.4 - Python 3.2 seems to be a bit of a dog - especially for memory usage).

曾经用到 Python 多线程队列功能写过一个实际例子 ↓,若追求极致性能,还可以把里面的 Queue 改为 Pipe。

Pipe 还有 duplex 参数poll() 方法 需要了解。默认情况下 duplex==True,若不开启双向管道,那么传数据的方向只能 conn1 ← conn2 。conn2.poll()==True 意味着可以马上使用 conn2.recv() 拿到传过来的数据。conn2.poll(n) 会让它等待 n 秒钟再进行查询。

from multiprocessing import Pipe

conn1, conn2 = Pipe(duplex=True)  # 开启双向管道,管道两端都能存取数据。默认开启
# 
conn1.send('A')
print(conn1.poll())  # 会print出 False,因为没有东西等待conn1去接收
print(conn2.poll())  # 会print出 True ,因为conn1 send 了个 'A' 等着conn2 去接收
print(conn2.recv(), conn2.poll(2))  # 会等待2秒钟再开始查询,然后print出 'A False'

尽管我下面的例子不会报错,但这是因为它过于简单,没有真的开多线程去跑,也没有写在程序入口的 if 内部。很多时候 Pipe 运行会快一点,但是它的功能太少了,得用 Queue。最明显的一个区别是:

conn1, conn2 = multiprocessing.Pipe()  # 管道有两端,某一端放入的东西,只能在另一端拿到
queue = multiprocessing.Queue()        # 队列只有一个,放进去的东西可以在任何地方拿到。

6. 队列 Queue

可以 import queue 调用 Python 内置的队列,在多线程里也有队列 from multiprocessing import Queue。下面提及的都是多线程的队列。

队列 Queue 的功能与前面的管道 Pipe 非常相似:无论主进程或子进程,都能访问到队列,放进去的对象都经过了深拷贝。不同的是:管道 Pipe 只有两个断开,而队列 Queue 有基本的队列属性,更加灵活,详细请移步 Stack Overflow Multiprocessing - Pipe vs Queue。

def func1(i):
    time.sleep(1)
    print(f'args {i}')

def run__queue():
    from multiprocessing import Process, Queue

    queue = Queue(maxsize=4)  # the following attribute can call in anywhere
    queue.put(True)
    queue.put([0, None, object])  # you can put deepcopy thing
    queue.qsize()  # the length of queue
    print(queue.get())  # First In First Out
    print(queue.get())  # First In First Out
    queue.qsize()  # the length of queue

    process = [Process(target=func1, args=(queue,)),
               Process(target=func1, args=(queue,)), ]
    [p.start() for p in process]
    [p.join() for p in process]

if __name__ =='__main__':
    run__queue()

除了上面提及的 Python 多线程,读取多个 (海康 \ 大华) 网络摄像头的视频流 ,我自己写的开源的强化学习库:小雅 ElegantRL 也使用了 Queue 进行多 CPU 多 GPU 训练,为了提速,我已经把 Queue 改为 Pipe。

7. 共享内存 Manager

为了在 Python 里面实现多进程通信,上面提及的 Pipe Queue 把需要通信的信息从内存里深拷贝了一份给其他线程使用(需要分发的线程越多,其占用的内存越多)。而共享内存会由解释器负责维护一块共享内存(而不用深拷贝),这块内存每个进程都能读取到,读写的时候遵守管理(因此不要以为用了共享内存就一定变快)。

Manager 可以创建一块共享的内存区域,但是存入其中的数据需要按照特定的格式,Value 可以保存数值,Array 可以保存数组,如下。这里不推荐认为自己写代码能力弱的人尝试。下面这里例子来自 Python 官网的 Document。

# https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing%20array#multiprocessing.Array

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

我删掉了 Python 3.8 的 shared_momery 介绍,这部分有 Bug

下文来自 Stack Overflow,问题 Shared memory in multiprocessing 下 thuzhf 的回答 2021-01 :

For those interested in using Python3.8 's shared_memory module, it still has a bug which hasn’t been fixed and is affecting Python3.8/3.9/3.10 by now (2021-01-15). The bug is about resource tracker destroys shared memory segments when other processes should still have valid access. So take care if you use it in your code.

PyTorch 也有自带的多进程 torch.multiprocessing

How to share a list of tensors in PyTorch multiprocessing? rozyang 的回答 ,非常简单,核心代码如下:

import torch.multiprocessing as mp
tensor.share_memory_()

正文已经结束,我把部分 multiprocessing 的代码都放在 github。希望大家能写出让自己满意的多线程。我设计高性能的多进程时,会遵守以下规则:

  • 尽可能少传一点数据
  • 尽可能减少主线程的负担
  • 尽可能不让某个进程傻等着
  • 尽可能减少进程间通信的频率

开源的深度强化学习 (DRL) 算法库 伯克利的 Ray-project Rllib 训练快,但太复杂,OpenAI 的 SpinningUp 简单,但不快。刚好我又懂一点多进程、Numpy、深度学习框架、深度强化学习这些双层优化算法,所以我觉得自己也写一个 DRL 库难度不大,于是开源了强化学习库:小雅 ElegantRL。让别人好好看看,DRL 库挺简单的一个东西弄那么复杂做什么?

尽管这个库会一直保持框架小巧、代码优雅来方便入门深度强化学习的人,但 ElegantRL 却把训练效率放在首位(正因如此,ElegantRL 与 SpinningUp 的定位不同),所以我需要用 Python 的多进程来加速 DRL 的训练。因而顺便写【在 Python 中优雅地用多进程】这篇东西。文章来源地址https://www.toymoban.com/news/detail-563394.html

到了这里,关于在Python中优雅地用多进程:进程池 Pool、管道通信 Pipe、队列通信 Queue、共享内存 Manager Value的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • (25)Linux IPC 进程间通信&&系统调用:pipe接口

    我们在之前讲过 \\\"进程之间是具有独立性\\\" 的,如果进程间想交互数据,成本会非常高! 因为独立性之本质即 \\\"封闭\\\",进程们你封闭你的我封闭我的,那么进程间的交流可谓是窒碍难行。 进程间的通信说白了就是 \\\"数据交互\\\",我们需要多进程进行协同处理一件事情。 刚才说的

    2024年02月02日
    浏览(37)
  • 【Linux】进程间通信——进程间通信的介绍和分类、管道、匿名管道、命名管道、匿名管道与命名管道的区别

      进程间通信(IPC,Interprocess communication)是一组编程接口,让程序员能够协调不同的进程,使之能在一个操作系统里同时运行,并相互传递、交换信息。这使得一个程序能够在同一时间里处理许多用户的要求。因为即使只有一个用户发出要求,也可能导致一个操作系统中

    2024年02月05日
    浏览(52)
  • Python实战:用多线程和多进程打造高效爬虫

    在网络爬虫的世界里,效率是关键。为了快速地获取大量数据,我们需要运用一些高级技巧,如多线程和多进程。在本篇博客中,我们将学习如何使用Python的多线程和多进程来构建一个高效的网络爬虫,以便更快速地获取目标网站上的信息。 在单线程爬虫中,我们按照顺序一

    2024年02月07日
    浏览(39)
  • 【Linux】进程通信之管道通信详解

    🍎 作者: 阿润菜菜 📖 专栏: Linux系统编程 其实管道通信是Unix中最古老的进程间通信的形式了: 管道通信是一种进程间通信的方式,它可以让一个进程的输出作为另一个进程的输入,实现数据的传输、资源的共享、事件的通知和进程的控制。 管道通信分为两种类型:匿名

    2023年04月19日
    浏览(39)
  • 【Linux】进程通信之匿名管道通信

    我们往往需要多个进程协同,共同完成一些事情。 数据传输:一个进程需要将它的数据发送给另一个进程 资源共享:多个进程之间共享同样的资源。 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止 时要通知父进程)。

    2024年04月14日
    浏览(44)
  • 【Linux】进程间通信(匿名管道 & 命名管道)-- 详解

    如何理解进程间通信? 进程具有独立性,所以进程想要通信难度是比较大的,成本高。 在日常生活中,通信的本质是传递信息,但站在程序员角度来看, 进程间通信的本质:让不同的进程看到同一份资源(内存空间) 。 进程间通信就是进程之间互相传递数据,那么进程间

    2024年04月28日
    浏览(49)
  • 进程间通信之利用命名管道进行通信

    命名管道(Named Pipe),也被称为FIFO(First In, First Out),是一种在Unix和Unix-like操作系统中用于进程间通信的特殊文件类型。它允许不相关的进程通过文件系统中的路径名进行通信。 命名管道(Named Pipe)是一种在Unix和Unix-like系统中用于进程间通信的特殊文件类型。它的作用主

    2024年01月19日
    浏览(37)
  • Linux——进程间通信、管道

    进程间的通信就是 在不同进程之间传播或交换信息。 举个例子: 古时,两军交战不斩来使; 因为两军互相是独立的,所以使节就是两军之间传话的进行传话的; 而在OS中,进程之间也是相互独立的,但某项工作并不是一个进程就可以完成,而是多个进程之间相互协助完成;

    2024年02月22日
    浏览(40)
  • 进程间通信(命名管道)

    目录:            1.命名管道            2.创建命名管道 --------------------------------------------------------------------------------------------------------------------------------- 1.命名管道 1.管道的一个应用限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信 2.如果我们想在不相

    2024年02月06日
    浏览(39)
  • Linux——进程间通信(管道)

    目录 进程通信的目的 管道 见见猪跑(举个例子) 文件描述符fd与管道的关系(深度理解管道) 什么是管道?  匿名管道 pipe函数概述 父子进程通信时与文件描述符的关系图(理解pipe函数的关键) pipe函数的使用  管道读写规则 管道的大小 自测  使用man 7 pipe查看 使用ulimit -a查看 管

    2024年02月03日
    浏览(85)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包