python异步切片下载文件(内置redis获取任务 mongo更新任务状态等)

这篇具有很好参考价值的文章主要介绍了python异步切片下载文件(内置redis获取任务 mongo更新任务状态等)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

异步切片下载二进制文件并上传桶删除本地文件

import json
import os
import asyncio
from urllib import parse

import aiohttp
import aioredis
from motor.motor_asyncio import AsyncIOMotorClient
from retrying import retry
from minio import Minio
from minio.error import S3Error
import loguru


class Async_download:
    def __init__(self, sema_number=20, redis_address='redis://ip:6379/db', redis_password='passwd'):
        self.__sema_number = sema_number
        self.__sema = None
        self.redis_address = redis_address
        self.redis_password = redis_password
        self.minio_config = {
            "endpoint": "ip:port",
            "access_key": "******",
            "secret_key": "******",
            "secure": False
        }
        self.mongo_config = {
            "uri": "mongodb://{0}:{1}@ip:27017".format(parse.quote_plus("user"),
                                                                   parse.quote_plus("passwd")),
            "db": "******",
            "collection": "******",
        }

    @classmethod
    def _mkdir(cls, file_path, full_path):
        if os.path.exists(file_path):
            return True
        try:
            os.makedirs(full_path)
        except Exception as e:
            pass
        return False

    def __init_check(self, file_path: str):
        full_path, file_name = file_path.rsplit('/', 1)
        file_size = os.path.getsize(file_path) if self._mkdir(file_path, full_path) else 0
        return file_name, file_size

    @classmethod
    def __sync_save_local(cls, r_headers, results, file_path):
        done, padding = results
        for d in done:
            for index, value in d.result().items():
                r_headers[index] = value

        with open(file_path, 'ab') as f:
            for _, value in r_headers.items():
                f.write(value)
        return True

    @classmethod
    def __generate_headers(cls, headers, file_size, first_byte):
        r_headers = {}
        index = 0
        if first_byte > 51200000:
            byte = 2048000  # 2M 为一片
        else:
            byte = 1024000  # 1M 为一片
        while True:
            file_size_two = file_size + byte
            if file_size_two >= first_byte:
                r_headers[index] = {"Range": f"bytes={file_size}-{first_byte}"}
                break
            r_headers[index] = {"Range": f"bytes={file_size}-{file_size_two - 1}"}
            index += 1
            file_size = file_size_two

        for key in r_headers:
            r_headers[key].update(headers)
        return r_headers

    @retry(stop_max_attempt_number=3)
    async def __download_one(self, session, method, url, r_headers, **kwargs):
        index, headers = r_headers
        async with self.__sema:
            async with session.request(method, url, headers=headers, **kwargs) as response:
                binary = await response.content.read()
                return {index: binary}

    async def __async_section_download(self, session, method, url, r_headers, **kwargs):
        tasks = [
            asyncio.create_task(self.__download_one(session, method, url, (key, r_headers[key]), **kwargs)) for key in
            r_headers
        ]
        return await asyncio.wait(tasks)

    @classmethod
    async def __get_content_length(cls, session, method, url, headers, **kwargs):
        async with session.request(method, url, headers=headers, **kwargs) as response:
            return response.headers.get('Content-Length') or response.headers.get('content-length') or 0

    @classmethod
    async def __sync_download(cls, session, method, url, headers, file_path, **kwargs):
        async with session.request(method, url, headers=headers, **kwargs) as response:
            with open(file_path, 'wb') as f:
                binary = await response.content.read()
                f.write(binary)

    async def __async_download_main(self, method, url, headers, file_path, **kwargs):
        file_name, file_size = self.__init_check(file_path)
        self.__sema = asyncio.Semaphore(self.__sema_number)
        async with aiohttp.ClientSession() as session:
            content_length = await self.__get_content_length(session, method, url, headers, **kwargs)

            if content_length and content_length.isdigit():
                content_length = int(content_length)
                if file_size >= content_length:
                    await self.__upload_to_minio(file_path, file_name)  # Upload to MinIO
                    await self.__update_mongo_status(file_name, True)  # Update MongoDB status
                    os.remove(file_path)  # Delete local file
                    return True, file_path

                r_headers = self.__generate_headers(headers, file_size, content_length)
                results = await self.__async_section_download(session, method, url, r_headers, **kwargs)
                self.__sync_save_local(r_headers, results, file_path)
            else:
                await self.__sync_download(session, method, url, headers, file_path, **kwargs)

            if os.path.getsize(file_path) >= int(content_length):
                await self.__upload_to_minio(file_path, file_name)  # Upload to MinIO
                await self.__update_mongo_status(file_name, True)  # Update MongoDB status
                os.remove(file_path)  # Delete local file
                return True, file_path
            return False, file_path

    async def __get_task_from_redis(self):
        async with aioredis.from_url(self.redis_address, password=self.redis_password) as redis:
            task = await redis.lpop('file_file_all')
        return task

    async def __process_redis_tasks(self):
        while True:
            task_info = await self.__get_task_from_redis()
            if task_info is None:
                break
            task = json.loads(task_info)

            try:
                method = 'get'
                url = task["file_link"]
                file_path = './{}'.format(task["file_name"])
                headers = {
                    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36'}
                if url.startswith('http') or url.startswith('https'):
                    try:
                        ddd = url.split('http')
                        if len(ddd) != 0:
                            url = 'http' + ddd[-1]
                    except:
                        continue

                # Perform download for the current task
                await self.__async_download_main(method, url, headers, file_path)
            except Exception as e:
                loguru.logger.error("Error processing Redis task:", e)

    async def __upload_to_minio(self, file_path, object_name):
        """
        上传minio
        """
        try:
            minioClient = Minio(**self.minio_config)
            check_bucket = minioClient.bucket_exists("******")

            if not check_bucket:
                minioClient.make_bucket("******")

            loguru.logger.info("start upload file to MinIO")
            minioClient.fput_object(bucket_name="******", object_name=object_name, file_path=file_path)
            loguru.logger.info("file {0} is successfully uploaded to MinIO".format(object_name))
        except FileNotFoundError as err:
            loguru.logger.info('*' * 10)
            loguru.logger.error('MinIO upload failed: ' + str(err))
        except S3Error as err:
            loguru.logger.error("MinIO upload failed:", err)

    async def __update_mongo_status(self, file_name, status):
        """
        更新mongo采集状态
        """
        try:
            mongo_uri = self.mongo_config["uri"]
            db_name = self.mongo_config["db"]
            collection_name = self.mongo_config["collection"]
            client = AsyncIOMotorClient(mongo_uri)
            db = client.get_database(db_name)
            collection = db.get_collection(collection_name)
            await collection.update_one({"file_name": file_name}, {"$set": {"status": status}})
        except Exception as e:
            loguru.logger.error("MongoDB update failed:", e)

    async def start(self):
        await self.__process_redis_tasks()


loguru.logger.add("download_file_output.log", rotation="500 MB", level="DEBUG")
if __name__ == '__main__':
    as_dw = Async_download(20)
    asyncio.run(as_dw.start())

部分代码来源于y小白的笔记文章来源地址https://www.toymoban.com/news/detail-805597.html

到了这里,关于python异步切片下载文件(内置redis获取任务 mongo更新任务状态等)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • PHP大文件切片下载代码

    header(\\\"Content-Type: application/fORCe-download\\\"); 表示强制下载,服务端将一个完整的文件或者包一并输出浏览器,然后浏览器触发下载(用户看到的下载);缺点是文件或者包有多大脚本就占用多大内存; header(\\\"Content-type: application/octet-stream\\\"); 表示未知的应用程序文件,也是会触发浏

    2024年02月16日
    浏览(29)
  • Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库

    Asynq [1] 是一个Go实现的分布式任务队列和异步处理库,基于redis,类似Ruby的 sidekiq [2] 和Python的 celery [3] 。Go生态类似的还有 machinery [4] 和goworker 同时提供一个WebUI asynqmon [5] ,可以源码形式安装或使用Docker image, 还可以和Prometheus集成 docker run --rm --name asynqmon -p 8080:8080 hibiken/as

    2024年02月14日
    浏览(33)
  • 实现微信小程序web-view内嵌H5中的下载功能(大文件切片下载)

    微信小程序的开发框架是uniapp,使用uniapp脚手架搭建,其中有页面是展示另一个小程序,在这个页面主体内容使用了标签将H5的页面内容展示,H5中有页面存放了下载的路径。点击下载按钮下载文件,或者预览文件让用户手动保存。 如果是pc端,下载用一个 a 标签就很容易,但

    2024年02月10日
    浏览(82)
  • python 异步任务框架 Celery 入门,速看!

    Celery 是使用 python 编写的分布式任务调度框架。 它有几个主要的概念: celery 应用 用户编写的代码脚本,用来定义要执行的任务,然后通过 broker 将任务发送到消息队列中 broker 代理,通过消息队列在客户端和 worker 之间进行协调。 celery 本身并不包含消息队列,它支持一下消

    2024年02月13日
    浏览(30)
  • 【前端面试】中大文件上传/下载:中等文件代理服务器放行+大文件切片传输+并发请求+localstorage实现断点续传

    目录 切片上传~spark-md5 原理:流式计算+分块处理 文件标识spark-md5:A-B A.切片哈希值合并 B.首尾切片+其他切片前中后各取2M 计算hash:A-B(参考React的Fiber架构) A.线程:web-worker B.空闲:requestIdleCallback 异步并发控制:A-B(参考http2的多路复用) A.promise.allSettled() B.并发数max=

    2024年02月12日
    浏览(38)
  • 使用XMLHttpRequest实现文件异步下载

            我想通过异步的方式实现下载文化,请求为post请求。一开始我打算用ajax。         不过ajax的返回类型不支持二进制文件流(binary)!因此ajax的异步方式无法接到后端接口返回的文件流,就无法下载文件。 jQuery.ajax() | jQuery API Documentation         改用dom原生的XMLH

    2024年02月14日
    浏览(26)
  • Python异步编程之web框架异步vs同步 无IO任务压测对比

    在python编程中,通过协程实现的异步编程号称能够提高IO密集型任务的并发量。本系列比较web服务器同步框架和异步框架的性能差异,包括无IO接口和常见IO操作,如文件、mysql、redis等。使用压测工具locust测试相同条件下两种编程模式能够处理请求的速度。 主题: 单纯比较异

    2024年02月06日
    浏览(40)
  • uniapp安卓读取或写入txt文件,创建文件或文件夹,获取手机内置存储根路径

    原理:利用 H5plus 的 native ,引入 java 类来进行处理。 uniapp开发记录

    2024年02月11日
    浏览(39)
  • Python异步编程之web框架 异步vs同步 数据库IO任务并发支持对比

    主题: 比较异步框架和同步框架在数据库IO操作的性能差异 python版本 :python 3.8 数据库 :mysql 8.0.27 (docker部署) 压测工具 :locust web框架 :同步:flask 异步:starlette 请求并发量 : 模拟10个用户 服务器配置 : Intel(R) i7-12700F 客户端配置 :Intel(R) i7-8700 3.20GHz python中操作数据库通常

    2024年02月08日
    浏览(37)
  • Python异步编程之web框架 异步vs同步 数据库IO任务压测对比

    主题: 比较异步框架和同步框架在数据库IO操作的性能差异 python版本 :python 3.8 数据库 :mysql 8.0.27 (docker部署) 压测工具 :locust web框架 :同步:flask 异步:starlette 请求并发量 : 模拟10个用户 服务器配置 : Intel(R) i7-12700F 客户端配置 :Intel(R) i7-8700 3.20GHz python中操作数据库通常

    2024年02月08日
    浏览(79)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包