concurrent.futures模块ThreadPoolExecutor、ProcessPoolExecutor讲解及使用实例

这篇具有很好参考价值的文章主要介绍了concurrent.futures模块ThreadPoolExecutor、ProcessPoolExecutor讲解及使用实例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

导入concurrent.futures.ThreadPoolExecutor

import concurrent.futures

concurrent.futures模块详解

这个模块是python并发执行的标准库,具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能。

  • 模块组成
    1、concurrent.futures.Executor: 这是一个虚拟基类,提供了异步执行的方法。
    2、submit(function, argument): 调度函数(可调用的对象)的执行,将 argument 作为参数传入。
    3、map(function, argument): 将 argument 作为参数执行函数,以 异步 的方式。
    4、shutdown(Wait=True): 发出让执行者释放所有资源的信号。
    5、concurrent.futures.Future: 其中包括函数的异步执行。Future对象是submit任务(即带有参数的functions)到executor的实例。

Executor是抽象类(父类),可以通过子类访问,即线程或进程的 ExecutorPools 。ThreadPoolExecutor是Executor的子类,它使用线程池来异步执行调用。因为,线程或进程的实例是依赖于资源的任务,所以最好以“池”的形式将他们组织在一起,作为可以重用的launcher或executor。

源码函数分析

init

class ThreadPoolExecutor(_base.Executor):

    # Used to assign unique thread names when thread_name_prefix is not supplied.
    _counter = itertools.count().__next__

    def __init__(self, max_workers=None, thread_name_prefix='',
                 initializer=None, initargs=()):
        # max_workers参数为空时,默认为机器处理器个数+4,最大值为32
        # thread_name_prefix  线程可选名称前缀
        # initializer  初始化工作线程使,指定的可调用函数
        # initargs  传给可调用函数的参数元组
        """Initializes a new ThreadPoolExecutor instance.

        Args:
            max_workers: The maximum number of threads that can be used to
                execute the given calls.
            thread_name_prefix: An optional name prefix to give our threads.
            initializer: A callable used to initialize worker threads.
            initargs: A tuple of arguments to pass to the initializer.
        """
        if max_workers is None:
            # ThreadPoolExecutor is often used to:
            # * CPU bound task which releases GIL
            # * I/O bound task (which releases GIL, of course)
            #
            # We use cpu_count + 4 for both types of tasks.
            # But we limit it to 32 to avoid consuming surprisingly large resource
            # on many core machine.
            max_workers = min(32, (os.cpu_count() or 1) + 4)
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")

        if initializer is not None and not callable(initializer):
            raise TypeError("initializer must be a callable")

        self._max_workers = max_workers
        self._work_queue = queue.SimpleQueue()
        self._idle_semaphore = threading.Semaphore(0)
        self._threads = set()
        self._broken = False
        self._shutdown = False
        self._shutdown_lock = threading.Lock()
        self._thread_name_prefix = (thread_name_prefix or
                                    ("ThreadPoolExecutor-%d" % self._counter()))
        self._initializer = initializer
        self._initargs = initargs
  • 举例
# demo1
import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

# demo2
import os
import random
import threading
import requests as rq
import time

from threading import Thread, Lock
from queue import Queue  # 用于多线程之间线程安全的数据通信
from concurrent.futures import ThreadPoolExecutor, as_completed

pool = ThreadPoolExecutor()
'''
利用线程池对I/O密集型任务进行优化
with ThreadPoolExecutor() as pool:
        futures = [pool.submit(craw, url) for url in urls]
        for future in futures:#as_completed后的结果顺序是不固定的
            print(future.result())
            html_queue.put(future.result())'''

def event_1():
    print("event_1 started")
    time.sleep(1)
    print("event_1 ended")
    return 1


def event_2():
    print("event_2 started")
    time.sleep(2)
    print("event_2 ended")
    return 2


def event_3():
    print("event_3 started")
    time.sleep(3)
    print("event_3 ended")
    return 3


def main():
    t0 = time.time()
    res1 = pool.submit(event_1)
    res2 = pool.submit(event_2)
    res3 = pool.submit(event_3)
    print(res1.result())
    print(res2.result())
    print(res3.result())
    t1 = time.time()
    print(t1 - t0)


if __name__ == '__main__':
    main()


submit

submit(*args, **kwargs)
安排可调用对象 fn 以 fn(*args, **kwargs) 的形式执行,并返回 Future 对象来表示它的执行。

 with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

map

  • map(func, *iterables, timeout=None, chunksize=1)

  • 类似内置函数 map(func, *iterables),但是有两点不同:
    1、立即获取 iterables 而不会惰性获取;
    2、异步执行 func,并支持多次并发调用。
    它返回一个迭代器。

  • 从调用 Executor.map() 开始的 timeout 秒之后,如果在迭代器上调用了 next() 并且无可用结果的话,迭代器会抛出 concurrent.futures.TimeoutError 异常。

  • timeout 秒数可以是浮点数或者整数,如果设置为 None 或者不指定,则不限制等待时间。

  • 如果 func 调用抛出了异常,那么该异常会在从迭代器获取值的时候抛出。

  • 当使用 ProcessPoolExecutor 的时候,这个方法会把 iterables 划分成多个块,作为独立的任务提交到进程池。这些块的近似大小可以通过给 chunksize 指定一个正整数。对于很长的 iterables,使用较大的 chunksize 而不是采用默认值 1,可以显著提高性能。对于 ThreadPoolExecutor,chunksize 不起作用。

不管并发任务的执行次序如何,map 总是基于输入顺序来返回值。map 返回的迭代器,在主程序迭代的时候,会等待每一项的响应。

    def xiaotu_receive_thread(self, *phone):
        for i in phone:
            res = self.number_parameter(i)
            userid, nickname, token, sign = res
            print(res)
            s = f"{token['S']}"
            t = f"{token['T']}"
            c_list = (s, t, userid)
        cooke_list = [c_list] * 10
        with concurrent.futures.ThreadPoolExecutor() as pool:
            pool.map(AccountNumber().xiaotu_receive, cooke_list)

shutdown(wait=True)

  • 告诉执行器 executor 在当前所有等待的 future 对象运行完毕后,应该释放执行器用到的所有资源。
  • 在 shutdown 之后再调用 Executor.submit() 和 Executor.map() 会报运行时错误 RuntimeError。
  • 如果 wait 为 True,那么这个方法会在所有等待的 future 都执行完毕,并且属于执行器 executor 的资源都释放完之后才会返回。
  • 如果 wait 为 False,本方法会立即返回。属于执行器的资源会在所有等待的 future 执行完毕之后释放。
  • 不管 wait 取值如何,整个 Python 程序在等待的 future 执行完毕之前不会退出。
  • 你可以通过 with 语句来避免显式调用本方法。with 语句会用 wait=True 的默认参数调用 Executor.shutdown() 方法。

执行器类 Executor 实现了上下文协议,可以用做上下文管理器。它能并发执行任务,等待它们全部完成。当上下文管理器退出时,自动调用 shutdown() 方法。

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

ProcessPoolExecutor 进程池执行器

  • ProcessPoolExecutor 使用了 multiprocessing 模块,这允许它可以规避 Global Interpreter Lock,但是也意味着只能执行和返回可序列化的(picklable)对象。

  • main 模块必须被 worker 子进程导入,这意味着 ProcessPoolExecutor 在交互解释器中无法工作。

  • 在已经被提交到 ProcessPoolExecutor 中的可调用对象内使用 Executor 或者 Future 方法会导致死锁。

  • concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
    1、这个 Executor 子类最多用 max_workers 个进程来异步执行调用。
    2、如果不指定 max_workers 或者为 None,它默认为本机的处理器数量。
    3、如果 max_workers 小于等于 0,会抛出 ValueError 异常。
    4、mp_context 是多进程上下文(multiprocessing context)或者 None,它会被用来启动 workers。
    5、如果不指定 mp_context 或者为 None,会使用默认的多进程上下文环境。

  • initializer 是一个可选的可调用对象,会在每个 worker 进程启动之前调用。

  • initargs 是传递给 initializer 的参数元组。

  • 如果 initializer 抛出了异常,那么当前所有等待的任务都会抛出 BrokenProcessPool 异常,继续提交 submit 任务也会抛出此异常。文章来源地址https://www.toymoban.com/news/detail-622436.html

ProcessPoolExecutor使用实例

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

到了这里,关于concurrent.futures模块ThreadPoolExecutor、ProcessPoolExecutor讲解及使用实例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • python爬虫——request模块讲解,Python详解

    对于GET方式的请求,浏览器会把http header和data一并发送出去,服务器响应200(返回数据); 而对于POST, 浏览器先发送header,服务器响应100 continue,浏览器再发送data,服务器响应200 ok(返回数据)。 (二)http常见请求参数 url:请求url地址 headers:请求头 **data:发送编码为表

    2024年04月26日
    浏览(36)
  • 自定义线程池 ThreadPoolExecutor

    Executor框架最核心的类是ThreadPoolExecutor,它是线程池的实现类,主要由下列4个组件构成。 ·corePool:核心线程池的大小。 ·maximumPool:最大线程池的大小。 ·BlockingQueue:用来暂时保存任务的工作队列。 ·RejectedExecutionHandler:当ThreadPoolExecutor已经关闭或ThreadPoolExecutor已经饱和时

    2024年02月05日
    浏览(34)
  • ThreadPoolExecutor源码学习

    继承结构如图所示: ThreadPoolExecutor - AbstractExecutorService - ExecutorService - Executor 线程池状态参数 线程池状态转移图如下所示 RUNNING: 线程池创建后进入的状态 SHUTDOWN: 调用 shutdown 方法进入该状态,该方法主要包含如下操作 更新线程池状态为 SHUTDOWN 中断空闲线程 interruptIdleWorker

    2023年04月11日
    浏览(32)
  • ThreadPoolExecutor使用浅谈

    ThreadPoolExecutor 是Python标准库 concurrent.futures 模块中的一个类,用于实现线程池的功能。 ThreadPoolExecutor 模块相比于 threading 等模块,通过 submit 方法返回的是一个 Future 对象,它代表了一个未来可期的结果。通过 Future 对象,我们可以在主线程(或主进程)中获取某个线程(或任

    2024年02月06日
    浏览(35)
  • 13.STM32超声波模块讲解与实战

    目录 1.超声波模块讲解 2.超声波时序图 3.超声波测距步骤 4.项目实战   超声波传感器模块上面通常有两个超声波元器件,一个用于发射,一个用于接收。电路板上有4个引脚:VCC GND Trig(触发),Echo(回应)主要参数: 工作电压与电流:5V,15ma 感应距离:2-400cm 感测角度:不

    2023年04月12日
    浏览(40)
  • Java - ThreadPoolExecutor线程池分析

    首先ThreadPoolExecutor中,一共提供了7个参数,每个参数都是非常核心的属性,在线程池去执行任务时,每个参数都有决定性的作用。 但是如果直接采用JDK提供的方式去构建,可见设置的核心参数最多就两个,这样就会导致对线程池的控制粒度很粗。所以在阿里规范中也推荐自

    2024年02月10日
    浏览(48)
  • java线程池ThreadPoolExecutor使用

    在开发服务端软件项目时,软件经常需要处理执行时间很短而数目巨大的请求,如果为每一个请求创建一个新的线程,则会导致性能上的瓶颈。因为JVM需要频繁的处理线程对象的创建与销毁,如果请求的执行时间很短,则有可能花在创建和销毁线程对象上的时间大于真正执行

    2024年02月06日
    浏览(53)
  • ThreadPoolExecutor线程池内部处理浅析

    我们知道如果程序中并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束时,会因为频繁创建线程而大大降低系统的效率,因此出现了线程池的使用方式,它可以提前创建好线程来执行任务。本文主要通过java的ThreadPoolExecutor来查看线程池的内部处理过程。

    2024年02月05日
    浏览(45)
  • 线程池ThreadPoolExecutor底层原理源码分析

    ThreadPoolExecutor中提供了两种执行任务的方法: void execute(Runnable command) Future? submit(Runnable task) 实际上submit中最终还是调用的execute()方法,只不过会返回⼀个Future对象,用来获取任务执行结果: execute(Runnable command)方法执行时会分为三步: 注意:提交⼀个Runnable时,不管当前线程

    2024年02月06日
    浏览(64)
  • ThreadPoolExecutor——高效处理并发任务的必备良器

      ThreadPoolExecutor是Java concurrent中用于管理线程池的类,它是Executor框架的一个实现。线程池是一种提高应用程序性能和可靠性的技术,它将多个任务分配给多个线程执行,从而实现并发处理。ThreadPoolExecutor提供了一种灵活的方式来管理线程池,可以控制线程池的大小、阻塞

    2024年02月02日
    浏览(30)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包