背景 链接到标题
最近读完了 《Redis 实战》对 Redis 有了一些了解,但是没有在实际项目中应用过,就想找一个使用 Redis 的项目来看看,找到 Huey 是因为之前使用过,趁机了解下具体实现。
简介 链接到标题
Huey 的定位是一个轻量级的任务队列,仅依赖于 Redis 作为任务相关信息存储,支持的功能有:
- 多种 worker 执行方式:thread,process,greenlet
- 支持多种任务类型:特定时间运行,周期性运行
- 包含重试机制,可以指定重试次数及重试间隔
- 支持任务锁
- …
我们根据官方的示例,来看看 Huey 是如何处理任务的,目录结构如下:
master ✔ $ pwd
/Users/yiran/Documents/git-repo/huey/examples/simple
yiran@zhouyirandeMacBook-Pro:~/Documents/git-repo/huey/examples/simple
master ✔ $ tree .
.
├── README
├── __init__.py
├── config.py
├── cons.sh
├── main.py
└── tasks.py
注意,这个目录结构是 Huey 官方建议的,具体原因为:
Behind-the-scenes when you decorate a function with task() or periodic_task(), the function registers itself with a centralized in-memory registry. When that function is called, a reference is put into the queue (along with the arguments the function was called with, etc), and when that message is consumed, the function is then looked-up in the consumer’s registry. Because of the way this works, it is strongly recommended that all decorated functions be imported when the consumer starts up.
Task 链接到标题
Huey 支持通过 @task 装饰器的方式创建任务,示例如下:
from huey import RedisHuey
huey = RedisHuey('simple.test', blocking=True)
@huey.task()
def count_beans(num):
print('-- counted %s beans --' % num)
return 'Counted %s beans' % num
从 huey.RedisHuey 创建了 huey 实例,我们看下 RedisHuey 是什么:
class RedisHuey(Huey):
def get_storage(self, read_timeout=1, max_errors=1000,
connection_pool=None, url=None, **connection_params):
return RedisStorage(
name=self.name,
blocking=self.blocking,
read_timeout=read_timeout,
max_errors=max_errors,
connection_pool=connection_pool,
url=url,
**connection_params)
在 huey.storage 中定义了 RedisHuey,继承自 Huey 类,重新实现了 get_storage
方法,我们看下 RedisStorage
是做什么用途的:
class RedisStorage(BaseStorage):
redis_client = Redis
def __init__(self, name='huey', blocking=False, read_timeout=1,
max_errors=1000, connection_pool=None, url=None,
client_name=None, **connection_params):
if Redis is None:
raise ImportError('"redis" python module not found, cannot use '
'Redis storage backend. Run "pip install redis" '
'to install.')
if sum(1 for p in (url, connection_pool, connection_params) if p) > 1:
raise ValueError(
'The connection configuration is over-determined. '
'Please specify only one of the the following: '
'"url", "connection_pool", or "connection_params"')
if url:
connection_pool = ConnectionPool.from_url(
url, decode_components=True)
elif connection_pool is None:
connection_pool = ConnectionPool(**connection_params)
...
def clean_name(self, name):
def convert_ts(self, ts):
def enqueue(self, data):
def dequeue(self):
...
def get_errors(self, limit=None, offset=0):
def flush_errors(self):
def emit(self, message):
def listener(self):
def __iter__(self):
RedisStorage 继承自 Storage,主要用于所有任务相关信息的写入及读取,目前 Huey 中只实现了 Storage 介质,如果想使用其他方式(如 MongoDB)就需要自己实现了。
知道了 RedisHuey 跟 Huey 的主要区别只是在于 get_storage
方法不同,那么看看 task
方法的具体实现:
def task(self, retries=0, retry_delay=0, retries_as_argument=False,
include_task=False, name=None, **task_settings):
def decorator(func):
"""
Decorator to execute a function out-of-band via the consumer.
"""
return TaskWrapper(
self,
func.func if isinstance(func, TaskWrapper) else func,
retries=retries,
retry_delay=retry_delay,
retries_as_argument=retries_as_argument,
include_task=include_task,
name=name,
**task_settings)
return decorator
我们看下 TaskWrapper
做了啥,一些变量复制忽略掉,关键的是这几行:
class TaskWrapper(object):
def __init__(self, huey, func, retries=0, retry_delay=0,
retries_as_argument=False, include_task=False, name=None,
task_base=None, **task_settings):
...
self.task_class = create_task(
QueueTask if task_base is None else task_base,
func,
retries_as_argument,
name,
include_task,
**task_settings)
self.huey.registry.register(self.task_class)
def is_revoked(self, dt=None, peek=True):
return self.huey.is_revoked(self.task_class, dt, peek)
def revoke(self, revoke_until=None, revoke_once=False):
self.huey.revoke_all(self.task_class, revoke_until, revoke_once)
将函数 func 创建为一个任务,并将任务注册到 Huey 中。同时实现了一些方法,这些方法最终调用的都是在 Huey 类中实现的。
Consumer 链接到标题
root@yiran30250:~/project/huey/examples/simple
master ✔ $ cat cons.sh
#!/bin/bash
echo "HUEY CONSUMER"
echo "-------------"
echo "In another terminal, run 'python main.py'"
echo "Stop the consumer using Ctrl+C"
PYTHONPATH=.:$PYTHONPATH
export WORKER_CLASS=${1:-thread}
python ../../huey/bin/huey_consumer.py main.huey --workers=2 -v -s 10 -k $WORKER_CLASS -C
接下来我们看下 Consumer 是如何实现的,在进程启动脚本里,先忽略其他参数,我们指定了 huey 实例,并指定了 worker 数量为 2,我们看下是如何执行的:
huey_consumer.py
def consumer_main():
...
huey_instance = load_huey(args[0])
config.setup_logger()
consumer = huey_instance.create_consumer(**config.values)
consumer.run()
if __name__ == '__main__':
consumer_main()
加载 huey 实例,创建相应 Consumer 实例并运行,我们看下 Consumer 做了什么:
class Consumer(object):
"""
Consumer sets up and coordinates the execution of the workers and scheduler
and registers signal handlers.
"""
根据注释我们可以知道 Consumer 主要是启动了 worker 和 scheduler,并截获相应的信息进行处理。
在 huey/consumer.py 中定义了 Worker
,Scheduler
,Environment
类,其中 Environment
根据所指定的 worker 类型的不同,分为 thread,process,greenlet。
我们看下 Worker
是如何执行对应的任务的:
Worker 链接到标题
def loop(self, now=None):
task = None
exc_raised = True
try:
task = self.huey.dequeue() # 从 hue 中获取任务,也就是从 redis 中获取任务
...
if task:
self.delay = self.default_delay
self.handle_task(task, now or self.get_now()) # 如果获取到了任务,对任务进行处理
elif exc_raised or not self.huey.blocking:
self.sleep()
def handle_task(self, task, ts):
if not self.huey.ready_to_run(task, ts):
self.add_schedule(task) # 若任务没有到达执行时间,则添加到 schedule 中
elif not self.is_revoked(task, ts):
self.process_task(task, ts) # 若任务没有被取消,则执行任务
else:
self.huey.emit_task(
EVENT_REVOKED,
task,
timestamp=ts)
self._logger.debug('Task %s was revoked, not running', task) # 任务被取消,通知 event
def process_task(self, task, ts):
...
self.run_pre_execute_hooks(task) # 执行 pre hook 动作
...
task_value = self.huey.execute(task) # 具体执行 task
...
self.run_post_execute_hooks(task, task_value, exception) # 执行 post hook 动作
...
看到 self.huey.execute(task) 我们发现最终还是通过 Huey 中的方法 execute 来执行 task,那么我们看下是如何实现的:
def execute(self, task):
if not isinstance(task, QueueTask):
raise TypeError('Unknown object: %s' % task)
try:
result = task.execute()
发现这里调用的是 task.execute,这里的 task 是谁呢,是我们最开始提到的 TaskWrapper
么,在 TaskWrapper
中没有实现 execute
方法,这里其实是上面提到的 create_task
返回的类,我们看下 create_task
的具体实现:
def create_task(task_class, func, retries_as_argument=False, task_name=None,
include_task=False, **kwargs):
def execute(self):
args, kwargs = self.data or ((), {})
if retries_as_argument:
kwargs['retries'] = self.retries
if include_task:
kwargs['task'] = self
return func(*args, **kwargs)
attrs = {
'execute': execute,
'__module__': func.__module__,
'__doc__': func.__doc__}
attrs.update(kwargs)
if not task_name:
task_name = func.__name__
return type(task_name, (task_class,), attrs)
可以看到 execute
函数中最终执行的就是 func 自己, func(*args, **kwargs)
,通过 type 返回真正的 task 类,指定父类为 task_class
(默认是 QueueTask)。
到这里我们就完成了整个流程:任务注册,任务调度,任务执行。那么我们再来看看 Huey 中的锁是怎么实现的。
Lock 链接到标题
class TaskLock(object):
"""
Utilize the Storage key/value APIs to implement simple locking. For more
details see :py:meth:`Huey.lock_task`.
"""
def __init__(self, huey, name):
self._huey = huey
self._name = name
self._key = '%s.lock.%s' % (self._huey.name, self._name)
self._huey._locks.add(self._key)
def __call__(self, fn):
@wraps(fn)
def inner(*args, **kwargs):
with self:
return fn(*args, **kwargs)
return inner
def __enter__(self):
if not self._huey._put_if_empty(self._key, '1'):
raise TaskLockedException('unable to set lock: %s' % self._name)
def __exit__(self, exc_type, exc_val, exc_tb):
self._huey._get_data(self._key)
def put_if_empty(self, key, value):
return self.conn.hsetnx(self.result_key, key, value)
在 Huey 中,任务锁的实现非常简单,单纯的利用了下 Redis 中的 HSETNX 机制:
Sets field in the hash stored at key to value, only if field does not yet exist. If key does not exist, a new key holding a hash is created. If field already exists, this operation has no effect.
总结 链接到标题
Huey 作为一个任务调度应用,整体代码量不多,但是有很多值得学习的地方,也许过段时间再看会有新的收获。