(编辑:jimmy 日期: 2024/12/28 浏览:2)
在实际开发中我们经常会碰上一些重复性或周期性的任务,比如像每天定时爬取某个网站的数据、一定周期定时运行代码训练模型等,类似这类的任务通常需要我们手动来进行设定或调度,以便其能够在我们设定好的时间内运行。
在 Windows 上我们可以通过计划任务来手动实现,而在 Linux 系统上往往我们会用到更多关于 crontab 的相关操作。但手动管理并不是一个很好的选择,如果我们需要有十几个不同的定时任务需要管理,那么每次通过人工来进行干预未免有些笨拙,那这时候就真的是「人工智能」了。
所以将这些定时任务的调度代码化才是能够让我们很好地从这种手动管理的纯人力操作中解脱出来。
在 Python 生态中对于定时任务的一些操作主要有那么几个:
所以为了满足能够相对复杂的时间条件,又不需要在前期的环境搭建上花费很多时间的前提下,选择 APScheduler 来对我们的调度任务或定时任务进行管理是个性价比极高的选择。而本文主要会带你快速上手有关 APScheduler 的使用。
虽然说官方文档上的内容不是很多,而且所列举的 API 不是很多,但这侧面也反映了这一框架的简单易用。所以在使用 APScheduler 之前,我们需要对这个框架的一些概念简单了解,主要有那么以下几个:
所谓的触发器就是用以触发定时任务的组件,在 APScheduler 中主要是指时间触发器,并且主要有三类时间触发器可供使用:
任务持久化主要是用于将设定好的调度任务进行存储,即便是程序因为意外情况,如断电、电脑或服务器重启时,只要重新运行程序时,APScheduler 就会根据对存储好的调度任务结果进行判断,如果出现已经过期但未执行的情况会进行相应的操作。
APScheduler 为我们提供了多种持久化任务的途径,默认是使用 memory 也就是内存的形式,但内存并不是持久化最好的方式。最好的方式则是通过像数据库这样的载体来将我们的定时任务写入到磁盘当中,只要磁盘没有损坏就能将数据给恢复。
APScheduler 支持的且常用的数据库主要有:
通常我们可以在创建 Scheduler 实例时创建,或是单独为任务指定。配置的方式相对简单,我们只需要指定对应的数据库链接即可。
执行器顾名思义就是执行我们任务的对象,在计算机内通常要么是 CPU 调度任务,要么是单独维护一个线程来运行任务。所以 APScheduler 里的执行器通常就是 ThreadPoolExecutor 或 ProcessPoolExecutor 这样的线程池和进程池两种。
当然如果是和协程或异步相关的任务调度,还可以使用对应的 AsyncIOExecutor、TwistedExecutor 和 GeventExecutor 三种执行器。
调度器的选择主要取决于你当前的程序环境以及 APScheduler 的用途。根据用途的不同,APScheduler 又提供了以下几种调度器:
通常情况下如果不是和 Web 项目或应用集成共存,那么往往都首选 BlockingScheduler 调度器来进行操作,它会在当前进程中启动相应的线程来进行任务调度与处理;反之,如果是和 Web 项目或应用共存,那么需要选择 BackgroundScheduler 调度器,因为它不会干扰当前应用的线程或进程状况。
基于对以上的概念和组件认识,我们就能基本上摸清 APScheduler 的运行流程:
虽然 APScheduler 里面的概念和组件看起来有点多,但在使用上并不算很复杂,我们可以通过本节的示例就能够很快使用。
选择对应的 scheduler
在使用之前我们需要先实例化一个 scheduler 对象,所有的 scheduler 对象都被放在了 apscheduler.schedulers 模块下,我们可以直接通过查看 API 文档或者借助 IDE 补全的提示来获取相应的 scheduler 对象。
这里我直接选取了最基础的 BlockingScheduler:
# main.py from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler()
配置 scheduler
对于 scheduler 的一些配置我们可以直接在实例化对象时就进行配置,当然也可以在创建实例化对象之后再进行配置。
实例化时进行参数配置:
# main.py from datetime import datetime from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.blocking import BlockingScheduler # 任务持久化 使用 SQLite jobstores = { 'default': SQLAlchemyJobStore(url = 'sqlite:///jobs.db') } # 执行器配置 executors = { 'default': ThreadPoolExecutor(20), } # 关于 Job 的相关配置,见官方文档 API job_defaults = { 'coalesce': False, 'next_run_time': datetime.now() } scheduler = BlockingScheduler( jobstores = jobstores, executors = executors, job_defaults = job_defaults, timezone = 'Asia/Shanghai' )
或是通过 scheduler.configure 方法进行同样的操作:
scheduler = BlockingScheduler() scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone='Asia/Shanghai')
创建 scheduler 对象之后,我们需要调用其下的 add_job() 或是 scheduled_job() 方法来将我们需要执行的函数进行注册。前者是以传参的形式指定对应的函数名,而后者则是以装饰器的形式直接对我们要执行的函数进行修饰。
比如我现在有一个输出此时此刻时间的函数 now():
from datetime import datetime def now(trigger): print(f"trigger:{trigger} -> {datetime.now()}")
然后我打算每 5 秒的时候运行一次,那我们使用 add_job() 可以这样写:
if __name__ == '__main__': scheduler.add_job(now, trigger = "interval", args = ("interval",), seconds = 5) scheduler.start()
在调用 start() 方法之后调度器就会开始执行,并在控制台上看到对应的结果了:
trigger:interval -> 2021-01-16 21:19:43.356674
trigger:interval -> 2021-01-16 21:19:46.679849
trigger:interval -> 2021-01-16 21:19:48.356595
当然使用 @scheduled_job 的方式来装饰我们的任务或许会更加自由一些,于是上面的例子就可以写成这样:
@scheduler.scheduled_job(trigger = "interval", args = ("interval",), seconds = 5) def now(trigger): print(f"trigger:{trigger} -> {datetime.now()}") if __name__ == '__main__': scheduler.start()
运行之后就会在控制台看到同样的结果了。
不过需要注意的是,添加任务一定要在 start() 方法执行前调用,否则会找不到任务或是抛出异常。
如果你是正在做有关的 Web 项目且存在一些定时任务,那么得益于 APScheduler 由于多样的调度器,我们能够将其和我们的项目结合到一起。
如果你正在使用 Flask,那么 Flask-APScheduler 这一别人写好的第三方包装库就很适合你,虽然它没有相关的文档,但只要你了解了前面我所介绍的有关于 APScheduler 的概念和组件,你就能很轻易地看懂这个第三方库仓库里的示例代码。
如果你使用的不是 Flask 框架,那么 APScheduler 本身也提供了一些对任务或作业的增删改查操作,我们可以自己编写一套合适的 API。
这里我使用的是 FastAPI 这一目前流行的 Web 框架。demo 项目结构如下:
temp-scheduler ├── config.py # 配置项 ├── main.py # API 文件 └── scheduler.py # APScheduler 相关设置
这里我们需要的依赖不多,只需要简单几个即可:
pip install fastapi apscheduler sqlalchemy uvicorn
如果项目中模块过多,那么使用一个文件或模块来进行统一管理是最好的选择。这里的 config.py 我们主要像 Flask 的配置那样简单设定:
from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.blocking import BlockingScheduler class SchedulerConfig: JOBSTORES = {"default": SQLAlchemyJobStore(url="sqlite:///job.db")} EXECUTORS = {"default": ThreadPoolExecutor(20)} JOB_DEFAULTS = {"coalesce": False} @classmethod def to_dict(cls): return { "jobstores": cls.JOBSTORES, "executors": cls.EXECUTORS, "job_defaults": cls.JOB_DEFAULTS, }
在 SchedulerConfig 配置项中我们可以自己实现一个 to_dict() 类方法,以便我们后续传参时通过解包的方式直接传入配置参数即可。
scheduler.py 模块的设定也比较简单,即设定对应的 scheduler 调度器即可。由于是演示 demo 我还将要定期执行的任务也放在了这个模块当中:
import logging from datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler from config import SchedulerConfig scheduler = BackgroundScheduler() logger = logging.getLogger(__name__) def init_scheduler() -> None: # config scheduler scheduler.configure(**SchedulerConfig.to_dict()) logger.info("scheduler is running...") # schedule test scheduler.add_job( func=mytask, trigger="date", args=("APScheduler Initialize.",), next_run_time=datetime.now(), ) scheduler.start() def mytask(message: str) -> None: print(f"[{datetime.now()}] message: {message}")
在这一部分中:
在 main.py 模块就主要存放着我们由 FastAPI 所构建的相关 API。如果在后续开发时存在多个接口,此时就需要将不同接口放在不同模块文件中,以达到路由的分发与管理,类似于 Flask 的蓝图模式。
import logging import uuid from datetime import datetime from typing import Any, Dict, Optional, Sequence, Union from fastapi import FastAPI from pydantic import BaseModel from scheduler import init_scheduler, mytask, scheduler logger = logging.getLogger(__name__) app = FastAPI(title="APScheduler API") app.add_event_handler("startup", init_scheduler) class Job(BaseModel): id: Union[int, str, uuid.UUID] name: Optional[str] = None func: Optional[str] = None args: Optional[Sequence[Optional[str]]] = None kwargs: Optional[Dict[str, Any]] = None executor: Optional[str] = None misfire_grace_time: Optional[str] = None coalesce: Optional[bool] = None max_instances: Optional[int] = None next_run_time: Optional[Union[str, datetime]] = None @app.post("/add") def add_job( message: str, trigger: str, trigger_args: Optional[dict], id: Union[str, int, uuid.UUID], ): try: scheduler.add_job( func=mytask, trigger=trigger, kwargs={"message": message}, id=id, **trigger_args, ) except Exception as e: logger.exception(e.args) return {"status_code": 0, "message": "添加失败"} return {"status_code": 1, "message": "添加成功"} @app.delete("/delete/{id}") def delete_job(id: Union[str, int, uuid.UUID]): """delete exist job by id""" try: scheduler.remove_job(job_id=id) except Exception: return dict( message="删除失败", status_code=0, ) return dict( message="删除成功", status_code=1, ) @app.put("/reschedule/{id}") def reschedule_job( id: Union[str, int, uuid.UUID], trigger: str, trigger_args: Optional[dict] ): try: scheduler.reschedule_job(job_id=id, trigger=trigger, **trigger_args) except Exception as e: logger.exception(e.args) return dict( message="修改失败", status_code=0, ) return dict( message="修改成功", status_code=1, ) @app.get("/job") def get_all_jobs(): jobs = None try: job_list = scheduler.get_jobs() if job_list: jobs = [Job(**task.__getstate__()) for task in job_list] except Exception as e: logger.exception(e.args) return dict( message="查询失败", status_code=0, jobs=jobs, ) return dict( message="查询成功", status_code=1, jobs=jobs, ) @app.get("/job/{id}") def get_job_by_id(id: Union[int, str, uuid.UUID]): jobs = [] try: job = scheduler.get_job(job_id=id) if job: jobs = [Job(**job.__getstate__())] except Exception as e: logger.exception(e.args) return dict( message="查询失败", status_code=0, jobs=jobs, ) return dict( message="查询成功", status_code=1, jobs=jobs, )
以上代码看起来很多,其实核心的就那么几点:
FastAPI 对象 app 的初始化。这里用到的 add_event_handler() 方法就有点像 Flask 中的 before_first_request,会在 Web 服务请求伊始进行操作,理解为初始化相关的操作即可。
API 接口路由。路由通过 app 对象下的对应 HTTP 方法来实现,如 GET、POST、PUT 等。这里的装饰器用法其实也和 Flask 很类似,就不多赘述。
scheduler 对象的增删改查。从 scheduler.py 模块中引入我们创建好的 scheduler 对象之后就可以直接用来做增删改查的操作:
完成以上的所有操作之后,我们就可以打开控制台,进入到该目录下并激活我们的虚拟环境,之后运行:
uvicorn main:app
之后我们就能在 FastAPI 默认的地址 http://127.0.0.1:8000/docs 中看到关于全部接口的 Swagger 文档页面了:
fastapi 集成的 swagger 页面
之后我们可以直接在文档里面或使用 Postman 来自己进行接口测试即可。
本文介绍了有关于 APScheduler 框架的概念及其用法,并进行了简单的实践。
得益于 APScheduler 的模块化设计才可以让我们更方便地去理解、使用它,并将其运用到我们实际的开发过程中。
从 APScheduler 目前的 Github 仓库代码以及 issue 来看,作者已经在开始重构 4.0 版本,当中的一些源代码和 API 也有较大的变动,相信在 4.0 版本中将会引入更多的新特性。
但如果现阶段你正打算使用或已经使用 APScheduler 用于实际生产中,那么希望本文能对会你有所帮助。