Python中实现定时任务

这篇具有很好参考价值的文章主要介绍了Python中实现定时任务。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Python中实现定时任务

在项目中,我们可能遇到有定时任务的需求。

  • 其一:每隔一个时间段就执行任务。 比如:服务中每隔20分钟消费消息队列中的消息。
  • 其二:定时执行任务。例如每天早上 8 点定时推送早报。

今天跟大家分享下 Python 定时任务的实现方法。下面先贴上一段示例代码,有需要定时执行的task,也有代表主程序的main函数。接下来就看如何用内置的方法来实现在main函数运行的同时,定时执行task?

import time
import logging

logging.basicConfig(
    level=logging.debug,
    format="%(asctime)s.%(msecs)d | %(threadName)s | %(levelname)s - %(message)s"
)


def task():
    logging.info("Task Start.")
    time.sleep(1)
    logging.info("Task Done.")

def main():
    logging.info("Start.")
    while True:
        try:
            time.sleep(1)
            logging.info("Main loop.")
        except KeyboardInterrupt:
            logging.warning("End loop.")
            break
    logging.info("Finish.")


if __name__ == "__main__":
    main()

1. time.sleep

第一种办法是最简单又最暴力。那就是在一个死循环中,使用函数 sleep()。

while True:
    task()
    time.sleep(5)

这个方法目前有几个问题:

  1. 阻塞主进程
  2. 一次循环只有一个task

第一个问题好解决,可以单独起一个线程去执行循环。第二个问题的话,一种是可以放在同一个循环里执行多个任务,还有一种更简单粗暴一点,多启几个线程来执行不同的循环😂。具体实现可以看如下代码:

STOPPED = threading.Event()

def schedule():
    while not STOPPED.is_set():
        task()
        time.sleep(5)

def main():
    threading.Thread(target=schedule).start()
    logging.info("Start.")
    while True:
        try:
            time.sleep(1)
            logging.info("Main loop.")
        except KeyboardInterrupt:
            STOPPED.set()  # 推出定时任务的循环
            logging.warning("End loop.")
            break
    logging.info("Finish.")

2. threading.Timer

既然第一种方法暴力,那么有没有比较优雅点的方法?
Python 标准库 threading 中有个 Timer 类。它会新启动一个线程来执行定时任务,所以它是非阻塞函式。示例如下:

from threading import Timer

t = Timer(task, 5)
t.start()

优点:不阻塞主进程,task在线程中执行

缺点:一个Timer只能执行一次

实现原理:
线程中预置一个finished的Event,通过finished.wait等待固定时间间隔。
超时则执行任务。如果需要取消任务,可以调用Timer.cancel来取消任务。

如果想循环,需要改造一下task函数。

2.1 添加新Timer

可以创建一个新函数repeat_task,每次在执行task后(前)再新建一个新的Timer定时执行。(start开始的位置决定了是任务之间等待固定间隔时间,还是每个任务的开始等待固定间隔时间)

from threading import Timer

def repeat_task():
    t = Timer(5, repeat_task)
    t.start()
    task()


def main():
    repeat_task()
    logging.info("Start.")
    while True:
        try:
            time.sleep(1)
            logging.info("Main loop.")
        except KeyboardInterrupt:
            for t in threading.enumerate():
                if t != threading._main_thread:
                    logging.info("Cancel +1")
                    t.cancel()
            logging.warning("End loop.")
            break
    logging.info("Finish.")

这样可以循环执行,但是仍然是只能一个线程一个任务。需要注意的是,停止整个程序时,取消所有除了主线程的线程。因为每次会生成一个新的Timer,所以在没有记录的情况下,只能遍历所有的线程来执行取消的动作。

2.2 改写Timer类

了解了Timer实现的原理,我们可以继承Timer类,run函数里再加一个循环用finished来控制,以此来实现在一个线程下重复执行任务。

class RepeatTimer(threading.Timer):
    def __init__(self, interval, function, args=None, kwargs=None):
        super().__init__(interval, function, args, kwargs)

    def run(self):
        while not self.finished.is_set():
            self.finished.wait(self.interval)
            if not self.finished.is_set():
                self.function(*self.args, **self.kwargs)

def main():
    t = RepeatTimer(5, task)
    t.start()
    logging.info("Start.")
    while True:
        try:
            time.sleep(1)
            logging.info("Main loop.")
        except KeyboardInterrupt:
            t.cancel()
            logging.warning("End loop.")
            break
    logging.info("Finish.")

3. sched

第三种方式是使用标准库中sched模块。sched 是事件调度器,它通过 scheduler 类来调度事件,从而达到定时执行任务的效果。sched 提供了两个添加调度任务的函数:

  1. enter(delay, priority, action, argument=(), kwargs={})
    该函数可以延迟一定时间执行任务。delay 表示延迟多长时间执行任务,单位是秒。

  2. scheduler.enterabs(time, priority, action, argument=(), kwargs={})
    该函数添加一项任务,这个任务会在 time 这时刻(time 是绝对时间)执行。

简单示例如下:

import sched

schedule = sched.scheduler()  # 初始化 sched 模块的 scheduler 类
schedule.enter(10, 1, task)  # 增加调度任务
schedule.run()  # 开始调度任务
  • 优点:

    1. 支持指定时间间隔和时间点执行任务
    2. 可以添加不同的任务
    3. 任务可以设置优先级
  • 缺点:
    scheduler 中的每个调度任务只会工作一次,不会无限循环被调用。如果想重复执行同一任务,需要重复添加调度任务。

如何自动重复添加调度任务呢?我们可以先添加一个repeat装饰器。

def repeat(interval):
    def wrapper(func):
        @wraps(func)
        def inner():
            if not STOP_FLAG.is_set():
                s.enter(interval, 0, inner)  # 每次执行inner时会再次将其加入到schedule
            func()  # 然后再执行func函数
            return inner
        s.enter(interval, 0, inner)  # 创建inner函数并加入到schedule
    return wrapper

然后,通过repeat装饰器来实现自动重复添加调度任务。

import sched
import threading
from functools import wraps

s = sched.scheduler()
STOP_FLAG = threading.Event()

@repeat(5)
def new_task():
    return task()

def main():
    t = threading.Thread(target=s.run)
    t.start()
    logging.info("Start.")
    while True:
        try:
            time.sleep(1)
            logging.info("Main loop.")
        except KeyboardInterrupt:
            STOP_FLAG.set()  # 先设置标志位,停止添加新任务
            for task in s.queue:
                s.cancel(task)  # 再取消所有队列中的任务
            logging.warning("End loop.")
            break
    logging.info("Finish.")

实现原理

介绍了那么多方法,那么多库,但是底层的实现逻辑都是差不多的。
一个完整的定时任务系统,有三个部分:

  1. 任务队列(task queue):根据执行时间和优先级进行排序
    • sorted(): 任务一多,整个队列重排,性能堪忧
    • heapq.pop():堆排序,只获取最高优先级的任务,不重复排序
  2. 调度器(scheduler)
  3. 执行器(executor)

前面的例子都是指定间隔时间的定时任务,那怎么执行指定时间点的任务呢?

上面的方法是可以做到的。有两种思路,

  1. 前面我们指定了间隔时间,那指定时间点,就先计算当前时间到指定时间点的间隔时间
  2. 不管间隔时间,而是记录任务时间点,然后实时去检查是否到达指定时间点

以上就是用内置方法实现比较简单的定时任务的三种方式。文章来源地址https://www.toymoban.com/news/detail-553352.html

到了这里,关于Python中实现定时任务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Django框架使用定时器-APScheduler实现定时任务:django实现简单的定时任务

    系统:windows10 python: python==3.9.0 djnago==3.2.0 APScheduler==3.10.1 1、创建utils包,在包里面创建schedulers包 utils/schedulers/task.py utils/schedulers/scheduler.py utils/schedulers/__init__.py 2、项目配置文件settings.py

    2024年02月12日
    浏览(46)
  • Quartz实战:基于Quartz实现定时任务的动态调度,实现定时任务的增删改查

    Quartz使用文档,使用Quartz实现动态任务,Spring集成Quartz,Quartz集群部署,Quartz源码分析 Quartz使用文档,使用Quartz实现动态任务,Spring集成Quartz,Quartz集群部署,Quartz源码分析 此处省略了SysJob实体类,以及Mapper等对数据库的操作。 本文只是大致实现一个基于Quartz实现定时任务

    2024年02月15日
    浏览(49)
  • python 定时任务执行命令行

    1.使用场景: 定时执行jmeter脚本,通过python定时器隔一段时间执行命令行命令。 2.库: os、datetime、threading (1)利用threading.Timer()定时器实现定时任务 Timer方法 说明 Timer(interval, function, args=None, kwargs=None) 创建定时器 cancel() 取消定时器 start() 使用线程方式执行 join(self, timeout

    2023年04月18日
    浏览(49)
  • 使用shedlock实现分布式定时任务锁【防止task定时任务重复执行】

    第一步:引入shedlock相关依赖 ShedLock还可以使用Mongo,Redis,Hazelcast,ZooKeeper等外部存储进行协调,例如使用redis则引入下面的包 第二步:创建数据库表结构,数据库表的脚本如下: 第三步:添加shedlock配置类 (定时任务防重复执行的配置类) 第四步:在启动类上添加启动注

    2024年02月10日
    浏览(42)
  • 【Python--定时任务的四种方法】

    当每隔一段时间就要执行一段程序,或者往复循环执行某一个任务,这就需要使用定时任务来执行程序。应用很广泛,可以实现程序的自动化,而不需要我们手动的在规定时间内执行。如爬虫,如定时器等等。以下将介绍几种pyhton常用的定时任务方法。 执行简单,代码容易理

    2024年02月04日
    浏览(51)
  • 青龙面版 定时任务执行python文件

    打开菜单:定时任务 第一步:新建任务 第二步:输入任务名称, 命令为:task 文件名称 (如果是根目录,直接写文件名称) 举例: 把依赖先安装好, 获取 下个节假日.py 代码

    2024年01月22日
    浏览(47)
  • springboot---定时任务实现

    任意类中创建一个方法,将该方法用@scheduled注解修饰,然后在项目的主方法上添加@EnableScheduling注解,定时任务就会生效。 但是需要注意的是定时任务不会一开始就执行,会等待设定的时间 1.2.1. cron cron表达式是一个字符串,字符串以5或6个空格隔开,分开共6或7个域,每一个

    2024年02月11日
    浏览(39)
  • SpringBoot 实现定时任务

    定时任务在实际项目开发中很常见,并且定时任务可以在各种场景中应用,通过自动化操作和任务的规模化管理,提高效率、可靠性和工作质量。可以减少手动操作,避免疏忽和错误,并节省时间和人力资源的投入 简单易用: 使用注解驱动的方式,简化了定时任务的配置和

    2024年02月12日
    浏览(57)
  • Java -- 定时任务实现方式

    在Java开发中,定时任务是一种十分常见的功能. 定时任务是在约定时间内执行的一段程序 如每天凌晨24点备份同步数据,又或者电商平台 30 分钟后自动取消未支付的订单,每隔一个小时拉取一次数据等都需要使用到定时器 批量处理数据:批量统计上个月的某个数据。 时间驱

    2024年02月02日
    浏览(38)
  • flask python 设置定时任务 flask 周期性执行任务方案

    flask 通常使用 flask_apscheduler 框架设计定时任务,flask_apscheduler 功能很全面,能按设定的时间规则执行任务,可以持久化到各类数据库(mysql,redis,mongodb),实现对定时任务增、删、改、查等操作。 方法三:通过调用 flask_apscheduler 的 api (推荐) 实例对象 scheduler 拥有增、删

    2024年01月21日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包