Celery 实现分布式任务队列

Celery 简介

Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。

在 Python 中定义 Celery 的时候,我们要引入 Broker,中文中有中间人的意思,在这里 Broker 起到一个中间人的角色。在工头提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农等着取出一个个任务准备着手做。

这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 有点像我们的 Broker,也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。

Show me the code

# 以下为 dispatcher.py
from worker import divide

# 1
divide.delay(1,2)
# 2
divide.apply_async((1, 2))


# 以下为 worker.py
from celery import Celery
app = Celery('tasks', backend='amqp://guest@localhost//', broker='redis://')

@app.task
def divide(x, y):
    print x / y

worker.py 中新建了一个 Celery 实例,以 amqp 作为 broker,以 redis 作为 backend 储存所有 task 执行的历史记录。我们在此例中使用 RabbitMQ 作为我们的消息队列服务器。

我们一方面通过命令行中执行以下语句来启动 celery 服务。

celery -A worker worker --loglevel=info

另外一方面,我们运行 dispatch.py,代码中将 worker 中的 divide 函数导入,再接着以两种方式将 task 启动。第一种方法中的 delay 方法接收了两个参数,实际为第二种方法的便捷调用,第二种方法在使用时,要将我们要传给 divide 的参数作为 tuple 放在第一个参数位置。

apply_async 的其他参数

apply_async 还支持其他参数,比如设置回调。

设置 task 实例的回调可以采用 link:

divide.apply_async((16, 2), link=divide.s(8))

首先计算 16 / 2,然后把结果 8 / 8,最后执行的结果等于 1. 所以这里的 link 是指向一个后继的调用函数,即完成当前 divide 以后再进行下一个 divide 操作。 除了 link 之外,还有 link_error,只会在该任务执行失败时调用。在本例中,我们可以在 divide 执行失败时,执行 link_error 所指的函数,这个函数就是错误消息的处理句柄,它会接收到一个 task 的 UUID,我们可以通过 UUID 来访问出错的任务的异常状态。

# dispatcher
divide.apply_async((1, 0), link_error=error_handler.s())

# 这里我们把 1 和 0 放到了 divide 函数中执行,引发了除零异常,继而执行 link_error 对应的 error_handler,error_handler 接收到 uuid 参数,通过 AsyncResult 生产一个结果实例,我们可以用 result.state 打印出该任务的执行情况,用 result.info 来获取异常的具体信息。

# worker
@app.task
def divide(x, y):
    print x/y

@app.task
def error_handler(uuid):
    result = AsyncResult(uuid)
    print 'task error {0}'.format(uuid)
# [2015-09-01 13:43:26,569: WARNING/Worker-2] task error 8e516377-a6c0-4a40-934f-dd1b0692c5fa
    print result.state
# [2015-09-01 13:43:26,572: WARNING/Worker-2] FAILURE
    print result.info
# [2015-09-01 13:43:26,572: WARNING/Worker-2] integer division or modulo by zero

跟踪异常的成因

异常的成因我们可以如上述代码所示将 result.info 打印出来而得知。然而我们并不能满足于此,仅仅知晓出错的 task 的 UUID 和其状态是不够的,我们想要知道发生错误时,task 的传入参数是什么。我一开始没有尝试出通过 UUID 来获取到原来的 1 和 0 这两个参数,后来我追踪了 apply_async 这个函数,位于 task.py 中,再跟踪到 trace.py 中的 build_tracer 函数,果然在 link_error 的调用时只传递了 UUID 一个参数,代码如下:

    def on_error(request, exc, uuid, state=FAILURE, call_errbacks=True):
        if propagate:
            raise
        I = Info(state, exc)
        R = I.handle_error_state(task, eager=eager)
        if call_errbacks:
            group(
                [signature(errback, app=app)
                 for errback in request.errbacks or []], app=app,
            ).apply_async((uuid, ))
            # ).apply_async((uuid, request.args))
            # 可以改成上一行注释中的代码,这样就可以在 error_handler 中得到原来调用的任务的输入参数了。
        return I, R, I.state, I.retval

此处通过修改 celery 源代码来获取出错时 task 的传入参数,但是方法并不好。于是我想能不能通过 UUID 直接获取到原来的 task,然后查看 task 的 args,但是这篇文档有些晦涩难懂,我就先放弃了,便发现了以下方法。

class DebugTask(Task):
    abstract = True
    def on_failure(self, *args, **kwargs):
        print self.request.args

@app.task(base=DebugTask)
def divide(x, y):
    print x / y

这段代码将原来应该继承的 Task 类中的 on_failure 函数重写,当 divide 函数发生异常时,该 task 的 state 自动变成 failure,Task 会自动调用 on_failure 函数,从而打印出传入的 args。

任务的远程调用

关于 task 的调用,celery 还提供了另外一种 send_task 方法。

Celery 作为分布式系统,自然就支持远程 worker,这个时候我们可以利用 send_task 这个函数,以函数名的方式调用 task。代码如下:

from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')
app.send_task('worker.divide', args=[1, 0])

# send_task 也支持 link_error,这个官方文档上没写详细,这里需要调用 signature 函数来生产函数的 signature,这时 divide 的 UUID 和我们通过修改源代码得到的 args。

app.send_task('worker.divide', args=[1, 0], link_error=app.signature('worker.error_handler'))

这里我们没有通过 module 的方式把 divide 函数给 import 到程序中来,也就意味着我们可以不将 worker 放在与 dispatcher 同一目录下。我们的想法是,将 worker 放在另外一台服务器上,通过 celery 调用它,本地 django 项目调用这个 dispatcher 后,将 task 发送到远程服务器的队列中,然后由远程服务器中的 worker 处理。

配置文件

此时需要注意的是,这里的 dispatcher 是通过文件的方式配置的,其配置文件应与 worker 端配置文件吻合,如下:

# celeryconfig.py
# coding=utf-8

# Broker 设置 RabbitMQ
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'redis://'

# Tasks 位于 worker.py 中
CELERY_IMPORTS = ('worker', )

# 默认为1次/秒的任务
CELERY_ANNOTATIONS = {'worker.divide': {'rate_limit': '1/s'}}

CELERY_ROUTES = {'worker.divide': {'queue': 'divide'},
                 'worker.error_handler': {'queue': 'error'}}

# 默认所有格式为 json
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']

使用了配置文件以后,我们在 worker 中也可以采用相同的方式定义 app,如下:

# coding=utf-8
from celery import Celery
app = Celery()
app.config_from_object('celeryconfig')

@app.task
def divide(x, y):
    print x / y

我们在配置文件中为 worker.divide 这个 task 指定了 divide 这个队列,为 error_handler 定义了 error 这个队列用于错误处理。 在启动 celery 的时候可以通过 -Q 参数指定队列。在终端中执行了以下命令后,celery 服务器就启动了,当前 celery 会监视 divide 队列,取出参数执行任务。而如果我们不启动另外一个 celery 来监视 error 队列,error_handler 就不会前往队列去拿参数执行。

celery -A worker worker --loglevel=info -Q divide

关于 Celery,网上英文教程都不多,更别说中文的了。

网上有些关于 Celery 性能的讨论,我暂且没有做分析,如果有更好的解决方案能够替代它,请留言告知。

如果发现本文有错误,请指正。

未经允许不得转载:SuperMan's blog » Celery 实现分布式任务队列

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址