好好学习,天天向上

Celery从入门到放弃:celery是什么?

第一次知道celery是因为想给django加入异步。

众所周知,django对单个请求是同步的,这里同步的意思是,客户端发起一个request,django完全处理完毕之后才返回response。这种情况下,如果django处理的时间很长很长,客户端就会出现等待不到响应超时的情况。而当时我刚好需要通过django调用ansible进行一些长时间的任务。这些任务耗时之长,可以达到1h。而让客户端一直等待响应1h显然是非常不可取的。

示意图1

此时,一个解决方案就是,django把长耗时的任务交由某种机制进行处理,然后立即返回response。之后,客户端通过另外的接口,查询任务处理情况。

示意图2

于是,celery就闪亮登场了。

摘抄官方文档中对celery的定义: > celery是一个简单、灵活且可靠的分布式系统,用来处理大量的消息,同时提供维护这样一个系统所需的操作工具。

它是一个专注于实时处理的任务队列,同时也支持任务调度。

celery初体验

详情见First Steps with Celery

选择并安装消息传输方式(broker)

celery自身并不保存数据,而是借助broker来发送和接收消息。celery支持多种broker: * RabbitMQ * Redis * 其他(详见Broker Overview

因此,我们需要根据所需选择并安装合适的broker。 > 注:具体broker的安装方式详见所选broker的官网。这里只聚焦celery哈。

安装celery

通过pip,即可安装celery:

1
pip install celery
#### 创建任务 首先,我们需要创建一个Celery实例app,通过这个实例来访问celery的所有功能,例如创建人物或者管理worker等等。

让我们新建一个文件,起名为tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
import time
from celery import Celery

# 第一个参数是当前模块的名字
# 第二个参数是所选broker的URL。这里,我们使用RabbitMQ
app = Celery('eletest', broker='amqp://ele@localhost//')

# 我们定义一个简单的任务,休眠100秒后,打印一句"hello, {{name}}"
@app.task
def sayHello(name):
time.sleep(100)
print "hello,", name
#### 运行worker 运行以下命令即可:
1
$ celery -A eletest worker --loglevel=info
> 注:如果想通过守护模式启动celery的话,可以考虑借助supervisord

celery worker提供了多种选项,可以通过celery worker -h进行了解使用

celery还提供了很多种命令。这里就不一一说明了。

调用任务

通过上面的步骤,我们已经把接收消息进行处理这一步完成了。现在,我们需要看看怎么发送消息。

在没有使用celery之前,我们是这样调用我们的任务的:

1
sayHello("ele") # hello, ele
使用celery,则只需要将上面替换成下面的代码:
1
2
3
4
5
from tasks import sayHello

sayHello.delay("ele")
# 执行上面这一句代码的时候,celery会把相关参数整理成某种格式,然后扔到你指定的broker。然后,前面启动的worker就会从broker里面读取相关参数,调用任务sayHello。
# 可以通过worker的控制台输出来看到执行过程
这里有几点要说明一下: * 调用sayHello.delay会立即返回一个AsyncResult实例。通过这个实例,我们就可以检查任务执行的状态,获取任务执行完后的返回值等等。这样,我们在django中,就可以返回给客户端一个taskid(通过该实例获得),然后客户端就可以使用这个taskid来了解任务真正的处理状态了。 * delay方法apply_async方法的简略版,只接受两个参数,一个是任务的位置参数,另一个是任务的关键字参数。使用这个方法,只需要把task(...)直接改成task.delay(...)即可。懒人方法,后面的事情celery都帮你做好了。如果需要进一步的定制,则可以使用后者。后者提供了详尽的参数,你可以自己指定taskid、broker的相关信息、连接等等。

保存任务执行结果

默认情况下,调用任务是不会保存结果的。因此,可以指定backend来实现任务结果的存储。对于backend,可以选择SQLAlchemy/Django ORM, Memcached, Redis, RPC (RabbitMQ/AMQP),或者也可以自定义。

使用也很简单,只需要在初始化celery实例的时候给参数backend赋值即可:

1
app = Celery('eletest', backend='redis://localhost', broker='amqp://ele@localhost//')
### celery + django = django异步化 有了celery的助攻,我们终于可以把同步的django异步掉了!!!

示意图3

具体如何在django中加入celery,详见first steps with django

参考

请言小午吃个甜筒~~