python multiprocessing库使用记录

这篇具有很好参考价值的文章主要介绍了python multiprocessing库使用记录。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

python multiprocessing库使用记录

需求是想并行调用形式化分析工具proverif,同时发起对多个query的分析(378个)。实验室有40核心80线程的服务器(双cpu,至强gold 5218R*2)。

观察到单个命令在分析时内存占用不大,且只使用单核心执行,因此考虑同时调用多个命令同时执行分析,加快结果输出。

最底层的逻辑是调用多个命令行语句,和在命令行直接执行proverif语句类似。在python中也就是使用 os.system()函数实现命令调用。然而由于存在如下问题,需要考虑使用多进程multiprocessing库。

  • 如果使用多线程threading库,由于GIL的存在,是否会因为一个进程未执行结束而无法发起新的进程?
  • query数量很大的原因来自于多场景分析,同时对于同一场景下的query也希望可以并行推进,同时分析。
  • query数量大+场景多,得到很多结果,每条分析语句都有各自不同的位置,需要生成大量的命令。
  • 每条query执行完成后会给出分析结果。虽然分析结果会以html文件的形式输出到指定结果文件夹,但是不能对分析结果做统一的分析,仍旧需要逐个阅读。希望能在输出后即时统计,原有输出不变,还能给出分析结果表。
  • 尽管proverif在分析上速度已经很好了,但是仍然有62条query在30000秒(8.3h)后未给出结果。希望能够统计每一条query的运行时间并记录,并能够提供当前仍在执行的query数量。
    • 进一步的,设置最高分析时长上限(如48h),若超出上限则终止分析。
    • 对于一些可达性查询(reachability,实现方法是:在实体执行最后,在公开信道上发送执行完成标记,检查攻击者是否检验实体代码是否正确,以及攻击者是否能够阻止合法实体正常执行程序(如何做?)),会出现构建攻击路径很慢的情况。但是实际上已经给出了goal reachable的结果。对于这种其实无需浪费更多时间,可以把reachability的query添加 set reconstructTrace = false .以提前结束。
    • 对于数量监控,需要多进程读写共享变量;对于运行时间记录,需要多进程读写同一个文件。

mutliprocessing库使用

主要使用multiprocessing.Pool()来创建进程池,当前python进程会创建新的python进程用于执行函数。(win下是子进程,linux下是fork)

由于存在操作系统上的差异,请使用if __name__ == '__main__':来编写主函数,否则可能出现问题。主函数内容如下。

query_num = multiprocessing.Value('i', 0)

def long_time_task(c, ):
    start = time.time()
    os.system(c)
    end = time.time()
    # task_name=...
    with query_num.get_lock():
        query_num.value -= 1
        print('Task %s runs %0.2f seconds. ' % (task_name, (end - start)) + str(query_num.value) + ' left.')
    return 'Task %s runs %0.2f seconds.' % (task_name, (end - start))


def call_back(s):
    with open('/home/dell/proverif/DDS/time.txt', "a+") as file:
        file.writelines(s + '\n')
        

if __name__ == '__main__':
    query_list = extract(path_query, 'query', '.')
    query_file_path_list = query_file(query_list)
    whole_cS = compromise_Scenarios(path_compromise, path_process_whole, work_path)
    MAC_cS = compromise_Scenarios(path_compromise, path_process_MAC, work_path)
    cmd = []
    cmd += (pv_cmd(query_file_path_list, whole_cS, path_result))
    cmd += (pv_cmd(query_file_path_list, MAC_cS, path_result))
    p = Pool(len(cmd))
    query_num.value = len(cmd)
    # for i in cmd:
    #     p.apply_async(long_time_task, args=(i,), callback=call_back)

    results = [p.apply_async(long_time_task, (i,), callback=call_back) for i in cmd]

    print('Waiting for all subprocesses done...')
    output = [result.get(timeout=24*60*60) for result in results]
    # p.close()
    # p.join()
    print('All subprocesses done.')
    

主函数前7行为文本处理,其内容不细表。

第8行p = Pool(len(cmd))创建了进程池,其长度为cmd的个数,也就是我们要同时发起这么多个进程。接下来注释掉的循环是常规的多进程发起办法,即使用apply_async函数执行我们要的函数。args是long_time_task的参数,由于需要为Iterable且只有一个参数,因此以元组形式传入。

call_back参数为回调函数,这里很像go语言下的defer,会在函数执行后再执行。回调函数接受long_time_task的返回值作为参数,我们使用这个机制实现多进程写文件。long_time_task在返回后会受到进程池p的调度,依次执行写文件操作,因此避免了同时写引起冲突。

对于剩余的query数量,使用全局变量query_num = multiprocessing.Value('i', 0),这样的变量具有锁,可以供多进程读写。每个query在完成后会将数量减一,输出时间和剩余数量。使用with query_num.get_lock():获得锁,避免读写冲突,并在使用完成后自动释放。

这已经满足了基本需求。还有一个定时终止的功能有待实现。接下来再介绍我不断修改的思路。

多进程定时终止

单进程定时终止

process = multiprocessing.Process(target=long_time_task)

# 启动进程
process.start()

# 设置运行时长上限(48小时)
timeout = 48 * 60 * 60  # 以秒为单位

# 创建定时器,在指定时间后终止进程
timer = multiprocessing.Timer(timeout, process.terminate)
timer.start()

# 等待进程结束
process.join()

使用定时器的办法,在一定时间后调用我们创建进程的process.terminate()方法结束进程。但我们需要多进程并行。

多进程定时终止

pool = multiprocessing.Pool()

# 准备要执行函数的参数列表
inputs = [1, 2, 3, 4, 5]

# 执行函数,并设置最大运行时长为30秒
result = pool.map_async(long_time_task, inputs)

# 获取结果,最多等待30秒
output = result.get(timeout=48 * 60 * 60)

map_async方法可以将函数应用于可迭代的参数列表,并返回一个AsyncResult对象,可以使用该对象的get方法获取结果。map_async方法将任务提交给进程池后会立即返回,并不会等待所有任务执行完成。如果在get方法获取结果时,其中某些任务仍在执行,将会等待直到超时。get方法拥有timeout参数,超时后会raise TimeoutError,报错终止python程序的运行。因此如果想输出已完成的结果,有两个思路:

  1. try-except捕获TimeoutError,并针对处理。
  2. 对每个结果都使用get方法并设置超时时间。

列表推导式

results = [p.apply_async(long_time_task, (i,), callback=call_back) for i in cmd]

print('Waiting for all subprocesses done...')
output = [result.get(timeout=24*60*60) for result in results]
# p.close()
# p.join()
print('All subprocesses done.')

使用apply_async方法来执行函数,该方法会也会返回一个AsyncResult对象。我们将这些对象放入results数组,接着使用数组中每个元素的结果组成output数组并定义超时时间。这样就可以执行call_back函数了。output内容其实不是很重要,主要是为了使用AsyncResult对象的get方法来设置定时器。

不过这样还是需要try-except捕获TimeoutError,以处理超时未完成的query。这样做比map_async好在哪里?我在使用的时候map_async似乎不能成功调用回调函数,还有待试验。此外,该方法并不能在设定时间时准时停下,例如我设置时间5s,则会在约12秒时才停止。

还有一个问题是,在pycharm里运行脚本时,会有部分进程无法结束。暂不清楚其原因,也不确定命令行下执行脚本是否存在同样的问题。

与Go相比

显著的感觉到python在处理多进程、多线程、并发等问题上有一定的弱点。虽然能够通过一系列操作实现,但是做起来比较吃力,也不算太优雅。现在的脚本已经可以并行分析了。然而在任务管理器中,除了看到了378个proverif进程,还看到了378个sh和378个python🤣

这378个python其实是没必要的,如果使用goroutine,发起多个协程执行shell命令即可。这样在开销和效率上都会更好。不过其实总的内存占用并不太高,所以这个点不算非常大的问题。但是如果使用更复杂的分析工具,mutliprocessing多进程调用就太笨拙了。

此外,在共享变量的访问上也不那么容易。例如query_num这个共享变量,多进程的访问就不是很方便,如果使用goroutine,则可以使用channel,创建一个monitor goroutine来接受各个goroutine的返回值,并做计数处理;文件读写也可以在channel+监视器内完成,无需考虑文件读写争用,或者也可以用mutex来互斥地读写文件。

在计时器的操作上,目前只了解到python的解决办法是抛出超时异常。这种方法会使得没执行完毕的脚本无法正常返回,不能给出信息。需要根据执行结果做一些特定的异常处理。也可能这是会有部分进程无法正常结束的原因。go语言有Timer类可以执行计时器操作,有望更优雅的解决问题。

后续计划

后续会考虑使用go语言编写一个专用于proverif的多进程并发分析调用工具,使用更优雅、效率更高的方法,实现:文章来源地址https://www.toymoban.com/news/detail-529754.html

  • 多进程并发调用
  • query分析时长记录、分析结果导出
  • 总分析时长上限设定,超时后正常退出,并标记超时query。

到了这里,关于python multiprocessing库使用记录的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Python 中的 multiprocessing 模块和 threading 模块有什么区别?什么情况下应该使用哪一个?解释 Python 中的 __del__ 方法的作用。有什么需要注意的地方解释

    multiprocessing 模块和 threading 模块都是用于在 Python 中进行并发编程的工具,但它们有一些关键的区别。以下是它们之间的比较: 区别: 并行性 vs 并发性: multiprocessing 模块用于创建独立的进程,每个进程都有自己的 Python 解释器和全局解释器锁(GIL)。因此,multiprocessing 允许

    2024年02月22日
    浏览(45)
  • Python进程池multiprocessing.Pool

    鲲鹏920:192核心 内存:756G python:3.9 在做单纯的cpu计算的场景,使用单进程核多进程的耗时做如下测试: 单进程情况下cpu的占用了如下,占用一半的核心数: 每一步和总耗时如下: cpu占用如下,每个进程基本占用48个左右核心数; 多进程的耗时如下: 每一个进程的耗时为

    2024年01月17日
    浏览(39)
  • Python分享之多进程探索 (multiprocessing包)

    在初步了解Python多进程之后,我们可以继续探索multiprocessing包中更加高级的工具。这些工具可以让我们更加便利地实现多进程。   进程池 进程池 (Process Pool)可以创建多个进程。这些进程就像是随时待命的士兵,准备执行任务(程序)。一个进程池中可以容纳多个待命的士兵。

    2024年02月08日
    浏览(34)
  • Python进程池multiprocessing.Pool八个函数对比

    Python的multiprocessing.Pool类提供了多种方法来分发任务给进程池中的工作进程。这些方法在功能和用途上有所不同,适用于不同的场景。以下是multiprocessing.Pool中八个主要函数的对比 apply() 功能:阻塞地执行一个函数,直到这个函数的执行完成。 用法:apply(func, args=(), kwds={}) 特

    2024年02月04日
    浏览(37)
  • 使用 multiprocessing 多进程处理批量数据

    示例代码 multiprocessing.Pool 创建进程池, 传入的参数是要要使用的 CPU 内核数量, 直接用 cpu_count() 可以拿到当前硬件配置所有的 CPU 内核数. pool.map 可以直接将处理后的结果拼接成一个 list 对象 应用在实际数据处理代码的效果对比: 普通处理方式, 用时 221 秒 多进程处理方式, 用时

    2024年02月09日
    浏览(43)
  • Python 标准类库-并发执行之multiprocessing-基于进程的并行

    Python3.6 multiprocessing 是一个支持使用类似于线程模块的API派生进程的包。该包同时提供本地和远程并发,通过使用子进程而不是线程,有效地避开了全局解释器锁。因此, multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。它同时在Unix和Windows上运行。 该模块还引

    2024年02月09日
    浏览(34)
  • Python__模块(TIME-进程/线程)__concurrent / multiprocessing

    进程或线程的使用。 线程是共享内存空间。 进程是内存独立状态。 同一个进程的线程之间可以直接交流。 两个进程想通信,必须通过一个中间代理来实现。 使用多线程能充分利用 CPU 来提供程序的执行效率。 每一个进程启动时都会最先产生一个线程,即主线程,然后主线程

    2024年02月04日
    浏览(35)
  • 【神行百里】python开启多线程(threading)与多进程(multiprocessing)运行

      由于处理数据过多,程序运行很慢,就学习了一下python开启多线程与多进程的方法,虽然最后也没用上,但还是记录总结一下,以备不时之需。   传送门:进程与线程认识,进程与线程通俗理解   简言之, 进程为资源分配的最小单元,线程为程序执行的最小单元

    2024年02月02日
    浏览(43)
  • 由浅入深走进Python异步编程【多进程】(含代码实例讲解 || multiprocessing、异步进程池、进程通信)

    从底层到第三方库,全面讲解python的异步编程。这节讲述的是python的多线程实现,纯干货,无概念,代码实例讲解。 本系列有6章左右,点击头像或者专栏查看更多内容,陆续更新,欢迎关注。 部分资料来源及参考链接: https://www.bilibili.com/video/BV1Li4y1j7RY/ 现在让我们初步进入

    2024年02月03日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包