简介 链接到标题
apscheduler 全称 Advanced Python Scheduler
,调度器,主要功能如下:
- 动态添加、删除任务
- 暂停、恢复任务
- 周期性调度:cron,date,interval
- …
那么接下来我们根据官方示例,看看 apscheduler 是如何进行处理任务的。
示例版本为 2.1,因为在 2.1 版本包含目前 master 分支上的主要功能,简单易懂。
代码结构如下:
yiran@zhouyirandeMacBook-Pro:~/Documents/git-repo/apscheduler
2.1 ✔ $ tree apscheduler
apscheduler
├── __init__.py
├── events.py
├── job.py
├── jobstores
│ ├── __init__.py
│ ├── base.py
│ ├── mongodb_store.py
│ ├── ram_store.py
│ ├── redis_store.py
│ ├── shelve_store.py
│ └── sqlalchemy_store.py
├── scheduler.py
├── threadpool.py
├── triggers
│ ├── __init__.py
│ ├── cron
│ │ ├── __init__.py
│ │ ├── expressions.py
│ │ └── fields.py
│ ├── interval.py
│ └── simple.py
└── util.py
示例:
from datetime import datetime, timedelta
from apscheduler.scheduler import Scheduler
from apscheduler.jobstores.shelve_store import ShelveJobStore
def alarm(time):
print('Alarm! This alarm was scheduled at %s.' % time)
if __name__ == '__main__':
scheduler = Scheduler(standalone=True)
scheduler.add_jobstore(ShelveJobStore('example.db'), 'shelve')
alarm_time = datetime.now() + timedelta(seconds=10)
scheduler.add_date_job(alarm, alarm_time, name='alarm',
jobstore='shelve', args=[datetime.now()])
print('To clear the alarms, delete the example.db file.')
print('Press Ctrl+C to exit')
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
Scheduler 链接到标题
上述示例很容易理解,首先对 Scheduler
实例化,然后添加 jobstore,定义一个名为 alarm
的 job,并指定其运行时间为当前时间 + 10s,将该 job 添加到 scheduler 中,添加 job 类型为 date_job
,然后启动 scheduler。
可以看到我们所有的操作都是通过 scheduler 方法实现的,那么我们来看下 Scheduler
类具体实现了哪些功能:
class Scheduler(object):
"""
This class is responsible for scheduling jobs and triggering
their execution.
"""
_stopped = True
_thread = None
def __init__(self, gconfig={}, **options):
self._wakeup = Event()
self._jobstores = {}
self._jobstores_lock = Lock()
self._listeners = []
self._listeners_lock = Lock()
self._pending_jobs = []
self.configure(gconfig, **options)
在 Scheduler
构造函数中,对一些变量进行初始化,这里要注意 self._wakeup
,后续的一些主要功能都是通过它来实现的。接下来看看 add_jobstore
方法:
def add_jobstore(self, jobstore, alias, quiet=False):
self._jobstores_lock.acquire() # 请求锁
try:
if alias in self._jobstores:
raise KeyError('Alias "%s" is already in use' % alias)
self._jobstores[alias] = jobstore # 将 jobstore 别名作为 key,添加到 self._jobstores 中
jobstore.load_jobs() # 加载 jobstore 中所有 job
finally:
self._jobstores_lock.release() # 释放锁
# Notify listeners that a new job store has been added
self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_ADDED, alias)) # 事件通知
# Notify the scheduler so it can scan the new job store for jobs
if not quiet:
self._wakeup.set() # 将 Event 置为 True
在 add_jobstore
中,将 jobstore 添加到 scheduler 中,并加载当前 jobstore 中的所有任务,接下来将具体的 job 添加到 scheduler 中:
def add_date_job(self, func, date, args=None, kwargs=None, **options):
trigger = SimpleTrigger(date)
return self.add_job(trigger, func, args, kwargs, **options)
这里的 SimpleTrigger
只是多种 Trigger 中的一种,根据 Trigger 类型的不同,最主要的差别在于 get_next_fire_time
计算方式不同。
如果添加的任务是 interval_job,那么对应 Trigger 为 IntervalTrigger
;如果添加的任务是 cron_job,那么对应的 Trigger 为 CronTrigger
。
可以看到不同的任务只是 Trigger 计算方式不同,最终还是通过 add_job
方法,继续看:
def add_job(self, trigger, func, args, kwargs, jobstore='default',
**options):
job = Job(trigger, func, args or [], kwargs or {},
options.pop('misfire_grace_time', self.misfire_grace_time),
options.pop('coalesce', self.coalesce), **options) # 将 job 实例化
if not self.running: # 如果 scheduler 未启动,那么将其添加到等待队列中
self._pending_jobs.append((job, jobstore))
logger.info('Adding job tentatively -- it will be properly '
'scheduled when the scheduler starts')
else:
self._real_add_job(job, jobstore, True) # 否则添加 job 到 jobstore 中
return job
继续看 _real_add_job
中的实现:
def _real_add_job(self, job, jobstore, wakeup):
job.compute_next_run_time(datetime.now()) # 计算job 下次运行时间
if not job.next_run_time:
raise ValueError('Not adding job since it would never be run')
self._jobstores_lock.acquire() # 请求锁
try:
try:
store = self._jobstores[jobstore]
except KeyError:
raise KeyError('No such job store: %s' % jobstore)
store.add_job(job) # 添加 job
finally:
self._jobstores_lock.release()
# Notify listeners that a new job has been added
event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job)
self._notify_listeners(event) # 事件通知
logger.info('Added job "%s" to job store "%s"', job, jobstore)
# Notify the scheduler about the new job
if wakeup:
self._wakeup.set() # # 将 Event 置为 True
在 _real_add_job
中我们终于看到 store.add_job(job)
,至于 store.add_job
如何实现我们之后看 JobStore
再说。
现在我们已经给 scheduler 添加了 jobstore 和 job,那么看下 scheduler 是如何运行的:
def start(self):
...
# Schedule all pending jobs
for job, jobstore in self._pending_jobs: # 将 scheduler 未运行时添加的 job,即在等待队列中的 job 添加到 jobstore 中
self._real_add_job(job, jobstore, False)
del self._pending_jobs[:]
self._stopped = False
if self.standalone:
self._main_loop()
else:
self._thread = Thread(target=self._main_loop, name='APScheduler')
self._thread.setDaemon(self.daemonic)
self._thread.start()
在 scheduler 运行时,会先将所有的 job 加载到 jobstore 中,然后调用 self._main_loop
,如果 Standalone 为 True,则会一直阻塞知道没有 job 需要运行,看看 self._main_loop
做了啥:
def _main_loop(self):
"""Executes jobs on schedule."""
logger.info('Scheduler started')
self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START)) # 事件通知
self._wakeup.clear()
while not self._stopped:
logger.debug('Looking for jobs to run')
now = datetime.now()
next_wakeup_time = self._process_jobs(now) # 计算下次唤醒时间
if next_wakeup_time is not None:
wait_seconds = time_difference(next_wakeup_time, now)
logger.debug('Next wakeup is due at %s (in %f seconds)',
next_wakeup_time, wait_seconds)
try:
self._wakeup.wait(wait_seconds) # 等待 Event flag
except IOError: # Catch errno 514 on some Linux kernels
pass
self._wakeup.clear()
elif self.standalone:
logger.debug('No jobs left; shutting down scheduler')
self.shutdown() # 若 scheduler standalone 为 True 且 jobs 为空,则停止 scheduler
break
else:
logger.debug('No jobs; waiting until a job is added')
try:
self._wakeup.wait() # 等待 Event flag
except IOError: # Catch errno 514 on some Linux kernels
pass
self._wakeup.clear()
还记得上面提到的 Scheduler
构造函数中的 self._wakeup
么,它实际上是 threading.Event
,它的 wait 方法会一直 block 直到 Event flag 为 True,也就是我们上面看到的 self._wakeup.set()
,那么我们可以知道在 Scheduler
中有几种场景会置为 True:
- Scheduler.shutdown
- Scheduler.add_jobstore
- Scheduler._real_add_job
如果没有触发上述场景,则 _main_loop
会根据 jobs 的执行时间一直循环等待。
JobStore 链接到标题
在 apscheduler 中,JobStore 只是单纯的实现了 Job 相关的方法:
class JobStore(object):
def add_job(self, job):
def update_job(self, job):
def remove_job(self, job):
def load_jobs(self):
def close(self):
其中,对 job 的操作会根据 JobStore 类型的不同,而采用不同的序列化方式,比如在 MongoDBJobStore
中采用的是 bson.binary
,而在其他 JobStore 比如 RedisJobStore
中采用的都是 pickle
。
Events 链接到标题
在 Scheduler
中,我们已经看到通过 threading.Event
来实现事件通知的,那么我们通知的 Event
都是在 apscheduler.events
中定义好的,比如:
class JobEvent(SchedulerEvent):
def __init__(self, code, job, scheduled_run_time, retval=None,
exception=None, traceback=None):
SchedulerEvent.__init__(self, code)
self.job = job
self.scheduled_run_time = scheduled_run_time
self.retval = retval
self.exception = exception
self.traceback = traceback
在 JobEvent
中,我们能看到 job 的执行时间,返回值,异常捕获等信息。如果看过之前关于 huey 博客 的同学应该知道,在 huey 中是可以直接通过 task id 获取 task 执行结果的,但是在 apscheduler 中,我们并没有直接获取该结果的方法,而是通过在 Scheduler
中的 add_listener
添加监听者,监控指定成功的 Job 获取该 Job 的返回值,感觉这里不太友好。
总结 链接到标题
到这里我们基本上已经将 apscheduler 的流程走了一遍,具体的 Trigger 计算时间的方法之后有机会单独写一下关于 cron,interval,date 的计算方法。
与 huey 相比,apscheduler 使用上要简单,但是简单也意味着功能的不足,比如获取 job 执行结果、job retry 机制等等。当然也有比较好的地方,apscheduler 在跟 web 框架比如 Flask,Django 集成的时候有一些第三方插件可以直接使用,不用像 Huey 一样要单独启动一个 consumer 进程,比较方便。