Python多线程与线程池(python线程池ThreadPoolExecutor)concurrent.futures高级别异步执行封装

这篇具有很好参考价值的文章主要介绍了Python多线程与线程池(python线程池ThreadPoolExecutor)concurrent.futures高级别异步执行封装。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Python多线程与线程池

一、Python多线程

在进行复杂的计算或处理大量数据时,可以通过创建多个线程来同时执行多个任务,从而提高程序的执行效率。这种技术称为多线程编程。

1.1 线程简介

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
Python多线程与线程池(python线程池ThreadPoolExecutor)concurrent.futures高级别异步执行封装,Python,python,开发语言

1.2 Python中的多线程

Python中的threading模块提供了对线程的支持。使用threading模块创建线程,直接从threading.Thread继承,然后重写__init__方法和run方法。

import threading

class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()
        self.n = n
    
    def run(self):
        print('running task', self.n)

t1 = MyThread(1)
t2 = MyThread(2)

t1.start()
t2.start()

Python多线程与线程池(python线程池ThreadPoolExecutor)concurrent.futures高级别异步执行封装,Python,python,开发语言

1.3 GIL限制

由于Python解释器设计中的全局解释器锁(Global Interpreter Lock,GIL)的存在,使得Python的多线程并不能利用多核优势。GIL是计算机程序设计语言解释器用于同步线程的工具,使得任何时刻只有一个线程在执行,即使在多核CPU平台上,Python的线程也无法同时执行。

Python多线程与线程池(python线程池ThreadPoolExecutor)concurrent.futures高级别异步执行封装,Python,python,开发语言

二、线程池

线程池是一种基于池化思想管理线程的工具。在开始任务时不再重新创建新的线程,而是直接从线程池中获取一个空闲线程来执行。如果线程池中没有空闲线程,新的任务就会等待(排队),直到有线程空闲。当任务执行完毕后,线程并不立即销毁,而是返回线程池等待下次被利用。

2.1 Python中的线程池

Python的concurrent.futures模块提供了高级别的异步执行封装,包括线程池ThreadPoolExecutor和进程池ProcessPoolExecutor,它们都是Executor的子类。

from concurrent.futures import ThreadPoolExecutor

def func(n):
    print(n)

with ThreadPoolExecutor(max_workers=4) as executor:
    executor.map(func, range(1,5))

其中max_workers参数表示线程池中最多可以同时运行的线程数量。

Python多线程与线程池(python线程池ThreadPoolExecutor)concurrent.futures高级别异步执行封装,Python,python,开发语言

线程池优势

Python线程池具有以下优势:

  1. 控制并发数:Python的线程池可以控制系统中运行的线程数量,避免了因为创建过多线程而导致系统资源耗尽。

  2. 提高性能:通过复用已经创建的线程,可以避免频繁地创建和销毁线程所带来的性能开销。

  3. 简化代码:使用线程池,我们只需要将任务提交给线程池,无需手动管理每个线程的生命周期。

  4. 异步处理:Python的线程池提供了异步处理任务的能力。当我们提交任务给线程池后,线程池会在后台进行处理,不会阻塞主程序的执行。

  5. 调度方便:线程池还提供了一些调度功能,如定时执行、周期执行等。

  6. 任务队列:线程池内部维护了一个任务队列,如果线程池中的所有线程都在忙,新来的任务会被放入队列中等待执行,这样可以保证所有提交给线程池的任务都会被执行,不会丢失。

总的来说,使用Python线程池可以简化并发编程的复杂性,提高程序的性能和稳定性。

三、代码分析

import requests
from requests.models import PreparedRequest
import json
import concurrent.futures

def get_score_models(url):
    url_score = "https://bizapi.csdn.net/trends/api/v1/get-article-score"

    headers = {
        "accept": "application/json, text/plain, */*",
        "x-ca-key": "203930474",
        "x-ca-nonce": "22cd11a0-760a-45c1-8089-14e53123a852",
        "x-ca-signature": "RaEczPkQ22Ep/k9/AI737gCtn8qX67CV/uGdhQiPIdQ=",
        "x-ca-signature-headers": "x-ca-key,x-ca-nonce",
        "x-ca-signed-content-type": "multipart/form-data"
    }

    data = {"url": url}
    response = send_request(url_score, data, headers)
    data1 = response.json()

    score_model = data1["data"]

    return score_model


def send_request(url, data, headers):
    session = requests.Session()
    prepared_request = PreparedRequest()
    prepared_request.prepare(method='POST', url=url,
                             headers=headers, data=data)
    return session.send(prepared_request)


def process_article_json(article):
    # score_model = get_score_models(article['article_url'])
    score_model = get_score_models(article['url'])
    article['article_score'] = score_model['score']
    print(article["url"])
    return article


if __name__ == '__main__':
    # 读取articles.json文件
    with open('articles.json', 'r') as f:
        articles = json.load(f)

    # 创建一个 ThreadPoolExecutor 实例,max_workers 表示线程池中最多可以同时运行的线程数量
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # 使用 map 函数将 process_article_json 应用到每个元素,并在多线程环境下并行处理
        processed_articles = list(executor.map(process_article_json, articles))

    # 保存处理后的结果到新的JSON文件
    output_file = 'processed_articles.json'
    with open(output_file, 'w') as f:
        json.dump(processed_articles, f, ensure_ascii=False, indent=4)

import requests
from requests.models import PreparedRequest
import json
import concurrent.futures
def get_score_models(url):
url_score = “https://bizapi.csdn.net/trends/api/v1/get-article-score”
headers = {
“accept”: “application/json, text/plain, /”,
“x-ca-key”: “203930474”,
“x-ca-nonce”: “22cd11a0-760a-45c1-8089-14e53123a852”,
“x-ca-signature”: “RaEczPkQ22Ep/k9/AI737gCtn8qX67CV/uGdhQiPIdQ=”,
“x-ca-signature-headers”: “x-ca-key,x-ca-nonce”,
“x-ca-signed-content-type”: “multipart/form-data”
}
data = {“url”: url}
response = send_request(url_score, data, headers)
data1 = response.json()
score_model = data1[“data”]
return score_model
def send_request(url, data, headers):
session = requests.Session()
prepared_request = PreparedRequest()
prepared_request.prepare(method=‘POST’, url=url,
headers=headers, data=data)
return session.send(prepared_request)
def process_article_json(article):
# score_model = get_score_models(article[‘article_url’])
score_model = get_score_models(article[‘url’])
article[‘article_score’] = score_model[‘score’]
print(article[“url”])
return article
if name == ‘main’:
# 读取articles.json文件
with open(‘articles.json’, ‘r’) as f:
articles = json.load(f)
# 创建一个 ThreadPoolExecutor 实例,max_workers 表示线程池中最多可以同时运行的线程数量
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 使用 map 函数将 process_article_json 应用到每个元素,并在多线程环境下并行处理
processed_articles = list(executor.map(process_article_json, articles))
# 保存处理后的结果到新的JSON文件
output_file = ‘processed_articles.json’
with open(output_file, ‘w’) as f:
json.dump(processed_articles, f, ensure_ascii=False, indent=4)

Python多线程与线程池(python线程池ThreadPoolExecutor)concurrent.futures高级别异步执行封装,Python,python,开发语言

以上给出的代码片段主要涉及到的是线程池的使用。具体来说,首先从一个名为articles.json的文件中读取文章信息,然后利用线程池并发地获取每篇文章的评分,并将评分添加到文章信息中,最后将处理后的文章信息保存到新的JSON文件。

代码的主要部分如下:

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    processed_articles = list(executor.map(process_article_json, articles))

Python多线程与线程池(python线程池ThreadPoolExecutor)concurrent.futures高级别异步执行封装,Python,python,开发语言

这里,首先创建了一个ThreadPoolExecutor实例,并设置最大并发线程数为5。然后使用executor.map()函数将process_article_json函数应用到articles列表的每个元素上,这样就可以在多线程环境下并行处理每篇文章了。由于executor.map()函数返回的是一个迭代器,因此需要用list()函数将其转换为列表。

这种方式可以有效地提高处理大量文章信息的效率,特别是当获取文章评分的过程涉及到网络请求等I/O操作时,通过线程池并发处理可以显著减少总的处理时间。

Python多线程与线程池(python线程池ThreadPoolExecutor)concurrent.futures高级别异步执行封装,Python,python,开发语言

四、参考资料

  • Python官方文档:threading — Thread-based parallelism
  • Python官方文档:concurrent.futures — Launching parallel tasks
  • Python线程池使用示例
  • Python多线程与GIL
  • 3. 爬取自己CSDN博客列表(分页查询)(网站反爬虫策略,需要在代码中添加合适的请求头User-Agent,否则response返回空)

20230817

 with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
            future_to_ip = {executor.submit(check_ping, ip): ip for ip in ips}
            for future in concurrent.futures.as_completed(future_to_ip):
                ip = future_to_ip[future]
                try:
                    if future.result():
                        print(f"IP [{ip}] 可以ping通")
                        return True
                except Exception as exc:
                    print('%r generated an exception: %s' % (ip, exc))

这段代码是使用Python的concurrent.futures模块创建一个线程池来并发执行任务。每个任务都在一个独立的线程中运行,最大并发线程数由max_workers参数指定。

  1. with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

    这一行创建了一个线程池执行器(ThreadPoolExecutor),最大工作线程数量为20。

  2. future_to_ip = {executor.submit(check_ping, ip): ip for ip in ips}

    这一行通过线程池执行器提交任务。每个任务都是对函数check_ping的调用,并传入一个IP地址。这会返回一个Future对象,表示待完成的操作。然后将每个Future对象与对应的IP地址映射到字典future_to_ip中。

  3. for future in concurrent.futures.as_completed(future_to_ip):

    这一行遍历所有已完成的Future对象。as_completed函数返回一个迭代器,当一个Future对象完成时,它就会产生一个新的值。

    for future in concurrent.futures.as_completed(future_to_ip)这段代码是阻塞等待的。这个语句会迭代返回已经完成的Future实例。如果没有完成的Future实例,它就会等待,直到有Future实例完成。

    concurrent.futures.as_completed函数返回一个迭代器,这个迭代器会在每次Future实例完成(无论成功还是失败)时产生该Future。所以,在这个循环中,你可以立即处理已完成的任务,而不必等待所有任务都完成。但是,如果所有的Future都还未完成,那么这个循环就会阻塞,直到至少有一个Future完成。

  4. ip = future_to_ip[future]

    这一行从字典中获取与当前Future对象对应的IP地址。

  5. if future.result():

    这一行获取Future对象的结果。如果check_ping函数返回True,那么就打印出可以ping通的IP地址,并返回True。

  6. 如果在获取Future对象的结果时发生异常,那么就捕获这个异常,并打印出引起异常的IP地址和异常信息。文章来源地址https://www.toymoban.com/news/detail-662499.html

到了这里,关于Python多线程与线程池(python线程池ThreadPoolExecutor)concurrent.futures高级别异步执行封装的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java - ThreadPoolExecutor线程池分析

    首先ThreadPoolExecutor中,一共提供了7个参数,每个参数都是非常核心的属性,在线程池去执行任务时,每个参数都有决定性的作用。 但是如果直接采用JDK提供的方式去构建,可见设置的核心参数最多就两个,这样就会导致对线程池的控制粒度很粗。所以在阿里规范中也推荐自

    2024年02月10日
    浏览(48)
  • java线程池ThreadPoolExecutor使用

    在开发服务端软件项目时,软件经常需要处理执行时间很短而数目巨大的请求,如果为每一个请求创建一个新的线程,则会导致性能上的瓶颈。因为JVM需要频繁的处理线程对象的创建与销毁,如果请求的执行时间很短,则有可能花在创建和销毁线程对象上的时间大于真正执行

    2024年02月06日
    浏览(53)
  • ThreadPoolExecutor线程池内部处理浅析

    我们知道如果程序中并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束时,会因为频繁创建线程而大大降低系统的效率,因此出现了线程池的使用方式,它可以提前创建好线程来执行任务。本文主要通过java的ThreadPoolExecutor来查看线程池的内部处理过程。

    2024年02月05日
    浏览(45)
  • 线程池ThreadPoolExecutor底层原理源码分析

    ThreadPoolExecutor中提供了两种执行任务的方法: void execute(Runnable command) Future? submit(Runnable task) 实际上submit中最终还是调用的execute()方法,只不过会返回⼀个Future对象,用来获取任务执行结果: execute(Runnable command)方法执行时会分为三步: 注意:提交⼀个Runnable时,不管当前线程

    2024年02月06日
    浏览(64)
  • Java多线程 - 创建线程池的方法 - ThreadPoolExecutor和Executors

    线程池介绍 什么是线程池 ? 线程池就是一个可以复用线程的技术。 不使用线程池的问题 : 如果用户每发起一个请求,后台就创建一个新线程来处理,下次新任务来了又要创建新线程,而创建新线程的开销是很大的,这样会严重影响系统的性能。 线程池工作原理 : 例如线程池

    2023年04月16日
    浏览(48)
  • 线程池高手进阶:揭秘ThreadPoolExecutor的小妙招!

    ThreadPoolExecutor 是 Java 中用于创建和管理线程池的接口,当线程池中的任务队列已满,并且线程池中的线程数量已经达到最大时,如果再有新的任务提交,就需要一个策略来处理这些无法执行的任务。它 提供了四种拒绝策略,都是 RejectedExecutionHandler 接口的实现,如下: Abor

    2024年01月25日
    浏览(41)
  • 全网最详细的线程池 ThreadPoolExecutor 详解,建议收藏!

    五种状态: 线程池的 shutdown() 方法,将线程池由 RUNNING(运行状态)转换为 SHUTDOWN状态 线程池的 shutdownNow() 方法,将线程池由RUNNING 或 SHUTDOWN 状态转换为 STOP 状态。 注: SHUTDOWN 状态 和 STOP 状态 先会转变为 TIDYING 状态,最终都会变为 TERMINATED ThreadPoolExecutor 继承自 AbstractExe

    2024年02月03日
    浏览(46)
  • 线程池创建线程异步获取Future超时

    其中,future.get是从开始进行get方法时进行计算的时间,非future生成开始计算的,即什么时候get什么时候开始计时。 线程池从生成线程,如果核心线程不为0,则有任务时一直生成核心线程,直至到核心线程,之后开始方队列中,最后任务多就开始开辟新线程到最大线程数。

    2024年02月02日
    浏览(37)
  • C++ 多线程编程(三) 获取线程的返回值——future

    C++11标准库增加了获取线程返回值的方法,头文件为future,主要包括 future 、 promise 、 packaged_task 、 async 四个类。 那么,了解一下各个类的构成以及功能。 future是一个模板类,它是传输线程返回值(也称为 共享状态 )的媒介,也可以理解为线程返回的结果就安置在future中。

    2024年02月02日
    浏览(45)
  • 多线程系列(十九) -Future使用详解

    在前几篇线程系列文章中,我们介绍了线程池的相关技术,任务执行类只需要实现 Runnable 接口,然后交给线程池,就可以轻松的实现异步执行多个任务的目标,提升程序的执行效率,比如如下异步执行任务下载。 而实际上 Runnable 接口并不能满足所有的需求,比如有些场景下

    2024年03月14日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包