Python笔记二之多线程

这篇具有很好参考价值的文章主要介绍了Python笔记二之多线程。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本文首发于公众号:Hunter后端

原文链接:Python笔记二之多线程

这一篇笔记介绍一下在 Python 中使用多线程。

注意:以下的操作都是在 Python 3.8 版本中试验,不同版本可能有不同之处,需要注意。

本篇笔记目录如下:

  1. 概念
  2. 多线程的使用示例
    daemon
    run()
  3. 线程对象的属性和设置
  4. 线程模块相关函数
    1. threading.active_count()
    2. threading.current_thread()
    3. threading.enumerate()
  5. 线程的异常和函数结果获取
  6. 线程池
    1. result()
    2. done()
    3. exception()
    4. cancel()
    5. running()
  7. 如何探索出最佳的线程池线程数量

1、概念

关于进程与线程的概念,这里简单介绍下。

一个进程是一个独立的执行环境,包括代码、数据和系统资源等,每个进程都有自己的内存空间、文件描述符、环境变量等。

而线程存在于进程中,共享进程内的内存和资源。

至于多进程与多线程,多进程可以充分利用计算机的多核 CPU,适用于 CPU 密集型的任务,,比如进行大量计算操作

而多线程则适用于涉及到大量的 IO 操作的任务,比如网络请求,文件读写等,在 Python 中有一个 GIL 的概念,它的全称是 Global Interpreter Lock,为全局解释器锁。

GIL 的存在是为了使同一时刻只有一个线程在运行 Python 代码,保护解释器的内部数据避免收到并发访问的影响。

所以 Python 中的多线程操作实际上是在多个线程中进行切换,以此来实现想要的并发效果。

2、多线程的使用示例

前面介绍了 Python 中多线程的操作适用于 IO 密集型的任务,所以这里以访问某个接口为例介绍一下多线程的使用。

那个接口我们这里用 Flask 创建一个服务器,其内容如下:

# app/__init__.py

from flask import Flask
import time

def create_app():
    app = Flask(__name__)

    @app.route("/test/<int:delay>")
    def test(delay):
        time.sleep(delay)
        return str(time.time())

    return app 

这个接口通过 delay 参数可以指定接口的休眠时间返回,比如 /test/4,那么接口响应时间大约会是 4 秒。

在 Python 中,用到多线程的模块是 threading 模块,以下是一个使用示例:

import threading
import time

import requests

def get_response(url):
    response = requests.get(url)
    print(response.content)

def test_multi_threading():
    url = "http://192.168.1.6:5000/test/2"
    threads = []

    for i in range(20):
        threads.append(threading.Thread(target=get_response, args=(url,)))

    for t in threads:
        t.start()

    for t in threads:
        t.join()

def test_single():
    url = "http://192.168.1.6:5000/test/2"

    for i in range(5):
        get_response(url)

if __name__ == "__main__":
    start_time = time.time()
    test_multi_threading()
    print("运行耗时:", time.time() - start_time)
    
    start_time = time.time()
    test_single()
    print("运行耗时:", time.time() - start_time)

在这里我们可以比对单个线程执行五次,需要的时间大约是 10 秒,而使用多线程的方式虽然调用了 20 次接口,但是耗时大约只有 2 秒,这就是多线程在 IO 密集型的情况下的好处。

接下来具体介绍下多线程的使用方法:

def test_multi_threading():
    url = "http://192.168.1.6:5000/test/2"
    threads = []

    for i in range(20):
        threads.append(threading.Thread(target=get_response, args=(url,)))

    for t in threads:
        t.start()

    for t in threads:
        t.join()

在这里,我们通过 threading.Thread() 的方式创建一个线程,然后通过 .start() 方法开始线程活动。

接着通过 join() 方法阻塞调用这个方法的线程,在这里也就是主线程,等待 t 线程完成后再执行主线程后面的操作。

如果我们尝试注释掉 t.join() 这两行,那么主线程就会不等待 t 线程直接往后面执行,造成我们后面在主函数里计算的时间不准确。

daemon

可以根据这个参数设置线程是否为守护线程,所有线程创建的时候默认都不是守护线程,如果需要设置线程为守护线程,需要额外做设置。

守护线程是一种特殊类型的线程,生命周期受到主线程的影响,也就是说当主线程结束时,守护线程会被强制终止,它不会阻止主线程的正常执行,主线程也不会像其他线程调用了 join() 一样被阻塞。

守护线程通常用于执行一些辅助性任务,比如日志记录、定时任务等,示例如下,我们开启了一个守护线程用于定时 print() 某些信息:

def print_info():
    while True:
        print("daemon threading, curr_time:", time.time())
        time.sleep(1)


def test_daemon_threading():
    base_url = "http://192.168.1.6:5000/test/"

    t1 = threading.Thread(target=get_response, args=(base_url + str(6),))
    t2 = threading.Thread(target=get_response, args=(base_url + str(2),))

    daemon_t = threading.Thread(target=print_info, args=(), daemon=True)

    t1.start()
    t2.start()
    daemon_t.start()

    t1.join()
    t2.join()

这样,守护线程 daemon_t 就会在后台一直循环打印信息,直到主线程结束,守护线程也会被强制终止。

run()

run()start() 方法都和线程的执行有关。

start() 用于启动线程,线程变量调用 start() 后,比如前面的 t.start(),会立即开始执行线程,且线程的执行与主线程并行进行。

run() 定义的是线程内的执行逻辑,是线程的入口点,表示的是线程活动的方法,线程开启后就会调用 run() 方法,执行线程的任务。

在执行 start() 方法后,线程会自动调用 run() 方法,以此来执行线程内需要调用的函数,我们可以通过重写 run() 方法来实现我们想要的定制化功能,比如在后面我们就是通过重写 run() 方法来实现线程的异常信息以及函数的结果返回的,

3、线程对象的属性和设置

线程本身有一些属性可以用于设置和获取,我们先创建一条线程:

t1 = threading.Thread(target=get_response, args=(base_url + str(6),))

查看线程名称

线程名称只是用于标记线程的,并无实际意义,根据用户设置而定,比如前面创建了线程,默认名为 Thread-1,我们可以通过下面的两个操作获取,两个操作是等效的:

t1.name
t1.getName()

设置线程名称

设置线程名称的方法如下:

t1.setName("test_thread")

判断线程是否存活

在未进行 start() 操作前,不是存活状态:

t1.is_alive()
# False

判断线程是否是守护线程

t1.daemon
t1.isDaemon()
# False

设置线程为守护线程

将线程设置为守护线程:

t1.setDaemon(True)

True 为是,False 为否

4、线程模块相关函数

对于 threading 模块,有一些函数可以用于进行相关操作,比如当前存活的线程对象,异常处理等。

接下来先介绍这些函数及其功能,之后会用一个示例应用上这些函数

1. threading.active_count()

返回当前存活的 Thread 对象的数量

2. threading.current_thread()

返回当前对应调用者的线程

3. threading.enumerate()

列表形式返回当前所有存活的 Thread 对象

接下来我们修改 print_info() 函数,运用我们刚刚介绍的这几种函数:

def print_info():
    while True:
        active_count = threading.active_count()
        print("当前存活的线程数量为:", active_count)
        for thread in threading.enumerate():
            print("存活的线程分别是:", thread.getName())
        print("当前所处的的线程名称为:", threading.current_thread().getName())
        print("\n")
        time.sleep(1)

还是执行 test_daemon_threading() 就可以看到对应的输出信息。

5、线程的异常和函数结果获取

Python 中使用 threading 模块创建的线程中的默认异常以及函数执行结果是不会被主线程捕获的,因为线程是独立运行的,我们可以通过定义全局的变量,比如 dict 或者队列来获取对应的信息。

这里介绍一下通过改写 run() 方法来实现我们的功能。

import threading
import traceback
import time
import request

def get_response(url):
    response = requests.get(url)
    if url.endswith("2"):
        1/0
    return time.time()

def print_info():
    while True:
        active_count = threading.active_count()
        print("当前存活的线程数量为:", active_count)
        for thread in threading.enumerate():
            print("存活的线程分别是:", thread.getName())
        print("当前所处的的线程名称为:", threading.current_thread().getName())
        print("\n")
        time.sleep(1)

class MyThread(threading.Thread):
    def __init__(self, func, *args, **kwargs):
        super(MyThread, self).__init__()
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.result = None
        self.is_error = None
        self.trace_info = None

    def run(self):
        try:
            self.result = self.func(*self.args, **self.kwargs)
        except Exception as e:
            self.is_error = True
            self.trace_info = traceback.format_exc()

    def get_result(self):
        return self.result if self.is_error is not True else None


def test_get_exception_and_result():
    base_url = "http://192.168.1.6:5000/test/"

    t1 = MyThread(get_response, base_url + str(3))
    t2 = MyThread(get_response, base_url + str(2))

    daemon_t = MyThread(print_info)
    daemon_t.setDaemon(True)

    t1.start()
    t2.start()
    daemon_t.start()

    t1.join()
    t2.join()

    print(t1.get_result())
    print(t2.is_error)
    print(t2.trace_info)

if __name__ == "__main__":
    test_get_exception_and_result()

在这里,我们调用 get_response 函数时,通过判断 delay 的值,手动触发了报错,以及添加了一个 return 返回值,且通过 MyThread 这个重写的 threading.Thread 来进行操作,获取到线程执行是否有异常,以及异常信息,以及函数返回的结果。

6、锁

如果有时候多个线程需要访问同一个全局变量,可能会导致数据不一致的问题,我们使用线程里的锁来控制对相关资源的访问,以此来确保线程安全,下面是一个示例:

import threading

counter = 0
lock_counter = 0
lock = threading.Lock()


def test_no_lock():
    global counter
    for i in range(1000000):
        counter += 1
        counter -= 1


def run_no_lock_thread():
    t1 = threading.Thread(target=test_no_lock)
    t2 = threading.Thread(target=test_no_lock)

    t1.start()
    t2.start()

    t1.join()
    t2.join()


def test_lock():
    global lock_counter
    for i in range(1000000):
        lock.acquire()
        lock_counter += 1
        lock_counter -= 1
        lock.release()


def run_lock_thread():
    t1 = threading.Thread(target=test_lock)
    t2 = threading.Thread(target=test_lock)

    t1.start()
    t2.start()

    t1.join()
    t2.join()


if __name__ == "__main__":
    print("before: ", counter)
    run_no_lock_thread()
    print("after: ", counter)

    print("before: ", lock_counter)
    run_lock_thread()
    print("after: ", lock_counter)

在上面的示例中,通过比对两个加锁和不加锁的情况下全局变量的值,可以发现,多执行几次的话,可以看法 counter 的值并不总是为 0 的,而 lock_counter 的值的结果一直是 0。

我们通过这种加锁的方式来保证 lock_counter 的值是安全的。

锁的引入我们使用的是:

lock = threading.Lock()

获取以及释放的方法是:

lock.acquire()

lock.release()

在这里对于 lock.acquire() 获取锁,有两个参数,blockingtimeout

blocking 表示是否阻塞,默认为 True,表示如果锁没有被释放,则会一直阻塞到锁被其他线程释放,为 False 的话,则表示不阻塞地获取锁,获取到返回为 True,没有获取到返回为 False

lock.acquire()
# 返回为 True,表示获取到锁

lock.acquire()
lock.acquire(blocking=True)
# 这两个操作都是阻塞获取锁,因为前一个操作已经获取到锁,所以这一步会被一直阻塞


is_lock = lock.acquire(blocking=False)
# 不阻塞的获取锁,如果拿到了锁并加锁,则返回为 True,否则返回为 False,表示没有拿到锁

还有一个参数为 timeout,表示 blockingTrue,也就是阻塞的时候,等待的秒数之后,超时没有拿到锁,返回为 False

release() 表示为锁的释放,没有返回值,当前面获取锁之后,可以通过 lock.release() 的方式释放锁。

locked() 返回为布尔型数据,判断是否获得了锁。

7、线程池

我们可以通过线程池的方式来自动管理我们的线程,用到的模块是 concurrent.futures.ThreadPoolExecutor

以下是一个使用示例:

from concurrent.futures import ThreadPoolExecutor
import concurrent.futures


def get_response(url):
    return True


with ThreadPoolExecutor(max_workers=8) as executor:
    future_list = [executor.submit(get_response, base_url) for _ in range(20)]

    for future in concurrent.futures.as_completed(future_list):
        print(future.result()

在这里,首先实例化一个线程池,然后输入 max_workers 参数,表示线程池开启的最大的线程数。

之后通过 submit() 方法向线程池提交两个任务,并返回一个 Future 对象,我们可以通过这个 Future 对象获取线程函数执行的各种情况,比如线程函数的返回结果,线程异常情况等。

在这里有一个 concurrent.futures.as_completed() 输入的是一个 Future 列表,会按照 任务完成的顺序 逐个返回已经完成的 Future 对象,这个完成,可以是线程函数执行完成,也可以是出现异常的结果。

接下来介绍一下 Future 对象的几个方法,在此之前,我们设置一下用于试验的基本数据:

from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
import requests
import time

def get_response(url):
    response = requests.get(url)
    if url.endswith("2"):
        1/0
    return time.time()

base_url = "http://192.168.1.6:5000/test/"
executor = ThreadPoolExecutor(max_workers=2)

future_1 = executor.submit(get_response, base_url + "3")
future_2 = executor.submit(get_response, base_url + "2")

其中,future_1 线程是正常运行,future_2 在线程里执行报错了。

1. result()

用于获取线程执行的函数返回的结果,如果线程还未完成,那么调用这个方法会阻塞,直到返回结果。

而如果线程里函数执行异常了,调用 result() 方法会重新抛出异常,希望程序正常运行的话,可以加上一个 try-except 操作,或者先通过后面的 exception()方法进行判断。

我们调用 future_1.result() 可以正常返回,而 future_2.result() 会重新报异常。

2. done()

返回一个布尔值,表示线程是否已经完成:

future_1.done() # True
future_2.done() # True

线程执行发生异常也属于完成。

3. exception()

如果线程执行发生异常,可以用这个方法来获取异常对象,如果没有异常就会返回 None

future_2.exception()
# ZeroDivisionError('division by zero')

4. cancel()

尝试取消线程的执行,如果线程还没有开始执行,线程会被标记为取消状态,如果线程已经在执行中或者执行完毕,则不会被取消:

future.cancel()

判断一个线程是否已经被取消,使用方法 cancelled(),返回布尔型数据

5. running()

判断线程是否还在执行中,比如下面的操作:

future_3 = executor.submit(get_response, base_url + "65")
future_3.running()  # True

8、如何探索出最佳的线程池线程数量

对于线程池中线程的数量需要指定多少个,是一个需要探索的问题。

比如需要判断我们的任务是否是 IO 密集型的,比如网络请求等,这种的话可以设置相对较高,但也并非无限高,因为等待的过程中,线程间的切换也是一部分开销。

在执行真正的任务前,我们可以通过一小部分任务来进行性能测试,逐步调整线程池的线程数量,然后观察服务器的内存啊,CPU 利用率啊,以及整个操作的消耗时间等,来综合判断出比较适合的线程数量作为最终的结果。

如果想获取更多后端相关文章,可扫码关注阅读:
Python笔记二之多线程文章来源地址https://www.toymoban.com/news/detail-772800.html

到了这里,关于Python笔记二之多线程的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Java之多线程进阶

    目录 一.上节内容复习 1.线程池的实现 2.自定义一个线程池,构造方法的参数及含义 3.线程池的工作原理 4.拒绝策略 5.为什么不推荐系统提供的线程池 二.常见的锁策略 1.乐观锁和悲观锁 2.轻量级锁和重量级锁 3.读写锁和普通互斥锁 4.自旋锁和挂起等待锁 5.可重入锁和不可重入

    2024年02月05日
    浏览(47)
  • JavaEE之多线程编程:3. 线程的状态(易懂!)

    进程 最核心的状态,一个是就绪状态,一个是阻塞状态(对于线程同样使用)。 以线程为单位进行调度的。 在Java中,又给线程赋予了一些其他的状态。 线程的状态是一个枚举类型 Thread.State 线程的状态一共有6个: NEW:安排了工作, 还未开始行动。(Thread对象已经有了,.

    2024年01月19日
    浏览(50)
  • Java之多线程初阶

    目录 一.进程和线程 1.什么是进程 2.并发,并行和串行 3.线程和多线程 4.进程和线程的区别(重点) 5.多线程的优点 6.多线程的缺点 二.线程的创建 1.继承Thread类 2.实现Runnable接口重写run()方法 3.通过匿名内部类的方式创建Thread和实现Runnable 4.通过Lambda表达式来实现一个线程 三.查看

    2024年02月05日
    浏览(43)
  • Rust语言之多线程

    多线程 是一种并发执行的技术,它允许一个程序或进程同时执行多个线程。每个线程都是程序执行的一个独立路径,它们可以并行运行,共享进程的资源(如内存空间),但每个线程有自己的指令指针、堆栈和局部变量。多线程的主要目的是提高程序的执行效率,通过同时执

    2024年02月22日
    浏览(35)
  • Linux之多线程(上)——Linux下的线程概念

    本文介绍了地址空间和二级页表、Linux下的线程、线程的优缺点以及线程与进程的关系等概念。 地址空间是进程能看到的资源窗口 :一个进程可以看到代码区、堆栈区、共享区、内核区等,大部分的资源是在地址空间上看到的。 页表决定进程真正有用资源的情况 :进程认为

    2024年02月09日
    浏览(43)
  • es 笔记二之基础查询

    本文首发于公众号:Hunter后端 原文链接:es笔记二之基础查询 这一篇笔记介绍 es 的基础查询。 基础查询包括很多,比如排序,类似数据库 limit 的操作,like 操作,与或非等,对于这些操作,我会在介绍他们的用法之后加上对应的数据库 sql 便于理解。 注意: 下面的操作都在

    2024年02月05日
    浏览(63)
  • Java之多线程初阶2

    目录 一.上节内容复习 1.进程和线程的区别 2.创建线程的四种方式 二.多线程的优点的代码展示 1.多线程的优点 2.代码实现 三.Thread类常用的方法 1.Thread类中的构造方法 2.Thread类中的属性 1.为线程命名并获取线程的名字 2.演示isDaemon() 3.演示isAlive() 4.演示getState() 3.Thread类中的方

    2024年02月04日
    浏览(45)
  • Linux入门之多线程|线程的同步|生产消费模型

    文章目录 一、多线程的同步 1.概念 2.条件变量 2.1条件变量概念 2.2条件变量接口 1.条件变量初始化 2.等待条件满足 3.唤醒等待 3.销毁条件变量 2.3条件变量demo 二、生产消费模型 1.生产消费模型 2.基于BlockQueue的生产者消费者模型 3.基于C++用条件变量和互斥锁实现一个生产消费模

    2024年02月09日
    浏览(38)
  • Linux入门之多线程|线程|进程基本概念及库函数

    目录 一、线程 1.线程的概念 补充知识点:页表 2.线程的优点 3.线程的缺点 4.线程异常 5.线程用途 二、线程与进程的区别与联系 三、关于进程线程的问题 0.posix线程库 1.创建线程 关于pthread_create的后两个参数 1.传入指针 2.传入对象 2.线程终止 3.取消线程 4.线程等待(等待线程

    2024年02月10日
    浏览(37)
  • JavaEE之多线程编程:5. 死锁(详解!!!)

    死锁是这样的一种情形:多个同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止。 【举个例子理解死锁】 张三李四两人去吃饺子,吃饺子需要酱油和醋。 张三抄起了酱油瓶, 李四抄起了醋瓶。 张三:你先把

    2024年01月25日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包