Python中实现定时任务
在项目中,我们可能遇到有定时任务的需求。
- 其一:每隔一个时间段就执行任务。 比如:服务中每隔20分钟消费消息队列中的消息。
- 其二:定时执行任务。例如每天早上 8 点定时推送早报。
今天跟大家分享下 Python 定时任务的实现方法。下面先贴上一段示例代码,有需要定时执行的task,也有代表主程序的main函数。接下来就看如何用内置的方法来实现在main函数运行的同时,定时执行task?
import time
import logging
logging.basicConfig(
level=logging.debug,
format="%(asctime)s.%(msecs)d | %(threadName)s | %(levelname)s - %(message)s"
)
def task():
logging.info("Task Start.")
time.sleep(1)
logging.info("Task Done.")
def main():
logging.info("Start.")
while True:
try:
time.sleep(1)
logging.info("Main loop.")
except KeyboardInterrupt:
logging.warning("End loop.")
break
logging.info("Finish.")
if __name__ == "__main__":
main()
1. time.sleep
第一种办法是最简单又最暴力。那就是在一个死循环中,使用函数 sleep()。
while True:
task()
time.sleep(5)
这个方法目前有几个问题:
- 阻塞主进程
- 一次循环只有一个task
第一个问题好解决,可以单独起一个线程去执行循环。第二个问题的话,一种是可以放在同一个循环里执行多个任务,还有一种更简单粗暴一点,多启几个线程来执行不同的循环😂。具体实现可以看如下代码:
STOPPED = threading.Event()
def schedule():
while not STOPPED.is_set():
task()
time.sleep(5)
def main():
threading.Thread(target=schedule).start()
logging.info("Start.")
while True:
try:
time.sleep(1)
logging.info("Main loop.")
except KeyboardInterrupt:
STOPPED.set() # 推出定时任务的循环
logging.warning("End loop.")
break
logging.info("Finish.")
2. threading.Timer
既然第一种方法暴力,那么有没有比较优雅点的方法?
Python 标准库 threading 中有个 Timer 类。它会新启动一个线程来执行定时任务,所以它是非阻塞函式。示例如下:
from threading import Timer
t = Timer(task, 5)
t.start()
优点:不阻塞主进程,task在线程中执行
缺点:一个Timer只能执行一次
实现原理:
线程中预置一个finished的Event,通过finished.wait等待固定时间间隔。
超时则执行任务。如果需要取消任务,可以调用Timer.cancel来取消任务。
如果想循环,需要改造一下task函数。
2.1 添加新Timer
可以创建一个新函数repeat_task,每次在执行task后(前)再新建一个新的Timer定时执行。(start开始的位置决定了是任务之间等待固定间隔时间,还是每个任务的开始等待固定间隔时间)
from threading import Timer
def repeat_task():
t = Timer(5, repeat_task)
t.start()
task()
def main():
repeat_task()
logging.info("Start.")
while True:
try:
time.sleep(1)
logging.info("Main loop.")
except KeyboardInterrupt:
for t in threading.enumerate():
if t != threading._main_thread:
logging.info("Cancel +1")
t.cancel()
logging.warning("End loop.")
break
logging.info("Finish.")
这样可以循环执行,但是仍然是只能一个线程一个任务。需要注意的是,停止整个程序时,取消所有除了主线程的线程。因为每次会生成一个新的Timer,所以在没有记录的情况下,只能遍历所有的线程来执行取消的动作。
2.2 改写Timer类
了解了Timer实现的原理,我们可以继承Timer类,run函数里再加一个循环用finished来控制,以此来实现在一个线程下重复执行任务。
class RepeatTimer(threading.Timer):
def __init__(self, interval, function, args=None, kwargs=None):
super().__init__(interval, function, args, kwargs)
def run(self):
while not self.finished.is_set():
self.finished.wait(self.interval)
if not self.finished.is_set():
self.function(*self.args, **self.kwargs)
def main():
t = RepeatTimer(5, task)
t.start()
logging.info("Start.")
while True:
try:
time.sleep(1)
logging.info("Main loop.")
except KeyboardInterrupt:
t.cancel()
logging.warning("End loop.")
break
logging.info("Finish.")
3. sched
第三种方式是使用标准库中sched
模块。sched
是事件调度器,它通过 scheduler
类来调度事件,从而达到定时执行任务的效果。sched
提供了两个添加调度任务的函数:
-
enter(delay, priority, action, argument=(), kwargs={})
该函数可以延迟一定时间执行任务。delay 表示延迟多长时间执行任务,单位是秒。 -
scheduler.enterabs(time, priority, action, argument=(), kwargs={})
该函数添加一项任务,这个任务会在 time 这时刻(time 是绝对时间)执行。
简单示例如下:
import sched
schedule = sched.scheduler() # 初始化 sched 模块的 scheduler 类
schedule.enter(10, 1, task) # 增加调度任务
schedule.run() # 开始调度任务
-
优点:
- 支持指定时间间隔和时间点执行任务
- 可以添加不同的任务
- 任务可以设置优先级
-
缺点:
scheduler 中的每个调度任务只会工作一次,不会无限循环被调用。如果想重复执行同一任务,需要重复添加调度任务。
如何自动重复添加调度任务呢?我们可以先添加一个repeat
装饰器。
def repeat(interval):
def wrapper(func):
@wraps(func)
def inner():
if not STOP_FLAG.is_set():
s.enter(interval, 0, inner) # 每次执行inner时会再次将其加入到schedule
func() # 然后再执行func函数
return inner
s.enter(interval, 0, inner) # 创建inner函数并加入到schedule
return wrapper
然后,通过repeat装饰器来实现自动重复添加调度任务。
import sched
import threading
from functools import wraps
s = sched.scheduler()
STOP_FLAG = threading.Event()
@repeat(5)
def new_task():
return task()
def main():
t = threading.Thread(target=s.run)
t.start()
logging.info("Start.")
while True:
try:
time.sleep(1)
logging.info("Main loop.")
except KeyboardInterrupt:
STOP_FLAG.set() # 先设置标志位,停止添加新任务
for task in s.queue:
s.cancel(task) # 再取消所有队列中的任务
logging.warning("End loop.")
break
logging.info("Finish.")
实现原理
介绍了那么多方法,那么多库,但是底层的实现逻辑都是差不多的。
一个完整的定时任务系统,有三个部分:
- 任务队列(task queue):根据执行时间和优先级进行排序
- sorted(): 任务一多,整个队列重排,性能堪忧
- heapq.pop():堆排序,只获取最高优先级的任务,不重复排序
- 调度器(scheduler)
- 执行器(executor)
前面的例子都是指定间隔时间的定时任务,那怎么执行指定时间点的任务呢?
上面的方法是可以做到的。有两种思路,文章来源:https://www.toymoban.com/news/detail-553352.html
- 前面我们指定了间隔时间,那指定时间点,就先计算当前时间到指定时间点的间隔时间
- 不管间隔时间,而是记录任务时间点,然后实时去检查是否到达指定时间点
以上就是用内置方法实现比较简单的定时任务的三种方式。文章来源地址https://www.toymoban.com/news/detail-553352.html
到了这里,关于Python中实现定时任务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!