Python Celery队列、定时任务
介绍
Celery (开源)是一个 Python 编写的简单、灵活、可靠的用来处理大量信息的异步的分布式任务队列,主要用于实时处理和任务调度, 同时提供操作和维护分布式系统所需的工具。Celery可以支持多台不同的计算机执行不同的任务或者相同的任务。
组件
Brokers
Celery 有生产者和消费者的角色,brokers 是生产者和消费者存放/拿取产品的地方(队列),常见的 brokers 有 rabbitmq、redis、Zookeeper、etcd 等。
Result Stores / backend
队列中的任务运行完后的结果或者状态需要被任务发送者知道,需要一个地方储存这些结果。常见的 backend 有redis、Memcached等。
Workers
消费者,从队列中取出任务并执行。
Tasks
队列中进行的任务,由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。
Celery Beat
任务调度器,用来调度周期任务。
Producer
任务生产者,调用 Celery 产生任务。
流程
Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和 Worker 调节。
启动一个任务,客户端向消息队列发送一条消息,然后中间人(Broker)将消息传递给一个 Worker,最后由 Worker进行执行中间人(Broker)分配的任务。
架构
简单说,可以有多个"消息队列"(message Queue),不同的消息可以指定发送给不同的Message Queue,而这是通过Exchange来实现的,发送消息到"消息队列"中时,可以指定routiing_key,Exchange通过routing_key来把消息路由(routes)到不同的"消息队列"中去。Exchange 对应一个消息队列(queue),即:通过"消息路由"的机制使Exchange对应queue,每个queue对应每个worker。
注:任务队列与消息队列都是由队列实现的异步协议,只是消息队列(Message Queue) 用来做异步通信,而任务队列(Task Queue) 更强调异步执行的任务。实际上发送消息也是一个任务,也就是说任务队列是在消息队列之上的管理工作,任务队列的很多典型应用就是发送消息,如发送邮件,发送短信,发送消息推送等。
安装
Python: 2.7.10 Celery: 4.4.0 Redis: 3.3.11(库) 5.0.7(包)
我这里使用 Redis 当做 Celery 的 Broker 和 Backend
pip install redis==3.3.11 pip install celery==4.4.0
写个task任务,例子1
vim tasks.py #!/usr/bin/env python from celery import Celery # 配置 backend 和 broker app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0') # 使用装饰器方式将函数装饰成 celery task @app.task def add(x, y): return x + y
根据状态执行不同操作, 例子2
from celery import Celery class BTask(Task): def on_success(self, retval, task_id, args, kwargs): print 'task done: {0}'.format(retval) return super(BTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): print 'task fail, reason: {0}'.format(exc) return super(BTask, self).on_failure(exc, task_id, args, kwargs, einfo) @app.task(base=BTask) def add(x, y): # raise KeyError return x + y
启动worker
celery -A tasks worker --loglevel=info
触发任务
vim trigger.py #!/usr/bin/env python import time from tasks import add result = add.delay(4, 10) while not result.ready(): time.sleep(1) print 'task done: {0}'.format(result.get())
delay 返回的是一个 AsyncResult 对象,里面存的就是一个异步的结果,当任务完成时result.ready() 为 true,然后用 result.get() 取结果。
任务状态
PENDING 任务等待中
STARTED 任务已开始
SUCCESS 任务执行成功
FAILURE 任务执行失败
RETRY 任务将被重试
REVOKED 任务取消
当我们有个耗时时间较长的任务进行时一般我们想得知它的实时进度,这里就需要我们自定义一个任务状态用来说明进度并手动更新状态,从而告诉回调当前任务的进度, 列子3
vim tasks.py from celery import Celery import time @app.task(bind=True) def mTasks(self): for i in xrange(1, 11): time.sleep(0.1) self.update_state(state="PROGRESS", meta={'p': i*10}) return 'done'
vim trigger.py #!/usr/bin/env python #coding: utf-8 from task import mTasks import sys def pm(body): res = body.get('result') if body.get('status') == 'PROGRESS': sys.stdout.write('\r任务进度: {0}%'.format(res.get('p'))) sys.stdout.flush() else: print '\r' print res r = mTasks.delay() print r.get(on_message=pm, propagate=False)
定时/周期性任务
vim celery_config.py from datetime import timedelta from celery.schedules import crontab CELERYBEAT_SCHEDULE = { 'ptask': { 'task': 'tasks.period_task', 'schedule': timedelta(seconds=5), # 间隔执行的时间可以用 datetime.timedelta 或者 crontab }, } CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
定时参考:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules
vim tasks.py app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0') app.config_from_object('celery_config') @app.task(bind=True) def period_task(self): print 'period task done: {0}'.format(self.request.id)
链式任务
def update_page_info(url): # fetch_page -> parse_page -> store_page chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url) chain() # 或 #fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)]) @app.task() def fetch_page(url): return myhttplib.get(url) @app.task() def parse_page(page): return myparser.parse_document(page) @app.task(ignore_result=True) def store_page_info(info, url): PageInfo.objects.create(url=url, info=info)
链式任务中前一个任务的返回值默认是下一个任务的输入值之一 ( 不想让返回值做默认参数可以用 si() 或者 s(immutable=True) 的方式调用 )。
这里的 s() 是方法 celery.signature() 的快捷调用方式,signature 具体作用就是生成一个包含调用任务及其调用参数与其他信息的对象,有点类似偏函数的概念:先不执行任务,而是把任务与任务参数存起来以供其他地方调用。
多个消息队列
vim tasks.py from celery import Celery app = Celery(broker="redis://localhost:6379/1" ) app.config_from_object("celery_config") @app.task def t1(x, y): return x * y @app.task def t2(x, y, z): return x + y + z @app.task def add(x, y): return x + y
vim celery_config.py from kombu import Queue, Exchange CELERY_RESULT_BACKEND = "redis://localhost:6379/1" CELERY_QUEUES = { Queue("default", Exchange("default"), routing_key = "default"), Queue("t1", Exchange("t1"), routing_key = "t1"), Queue("t2", Exchange("t2"), routing_key = "t2") } #路由 CELERY_ROUTES = { "tasks.t1":{"queue": "t1", "routing_key": "t1"}, "tasks.t2":{"queue": "t2", "routing_key": "t2"} } CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json'] # CELERY_TIMEZONE = 'UTC' # 定时任务 # CELERYBEAT_SCHEDULE = # 't1_schedule' : { # 'task':'tasks.t1', # 'schedule':20, 3 'args':(5,6) # }
启动worker
celery -A tasks worker -l info -n workerA.%h -Q t1 celery -A tasks worker -l info -n workerA.%h -Q t2 celery -A tasks worker -l info -n worker.%h -Q celery # 默认的 # celery -A tasks beat
触发执行
vim trigger.py import time from tasks import t1,t2,add r1 = t1.delay(10, 20) time.sleep(1) print (r1.result) print (r1.status) r2 = t2.delay(10, 20, 30) time.sleep(1) print (r2.result) print (r2.status) r3 = add.delay(100, 200) time.sleep(1) print (r3.result) print (r3.status)
任务调用方式
add.delay(2, 2)
add.apply_async(2, 2)
实际上 delay 只是 apply_async 的快捷方式,只是 apply_async 可以进行更多的任务属性设置,更多参考
关于 AsyncResult
AsyncResult 主要用来储存任务执行信息与执行结果,有点类似 tornado 中的 Future 对象,都有储存异步结果与任务执行状态的功能。更多参考
参考