拉取、入库的优化点:
-
请求接口:
-
场景:原来一次请求一个月的数据,可能会面临接口卡顿的问题,而且数据量过大,有时候会溢出
-
解决:可以考虑由一次请求 切分为 小而多次请求
-
加个for循环,分为小批量去请求,避免接口数据量过大
-
将多次请求采用多线程进行,避免单线程I/O阻塞( 可以异步操作,可以先发送请求而不等待结果返回)
with futures.ThreadPoolExecutor() as executor: # 可传入线程数 # 传入执行的方法 和 参数列表,会等待所有线程执行完毕 query_info_list = list(executor.map(self.api.get_pipeline_info,query_list))
-
-
注意点:
- 接口性能:需要接口方面也支持并发处理,不然你这边同时发送多个请求,接口方还是一个一个给你处理,时间上并无提升
- 风险与收益并存:在这过程发生了错误,会比较难处理,不能第一时间捕捉,而且可能会一错毁所有
- 限定场景:网上有传言python的多线程只用到了一个CPU核心,由于对python底层暂不了解,所以盲猜这里的多线程操作只适用于I/0密集的场景
-
-
数据入库:
-
场景:入库时候可能会一次性插入上万条大数据,数据库和本地内存会承受较大压力
-
解决:可以采用python yield关键字,把数据分片处理
-
使用举例:
def chunk_request(): for xxx in 一堆数据: yield xxx # 1. 先返回一个结果 def chunk(iterator, bulk_size=50): # 2. 会等待,凑满50个yield的结果,然后统一返回 """ 工具方法, 返回一批结果的迭代器 """ for item in iterator: yield chain([item], islice(iterator, bulk_size - 1)) for items in chunk(self.chunk_request(start=start, end=end)): # 3. 收满50个yield为一组 # 进行小批量入库操作 # obj.create(xxx)
-
-
对于yield自己的理解:有点像协程的概念,平时我们一个return,方法就直接结束,返回结果了,但是使用了yield就很好玩了,你可以先返回一个结果,然后再回到上次返回结果的地方,继续往下执行。所以利用这个特性,我们就可以将数据给分片处理了,先返回一小部分结果,然后入库,再返回执行,拿到下一批结果,再入库,而不是一次return一大堆数据。文章来源:https://www.toymoban.com/news/detail-401456.html
-
升级版本: 不仅做到控制数据小批量入库,还可以控制到源头数据小批量拉取(利用上边讲的特性,会返回方法里继续往下执行代码)文章来源地址https://www.toymoban.com/news/detail-401456.html
-
# 拉取: for 一次拉取 in 多次调用api拉取: yield 数据 # 这样可以做到先返回数据,再回来执行for循环 继续往下调用api拉取 # 入库: for 数据 in chunk(一小批数据): 入库
-
-
到了这里,关于python的多线程和yield的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!