Python异步编程高并发执行爬虫采集,用回调函数解析响应

这篇具有很好参考价值的文章主要介绍了Python异步编程高并发执行爬虫采集,用回调函数解析响应。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、问题:当发送API请求,读写数据库任务较重时,程序运行效率急剧下降。

异步技术是Python编程中对提升性能非常重要的一项技术。在实际应用,经常面临对外发送网络请求,调用外部接口,或者不断更新数据库或文件等操作。 这这些操作,通常90%以上时间是在等待,如通过REST, gRPC向服务器发送请求,通常可能等待几十毫秒至几秒,甚至更长。如果业务较重,按顺序执行编程,会导致大量时间用在等待上,程序运行效率急剧下降。
常见的场景,就是爬虫软件通常会发起很多请求,如果采用同步编程方式工,往往运行时间很长。

二、异步编程的优势

通常的编程,如果有4个任务,采用同步编程模式,4个任务是按顺序执行的,分别用时:10s,7s,5s,6s,共耗时28s; 而异步方式,就是让4个任务同时执行,总耗时降为10s,改善效果是很明显的。
Python异步编程高并发执行爬虫采集,用回调函数解析响应

那时异步编程是如何做到的?

异步编程,将每个任务改成协程执行,在遇到需要等待的语句时,即暂时将执行权交还给主程序的控制循环event loop,其它协程可以继续使用CPU等资源。而当该协程收到响应后,会用事件通知event loop,申请继续执行。 这样就避免了由于等待期间还占用CPU资源的情形。 因此程序执行效率大为提高。

但如果任务是计算密集型的,那么异步技术对性能提升帮助不大,需要采用其它方式,如多进程编程。或者Cython 等。

三、用同步编程方式,抓取多个网站数据

先看一下,采用同步编程顺序执行,抓取多个网站数据的耗时。 这些网站中,
其中http://www.google.com 是无响应的,会超时。因此在 requests.get()方法,设置 timeout=3, 即超过3秒,会抛出TimeOutException 异常。

代码如下:


import requests
import time


# 测试时将测试网址替换
urls = [
    "http://www.bxxxx.com",
    "http://www.aaaa.com",
    "http://www.bbbb.com",
    "http://www.cccc.com",
    "http://www.sdddd.com",
    "http://www.jdddd.com",
    "http://www.zeeee.com",
    "http://www.tffff.com",
    "http://www.cgggg.com",
    "http://www.zhhhhh.com.cn",
    "http://www.google.com",
    "https://www.yiiiii.com/",
]


def check_one_ip(url):
    headers = {
        "user-ageng": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 \
            (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.69"
    }
    TIMEOUT = 3
    result = ()
    try:
        response = requests.get(url, headers=headers, timeout=TIMEOUT)
        print(f"response from {url} is : {response.status_code}")
        if 200 <= response.status_code < 300:
            print(f"length of response body is {len(response.text)}")
        result = (url, response.status_code)
    except Exception as e:
        print(f"{url} met timeout error")
        return (url, 999)
    return result


def main():
    results = []
    for url in urls:
        result = check_one_ip(url)
        results.append(result)


if __name__ == "__main__":
    t1 = time.time()
    main()
    t2 = time.time()
    print(f"total time: {t2-t1:.3f}s")

运行代码,向12个网站发送request, 打印response的状态码,总耗时为:6.035s,

response from url is : 200
length of response body is 2381
response from url is : 200
length of response body is 24000
response from url is : 200
length of response body is 106117
response from url is : 403
response from url is : 404
response from url is : 200
length of response body is 177104
response from url is : 200
length of response body is 37989
response from url is : 200
length of response body is 89513
response from url is : 200
length of response body is 32642
response from url is : 403
url met timeout error
response from url is : 200
length of response body is 834
total time: 6.035s

四、用异步方式,同时抓取多个网站数据

现在,采用Asyncio异步编程,以并发的运行方式,向多个网站同时发送request, 总耗时,应该是用时最长那个协程的用时。这里我们使用了timeout, 就是3秒左右。

AsyncIO异步编程步骤:

  1. 定义异步任务函数
    使用 asyc / await 关键字。在耗时操作前加await
  2. 创建asyncio.create_task() 方法创建协程任务
  3. 在main()方法中用gather() 汇集协程任务,以便并发执行。
    gather()方法返回结果是一个由所有返回值聚合而成的迭代器
  4. 在主线程的event loop中运行main()
    asyncio模块提供了1个.run()来启动 event loop 异步控制循环,并执行main()方法,
  5. 可选,给协程添加回调函数来解析网站响应结果
    对于每个Task, 可用 add_done_callback(task_callback) 方法添加回调函数,此例中,对显示response的状态码。

其它说明

  • 由于requests库的 response对象不支持 await语句,因此这里使用htppx 库来代替requests, 除了异步接口外,其它使用方式完全一致。

完整代码

import asyncio
import httpx
from concurrent.futures import ThreadPoolExecutor, Future
import time
import contextvars

# 测试时将测试网址替换
urls = [
    "http://www.bxxxx.com",
    "http://www.aaaa.com",
    "http://www.bbbb.com",
    "http://www.cccc.com",
    "http://www.sdddd.com",
    "http://www.jdddd.com",
    "http://www.zeeee.com",
    "http://www.tffff.com",
    "http://www.cgggg.com",
    "http://www.zhhhhh.com.cn",
    "http://www.google.com",
    "https://www.yiiiii.com/",
]


async def check_one_ip(url):
    headers = {
        "user-ageng": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 \
            (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.69"
    }
    TIMEOUT = 3
    result = ()
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(url, headers=headers,timeout=TIMEOUT)
            print(f"response from {url} is : {response.status_code}")
            if 200 <= response.status_code < 300:
                print(f"length of response body is {len(response.text)}")
            result = (url, response.status_code)
    except  Exception as e:
        print(f"{url} met timeout error")
        return (url, 999)
    return result 

def task_callback(context):
    # print response.status_code 
    url, code = context.result()
    print(f"It is callback,  got status_code: {code} of {url}")

async def main():
    tasks=[]
    for url in urls:
        task = asyncio.create_task(check_one_ip(url))
        task.add_done_callback(task_callback)
        tasks.append(task)
    await asyncio.gather(*tasks) 
        

if __name__=="__main__":
    t1 = time.time()
    asyncio.run(main())
    t2 = time.time()
    print(f"total time: {t2-t1:.3f}s")    

运行结果如下,可以看到,总耗时: 3.161s,相比同步编程方式,耗时减少了1半。 随着发送请求量的增加,可以看到更加明显的效果。

response from url is : 302
It is callback,  got status_code: 302 of url
response from url is : 302
It is callback,  got status_code: 302 of url
response from url is : 200
length of response body is 23508
It is callback,  got status_code: 200 of url
response from url is : 302
response from url is : 301
It is callback,  got status_code: 302 of url
It is callback,  got status_code: 301 of url
response from url is : 301
response from url is : 301
response from url is : 301
response from url is : 200
length of response body is 396837
It is callback,  got status_code: 301 of url
It is callback,  got status_code: 301 of url
It is callback,  got status_code: 301 of url
It is callback,  got status_code: 200 of url
response from url is : 404
It is callback,  got status_code: 404 of url
response from url is : 200
length of response body is 1151330
It is callback,  got status_code: 200 of url
url met timeout error
It is callback,  got status_code: 999 of url
total time: 3.161s

五、异步编程注意事项

1)协程不应该执行耗时长的任务

异步event loop执行期间,虽然各个协程是在工作,但主线程是被阻塞的。本例中,异步耗时的总时长与访问google.com超时时长相同,那么意味着,如果协程中如果有1个是耗时很长的任务,那么主线程还将被阻塞,异步解决不了这个问题,这时耗时协程应该拿出来,用子线程、或者子进程来执行。

2) 协程应该汇集后并发执行

遇到一些开发者咨询,为什么采用了异步编程,但性能没有明显提升呢? 创建多个协程任务后,必须按第3步,用gather()方法来汇集创建的协程任务,然后用asyncio.run()方法并发运行。 另外官方文档要求 event loop要在主线程main() 方法中运行。

3)慎用底层编程接口

另外由于官方文档并未清晰说明 event loop、future对象等低层编程接口,除非你很了解异步低层的实现机制,否则不建议使用低层接口,
使用ayncio.run() 来启动evnetloop, 使用 task 对象,而非future 对象。文章来源地址https://www.toymoban.com/news/detail-710123.html

到了这里,关于Python异步编程高并发执行爬虫采集,用回调函数解析响应的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Python异步编程之web框架 异步vs同步 数据库IO任务并发支持对比

    主题: 比较异步框架和同步框架在数据库IO操作的性能差异 python版本 :python 3.8 数据库 :mysql 8.0.27 (docker部署) 压测工具 :locust web框架 :同步:flask 异步:starlette 请求并发量 : 模拟10个用户 服务器配置 : Intel(R) i7-12700F 客户端配置 :Intel(R) i7-8700 3.20GHz python中操作数据库通常

    2024年02月08日
    浏览(58)
  • Flutter实现ControlExecutor进行多个异步任务执行时监听状态并可指定最后执行的异步并在指定的异步执行完毕后结束executor并回调。

    1.场景 当有多个接口请求时,且接口调用不是同时进行时,而且接口调用有可能时链式的,中间也有可能加入别的逻辑,但是需要在第一个接口调用时打开等待框,在最后一个接口调用完成时关闭等待框类似需求时,可以用到ControlExecutor进行接口执行过程的监听,并可标记最

    2024年02月09日
    浏览(38)
  • 如何处理异步编程中的回调地狱问题?

    前端入门之旅:探索Web开发的奇妙世界 欢迎来到前端入门之旅!感兴趣的可以订阅本专栏哦!这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发者,这里都将为你提供一个系统而又亲切的学习平台。在这个

    2024年02月09日
    浏览(47)
  • 一个月学通Python(三十三):Python并发编程在爬虫中的应用

    结合自身经验和内部资料总结的Python教程,每天3-5章,最短1个月就能全方位的完成Python的学习并进行实战开发,学完了定能成为大佬!加油吧!卷起来! 全部文章请访问专栏:《Python全栈教程(0基础)》 再推荐一下最近热更的:《大厂测试高频面试题详解》 该专栏对近年

    2024年02月13日
    浏览(42)
  • kafka入门,生产者异步发送、回调函数,同步发送(四)

    引入依赖 回调函数会在producer收到ack时调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明信息发送失败 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。 只需在异步发送的基础上,再调用一下 get(

    2024年02月11日
    浏览(47)
  • 什么是异步编程?什么是回调地狱(callback hell)以及如何避免它?

    前端入门之旅:探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅!这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发者,这里都将为你提供一

    2024年02月10日
    浏览(41)
  • vue js 回调函数 异步处理 为什么要 let that = this

    1 异步就是开个事务( 只有主线程 等主线程空闲 ),用that 值 做处理,然后返回处理结果,而that的值是开启 事务那一刻 的this的值.而在主线程处理的时候,this的一直在变化, that的值保留在那一刻 ps 或是将本obj 传递给其他的obj使用处理 ps 开启新事务或开启新子线程都是 在新的ob

    2024年02月11日
    浏览(54)
  • 巧用回调函数解决微信小程序与后台数据交互出现的异步问题

            微信小程序端需要发送一个包含文字与图片的表单数据给后端,我一开始的思路是 先 上传图片得到临时的URL, 后 执行POST请求将表单数据发送给后端,但后端只能获取到文字,而图片URL却始终获取不到。         注意看我上面的思路, 一先一后 ,无形中将两

    2024年02月16日
    浏览(44)
  • 并发编程 --- 异步方法的异常处理

    现在模拟一个异步方法抛出了异常: 思考一下, DontHandle() 方法是否能够捕获到异常? 答案是:不会捕获到异常! 因为 DontHandle() 方法在 ThrowAfter() 方法抛出异常之前,就已经执行完毕。 那么上述代码怎么才能捕获到异常呢? 若想要捕获异常则必须通过 await 等待 Th

    2024年02月15日
    浏览(46)
  • 登陆校验解决前端success回调函数始终执行问题

    有一串前端js登陆代码: 和后端代码: 存在问题: 不管前端传递的账号密码是否正确,都会成功调用success回调函数,从而导致无法判断账号密码是否匹配成功。 解决办法: 要避免无论账号密码是否正确都调用success回调函数,可以在后端代码中对账号密码进行验证,只有在

    2024年02月14日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包