使用典型的异步 Python 库处理数百个 HTTP 请求、磁盘写入和其他 I/O 密集型任务。
当在单线程同步语言的范围内构建应用程序时,局限性很快就会变得非常明显。我首先想到的是writes:I/O 密集型任务的定义。将数据写入文件(或数据库)时,每个“写入”操作都会故意占用一个线程,直到写入完成。这对于确保大多数系统中的数据完整性非常有意义。例如,如果两个操作同时尝试更新数据库记录,哪一个是正确的?或者,如果脚本需要 HTTP 请求成功才能继续,那么我们如何继续操作,直到我们知道请求成功?
HTTP 请求是最常见的线程阻塞操作之一。当我们编写期望来自外部第三方的数据的脚本时,我们引入了无数的不确定性,这些不确定性只能由请求本身来回答,例如响应时间延迟、我们期望接收的数据的性质,或者请求是否会成功。即使使用我们有信心的 API,任何操作在完成之前也不一定会成功。因此,我们被“封锁”了。
随着应用程序的复杂性增加以支持更多的同时用户交互,软件正在远离线性执行的范例。因此,虽然我们可能不确定特定请求是否成功或数据库写入是否完成,但只要我们有办法优雅地处理和缓解这些问题,这是可以接受的。
一个值得异步执行的问题
您认为 Python 脚本执行数百个 HTTP 请求、解析每个响应并将输出写入单个文件需要多长时间?如果要在简单的 for 循环中使用请求,则需要等待相当长的时间让 Python 执行每个请求、打开文件、写入文件、关闭文件,然后继续执行下一个请求。
我们把asyncio提高脚本效率的能力放到实际测试中。我们将为数百个 URL 的每个任务执行两个 I/O 阻塞操作:执行和解析 HTTP 请求并将所需结果写入单个文件。我们实验的输入将是大量 URL,预期输出是从这些 URL 解析的元数据。让我们看看对数百个 URL 执行此操作需要多长时间。
该网站大约有 2000 篇自己发布的帖子,这使其成为这个小实验的绝佳实验对象。我创建了一个 CSV,其中包含这些帖子的 URL,这将是我们的输入。下面先睹为快:
输入样本
输入 CSV
样本输出
对于输入 CSV 中找到的每个 URL,我们的脚本将获取 URL、解析页面并将一些选择数据写入单个 CSV。结果将类似于以下示例:
我们的脚本将输出的示例
工作工具
我们需要三个核心 Python 库来实现这一目标:
Asyncio:Python 的基础库,用于运行异步 IO 绑定任务。该库在某种程度上已内置到 Python 核心语言中,引入了async/await关键字,分别表示函数何时异步运行以及何时等待此类函数。
Aiohttp:在客户端使用时,类似于Python的requests库,用于发出异步请求。或者, aiohttp可以反向使用:作为应用程序 Web 服务器 来处理传入请求和提供响应,但这是另一个故事了。
Aiofiles:使写入磁盘(例如创建字节并将字节写入文件)成为一项非阻塞任务,这样即使多个任务绑定到同一个文件,多个写入也可以在同一线程上发生而不会相互阻塞。
#安装必要的库 $ pip install asyncio aiohttp aiofiles
奖励:优化速度的依赖项
只需安装一些补充库,aiohttp就可以更快地执行请求。这些库是cchardet(字符编码检测)、aiodns(异步 DNS 解析)和brotlipy(无损压缩)。我强烈建议使用下面方便提供的快捷方式安装它们(从我这里获取,我是互联网上的陌生人):
#安装补充依赖项以加快请求速度 $ pip install aiohttp[speedups]
准备异步脚本/应用程序
我们将像任何其他 Python 脚本一样构造该脚本。我们的主模块aiohttp_aiofiles_tutorial将处理我们的所有逻辑。config.py和main.py都位于主模块之外,并分别为我们的脚本提供一些基本配置和入口点:
#我们的异步获取器/编写器的项目结构 /aiohttp-aiofiles-tutorial ├── /aiohttp_aiofiles_tutorial │ ├── __init__.py │ ├── fetcher.py │ ├── loops.py │ ├── tasks.py │ ├── parser.py │ └── /data # Source data │ ├── __init__.py │ ├── parser.py │ ├── tests │ └── urls.csv ├── /export # Destination for exported data ├── config.py ├── logger.py ├── main.py ├── pyproject.toml ├── Makefile ├── README.md └── requirements.txt
/export只是一个空目录,我们将在其中写入输出文件。
/data子模块包含上面提到的输入 CSV,以及解析它的一些基本逻辑。没什么值得打电话回家的,但如果你好奇的话,可以在Github 存储库上找到源代码。最后附上本文代码。
开始事情
我们卷起袖子,从强制性脚本“入口点” main.py开始。这将启动/aiohttp_aiofiles_tutorial中的核心函数,称为init_script():
#main.py """脚本入口点。""" import asyncio from aiohttp_aiofiles_tutorial import init_script if __name__ == "__main__": asyncio.run(init_script())
init_script()这看起来像是我们正在通过运行单个函数/协程asyncio.run(),乍一看这似乎违反直觉。您可能会问, asyncio 的目的不就是同时运行多个协程吗?
它的确是!init_script()是一个调用其他协程的协程。其中一些协程从其他协程创建任务,其他协程执行任务,等等。asyncio.run()创建一个事件循环,直到目标协程完成为止(包括父协程调用的所有协程),该循环不会停止运行。因此,如果我们保持干净,asyncio.run()就是一次性调用来初始化脚本。
初始化我们的脚本
这就是乐趣的开始。我们已经确定脚本的目的是输出单个 CSV 文件,这就是我们开始的地方:通过在整个脚本运行的上下文中创建并打开一个输出文件:
#aiohttp_aiofiles_tutorial/__init__.py """同时发出数百个请求并将响应保存到磁盘。""" import aiofiles from config import EXPORT_FILEPATH async def init_script(): """准备输出文件并启动任务创建/执行。""" async with aiofiles.open(EXPORT_FILEPATH, mode="w+") as outfile: await outfile.write( "title,description,primary_tag,url,published_at\n" ) # (我们其余的脚本逻辑将在这里执行). # ...
我们的脚本首先打开一个带有aiofiles. 只要我们的脚本通过 在打开的异步文件的上下文中运行async with aiofiles.open() as outfile:,我们就可以不断写入该文件,而不必担心打开和关闭文件。
将此与 Python 中处理文件 I/O 的同步with open() as outfile:(默认)实现进行比较。通过使用,我们几乎aiofiles可以同时从多个源将数据写入同一文件。
EXPORT_FILEPATH碰巧以 CSV ( /export/hackers_pages_metadata.csv ) 为目标。每个 CSV 都需要一行标题;因此,我们await outfile.write()在打开 CSV 后立即写入标题:
#将一行写入由列标题组成的 CSV ... await outfile.write( "title,description,primary_tag,url,published_at\n" )
前进
下面是__init__.py的完整版本,最终将使我们的脚本付诸实践。最值得注意的补充是协程的引入execute_fetcher_tasks();我们将一次剖析这一块:
#aiohttp_aiofiles_tutorial/__init__.py """同时发出数百个请求并将响应保存到磁盘。""" import asyncio import time import aiofiles from aiofiles.threadpool.text import AsyncTextIOWrapper as AsyncIOFile from aiohttp import ClientSession from config import EXPORT_FILEPATH, HTTP_HEADERS from .data import urls_to_fetch # URLs parsed from a CSV from .tasks import create_tasks # Creates one task per URL async def init_script(): """准备输出文件并启动任务创建/执行。""" async with aiofiles.open(EXPORT_FILEPATH, mode="w+") as outfile: await outfile.write( "title,description,primary_tag,url,published_at\n" ) await execute_fetcher_tasks(outfile) await outfile.close() async def execute_fetcher_tasks(outfile: AsyncIOFile): """ 打开异步 HTTP 会话并执行创建的任务。 :param AsyncIOFile outfile:要写入的本地文件的路径。 """ async with ClientSession(headers=HTTP_HEADERS) as session: task_list = await create_tasks(session, urls_to_fetch, outfile) await asyncio.gather(*task_list)
execute_fetcher_tasks()分解主要是为了组织我们的代码。该协程接受outfile一个参数,该参数将作为我们最终解析的数据的目的地。逐行来看:
async with ClientSession(headers=HTTP_HEADERS) as session:与 Python请求库不同,aiohttp使我们能够打开一个客户端会话,该会话创建一个连接池,该连接池一次 最多允许 100 个活动连接。因为我们将发出 200 个以下的请求,所以获取所有这些 URL 所需的时间将与 Python 在正常情况下获取两个 URL 所需的时间相当。
create_tasks():我们要定义的这个函数接受三个参数。第一个是ClientSession我们之前刚刚打开一行的异步。接下来,我们有urls_to_fetch变量(之前在脚本中导入)。这是一个简单的 Python 字符串列表,其中每个字符串都是从我们之前的“输入”CSV 解析而来的 URL。该逻辑通过一个简单的函数在其他地方处理(对于本教程来说并不重要)。 最后,outfile被传递,因为我们稍后将写入该文件。使用这些参数,create_tasks()将为 174 个 URL 中的每一个创建一个任务。其中每个都会将给定 URL 的内容下载到目标目录。该函数返回任务,但在我们发出指令之前不会执行它们,这是通过...
asyncio.gather(*task_list):Asyncio 的gather()方法在当前运行的事件循环内执行一组任务。一旦开始,异步 I/O 的速度优势将立即显现出来。
创建异步任务
如果您还记得的话,PythonTask包装了我们将来要执行的函数(协程)。此外,每个任务都可以暂时搁置以执行其他任务。在执行创建任务之前,必须传递预定义的协程以及适当的参数。
我分开create_tasks()返回一个 Python 任务列表,其中每个“任务”将执行获取我们的 URL 之一:
#aiohttp_aiofiles_tutorial/tasks.py """准备要执行的任务。""" import asyncio from asyncio import Task from typing import List from aiofiles.threadpool.text import AsyncTextIOWrapper as AsyncIOFile from aiohttp import ClientSession from .fetcher import fetch_url_and_save_data async def create_tasks( session: ClientSession, urls: List[str], outfile: AsyncIOFile ) -> List[Task]: """ 创建 asyncio 任务来解析 HTTP 请求响应。 :param ClientSession session: 异步 HTTP 请求会话。 :param List[str] urls:要获取的资源 URL。 :param AsyncIOFile outfile:要写入的本地文件的路径。 :returns: List[Task] """ task_list = [] for i, url in enumerate(urls): task = asyncio.create_task( fetch_url_and_save_data( session, url, outfile, len(urls), i ) ) task_list.append(task) return task_list
关于 asyncio 任务的一些值得注意的事情:
我们预先定义“工作要做” 。a 的创建Task并不执行代码。我们的脚本将使用不同的参数同时运行同一函数 174 次。我们希望预先定义这些任务是有道理的。
定义任务既快速又简单。瞬间,CSV 中的每个 URL 都会创建一个相应的任务并将其添加到task_list.
任务准备就绪后,只剩下一件事情要做了,那就是把它们全部启动并开始聚会。这就是asyncio.gather(*task_list)__ init __ .py中的行发挥作用的地方。
Asyncio 的 Task 对象本身就是一个类,具有其属性和方法,本质上提供了一个包装器,其中包含检查任务状态、取消任务等的方法。
执行我们的任务
回到 之前create_tasks(),我们创建了每个任务,每个任务单独执行一个称为每个任务的方法fetch_url_and_save_data()。这个函数做了三件事:
通过aiohttp的会话上下文(由 处理async with session.get(url) as resp:)向给定任务的 URL 发出异步请求
将响应正文作为字符串读取。
html通过传递给我们的最后一个函数,将响应正文的内容写入文件parse_html_page_metadata():
#aiohttp_aiofiles_tutorial/fetcher.py """获取 URL、提取其内容并将解析后的数据写入文件。""" from aiofiles.threadpool.text import AsyncTextIOWrapper as AsyncIOFile from aiohttp import ClientError, ClientSession, InvalidURL from logger import LOGGER from .parser import parse_html_page_metadata async def fetch_url_and_save_data( session: ClientSession, url: str, outfile: AsyncIOFile, total_count: int, i: int, ): """ 在解析之前从 URL 获取原始 HTML。 :param ClientSession session: 异步 HTTP 请求会话。 :param str url: 要获取的目标 URL。 :param AsyncIOFile outfile:要写入的本地文件的路径。 :param int Total_count:要获取的 URL 总数。 :param int i: URL 总数中当前迭代的 URL。 """ try: async with session.get(url) as resp: if resp.status != 200: pass html = await resp.text() page_metadata = await parse_html_page_metadata(html, url) await outfile.write(f"{page_metadata}\n") LOGGER.info( f"Fetched URL {i} of {total_count}: {page_metadata}" ) except InvalidURL as e: LOGGER.error(f"Unable to fetch invalid URL `{url}`: {e}") except ClientError as e: LOGGER.error(f"ClientError while fetching URL `{url}`: {e}") except Exception as e: LOGGER.error( f"Unexpected error while fetching URL `{url}`: {e}" )
当通过aiohttp获取URL 时ClientSession,调用.text()response() 上的方法将以字符串await resp.text()形式返回请求的响应。不要与 混淆,后者返回一个字节对象(对于拉取媒体文件或字符串以外的任何内容很有用)。.body()
如果您继续跟踪,我们现在已经深入了三个“上下文”:
我们通过打开一个aiofiles.open()上下文来开始我们的脚本,该上下文将保持打开状态,直到我们的脚本完成。outfile这允许我们在脚本运行期间从任何任务写入数据。
将标头写入 CSV 文件后,我们使用 打开了一个持久的客户端请求会话async with ClientSession() as session,这允许我们在会话打开时批量发出请求。
在上面的代码片段中,我们输入了第三个也是最后一个上下文:单个 URL 的响应上下文(通过async with session.get(url) as resp)。与其他两个上下文不同,我们将进入和离开此上下文 174 次(每个 URL 一次)。
在每个 URL 响应上下文中,我们最终开始生成一些输出。这给我们留下了最后的逻辑(await parse_html_page_metadata(html, url)),它解析每个URL响应并从页面返回一些抓取的元数据,然后将所述元数据写入我们的outfile下一行await outfile.write(f"{page_metadata}\n")。
将解析的元数据写入 CSV
您可能会问,我们计划如何从 HTML 页面中提取元数据?当然是使用BeautifulSoup !有了 HTTP 响应的 HTML,我们就可以bs4解析每个 URL 响应并返回以下各列的值outfile:title、description、Primary_tag、published at和url。
这五个值以逗号分隔的字符串形式返回,然后outfile作为单行写入 CSV。
#aiohttp_aiofiles_tutorial/parser.py """从原始 HTML 中解析元数据。""" from bs4 import BeautifulSoup from bs4.builder import ParserRejectedMarkup from logger import LOGGER async def parse_html_page_metadata(html: str, url: str) -> str: """ 将页面元数据从原始 HTML 提取到 CSV 行中。 :param str html: 给定获取的 URL 的原始 HTML 源。 :param str url:与提取的 HTML 关联的 URL。 :returns: str """ try: soup = BeautifulSoup(html, "html.parser") title = soup.title.string.replace(",", ";") description = ( soup.head.select_one("meta[name=description]") .get("content") .replace(",", ";") .replace('"', "`") .replace("'", "`") ) primary_tag = ( soup.head .select_one("meta[property='article:tag']") .get("content") ) published_at = ( soup.head .select_one("meta[property='article:published_time']") .get("content") .split("T")[0] ) if primary_tag is None: primary_tag = "" return f"{title}, {description}, {primary_tag}, {url}, {published_at}" except ParserRejectedMarkup as e: LOGGER.error( f"Failed to parse invalid html for {url}: {e}" ) except ValueError as e: LOGGER.error( f"ValueError occurred when parsing html for {url}: {e}" ) except Exception as e: LOGGER.error( f"Parsing failed when parsing html for {url}: {e}" )
运行珠宝,运行脚本
让我们带这个坏男孩去兜风吧。我在__init__.py中添加了一个计时器来记录脚本持续时间所经过的秒数:
#aiohttp_aiofiles_tutorial/__init__.py
"""同时发出数百个请求并将响应保存到磁盘。""" import time from time import perf_counter as timer ... async def init_script(): """Prepare output file & kickoff task creation/execution.""" start_time = timer() # Add timer to function async with aiofiles.open(EXPORT_FILEPATH, mode="w+") as outfile: await outfile.write( "title,description,primary_tag,url,published_at\n" ) await execute_fetcher_tasks(outfile) await outfile.close() LOGGER.success( f"Executed {__name__} in {time.perf_counter() - start_time:0.2f} seconds." ) # Log time of execution ...
make run如果您按照存储库进行操作(或者只是输入) ,请混合该 mfing命令python3 main.py。系好安全带:
#在约3 秒内获取174页后日志的尾部 ... 16:12:34 PM | INFO: Fetched URL 165 of 173: Setting up a MySQL Database on Ubuntu, Setting up MySQL the old-fashioned way: on a linux server, DevOps, https://hackersandslackers.com/set-up-mysql-database/, 2018-04-17 16:12:34 PM | INFO: Fetched URL 164 of 173: Dropping Rows of Data Using Pandas, Square one of cleaning your Pandas Dataframes: dropping empty or problematic data., Data Analysis, https://hackersandslackers.com/pandas-dataframe-drop/, 2018-04-18 16:12:34 PM | INFO: Fetched URL 167 of 173: Installing Django CMS on Ubuntu, Get the play-by-play on how to install DjangoCMS: the largest of three major CMS products for Python`s Django framework., Software, https://hackersandslackers.com/installing-django-cms/, 2017-11-19 16:12:34 PM | INFO: Fetched URL 166 of 173: Starting a Python Web App with Flask & Heroku, Pairing Flask with zero-effort container deployments is a deadly path to addiction., Architecture, https://hackersandslackers.com/flask-app-heroku/, 2018-02-13 16:12:34 PM | INFO: Fetched URL 171 of 173: Another 'Intro to Data Analysis in Python Using Pandas' Post, An introduction to Python`s quintessential data analysis library., Data Analysis, https://hackersandslackers.com/intro-python-pandas/, 2017-11-16 16:12:34 PM | INFO: Fetched URL 172 of 173: Managing Python Environments With Virtualenv, Embrace core best-practices in Python by managing your Python packages using virtualenv and virtualenvwrapper., Software, https://hackersandslackers.com/python-virtualenv-virtualenvwrapper/, 2017-11-15 16:12:34 PM | INFO: Fetched URL 170 of 173: Visualize Folder Structures with Python’s Treelib, Using Python`s treelib library to output the contents of local directories as visual tree representations., Data Engineering, https://hackersandslackers.com/python-tree-hierachies-treelib/, 2017-11-17 16:12:34 PM | INFO: Fetched URL 169 of 173: Merge Sets of Data in Python Using Pandas, Perform SQL-like merges of data using Python`s Pandas., Data Analysis, https://hackersandslackers.com/merge-dataframes-with-pandas/, 2017-11-17 16:12:34 PM | INFO: Fetched URL 168 of 173: Starting an ExpressJS App, Installation guide for ExpressJS with popular customization options., JavaScript, https://hackersandslackers.com/create-an-expressjs-app/, 2017-11-18 16:12:34 PM | SUCCESS: Executed aiohttp_aiofiles_tutorial in 2.96 seconds.
用 Python 编写异步脚本肯定需要更多的努力,但不会增加数百或数千倍的努力。即使您追求的不是速度,处理大型应用程序的数量也使 asyncio 变得绝对至关重要。例如,如果您的聊天机器人或网络服务器正在处理用户的请求,那么当第二个用户同时尝试与您的应用程序交互时会发生什么?通常答案是什么:用户 1得到了他们想要的东西,而用户 2则被困在阻塞的线程中。文章来源:https://www.toymoban.com/diary/python/582.html
源代码:github.com/hackersandslackers/aiohttp-aiofiles-tutorial/tree/master/aiohttp_aiofiles_tutorial/data文章来源地址https://www.toymoban.com/diary/python/582.html
到此这篇关于使用AIOHTTP和AIOFiles进行异步Python HTTP请求的文章就介绍到这了,更多相关内容可以在右上角搜索或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!