前言
本文将和大家一起探讨python提供高级接口(进程池、线程池)的并发编程,使用内置基本库concurrent.futures来实现并发,先通过官方来简单使用这个模块。先打好基础,能够有个基本的用法与认知,后续文章,我们再进行详细使用。为什么说是concurrent.futures,而不是concurrent呢?因为concurrent只有futures模块。
本文为python并发编程的第十三篇,上一篇文章地址如下:
python:并发编程(十二)_Lion King的博客-CSDN博客
下一篇文章地址如下:
python:并发编程(十四)_Lion King的博客-CSDN博客
一、快速开始
1、ProcessPoolExecutor
import concurrent.futures
import math
# 定义一些待判断的素数
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
# 判断一个数是否为素数
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
# 使用ProcessPoolExecutor作为执行器
with concurrent.futures.ProcessPoolExecutor() as executor:
# 使用executor.map()方法将is_prime函数应用到PRIMES列表中的每个元素上
# 返回的结果是一个迭代器,每个迭代项是一个二元组(number, prime),表示待判断的数和是否为素数的结果
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
该程序使用concurrent.futures
模块的ProcessPoolExecutor
作为执行器,将is_prime
函数应用到PRIMES
列表中的每个元素上,判断每个数是否为素数。
在main()
函数中,通过executor.map()
方法将is_prime
函数应用到PRIMES
列表中的每个元素上,返回的结果是一个迭代器。迭代器的每个迭代项是一个二元组(number, prime)
,表示待判断的数和是否为素数的结果。
通过zip(PRIMES, executor.map(is_prime, PRIMES))
将待判断的数和结果进行配对,然后使用for
循环遍历迭代器,打印每个数是否为素数的结果。
该程序使用多进程并行执行判断素数的任务,提高了程序的执行效率。
2、ThreadPoolExecutor
import concurrent.futures
import urllib.request
# 定义要下载的网页URL列表
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistant-subdomain.python.org/']
# 定义加载URL的函数
def load_url(url, timeout):
# 使用urllib.request.urlopen打开URL并读取内容
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# 使用ThreadPoolExecutor作为执行器,最大线程数为5
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 启动下载任务,并将每个任务与对应的URL关联起来
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
# 遍历已完成的Future对象
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
# 获取任务的结果
data = future.result()
except Exception as exc:
# 如果任务发生异常,则打印异常信息
print('%r generated an exception: %s' % (url, exc))
else:
# 如果任务正常完成,则打印网页URL和内容的长度
print('%r page is %d bytes' % (url, len(data)))
该程序使用concurrent.futures
模块进行并发的网页下载。它创建了一个ThreadPoolExecutor
执行器,最大线程数为5。然后,它遍历URL列表,使用executor.submit()
方法提交下载任务,并将每个任务与对应的URL关联起来。
在as_completed()
函数中,它迭代已完成的Future
对象,即已下载完成的任务。对于每个已完成的任务,它首先获取任务的结果(即网页内容),如果任务发生异常,则打印异常信息,否则打印网页的URL和内容的长度。
请注意,在运行该程序之前,请确保您的计算机已连接到互联网,以便能够成功下载网页内容。
3、模块函数
concurrent.futures模块除了提供ThreadPoolExecutor和ProcessPoolExecutor之外,还提供了一些其他的函数来执行异步任务和处理结果。以下是一些常用的concurrent.futures模块函数:
(1)concurrent.futures.as_completed(fs, timeout=None):
①接收一个可迭代的Future对象集合fs,返回一个生成器,在每个Future对象完成时产生结果。
②可选地指定timeout参数,用于限制等待结果的最长时间。
(2)concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED):
①接收一个可迭代的Future对象集合fs,等待所有的Future对象完成。
②可选地指定timeout参数,用于限制等待结果的最长时间。
③可选地指定return_when参数,用于指定何时返回结果,可选值包括FIRST_COMPLETED、FIRST_EXCEPTION和ALL_COMPLETED。
这些函数提供了更灵活的方式来管理和处理Future对象的执行和结果。您可以根据需要选择适合的函数来处理异步任务。
以下是使用模块函数的示例:
import concurrent.futures
def task(n):
return n ** 2
# 创建线程池执行器
with concurrent.futures.ThreadPoolExecutor() as executor:
# 提交任务到线程池
futures = [executor.submit(task, i) for i in range(5)]
# 使用wait函数等待所有任务完成
done, not_done = concurrent.futures.wait(futures, timeout=None)
for future in done:
result = future.result()
print(result)
# 使用as_completed函数按照完成顺序获取结果
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(result)
4、Executor 对象
concurrent.futures
模块中的Executor
是一个抽象基类,用于表示执行器对象。它定义了一些共同的方法和行为,用于管理并发执行的任务。但要通过它的子类调用,而不是直接调用。
Executor
类并不直接实例化,而是通过具体的子类如ThreadPoolExecutor
和ProcessPoolExecutor
来创建实例。
以下是一些常用的Executor
类方法:
(1)submit(fn, *args, **kwargs)
: 提交一个可调用对象和它的参数给执行器,返回一个Future
对象,表示该任务的未来结果。
(2)map(fn, *iterables, timeout=None)
: 批量提交任务,并按原始迭代器的顺序返回结果。它类似于内置函数map()
,但是可以异步地并发执行任务。
(3)shutdown(wait=True)
: 关闭执行器,不再接受新的任务。如果wait
参数为True(默认值),则在所有任务完成后再关闭执行器。
(4)submit_to_executor(fn, executor, *args, **kwargs)
: 将任务提交给指定的执行器对象,并返回一个Future
对象。
(5)map_to_executor(fn, executor, *iterables, timeout=None)
: 将任务批量提交给指定的执行器对象,并返回结果。
这些方法使得在执行任务时更加方便和灵活。可以根据具体的需求选择合适的方法和执行器类型。
5、Future 对象
concurrent.futures.Future
是concurrent.futures
模块中的一个类,用于表示一个异步任务的未来结果。它将可调用对象封装为异步执行。Future 实例由 Executor.submit() 创建,除非测试,不应直接创建。
以下是concurrent.futures.Future
的一些常用方法:
(1)result()
: 等待并返回异步任务的结果。如果任务尚未完成,该方法会阻塞直到任务完成并返回结果。
(2)done()
: 判断异步任务是否已经完成,返回布尔值。
(3)cancel()
: 取消异步任务的执行。如果任务已经开始执行或已经完成,则无法取消。文章来源:https://www.toymoban.com/news/detail-488753.html
(4)add_done_callback(fn)
: 添加一个回调函数,当异步任务完成时会调用该回调函数。文章来源地址https://www.toymoban.com/news/detail-488753.html
到了这里,关于python:并发编程(十三)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!